kafka在第一秒内生成消息的速度很慢

5ssjco0h  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(256)

我和Kafka合作,我做了一个这样的制片人:

synchronized (obj) {

        while (true){

            long start = Instant.now().toEpochMilli();
            for (int i=0; i< NUM_MSG_SEC  ; i++)  
            {

                PriceStreamingData data = PriceStreamingData.newBuilder()
                        .setUser(getRequest().getUser())
                        .setSecurity(getRequest().getSecurity())
                        .setTimestamp(Instant.now().toEpochMilli())
                        .setPrice(new Random().nextDouble()*200)
                        .build();

                record = new ProducerRecord<>(topic, keyBuilder.build(data), 
                        data);

                producer.send(record,new Callback(){
                    @Override
                    public void onCompletion(RecordMetadata arg0, Exception arg1) {
                        counter.incrementAndGet();
                        if(arg1 != null){
                            arg1.printStackTrace();
                        }

                    } 
                });

            }
            long diffCiclo = Instant.now().toEpochMilli() - start;
            long diff = Instant.now().toEpochMilli() - startTime;

            System.out.println("Number of sent: " + counter.get() +  
                    " Millisecond:" + (diff) + " - NumberOfSent/Diff(K): " + counter.get()/diff );

            try {
                if(diffCiclo >= 1000){
                    System.out.println("over 1 second: "  + diffCiclo);

                }
                else {
                    obj.wait( 1000 - diffCiclo );

                }
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

        }
    }

正如您所看到的,它非常简单,只需创建一条新消息并发送它。如果我看到日志:

NumberOfSent/Diff(K)

在前10秒,它的表现非常糟糕

30k per second

60秒后我

180k  per second

为什么?我怎样才能开始这个过程,已经达到180k了?
我的Kafka制片人配置如下

Async producer ( but also with sync producer the situation dose not change)
  ACKS_CONFIG = 0
  BATCH_SIZE_CONFIG = 20000 
  COMPRESSION_TYPE_CONFIG = none
  LINGER_MS_CONFIG = 0

最后一个细节:

NUM_MSG_SEC is set to 200000 or bigger number
zy1mlcev

zy1mlcev1#

我自己找到了解决办法,希望这篇文章也能对其他人有用。
问题就在眼前

ProducerConfig.BATCH_SIZE_CONFIG

以及

ProducerConfig.LINGER_MS_CONFIG

我的参数是20000和0,为了解决这个问题,我将它们设置为更高的值200000和1000。最后,我用以下参数启动了jvm:

-XX:MinMetaspaceFreeRatio=100
-XX:MaxMetaspaceFreeRatio=100

因为我看到将元空间设置为合适的值需要更长的时间。
现在生产商开始直接在140k和1秒已经是180k。

相关问题