我的用例是,我使用Kafka生产者(Kafka客户端)创建一个函数,该函数从rest API端点接收数据,然后将其发送到Kafka集群。端点可以具有非常高的负载。
我最初的解决方案如下:在包含处理传入rest API终结点的方法的类中,我创建了一个共享的Kafka生成器,每当调用该终结点时,Kafka生成器函数都会使用它。this thread的第二点也提到了这种情况。
尽管Kafka生产者是thread safe,但我不确定这个单一的生产者示例是否能够处理大量输入。
因此,我想尝试另一种方法,suggested by Azure:
事件服务器还使用多个滑动队列来控制来自客户端的未完成请求的数量。新请求被排队到事件服务器示例中的多个队列之一,然后由多个并行Kafka生成器线程处理。每个线程示例化一个生产者。滑动队列的数量由线程池大小控制。
我不太精通并发编程。我的问题是如何创建线程,每个线程示例化一个Kafka生产者。我用的是Scala。
暂无答案!
目前还没有任何答案,快来回答吧!