如何在springboot中处理taskrejectedexception

sauutmhj  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(487)

我正在使用SpringBoot2.0.2版本和SpringKafka 2.1.10处理kafka消息,并将它们插入ElasticSearch6.x。我得到了一批100条我想并行插入elasticsearch的消息。此外,我在@kafkalistener中使用手动确认。我知道当我的threadpooltaskexecutor的队列已满时,我会得到taskrejectedexception,我只是捕获它,报告它并再次将它抛出。
但是假设ack.acknowledge不会被调用,因此消息将由kafka重新传递。但显然,在3万条消息的消息负载中,我遗漏了几条(约10条)消息。我想知道我是否没有正确处理可能导致丢弃消息的异常。任何帮助都将不胜感激。
这是我的线程池任务执行器

@Configuration
@EnableAsync
public class CommonBeanConfig {
    @Bean(name = "threadPoolTaskExecutor")
    public Executor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(20);
        executor.setMaxPoolSize(100);
        executor.setQueueCapacity(500);
        executor.setThreadNamePrefix("KafaSubscriber-Async#");
        executor.initialize();
        return executor;
    }
}

这里是@async方法

@Async
    public CompletableFuture<Integer> publishToElasticSearch(String key, EcrAvro avroData) throws Exception {
        logger.warn("ECR avro key=" + key + " - ECR Avro value= " + avroData.toString());
       // check if records exists then update else insert (upsert)
        return CompletableFuture.completedFuture(res.getStatusLine().getStatusCode());
    }

这是我的@kafkalistener

@KafkaListener(topics = "${topic}", containerFactory = "ecrAvroListenerFactory")
public void listen(final Acknowledgment ack, final List<ConsumerRecord<String, EcrAvro>> messages) throws Exception {
    try {
        List<CompletableFuture<Integer>> completableFuturesList = new ArrayList<>();
        for (ConsumerRecord<String, EcrAvro> kafkaRecord : messages) {
            String key = kafkaRecord.key();
            EcrAvro avroData = kafkaRecord.value();
            completableFuturesList.add(publishToElasticService.publishToElasticSearch(key, avroData));
        }
        CompletableFuture.allOf(completableFuturesList.toArray(new CompletableFuture[completableFuturesList.size()])).join();
        logger.warn("******all threads joined ..!************\n\n");
        ack.acknowledge();
        logger.warn("******acknowledge done..!************\n\n");
    }catch(TaskRejectedException trje){
        logger.warn("******task rejected!************\n\n");
        throw trje;
    }
}

我的消费者配置也显示在这里

//Builds the consumer factory, required for @KafkaListener
protected ConcurrentKafkaListenerContainerFactory<Object, Object> buildConcurrentKafkaListenerFactory(String consumerType) {
    Map<String, Object> properties = initializeCommonConsumerConfig();

    properties.put(ConsumerConfig.GROUP_ID_CONFIG, environment.getProperty("group.id"));
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
    properties.put("schema.registry.url", environment.getProperty("kafka.schema.registry.url"));
    properties.put("specific.avro.reader", "true");

    logger.info("Consumer Factory Properties: " + getPropertyAsString(properties));

    final ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConcurrency(Integer.parseInt(environment.getProperty("accountupdate.concurrent.consumer.count")));
    factory.setBatchListener(true);
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<Object, Object>(properties));
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
    // by default spring kafka is configured to send ack on error, disabling it
    factory.getContainerProperties().setAckOnError(false);

    return factory;
}

public Map<String, Object> initializeCommonConsumerConfig() {
    HashMap props = new HashMap();
    props.put("bootstrap.servers", environment.getProperty("kafka.bootstrap.servers"));
    props.put("enable.auto.commit", environment.getProperty("enable.auto.commit"));
    props.put("session.timeout.ms", environment.getProperty("session.timeout.ms"));
    props.put("auto.offset.reset", environment.getProperty("auto.offset.reset"));
    props.put("fetch.max.wait.ms", environment.getProperty("fetch.max.wait.ms"));
    props.put("max.partition.fetch.bytes", environment.getProperty("max.partition.fetch.bytes"));
    props.put("max.poll.records", environment.getProperty("max.poll.records"));

    String jaasFile = environment.getProperty("jaasfile");
    System.out.println("Jaas file is " + jaasFile);

    if (jaasFile != null) {
        props.put("security.protocol", environment.getProperty("security.protocol"));
        props.put("sasl.kerberos.service.name", environment.getProperty("sasl.kerberos.service.name"));

        try {
        System.setProperty("java.security.auth.login.config", this.resourceLoader.getResource(jaasFile).getURI().toString());
        System.out.println("java.security.auth.login.config::" + System.getProperty("java.security.auth.login.config"));
        System.setProperty("java.security.krb5.realm", environment.getProperty("realm"));
        System.setProperty("java.security.krb5.kdc", environment.getProperty("kdc"));
        System.setProperty("sun.security.krb5.debug", environment.getProperty("krb.debug"));
        System.setProperty("sun.security.krb5.principal", environment.getProperty("principal"));
        } catch (IOException ioException) {
            ioException.printStackTrace();
        }
    }

    return props;
}
6yoyoihd

6yoyoihd1#

您可以使用“呼叫者运行”拒绝策略。
但是,kafka不跟踪消息确认,只跟踪主题/分区中的偏移量。
如果您跳过偏移量9并确认偏移量10,9将永远不会被重新交付。
所以,你不能这样用Kafka。

相关问题