“Kafka消费群体授权失败”偶尔?

af7jpaap  于 2021-06-05  发布在  Kafka
关注(0)|答案(0)|浏览(218)

我有一个php程序,其中有2个php进程使用kafka消息。但有时会出现“Kafka消费群授权失败”这样的错误,这两个过程之间有关系吗?它们具有相同的主题和组id,当然acl策略类型是gssapi。但我真的不知道为什么它只是偶尔发生?
代码如下:

public static function consumer(string $topic, string $group, $handler, string $instance = 'test')
    {
        $conf = new Conf();

        // Set a rebalance callback to log partition assignments (optional)
        $conf->setRebalanceCb(function (KafkaConsumer $kafka, $err, array $partitions = null) {

            switch ($err) {
                case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                    $kafka->assign($partitions);
                    break;

                case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                    $kafka->assign(null);
                    break;

                default:
                    throw new \Exception($err);
            }
        });

        // Configure the group.id. All consumer with the same group.id will consume
        // different partitions.
        $conf->set('group.id', $group);

        // Initial list of Kafka brokers
        $conf->set('metadata.broker.list', config("kafka.$instance"));

        // Set where to start consuming messages when there is no initial offset in
        // offset store or the desired offset is out of range.
        // 'smallest': start from the beginning
        $conf->set('auto.offset.reset', 'smallest');

        // kafka version: 0.10.0
        // %5|1583908574.751|MAXPOLL|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/102: Broker does not support KIP-62 (requires Apache Kafka >= v0.10.1.0): consumer configuration `max.poll.interval.ms` (300000) is effectively limited by `session.timeout.ms` (10000) with this broker version
        // $conf->set('max.poll.interval.ms', '10000');

        $consumer = new KafkaConsumer($conf);
        // Subscribe to topic 'test'
        $consumer->subscribe([$topic]);

        // "Waiting for partition assignment... (make take some time when\n";
        // "quickly re-joining the group after leaving it.)\n";

        while (true) {
            /**@var \RdKafka\Message $message */
            $message = $consumer->consume(120 * 1000); 
            switch ($message->err) {
                case RD_KAFKA_RESP_ERR_NO_ERROR:
                    $handler($message->payload);
                    break;
                case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                    // "No more messages; will wait for more\n";
                    break;
                case RD_KAFKA_RESP_ERR__TIMED_OUT:
                    // "Timed out\n";
                    break;
                default:
                    throw new \Exception($message->errstr(), $message->err);
                    break;
            }
        }
    }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题