force在junit结束时停止spring引导

n3h0vuf2  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(438)

实际上,我有一个spring引导应用程序,我使用了kafka嵌入而没有使用spring-kafka,但是使用了合流库。
我配置了自定义拓扑,当我启动junit时,流仍然处于侦听状态,而spring引导服务器不会结束。
我尝试使用@dirtiescontext,但问题仍然存在。
从我用的junit开始

@RunWith(SpringRunner.class)
@SpringBootTest

当消费者处于循环中时,我在控制台中看到以下消息:
[producer clientid=application1-3c4587c8-23f0-4c8b-8ef0-75bc1e0f966c-streamthread-1-producer]无法建立到节点-1的连接。代理可能不可用。
提示?
谢谢

9lowa7mx

9lowa7mx1#

出现错误的原因似乎是代理没有运行。但Kafka团队正在运行。
为了修复它,我认为您需要在测试时阻止kafkastreams运行。
@springboottest注解提供了一个属性类。您可以指定将注册为bean的类
您可以通过指定classes属性来防止注册与kafkastreams相关的bean。
例如,您可以按以下步骤进行测试。在这种情况下,不会出现上述问题,因为kafkastreams相关bean没有注册。

@RunWith(SpringRunner.class)
@SpringBootTest(classes = {ServiceImpl.class, CommonConfig.class})
public class SomeClassTest {
    @Autowired
    private ServiceImpl articleServiceImpl;

    // do test
}

使用@springboottest时,最好只注册要测试的bean。
如果您不熟悉这种方法,请尝试模拟管理kafkastreams对象的bean
例如,我创建以下bean来开始和结束kafkastreams对象。

@Component
public class ManageableStream implements DisposableBean, InitializingBean {

    private final KafkaStreams kafkaStreams;

    public ManageableStream() {
        StreamsConfig config = buildStreamConfig();
        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> inputStream = builder.stream("source.topic");
        inputStream.to("destination.topic");

        Topology topology = builder.build();
        kafkaStreams = new KafkaStreams(topology, config);
    }

    @Override
    public void destroy() throws Exception {
        kafkaStreams.close();
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        kafkaStreams.start();
    }

    private StreamsConfig buildStreamConfig() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-stream-application");
        properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, AT_LEAST_ONCE);
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);

        return new StreamsConfig(properties);
    }
}

测试时,模拟管理kafkastreams对象的bean。

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {

    @MockBean
    private ManageableStream manageableStream;

    @Test
    public void contextLoads() {
    }

}

那么Kafka团队就不会启动,所以上述问题就不会发生。
如果您想测试kafkastreams的拓扑结构,请参见下面的链接
https://kafka.apache.org/11/documentation/streams/developer-guide/testing.html
我希望我的回答是有帮助的

相关问题