kafka流协同分区与交互式查询

5cg8jx4n  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(297)

我有以下拓扑结构:

topology.addSource(WS_CONNECTION_SOURCE, new StringDeserializer(), new WebSocketConnectionEventDeserializer()
            , utilService.getTopicByType(TopicType.CONNECTION_EVENTS_TOPIC))
            .addProcessor(SESSION_PROCESSOR, WSUserSessionProcessor::new, WS_CONNECTION_SOURCE)
            .addStateStore(sessionStoreBuilder, SESSION_PROCESSOR)
            .addSink(WS_STATUS_SINK, utilService.getTopicByType(TopicType.ONLINE_STATUS_TOPIC),
                    stringSerializer, stringSerializer
                    , SESSION_PROCESSOR)

            //WS session routing
            .addSource(WS_NOTIFICATIONS_SOURCE, new StringDeserializer(), new StringDeserializer(),
                    utilService.getTopicByType(TopicType.NOTIFICATION_TOPIC))
            .addProcessor(WS_NOTIFICATIONS_ROUTE_PROCESSOR, SessionRoutingEventGenerator::new,
                    WS_NOTIFICATIONS_SOURCE)
            .addSink(WS_NOTIFICATIONS_DELIVERY_SINK, new NodeTopicNameExtractor(), WS_NOTIFICATIONS_ROUTE_PROCESSOR)
            .addStateStore(userConnectedNodesStoreBuilder, WS_NOTIFICATIONS_ROUTE_PROCESSOR, SESSION_PROCESSOR);

如您所见,有两个源主题。状态存储是从第一个主题构建的,第二个流读取状态存储。当我启动拓扑时,我看到那些流线程被分配了两个源主题的相同分区(共分区)。我假设这是因为状态存储是由第二个主题流访问的。
这在功能上工作得很好。但是有一个性能问题。当更新状态存储的第一个源主题的输入数据量激增时,第二个主题处理就会延迟。
对我来说,第二个主题应该尽快处理。延迟处理第一个主题是可以的。
我正在考虑以下策略:

Current configuration:
     WS_CONNECTION_SOURCE - 30 partitions
     WS_NOTIFICATIONS_SOURCE - 30 partitions
     streamThreads: 10
     appInstances: 3 

New configuration:
    WS_CONNECTION_SOURCE - 15 partitions
    WS_NOTIFICATIONS_SOURCE - 30 partitions
    streamThreads: 10
    appInstances: 3
    Since there is no co-partitioning, tasks has to use interactive query to access store

这个想法是在10个线程中,5个线程只处理第二个主题,这可以在第一个主题出现激增时缓解当前的问题。
以下是我的问题:

1. Is this strategy correct? To avoid co-partitioning and use interactive query
2. Is there a chance that Kafka will assign 10 partitions of WS_CONNECTION_SOURCE 
   to one instance since there are 10 threads and one instance won't get any?
3. Is there any better approach to solve the performance problem?
neekobn8

neekobn81#

状态存储和交互式查询是kafka流抽象。要使用交互式查询,您必须定义状态存储(使用kafka streams api),并强制您为输入主题使用相同数量的分区。我认为你的解决办法行不通。交互式查询用于公开在kafka流之外查询状态存储的能力(不用于处理器api内的访问)
也许您可以查看会话处理器源代码,从其他拓扑中提取更多要处理的工作,并将结果发布到中间主题,然后基于该主题构建状态存储。
另外:
目前kafka streams不支持输入主题的优先级排序。关于源主题的优先级有kip:kip-349。不幸的是,链接的jira门票被关闭,因为无法修复(https://issues.apache.org/jira/browse/kafka-6690)

相关问题