使用kafka节点nodejs lib从合流kafka获取已提交的消息

ewm0tg9j  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(273)

我已经使用yaml设置了cp kafka和cp zookeeper

version: '3'
services:
  zookeeper:
    container_name: zookeeper
    image: confluentinc/cp-zookeeper
    ports:
      - "32181:32181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_SYNC_LIMIT: 2

  kafka:
    container_name: kafka
    image: confluentinc/cp-kafka
    ports:
      - "9094:9094"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
      KAFKA_LISTENERS: INTERNAL://:9092,OUTSIDE://:9094
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://:9092,OUTSIDE://localhost:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      ES_JAVA_OPTS: "-Xms512m -Xmx3000m"

使用nodejs框架,使用kafka节点库尝试生成和使用消息。
我的程序代码(工作正常)

class kafkaProducer{

    constructor(){
        /* Getting the kafak avaliable messge broker reference */
        this.kafkaBorkerObj                     = require('kafka-node');
        this.strDebug                           = "----------KAFKA PRODUCER --> ";
    }

    /***************************************************************************/
    /*Purpose   : Send Message into the topics.
    /*Input     : pStrTopicName :: Topic name,
                : pStrMessageBody   :: Message body
    /*Response  : Message send status :: TRUE/FALSE
    /***************************************************************************/
    setMessageInTopic(pStrTopicName, pStrMessageBody, callback){
        try{
            /* Variable and object init */
            const Producer      = this.kafkaBorkerObj.Producer;
            const kafkaClient   = new this.kafkaBorkerObj.KafkaClient({kafkaHost:process.env.KAFAK_BROKER_ENDPOINT});
            const producerObj   = new Producer(kafkaClient);
            const strDebug      = this.strDebug;

            console.log(strDebug+"Received Topic : "+pStrTopicName);

            /* Topic and message defination */
            let arrPayloads = [
                                {
                                    topic: pStrTopicName,
                                    retention: 1000,
                                    messages: JSON.stringify(pStrMessageBody)
                                }
                            ];
            console.log(strDebug+"Paylod : "+arrPayloads);

            /* if producer ready then do needful */
            producerObj.on('ready', async function() {
                console.log(strDebug+"Producer is ready");
                /* Producing the message to the topic, create if not exists */
                let push_status = producerObj.send(arrPayloads, (err, data) => {
                    console.log(strDebug+"Producer is send");
                    /* if any error occred the do needful */
                    if (err) {
                        console.log(strDebug+pStrTopicName+']: broker update failed');
                        console.log(err);
                        /* Send operation faild response */
                        callback(false) ;
                    } else {
                        console.log(strDebug+pStrTopicName+']: broker update success');
                        /* Send operation success response */
                        callback(data[pStrTopicName][0]);
                    }
                    /* Closing the Kafka object */
                    //producerObj.close();
                    //kafkaClient.close();
                });
            });

            /* if any error occured then do needful */
            producerObj.on('error', function(err) {
                console.log(err);
                console.log(strDebug+kafka_topic+']: connection errored');

                /* Closing the Kafka object */
                //producerObj.close();
                //kafkaClient.close();

                /* Send operation faild response */
                callback(false) ;
            });
        }catch(e) {
            console.log(e);
            /* Send operation faild response */
            callback(false) ;
        }finally{

        }
    }
}

/* Making Kafka Producer class as public to the all users */
module.exports = kafkaProducer;

消费者代码是

/*************************************************************************/
    /*Purpose   : Reading the message from kafka queue for given topics.
    /*************************************************************************/

    class kafkaConsumer{

        constructor(){
            /* Getting the kafak avaliable messge broker reference */
            this.kafkaBokerObj                      = require('kafka-node');
            this.strDebug                           = "----------KAFKA CONSUMER --> ";
        }

        /***************************************************************************/
        /*Purpose   : Reading the Message from.
        /*Input     : pStrTopicNameArr  :: Topic name array,
        /*Response  : Send the received message from topic
        /***************************************************************************/
        getMessageFromKafak(pStrTopicNameArr, callback){
            try{
                var Consumer    = this.kafkaBokerObj.Consumer;
                var kafkaClient = new this.kafkaBokerObj.KafkaClient({kafkaHost:process.env.KAFAK_BROKER_ENDPOINT});
                var strDebug    = this.strDebug;

                var options = {
                    // Auto commit config
                    autoCommit: false,
                    autoCommitMsgCount: 100,
                    autoCommitIntervalMs: 1000,
                    // Fetch message config
                    fetchMaxWaitMs: 5000,
                    fetchMinBytes: 1,
                    fetchMaxBytes: 1024 * 10,
                    fromOffset: false,
                    fromBeginning: false
                };

                let consumerObj = new Consumer(kafkaClient,pStrTopicNameArr,options);

                console.log(strDebug+"RESPONE MESSAGE PROCESS "+pStrTopicNameArr)
                console.log(pStrTopicNameArr);

                for(var pStrTopicObj of pStrTopicNameArr){
                    var strTopicName = pStrTopicObj.topic;
                    console.log("Topic Added "+strTopicName);

                }
                /* If kafka consumer is on then do read the message */
                consumerObj.on('message', function (pStrMessage) {
                    console.log(strDebug+"RESPONE COMMIT MESSAGE  ----------------------");
                    consumerObj.commit({'force':true},function(err, data) {
                        console.log(strDebug+"RESPONE COMMIT MESSAGE  DONE ----------------------");
                        console.log(err);
                        console.log(data);
                        /* Return the message */
                        return callback(pStrMessage);
                    });
                    console.log(strDebug+"RESPONE MESSAGE  ----------------------");
                    /* consumerObj.close();
                    kafkaClient.close(); */
                }).on('error', function (pStrError) {
                    console.log(strDebug+"RESPONE MESSAGE ERROR ----------------------")
                    console.log(pStrError);
                    /* consumerObj.close();
                    kafkaClient.close(); */
                    /* Return negative response */
                    return callback(false);
                }).on('offsetOutOfRange', function (topic) {
                    topic.maxNum = 2;
                    offset.fetch([topic], function (err, offsets) {
                        var min = Math.min.apply(null, offsets[topic.topic][topic.partition]);
                        consumerObj.setOffset(topic.topic, topic.partition, min);
                    });
                });

                consumerObj.addTopics(pStrTopicNameArr, function (err, added) {
                        console.log("***************");
                        console.log(err);
                        console.log(added);
                    });

                /* pStrTopicNameArr.forEach(function(pIndex, pStrValue){
                    var strTopicName = pIndex.topic;
                    console.log("Topic Added "+strTopicName);
                    consumerObj.addTopics(strTopicName, function (err, added) {
                        console.log("***************");
                        console.log(err);
                        console.log(added);
                    });
                }); */
            }catch(e) {
                console.log(e);
                /* Return negative response */
                return callback(false);
            }finally{
                /* Closing the Kafka object */
                //consumerObj.close();
                //kafkaClient.close();
            }
        }
    }

    /* Making Kafka Consumer class as public to the all users */
    module.exports = kafkaConsumer;

调用消费代码,此函数get调用一个消息get成功的过程。

function resetTheConsumer(intOffset){
    try{
        /* if any tpoic is added in the message queue then do needful */
        if(strTopicArr.length > 0){
            /* Got the response */
            kafkaConsumerObj.getMessageFromKafak(strTopicCollection, function(pResponseMessage){
                /* received message */
                echo("Received message from Message Queue");
                echo(pResponseMessage);
                sendMessage(pResponseMessage);
            });
        }else{
            echo("No Topic Found");
        }
    }catch(exception){
        echo("Error occured while reading the mesasge from Message Queue sending the exception");
        echo(exception);
    }
}

我们的尝试(消费者部分)
设置 autoCommit: false 并在成功读取消息后手动提交消息。
结果:消费者仍然从旧的偏移量返回已读的消息
手动提交输出 { getCustomerInfossds: { partition: 0, errorCode: 0 } } 添加步骤(1)添加了 consumerObj.addTopics 结果:addtopics工作,但仍然从旧的偏移量获取read消息。
addtopics的输出 [ 'getCustomerInfossds' ] (主题名称)
以及接收消息(如果执行消费者,则返回该消息的编号)

{ topic: 'getCustomerInfossds',
  value:
   '{"headers":{"Authorization":"Bearer clpVU3kwSFNKZlA5TUtrOl9jUGtLb3VwLXMtZGlSQlZHN1ZKZ040Vmh1QQ==","token":"dd8f4eee15c338ebf2484e772a15a05a","wsSessionID":"Dc55aclJlBEYJOj1N7GFql7lzsJNbL","keepAlive":18000,"operation":"getCustomerInfo/","messageSendTimeStamp":"25/6/2020 2:53:15:655","corRelationId":"HqwS0R4aHY4JxpWapDaIYv75EQOtM6udUhajQaVE"},"requestBody":{"topic":"getCustomerInfossds","data":{"getCustomerInfo":{"_id":1}}}}',
  offset: 66,
  partition: 0,
  highWaterOffset: 67,
  key: null }

浏览器输出

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题