ApacheSamza的检查点工具不会泄露分区偏移量

ercv8c1e  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(192)

我正在尝试用checkpoint工具倒带我的一个samza作业的输入提要,如这里和这里所述。出于某种原因,checkpoint工具不会像承诺的那样输出偏移量,但是我知道一个事实,作业已经消耗了来自所讨论分区的多条消息。
这是checkpoint工具提供的输出的截断版本:

2015-06-11 16:31:04 ZkClient [INFO] zookeeper state changed (SyncConnected)
2015-06-11 16:31:04 ZkEventThread [INFO] Terminate ZkClient event thread.
2015-06-11 16:31:04 ZooKeeper [INFO] Session: 0x14de25b502e01b4 closed
2015-06-11 16:31:04 ClientCnxn [INFO] EventThread shut down
2015-06-11 16:31:04 KafkaCheckpointManager [INFO] Checkpoint topic __samza_checkpoint_ver_1_for_test-job1_1 already exists.
2015-06-11 16:31:04 KafkaCheckpointManager [INFO] Validating checkpoint topic __samza_checkpoint_ver_1_for_test-job1_1.
2015-06-11 16:31:04 KafkaCheckpointManager [INFO] Successfully validated checkpoint topic __samza_checkpoint_ver_1_for_test-job1_1.
2015-06-11 16:31:04 KafkaCheckpointManager [INFO] Reading checkpoint for taskName Partition 0
2015-06-11 16:31:04 KafkaCheckpointManager [INFO] No TaskName to checkpoint mapping provided.  Reading for first time.
2015-06-11 16:31:04 KafkaCheckpointManager [INFO] Connecting to leader pavels-mbp.it.local:9092 for topic __samza_checkpoint_ver_1_for_test-job1_1 and to fetch all checkpoint messages.
2015-06-11 16:31:04 KafkaCheckpointManager [INFO] Got offset 0 for topic __samza_checkpoint_ver_1_for_test-job1_1 and partition 0. Attempting to fetch messages for checkpoint log.
2015-06-11 16:31:04 KafkaCheckpointManager [INFO] Get latest offset 31 for topic __samza_checkpoint_ver_1_for_test-job1_1 and partition 0.
2015-06-11 16:31:04 KafkaCheckpointManager [INFO] Got checkpoint state for taskName Partition 0: Checkpoint [offsets={}]

这是我的 test_job.properties 文件:


# Job

job.factory.class=org.apache.samza.job.local.ThreadJobFactory
job.name=test-job1

# Task

task.class=com.xim.test.TestTaskClass
task.inputs=kafka.EnergyPurchaseEvent

systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.consumer.zookeeper.connect=localhost:2181/
systems.kafka.producer.bootstrap.servers=localhost:9092

task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
task.checkpoint.system=kafka
task.checkpoint.replication.factor=1

如您所见,检查点已启用。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题