本文整理了Java中org.apache.spark.streaming.kafka.KafkaUtils.createStream()
方法的一些代码示例,展示了KafkaUtils.createStream()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。KafkaUtils.createStream()
方法的具体详情如下:
包路径:org.apache.spark.streaming.kafka.KafkaUtils
类名称:KafkaUtils
方法名:createStream
暂无
代码示例来源:origin: databricks/learning-spark
public static void main(String[] args) throws Exception {
String zkQuorum = args[0];
String group = args[1];
SparkConf conf = new SparkConf().setAppName("KafkaInput");
// Create a StreamingContext with a 1 second batch size
JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(1000));
Map<String, Integer> topics = new HashMap<String, Integer>();
topics.put("pandas", 1);
JavaPairDStream<String, String> input = KafkaUtils.createStream(jssc, zkQuorum, group, topics);
input.print();
// start our streaming context and wait for it to "finish"
jssc.start();
// Wait for 10 seconds then exit. To run forever call without a timeout
jssc.awaitTermination(10000);
// Stop the streaming context
jssc.stop();
}
}
代码示例来源:origin: jetoile/hadoop-unit
public void run() {
Map<String, Integer> topicMap = new HashMap<>();
topicMap.put(topic, 1);
JavaPairReceiverInputDStream<String, String> stream = KafkaUtils.createStream(
scc,
zkString,
"groupId",
topicMap);
JavaDStream<String> messages = stream.map(r -> r._2());
messages.foreachRDD(r -> {
System.out.println("========================");
System.out.println(r);
});
}
}
代码示例来源:origin: gwenshap/kafka-examples
KafkaUtils.createStream(ssc, args[0], args[1], topicMap);
代码示例来源:origin: wankunde/logcount
JavaPairReceiverInputDStream<String, String> logstream = KafkaUtils.createStream(ssc,
"10.10.102.191:2181,10.10.102.192:2181,10.10.102.193:2181", "recsys_group1", topicMap);
代码示例来源:origin: Stratio/Decision
private void configureDataContext(JavaStreamingContext context) {
Map<String, Integer> baseTopicMap = new HashMap<>();
configurationContext.getDataTopics().forEach( dataTopic -> baseTopicMap.put(dataTopic, 1));
kafkaTopicService.createTopicsIfNotExist(configurationContext.getDataTopics(), configurationContext
.getKafkaReplicationFactor(), configurationContext.getKafkaPartitions());
HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("zookeeper.connect", configurationContext.getZookeeperHostsQuorumWithPath());
kafkaParams.put("group.id", configurationContext.getGroupId());
/*
groupId must be the cluster groupId. Kafka assigns each partition of a topic to one, and one only, consumer of
the group.
Decision topics has only one partition (by default), so if we have two o more decision instances (consumers) reading the
same topic with the same groupId, only one instance will be able to read from the topic
*/
JavaPairDStream<String, byte[]> messages = KafkaUtils.createStream(context, String.class, byte[].class,
kafka.serializer.StringDecoder.class, kafka.serializer.DefaultDecoder.class, kafkaParams, baseTopicMap,
StorageLevel.MEMORY_AND_DISK_SER());
AvroDeserializeMessageFunction avroDeserializeMessageFunction = new AvroDeserializeMessageFunction();
JavaDStream<StratioStreamingMessage> insertRequests = messages.filter(
new FilterAvroMessagesByOperationFunction(STREAM_OPERATIONS.MANIPULATION.INSERT))
.map(avroDeserializeMessageFunction);
InsertIntoStreamFunction insertIntoStreamFunction = new InsertIntoStreamFunction(streamOperationService,
configurationContext.getZookeeperHostsQuorum());
insertRequests.foreachRDD(insertIntoStreamFunction);
}
代码示例来源:origin: zhang637/kafka_spark_hbase_demo
Map<String, Integer> topicMap = Maps.newHashMap();
topicMap.put("recsys", 3);
JavaPairReceiverInputDStream<String, String> logstream = KafkaUtils.createStream(ssc,
"hdp1:2181", "recsys_group0", topicMap);
logstream.print();
代码示例来源:origin: org.apache.spark/spark-streaming-kafka-0-8
kafkaParams.put("auto.offset.reset", "smallest");
JavaPairDStream<String, String> stream = KafkaUtils.createStream(ssc,
String.class,
String.class,
代码示例来源:origin: mvalleavila/Kafka-Spark-Hbase-Example
JavaPairDStream<String, String> messages = KafkaUtils.createStream(sc, args[1], args[2], topicMap);
代码示例来源:origin: org.apache.spark/spark-streaming-kafka_2.10
kafkaParams.put("auto.offset.reset", "smallest");
JavaPairDStream<String, String> stream = KafkaUtils.createStream(ssc,
String.class,
String.class,
代码示例来源:origin: org.apache.spark/spark-streaming-kafka_2.11
kafkaParams.put("auto.offset.reset", "smallest");
JavaPairDStream<String, String> stream = KafkaUtils.createStream(ssc,
String.class,
String.class,
代码示例来源:origin: org.apache.spark/spark-streaming-kafka
kafkaParams.put("auto.offset.reset", "smallest");
JavaPairDStream<String, String> stream = KafkaUtils.createStream(ssc,
String.class,
String.class,
代码示例来源:origin: org.apache.spark/spark-streaming-kafka-0-8_2.11
kafkaParams.put("auto.offset.reset", "smallest");
JavaPairDStream<String, String> stream = KafkaUtils.createStream(ssc,
String.class,
String.class,
代码示例来源:origin: Stratio/Decision
JavaPairDStream<String, byte[]> messages = KafkaUtils.createStream(context, String.class, byte[].class,
kafka.serializer.StringDecoder.class, kafka.serializer.DefaultDecoder.class, kafkaParams, baseTopicMap,
StorageLevel.MEMORY_AND_DISK_SER());
代码示例来源:origin: XavientInformationSystems/Data-Ingestion-Platform
public static void main(String[] args) throws DataIngestException {
CmdLineParser cmdLineParser = new CmdLineParser();
final AppArgs appArgs = cmdLineParser.validateArgs(args);
System.setProperty("HADOOP_USER_NAME", appArgs.getProperty(DiPConfiguration.HADOOP_USER_NAME));
SparkConf conf = new SparkConf().setAppName("SparkTwitterStreaming")
.setMaster("local[*]");
try (JavaStreamingContext jsc = new JavaStreamingContext(new JavaSparkContext(conf), new Duration(1000))) {
JavaPairReceiverInputDStream<String, String> stream = KafkaUtils.createStream(jsc,
appArgs.getProperty(DiPConfiguration.ZK_HOST)+":"+appArgs.getProperty(DiPConfiguration.ZK_PORT), "spark-stream", getKafkaTopics(appArgs));
JavaDStream<Object[]> twitterStreams = stream.map(tuple -> FlatJsonConverter.convertToValuesArray(tuple._2))
.cache();
SparkHdfsWriter.write(twitterStreams, appArgs);
new SparkHBaseWriter(jsc.sparkContext(), appArgs).write(twitterStreams);
SparkJdbcSourceWriter jdbcSourceWriter = new SparkJdbcSourceWriter(new SQLContext(jsc.sparkContext()),
appArgs);
new TopNLocationByTweets(jdbcSourceWriter,Integer.valueOf(appArgs.getProperty("topN"))).compute(twitterStreams);
new TopNUsersWithMaxFollowers(jdbcSourceWriter,Integer.valueOf(appArgs.getProperty("topN"))).compute(twitterStreams);
jsc.start();
jsc.awaitTermination();
}
}
代码示例来源:origin: Stratio/Decision
JavaPairDStream<String, String> messages = KafkaUtils.createStream(context,
configurationContext.getZookeeperHostsQuorumWithPath(), configurationContext.getGroupId(), baseTopicMap);
messages.cache();
内容来源于网络,如有侵权,请联系作者删除!