kafka.consumer.Consumer.createJavaConsumerConnector()方法的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(11.3k)|赞(0)|评价(0)|浏览(421)

本文整理了Java中kafka.consumer.Consumer.createJavaConsumerConnector()方法的一些代码示例,展示了Consumer.createJavaConsumerConnector()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Consumer.createJavaConsumerConnector()方法的具体详情如下:
包路径:kafka.consumer.Consumer
类名称:Consumer
方法名:createJavaConsumerConnector

Consumer.createJavaConsumerConnector介绍

暂无

代码示例

代码示例来源:origin: apache/incubator-gobblin

protected ConsumerConnector createConsumerConnector() {
 return Consumer.createJavaConsumerConnector(this.consumerConfig);
}

代码示例来源:origin: apache/incubator-gobblin

public SimpleKafkaConsumer(Properties props, KafkaCheckpoint checkpoint)
{
 Config config = ConfigFactory.parseProperties(props);
 topic = config.getString("topic");
 String zkConnect = config.getString("zookeeper.connect");
 schemaRegistry = KafkaSchemaRegistryFactory.getSchemaRegistry(props);
 deserializer = new LiAvroDeserializer(schemaRegistry);
 /** TODO: Make Confluent schema registry integration configurable
  * HashMap<String, String> avroSerDeConfig = new HashMap<>();
  * avroSerDeConfig.put("schema.registry.url", "http://localhost:8081");
  * deserializer = new io.confluent.kafka.serializers.KafkaAvroDeserializer();
  * deserializer.configure(avroSerDeConfig, false);
  *
  **/
 Properties consumeProps = new Properties();
 consumeProps.put("zookeeper.connect", zkConnect);
 consumeProps.put("group.id", "gobblin-tool-" + System.nanoTime());
 consumeProps.put("zookeeper.session.timeout.ms", "10000");
 consumeProps.put("zookeeper.sync.time.ms", "10000");
 consumeProps.put("auto.commit.interval.ms", "10000");
 consumeProps.put("auto.offset.reset", "smallest");
 consumeProps.put("auto.commit.enable", "false");
 //consumeProps.put("consumer.timeout.ms", "10000");
 consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumeProps));
 Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(ImmutableMap.of(topic, 1));
 List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(this.topic);
 stream = streams.get(0);
 iterator = stream.iterator();
}

代码示例来源:origin: pinterest/secor

@Override
public void init(SecorConfig config) throws UnknownHostException {
  this.mConfig = config;
  mConsumerConnector = Consumer.createJavaConsumerConnector(createConsumerConfig());
  if (!mConfig.getKafkaTopicBlacklist().isEmpty() && !mConfig.getKafkaTopicFilter().isEmpty()) {
    throw new RuntimeException("Topic filter and blacklist cannot be both specified.");
  }
  TopicFilter topicFilter = !mConfig.getKafkaTopicBlacklist().isEmpty() ? new Blacklist(mConfig.getKafkaTopicBlacklist()) :
      new Whitelist(mConfig.getKafkaTopicFilter());
  LOG.debug("Use TopicFilter {}({})", topicFilter.getClass(), topicFilter);
  List<KafkaStream<byte[], byte[]>> streams =
      mConsumerConnector.createMessageStreamsByFilter(topicFilter);
  KafkaStream<byte[], byte[]> stream = streams.get(0);
  mIterator = stream.iterator();
  mKafkaMessageTimestampFactory = new KafkaMessageTimestampFactory(mConfig.getKafkaMessageTimestampClass());
}

代码示例来源:origin: Graylog2/graylog2-server

cc = Consumer.createJavaConsumerConnector(consumerConfig);

代码示例来源:origin: apache/incubator-gobblin

KafkaConsumerSuite(String zkConnectString, String topic)
{
 _topic = topic;
 Properties consumeProps = new Properties();
 consumeProps.put("zookeeper.connect", zkConnectString);
 consumeProps.put("group.id", _topic+"-"+System.nanoTime());
 consumeProps.put("zookeeper.session.timeout.ms", "10000");
 consumeProps.put("zookeeper.sync.time.ms", "10000");
 consumeProps.put("auto.commit.interval.ms", "10000");
 consumeProps.put("_consumer.timeout.ms", "10000");
 _consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumeProps));
 Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
   _consumer.createMessageStreams(ImmutableMap.of(this._topic, 1));
 List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(this._topic);
 _stream = streams.get(0);
 _iterator = _stream.iterator();
}

代码示例来源:origin: apache/incubator-gobblin

KafkaConsumerSuite(String zkConnectString, String topic)
{
 _topic = topic;
 Properties consumeProps = new Properties();
 consumeProps.put("zookeeper.connect", zkConnectString);
 consumeProps.put("group.id", _topic+"-"+System.nanoTime());
 consumeProps.put("zookeeper.session.timeout.ms", "10000");
 consumeProps.put("zookeeper.sync.time.ms", "10000");
 consumeProps.put("auto.commit.interval.ms", "10000");
 consumeProps.put("_consumer.timeout.ms", "10000");
 _consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumeProps));
 Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
   _consumer.createMessageStreams(ImmutableMap.of(this._topic, 1));
 List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(this._topic);
 _stream = streams.get(0);
 _iterator = _stream.iterator();
}

代码示例来源:origin: apache/incubator-pinot

kafka.consumer.Consumer.createJavaConsumerConnector(kafkaHighLevelStreamConfig.getKafkaConsumerConfig());

代码示例来源:origin: apache/incubator-druid

final ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps));

代码示例来源:origin: apache/incubator-gobblin

public KafkaTestBase(String topic) throws InterruptedException, RuntimeException {
 startServer();
 this.topic = topic;
 AdminUtils.createTopic(zkClient, topic, 1, 1, new Properties());
 List<KafkaServer> servers = new ArrayList<>();
 servers.add(kafkaServer);
 TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, 0, 5000);
 Properties consumeProps = new Properties();
 consumeProps.put("zookeeper.connect", zkConnect);
 consumeProps.put("group.id", "testConsumer");
 consumeProps.put("zookeeper.session.timeout.ms", "10000");
 consumeProps.put("zookeeper.sync.time.ms", "10000");
 consumeProps.put("auto.commit.interval.ms", "10000");
 consumeProps.put("consumer.timeout.ms", "10000");
 consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumeProps));
 Map<String, Integer> topicCountMap = new HashMap<>();
 topicCountMap.put(this.topic, 1);
 Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
 List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(this.topic);
 stream = streams.get(0);
 iterator = stream.iterator();
}

代码示例来源:origin: apache/nifi

consumer = Consumer.createJavaConsumerConnector(consumerConfig);

代码示例来源:origin: uber/chaperone

private void init() {
 // register kafka offset lag metrics, one Gauge is for per consumer level granularity
 MetricRegistry registry = Metrics.getRegistry();
 try {
  fetchedMsgCounter = registry.meter("kafkaIngesterConsumer." + this.getName() + "-msgFetchRate");
  failedToIngestCounter = registry.meter("kafkaIngesterConsumer." + this.getName() + "-failedToIngest");
  kafkaOffsetLagGauge =
    registry.register("kafkaIngesterConsumer." + this.getName() + "-kafkaOffsetLag", new JmxAttributeGauge(
      new ObjectName(maxLagMetricName), "Value"));
 } catch (MalformedObjectNameException | IllegalArgumentException e) {
  logger.error("Register failure for metrics of KafkaIngesterConsumer", e);
 }
 TopicFilter topicFilter = new Whitelist(AuditConfig.AUDIT_TOPIC_NAME);
 logger.info("{}: Topic filter is {}", getName(), AuditConfig.AUDIT_TOPIC_NAME);
 this.consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());
 KafkaStream<byte[], byte[]> stream = consumer.createMessageStreamsByFilter(topicFilter, 1).get(0);
 iterator = stream.iterator();
 logger.info("KafkaIngesterConsumer thread {} is initialized successfully", getName());
 if (AuditConfig.INGESTER_ENABLE_DEDUP) {
  deduplicator =
    new Deduplicator(threadId, AuditConfig.INGESTER_REDIS_HOST, AuditConfig.INGESTER_REDIS_PORT,
      AuditConfig.INGESTER_REDIS_KEY_TTL_SEC, AuditConfig.INGESTER_DUP_HOST_PREFIX,
      AuditConfig.INGESTER_HOSTS_WITH_DUP);
  deduplicator.open();
 } else {
  deduplicator = null;
 }
}

代码示例来源:origin: io.zipkin.zipkin2/zipkin-collector-kafka08

ZookeeperConsumerConnector get() {
 if (connector == null) {
  synchronized (this) {
   if (connector == null) {
    connector = (ZookeeperConsumerConnector) createJavaConsumerConnector(config);
   }
  }
 }
 return connector;
}

代码示例来源:origin: apache/incubator-edgent

private synchronized ConsumerConnector client() {
  if (consumer == null)
    consumer = Consumer.createJavaConsumerConnector(
                      createConsumerConfig());
  return consumer;
}

代码示例来源:origin: ningg/flume-ng-extends-source

public static ConsumerConnector getConsumer(Properties kafkaProps) {
 ConsumerConfig consumerConfig =
     new ConsumerConfig(kafkaProps);
 ConsumerConnector consumer =
     Consumer.createJavaConsumerConnector(consumerConfig);
 return consumer;
}

代码示例来源:origin: vakinge/jeesuite-libs

@SuppressWarnings("unchecked")
public OldApiTopicConsumer(ConsumerContext context) {
  super(context);
  try {
    Class<?> deserializerClass = Class.forName(context.getProperties().getProperty("value.deserializer"));
    deserializer = (Deserializer<Object>) deserializerClass.newInstance();
  } catch (Exception e) {}
  this.connector = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(context.getProperties()));
}

代码示例来源:origin: com.netflix.suro/suro-kafka

@Override
public void start() throws Exception {
  connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps));
  final Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(ImmutableMap.of(topic, 1));
  final List<KafkaStream<byte[], byte[]>> streamList = streams.get(topic);
  if (streamList == null || streamList.size() != 1) {
    throw new RuntimeException(topic + " is not valid");
  }
  stream = streamList.get(0).iterator();
  startTakingTraffic();
}

代码示例来源:origin: locationtech/geowave

private ConsumerConnector buildKafkaConsumer() {
 final Properties kafkaProperties = kafkaOptions.getProperties();
 final ConsumerConnector consumer =
   Consumer.createJavaConsumerConnector(new ConsumerConfig(kafkaProperties));
 return consumer;
}

代码示例来源:origin: com.github.hackerwin7/jlib-utils

/**
 * constructor with kafka conf
 * @param conf
 */
public KafkaHighConsumer(KafkaConf conf) {
  ConsumerConfig config = new ConsumerConfig(conf.getProps());
  consumer = Consumer.createJavaConsumerConnector(config);
  topic = conf.getProp(KafkaConf.HIGH_TOPIC);
}

代码示例来源:origin: apache/apex-malhar

public KafkaTestConsumer(String topic, String zkaddress)
{
 this.zkaddress = zkaddress;
 this.topic = topic;
 consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
}

代码示例来源:origin: mariamhakobyan/elasticsearch-river-kafka

public KafkaConsumer(final RiverConfig riverConfig) {
  consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(riverConfig));
  final Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
  topicCountMap.put(riverConfig.getTopic(), AMOUNT_OF_THREADS_PER_CONSUMER);
  final Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams =
      consumerConnector.createMessageStreams(topicCountMap);
  streams = consumerStreams.get(riverConfig.getTopic());
  logger.debug("Index: {}: Started kafka consumer for topic: {} with {} partitions in it.",
      riverConfig.getIndexName(), riverConfig.getTopic(), streams.size());
}

相关文章

微信公众号

最新文章

更多