SpringKafka全局事务id在程序结束后保持打开状态

z9smfwbn  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(277)

我在springboot下创建了一个kafka-spring生产者,它将数据发送到kafka,然后写入数据库;我希望所有的工作都在一个交易。我对Kafka还不熟悉,对Spring也不在行,我遇到了一些困难。非常感谢你的指点。
到目前为止,我的代码在一个循环中成功地写入了kafka。我还没有设置db,但是通过在配置中向producerfactory添加transactionidprefix来设置全局事务处理:

producerFactory.setTransactionIdPrefix("MY_SERVER");

并将@transactional添加到kafka发送的方法中。最终我计划用同样的方法来做我的数据库工作。
问题:代码在第一次运行时运行得很好。但是如果我停止程序,即使是干净地停止,我发现代码在第二次运行它时会挂起,只要它进入@transactional方法。如果我注解掉@transactional,它将进入方法,但挂起在kafa template send()上。
问题似乎是事务id。如果我更改前缀并重新运行,程序在第一次运行时会再次正常运行,但在再次运行时会挂起,直到选择了新的前缀。因为在重新启动后,trans id计数器从零开始,如果trans id前缀没有改变,那么在重新启动时将使用相同的trans id。
在我看来,原来的transid仍然在服务器上打开,而且从未提交(我可以使用console consumer读取主题之外的数据,但这将读取uncommitted)。但是如果是这样的话,我如何让spring提交trans呢?我想我的配置一定是错的。或者——问题是trans-id永远不能被重用吗(在这种情况下,如何解决呢?)
这是我的相关代码。配置为:

@SpringBootApplication
public class MYApplication {

@Autowired
private static ChangeSweeper changeSweeper;

@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;

@Bean
public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        DefaultKafkaProducerFactory<String, String> producerFactory=new DefaultKafkaProducerFactory<>(configProps); 
        producerFactory.setTransactionIdPrefix("MY_SERVER"); 
        return  producerFactory;
}

@Bean
public KafkaTransactionManager<String, String> KafkaTransactionManager() {
    return new KafkaTransactionManager<String, String>((producerFactory()));
}

@Bean(name="kafkaProducerTemplate")
public KafkaTemplate<String, String> kafkaProducerTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

进行交易的方法是:

@Transactional
public void send( final List<Record> records) {
    logger.debug("sending {} records; batchSize={}; topic={}", records.size(),batchSize,  kafkaTopic);

    // Divide the record set into batches of size batchSize and send each batch with a kafka transaction:
    for (int batchStartIndex = 0; batchStartIndex < records.size(); batchStartIndex += batchSize ) {
        int batchEndIndex=Math.min(records.size()-1, batchStartIndex+batchSize-1);
        List<Record> nextBatch = records.subList(batchStartIndex, batchEndIndex);
        logger.debug("## batch is from " + batchStartIndex + " to " + batchEndIndex);           

        for (Record record : nextBatch) {

            kafkaProducerTemplate.send( kafkaTopic, record.getKey().toString(), record.getData().toString());   
            logger.debug("Sending> " + record);
        }

// I will put the DB writes here

}
b1zrtrql

b1zrtrql1#

无论运行多少次,这对我来说都很好(但是我必须在本地机器上运行3个代理示例,因为默认情况下事务需要这样做)。。。

@SpringBootApplication
@EnableTransactionManagement
public class So47817034Application {

    public static void main(String[] args) {
        SpringApplication.run(So47817034Application.class, args).close();
    }

    private final CountDownLatch latch = new CountDownLatch(2);

    @Bean
    public ApplicationRunner runner(Foo foo) {
        return args -> {
            foo.send("foo");
            foo.send("bar");
            this.latch.await(10, TimeUnit.SECONDS);
        };
    }

    @Bean
    public KafkaTransactionManager<Object, Object> KafkaTransactionManager(KafkaProperties properties) {
        return new KafkaTransactionManager<Object, Object>(kafkaProducerFactory(properties));
    }

    @Bean
    public ProducerFactory<Object, Object> kafkaProducerFactory(KafkaProperties properties) {
        DefaultKafkaProducerFactory<Object, Object> factory =
                new DefaultKafkaProducerFactory<Object, Object>(properties.buildProducerProperties());
        factory.setTransactionIdPrefix("foo-");
        return factory;
    }

    @KafkaListener(id = "foo", topics = "so47817034")
    public void listen(String in) {
        System.out.println(in);
        this.latch.countDown();
    }

    @Component
    public static class Foo {

        @Autowired
        private KafkaTemplate<Object, Object> template;

        @Transactional
        public void send(String go) {
            this.template.send("so47817034", go);
        }

    }

}

相关问题