Kafka主题似乎只有一个KafkaConsumer示例在气流中消费

rvpgvaaj  于 2022-09-21  发布在  Kafka
关注(0)|答案(1)|浏览(129)

我有一个系统,我在其中启动两个独立的进程,一个向Kafka主题生成消息,另一个进程运行多个KafkaConsumer示例并使用这些消息。然而,似乎只有一个消费者实际使用了消息,而第二个消费者却什么也不做。

我已经使用Kafdrop创建了一个带有四个示例的Kafka主题。据我所知,这应该意味着如果我在运行KafkaConsumer的Airflow中有两个进程,它们应该在它们之间拆分可用示例,并使用来自它们“分配的”示例的消息。两个KafkaConsumer示例属于同一个group-id

在气流日志中,我只看到其中一个KafkaConsumer显示它正在处理消息。下面我附上了用于生成和使用Kafka消息的python代码,以及用于启动生成和使用消息的过程的airflow脚本。最后,我还有一个来自该过程的日志,它不使用任何消息。

import airflow
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from preprocessing.testers import create_msg, print_current_time
import random

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(1),
    'provide_context': True,                     
}

dag = DAG(
    dag_id='creating_msgs',
    default_args=args,
    schedule_interval= '@once',
    catchup=True,
)

create_msgs = [
    PythonOperator(
        task_id='create_msg_'+str(i+1),
        python_callable=create_msg,
        op_kwargs={'task_id': i+1, 'nbr': random.randrange(50,100)},
        dag=dag) for i in range(2)
]

print_time_task = PythonOperator(
        task_id='print_time',
        python_callable=print_current_time,
        dag=dag
    )

create_msgs >> print_time_task

制作Kafka信息的脚本

def create_msg(**kwargs):
    producer = KafkaProducer(bootstrap_servers=['kafka:9092'],                              # set up Producer
    value_serializer=lambda x: json.dumps(x).encode('utf-8'))

    for i in range(kwargs['nbr']):
        task_name = 'Task' + str(kwargs['task_id']) + '_' + str(i+1)
        logging.warn('Producing message of task: ' + task_name)
        producer.send('TopicA', {'name': task_name})
        sleep_time = random.randrange(1,5)
        logging.warn('Sleeping for ' + str(sleep_time))
        sleep(sleep_time)
    producer.close()

消费Kafka消息的脚本

import airflow
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from preprocessing.testers import print_current_time
from crawling.crawler import start_task
import random

args = { 
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(1), 
    'provide_context': True,
}

dag = DAG(
    dag_id='crawl_listings',
    default_args=args,
    schedule_interval= '@once',
    catchup=True,
)

start_tasks = [
    PythonOperator(
        task_id='start_task'+str(i+1),
        python_callable=create_task,
        op_kwargs={'task_id': i+1},
        dag=dag) for i in range(2)
]

print_time_task = PythonOperator(
        task_id='print_time',
        python_callable=print_current_time,
        dag=dag
    )

start_tasks >> print_time_task

任务来运行KafkaConsumer(为此创建了两个示例)。

def start_task(**kwargs):
    consumer = KafkaConsumer(
        'TopicA',
        bootstrap_servers=['kafka:9092'],
        consumer_timeout_ms=3000,
        auto_offset_reset='earliest',
        enable_auto_commit=True,
        group_id='my-group',
        value_deserializer=lambda x: json.loads(x.decode('utf-8')))

    try:
        for message in consumer:
            logging.info('Processing: ' + message.value['name'])
            sleep_time = random.randrange(1,5)
            logging.warn('Sleeping for ' + str(sleep_time))
            sleep(sleep_time)

        logging.info('Done')
        consumer.close()
    except Exception as e:
        print(e)
        logging.error('Error: ' + e)

以下是包含KafkaConsumer的任务的气流日志,该任务不处理任何消息。另一个过程

[2022-09-15 13:23:11,737] {{taskinstance.py:887}} INFO - Executing <Task(PythonOperator): start_crawling_task2> on 2022-09-14T00:00:00+00:00
[2022-09-15 13:23:11,739] {{standard_task_runner.py:53}} INFO - Started process 3319 to run task
[2022-09-15 13:23:11,785] {{logging_mixin.py:112}} INFO - Running %s on host %s <TaskInstance: crawl_listings.start_crawling_task2 2022-09-14T00:00:00+00:00 [running]> f68c0ccde6c0
[2022-09-15 13:23:11,805] {{logging_mixin.py:112}} INFO - [2022-09-15 13:23:11,804] {{conn.py:372}} INFO - <BrokerConnection node_id=bootstrap-0 host=kafka:9092 <connecting> [IPv4 ('172.19.0.6', 9092)]>: connecting to kafka:9092 [('172.19.0.6', 9092) IPv4]
[2022-09-15 13:23:11,805] {{logging_mixin.py:112}} INFO - [2022-09-15 13:23:11,805] {{conn.py:1049}} INFO - Probing node bootstrap-0 broker version
[2022-09-15 13:23:11,807] {{logging_mixin.py:112}} INFO - [2022-09-15 13:23:11,806] {{conn.py:401}} INFO - <BrokerConnection node_id=bootstrap-0 host=kafka:9092 <connecting> [IPv4 ('172.19.0.6', 9092)]>: Connection complete.
[2022-09-15 13:23:11,913] {{logging_mixin.py:112}} INFO - [2022-09-15 13:23:11,913] {{conn.py:1106}} INFO - Broker version identifed as 1.0.0
[2022-09-15 13:23:11,914] {{logging_mixin.py:112}} INFO - [2022-09-15 13:23:11,913] {{conn.py:1108}} INFO - Set configuration api_version=(1, 0, 0) to skip auto check_version requests on startup
[2022-09-15 13:23:11,914] {{logging_mixin.py:112}} INFO - [2022-09-15 13:23:11,914] {{subscription_state.py:171}} INFO - Updating subscribed topics to: ('TopicA',)
[2022-09-15 13:23:12,121] {{logging_mixin.py:112}} INFO - [2022-09-15 13:23:12,121] {{conn.py:372}} INFO - <BrokerConnection node_id=1 host=kafka:9092 <connecting> [IPv4 ('172.19.0.6', 9092)]>: connecting to kafka:9092 [('172.19.0.6', 9092) IPv4]
[2022-09-15 13:23:12,222] {{logging_mixin.py:112}} INFO - [2022-09-15 13:23:12,222] {{conn.py:401}} INFO - <BrokerConnection node_id=1 host=kafka:9092 <connecting> [IPv4 ('172.19.0.6', 9092)]>: Connection complete.
[2022-09-15 13:23:12,222] {{logging_mixin.py:112}} INFO - [2022-09-15 13:23:12,222] {{conn.py:811}} INFO - <BrokerConnection node_id=bootstrap-0 host=kafka:9092 <connected> [IPv4 ('172.19.0.6', 9092)]>: Closing connection. 
[2022-09-15 13:23:12,427] {{logging_mixin.py:112}} INFO - [2022-09-15 13:23:12,427] {{cluster.py:376}} INFO - Group coordinator for my-group is BrokerMetadata(nodeId=1, host='kafka', port=9092, rack=None)
[2022-09-15 13:23:12,428] {{logging_mixin.py:112}} INFO - [2022-09-15 13:23:12,427] {{base.py:688}} INFO - Discovered coordinator 1 for group my-group
[2022-09-15 13:23:12,428] {{logging_mixin.py:112}} INFO - [2022-09-15 13:23:12,428] {{base.py:735}} INFO - Starting new heartbeat thread
[2022-09-15 13:23:12,428] {{logging_mixin.py:112}} INFO - [2022-09-15 13:23:12,428] {{consumer.py:342}} INFO - Revoking previously assigned partitions set() for group my-group
[2022-09-15 13:23:12,429] {{logging_mixin.py:112}} INFO - [2022-09-15 13:23:12,429] {{base.py:447}} INFO - (Re-)joining group my-group
[2022-09-15 13:23:16,647] {{logging_mixin.py:112}} INFO - [2022-09-15 13:23:16,647] {{base.py:333}} INFO - Successfully joined group my-group with generation 2
[2022-09-15 13:23:16,647] {{logging_mixin.py:112}} INFO - [2022-09-15 13:23:16,647] {{subscription_state.py:257}} INFO - Updated partition assignment: [TopicPartition(topic='TopicA', partition=2), TopicPartition(topic='TopicA', partition=3)]
[2022-09-15 13:23:16,648] {{logging_mixin.py:112}} INFO - [2022-09-15 13:23:16,648] {{consumer.py:239}} INFO - Setting newly assigned partitions {TopicPartition(topic='TopicA', partition=2), TopicPartition(topic='TopicA', partition=3)} for group my-group
[2022-09-15 13:23:16,651] {{logging_mixin.py:112}} INFO - [2022-09-15 13:23:16,650] {{crawler.py:25}} INFO - Done
[2022-09-15 13:23:16,654] {{logging_mixin.py:112}} INFO - [2022-09-15 13:23:16,654] {{base.py:742}} INFO - Stopping heartbeat thread
[2022-09-15 13:23:16,654] {{logging_mixin.py:112}} INFO - [2022-09-15 13:23:16,654] {{base.py:767}} INFO - Leaving consumer group (my-group).
[2022-09-15 13:23:16,661] {{logging_mixin.py:112}} INFO - [2022-09-15 13:23:16,661] {{conn.py:811}} INFO - <BrokerConnection node_id=1 host=kafka:9092 <connected> [IPv4 ('172.19.0.6', 9092)]>: Closing connection. 
[2022-09-15 13:23:16,662] {{python_operator.py:114}} INFO - Done. Returned value was: None
[2022-09-15 13:23:16,670] {{taskinstance.py:1048}} INFO - Marking task as SUCCESS.dag_id=crawl_listings, task_id=start_crawling_task2, execution_date=20220914T000000, start_date=20220915T132311, end_date=20220915T132316
[2022-09-15 13:23:21,700] {{logging_mixin.py:112}} INFO - [2022-09-15 13:23:21,700] {{local_task_job.py:103}} INFO - Task exited with return code 0
1mrurvl1

1mrurvl11#

您复制的日志正好有2个分区分配给该使用者

[2022-09-15 13:23:16,647] {{logging_mixin.py:112}} INFO - [2022-09-15 13:23:16,647] {{subscription_state.py:257}} INFO - Updated partition assignment: [TopicPartition(topic='TopicA', partition=2), TopicPartition(topic='TopicA', partition=3)]
[2022-09-15 13:23:16,648] {{logging_mixin.py:112}} INFO - [2022-09-15 13:23:16,648] {{consumer.py:239}} INFO - Setting newly assigned partitions {TopicPartition(topic='TopicA', partition=2), TopicPartition(topic='TopicA', partition=3)} for group my-group

如果为另一个使用者分配了分区0和1,但不处理数据,则这些分区中可能没有数据

相关问题