kafka和strom-clojure实现

cbjzeqam  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(353)

这是第一次实现流处理基础设施,我的毒药是storm 1.0.1、kafka 0.9.0和clojure 1.5。
现在我有了使用消息传递系统(rabbitmq)的背景知识,我喜欢它有几个原因。
易于安装和维护
漂亮的前端门户网站
持久消息状态被维护,在这里我可以启动一个消费者,它知道哪些消息没有被消费。i、 e.“正好一次”
但是它不能达到我想要的吞吐量。
现在已经经历了Kafka,它很大程度上依赖于手动维护偏移量(内部在Kafka代理、zookeper或外部)
我终于设法在clojure创建了一个喷口,源头是Kafka经纪人,这是一场噩梦。
现在像大多数场景一样,我想要的是“一次准确的消息传递”,并且按照Kafka的文档状态
因此,kafka有效地保证了默认情况下至少一次传递,并允许用户通过禁用生产者上的重试并在处理一批消息之前提交其偏移量来实现最多一次传递。只有一次交付需要与目标存储系统合作,但kafka提供了补偿,这使得实现这一点变得简单。
这对于clojure kafka来说意味着什么,很难概念化。
我可能有几个博尔茨沿途,但终点是postgres集群。我是否要将偏移量存储在数据库中(听起来像是等待发生的竞争危险),并在我的storm集群初始化时从postgres获取偏移量?
也有任何危险设置我的平行Kafka喷口的数字大于1?
我通常以此为出发点,因为很多东西的例子在clojure中是不可用的。对我使用的版本进行了一些小的调整(我的信息并不像我期望的那样,但至少我能看到)

(def ^{:private true
   :doc "kafka spout config definition"}
   spout-config (let [cfg (SpoutConfig. (ZkHosts. "127.0.0.1:2181") "test" "/broker"  (.toString (UUID/randomUUID)))]
             ;;(set! (. cfg scheme) (StringScheme.)) depricated
             (set! (. cfg scheme) (SchemeAsMultiScheme. (StringScheme.)))                
             ;;(.forceStartOffsetTime cfg -2) 
             cfg))

 (defn mk-topology []
 (topology
  {;;"1" (spout-spec sentence-spout)
    "1" (spout-spec my-kafka-spout :p 1)
    "2" (spout-spec (sentence-spout-parameterized
                 ["the cat jumped over the door"
                  "greetings from a faraway land"])
                 :p 2)}
   {"3" (bolt-spec {"1" :shuffle}
               split-sentence
               :p 5)
    "4" (bolt-spec {"3" ["word"]}
               word-count
               :p 1)}))
vc6uscn9

vc6uscn91#

在任何分布式系统中,都不可能保证要完成的部分工作只工作一次。在某个点上,某些东西会失败,它需要重试(这称为“至少一次”处理)或不重试(这称为“最多一次”处理),尽管您不能完全处于中间并得到“完全一次”处理。你能得到的是非常接近一次处理。
诀窍是,在你的过程结束时,扔掉第二个副本,如果你发现工作做了两次。这就是索引的来源。将结果保存到数据库中时,请查看是否使用比已保存的索引更高的索引。如果你发现后面的工作已经存在,那就扔掉这个工作,不要保存它。至于文件,这种解释对做过很多次的人来说只是“向前看”。。。

相关问题