org.apache.spark.streaming.kafka.KafkaUtils.createStream()方法的使用及代码示例

x33g5p2x  于2022-01-23 转载在 其他  
字(6.9k)|赞(0)|评价(0)|浏览(418)

本文整理了Java中org.apache.spark.streaming.kafka.KafkaUtils.createStream()方法的一些代码示例,展示了KafkaUtils.createStream()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KafkaUtils.createStream()方法的具体详情如下:
包路径:org.apache.spark.streaming.kafka.KafkaUtils
类名称:KafkaUtils
方法名:createStream

KafkaUtils.createStream介绍

暂无

代码示例

代码示例来源:origin: databricks/learning-spark

public static void main(String[] args) throws Exception {
  String zkQuorum = args[0];
  String group = args[1];
  SparkConf conf = new SparkConf().setAppName("KafkaInput");
  // Create a StreamingContext with a 1 second batch size
  JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(1000));
  Map<String, Integer> topics = new HashMap<String, Integer>();
  topics.put("pandas", 1);
  JavaPairDStream<String, String> input = KafkaUtils.createStream(jssc, zkQuorum, group, topics);
  input.print();
  // start our streaming context and wait for it to "finish"
  jssc.start();
  // Wait for 10 seconds then exit. To run forever call without a timeout
  jssc.awaitTermination(10000);
  // Stop the streaming context
  jssc.stop();
  }
}

代码示例来源:origin: jetoile/hadoop-unit

public void run() {
    Map<String, Integer> topicMap = new HashMap<>();
    topicMap.put(topic, 1);

    JavaPairReceiverInputDStream<String, String> stream = KafkaUtils.createStream(
        scc,
        zkString,
        "groupId",
        topicMap);

    JavaDStream<String> messages = stream.map(r -> r._2());
    messages.foreachRDD(r -> {
        System.out.println("========================");
        System.out.println(r);
      });
  }
}

代码示例来源:origin: gwenshap/kafka-examples

KafkaUtils.createStream(ssc, args[0], args[1], topicMap);

代码示例来源:origin: wankunde/logcount

JavaPairReceiverInputDStream<String, String> logstream = KafkaUtils.createStream(ssc,
    "10.10.102.191:2181,10.10.102.192:2181,10.10.102.193:2181", "recsys_group1", topicMap);

代码示例来源:origin: Stratio/Decision

private void configureDataContext(JavaStreamingContext context) {
  Map<String, Integer> baseTopicMap = new HashMap<>();
  configurationContext.getDataTopics().forEach( dataTopic -> baseTopicMap.put(dataTopic, 1));
  kafkaTopicService.createTopicsIfNotExist(configurationContext.getDataTopics(), configurationContext
      .getKafkaReplicationFactor(), configurationContext.getKafkaPartitions());
  HashMap<String, String> kafkaParams = new HashMap<>();
  kafkaParams.put("zookeeper.connect", configurationContext.getZookeeperHostsQuorumWithPath());
  kafkaParams.put("group.id", configurationContext.getGroupId());
   /*
   groupId must be the cluster groupId. Kafka assigns each partition of a topic to one, and one only, consumer of
   the group.
   Decision topics has only one partition (by default), so if we have two o more decision instances (consumers) reading the
   same topic with the same groupId, only one instance will be able to read from the topic
   */
  JavaPairDStream<String, byte[]> messages = KafkaUtils.createStream(context, String.class, byte[].class,
      kafka.serializer.StringDecoder.class, kafka.serializer.DefaultDecoder.class, kafkaParams, baseTopicMap,
      StorageLevel.MEMORY_AND_DISK_SER());
  AvroDeserializeMessageFunction avroDeserializeMessageFunction = new AvroDeserializeMessageFunction();
  JavaDStream<StratioStreamingMessage>  insertRequests = messages.filter(
      new FilterAvroMessagesByOperationFunction(STREAM_OPERATIONS.MANIPULATION.INSERT))
      .map(avroDeserializeMessageFunction);
  InsertIntoStreamFunction insertIntoStreamFunction = new InsertIntoStreamFunction(streamOperationService,
      configurationContext.getZookeeperHostsQuorum());
  insertRequests.foreachRDD(insertIntoStreamFunction);
}

代码示例来源:origin: zhang637/kafka_spark_hbase_demo

Map<String, Integer> topicMap = Maps.newHashMap();
topicMap.put("recsys", 3);
JavaPairReceiverInputDStream<String, String> logstream = KafkaUtils.createStream(ssc,
    "hdp1:2181", "recsys_group0", topicMap);
logstream.print();

代码示例来源:origin: org.apache.spark/spark-streaming-kafka-0-8

kafkaParams.put("auto.offset.reset", "smallest");
JavaPairDStream<String, String> stream = KafkaUtils.createStream(ssc,
 String.class,
 String.class,

代码示例来源:origin: mvalleavila/Kafka-Spark-Hbase-Example

JavaPairDStream<String, String> messages = KafkaUtils.createStream(sc, args[1], args[2], topicMap);

代码示例来源:origin: org.apache.spark/spark-streaming-kafka_2.10

kafkaParams.put("auto.offset.reset", "smallest");
JavaPairDStream<String, String> stream = KafkaUtils.createStream(ssc,
 String.class,
 String.class,

代码示例来源:origin: org.apache.spark/spark-streaming-kafka_2.11

kafkaParams.put("auto.offset.reset", "smallest");
JavaPairDStream<String, String> stream = KafkaUtils.createStream(ssc,
 String.class,
 String.class,

代码示例来源:origin: org.apache.spark/spark-streaming-kafka

kafkaParams.put("auto.offset.reset", "smallest");
JavaPairDStream<String, String> stream = KafkaUtils.createStream(ssc,
 String.class,
 String.class,

代码示例来源:origin: org.apache.spark/spark-streaming-kafka-0-8_2.11

kafkaParams.put("auto.offset.reset", "smallest");
JavaPairDStream<String, String> stream = KafkaUtils.createStream(ssc,
 String.class,
 String.class,

代码示例来源:origin: Stratio/Decision

JavaPairDStream<String, byte[]> messages = KafkaUtils.createStream(context, String.class, byte[].class,
    kafka.serializer.StringDecoder.class, kafka.serializer.DefaultDecoder.class, kafkaParams, baseTopicMap,
    StorageLevel.MEMORY_AND_DISK_SER());

代码示例来源:origin: XavientInformationSystems/Data-Ingestion-Platform

public static void main(String[] args) throws DataIngestException {
  CmdLineParser cmdLineParser = new CmdLineParser();
  final AppArgs appArgs = cmdLineParser.validateArgs(args);
  
  System.setProperty("HADOOP_USER_NAME", appArgs.getProperty(DiPConfiguration.HADOOP_USER_NAME));
  SparkConf conf = new SparkConf().setAppName("SparkTwitterStreaming")
      .setMaster("local[*]");
  try (JavaStreamingContext jsc = new JavaStreamingContext(new JavaSparkContext(conf), new Duration(1000))) {
    JavaPairReceiverInputDStream<String, String> stream = KafkaUtils.createStream(jsc,
        appArgs.getProperty(DiPConfiguration.ZK_HOST)+":"+appArgs.getProperty(DiPConfiguration.ZK_PORT), "spark-stream", getKafkaTopics(appArgs));
    JavaDStream<Object[]> twitterStreams = stream.map(tuple -> FlatJsonConverter.convertToValuesArray(tuple._2))
        .cache();
    
    SparkHdfsWriter.write(twitterStreams, appArgs);
    new SparkHBaseWriter(jsc.sparkContext(), appArgs).write(twitterStreams);
    SparkJdbcSourceWriter jdbcSourceWriter = new SparkJdbcSourceWriter(new SQLContext(jsc.sparkContext()),
        appArgs);
    new TopNLocationByTweets(jdbcSourceWriter,Integer.valueOf(appArgs.getProperty("topN"))).compute(twitterStreams);
    new TopNUsersWithMaxFollowers(jdbcSourceWriter,Integer.valueOf(appArgs.getProperty("topN"))).compute(twitterStreams);
    jsc.start();
    jsc.awaitTermination();
  }
}

代码示例来源:origin: Stratio/Decision

JavaPairDStream<String, String> messages = KafkaUtils.createStream(context,
    configurationContext.getZookeeperHostsQuorumWithPath(), configurationContext.getGroupId(), baseTopicMap);
messages.cache();

相关文章

微信公众号

最新文章

更多