我在springboot下创建了一个kafka-spring生产者,它将数据发送到kafka,然后写入数据库;我希望所有的工作都在一个交易。我对Kafka还不熟悉,对Spring也不在行,我遇到了一些困难。非常感谢你的指点。
到目前为止,我的代码在一个循环中成功地写入了kafka。我还没有设置db,但是通过在配置中向producerfactory添加transactionidprefix来设置全局事务处理:
producerFactory.setTransactionIdPrefix("MY_SERVER");
并将@transactional添加到kafka发送的方法中。最终我计划用同样的方法来做我的数据库工作。
问题:代码在第一次运行时运行得很好。但是如果我停止程序,即使是干净地停止,我发现代码在第二次运行它时会挂起,只要它进入@transactional方法。如果我注解掉@transactional,它将进入方法,但挂起在kafa template send()上。
问题似乎是事务id。如果我更改前缀并重新运行,程序在第一次运行时会再次正常运行,但在再次运行时会挂起,直到选择了新的前缀。因为在重新启动后,trans id计数器从零开始,如果trans id前缀没有改变,那么在重新启动时将使用相同的trans id。
在我看来,原来的transid仍然在服务器上打开,而且从未提交(我可以使用console consumer读取主题之外的数据,但这将读取uncommitted)。但是如果是这样的话,我如何让spring提交trans呢?我想我的配置一定是错的。或者——问题是trans-id永远不能被重用吗(在这种情况下,如何解决呢?)
这是我的相关代码。配置为:
@SpringBootApplication
public class MYApplication {
@Autowired
private static ChangeSweeper changeSweeper;
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
DefaultKafkaProducerFactory<String, String> producerFactory=new DefaultKafkaProducerFactory<>(configProps);
producerFactory.setTransactionIdPrefix("MY_SERVER");
return producerFactory;
}
@Bean
public KafkaTransactionManager<String, String> KafkaTransactionManager() {
return new KafkaTransactionManager<String, String>((producerFactory()));
}
@Bean(name="kafkaProducerTemplate")
public KafkaTemplate<String, String> kafkaProducerTemplate() {
return new KafkaTemplate<>(producerFactory());
}
进行交易的方法是:
@Transactional
public void send( final List<Record> records) {
logger.debug("sending {} records; batchSize={}; topic={}", records.size(),batchSize, kafkaTopic);
// Divide the record set into batches of size batchSize and send each batch with a kafka transaction:
for (int batchStartIndex = 0; batchStartIndex < records.size(); batchStartIndex += batchSize ) {
int batchEndIndex=Math.min(records.size()-1, batchStartIndex+batchSize-1);
List<Record> nextBatch = records.subList(batchStartIndex, batchEndIndex);
logger.debug("## batch is from " + batchStartIndex + " to " + batchEndIndex);
for (Record record : nextBatch) {
kafkaProducerTemplate.send( kafkaTopic, record.getKey().toString(), record.getData().toString());
logger.debug("Sending> " + record);
}
// I will put the DB writes here
}
1条答案
按热度按时间b1zrtrql1#
无论运行多少次,这对我来说都很好(但是我必须在本地机器上运行3个代理示例,因为默认情况下事务需要这样做)。。。