本文整理了Java中org.apache.spark.streaming.Durations
类的一些代码示例,展示了Durations
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Durations
类的具体详情如下:
包路径:org.apache.spark.streaming.Durations
类名称:Durations
暂无
代码示例来源:origin: cloudera-labs/envelope
private static void initializeStreamingJob() {
int batchMilliseconds = INSTANCE.config.getInt(BATCH_MILLISECONDS_PROPERTY);
final Duration batchDuration = Durations.milliseconds(batchMilliseconds);
JavaStreamingContext jsc = new JavaStreamingContext(new JavaSparkContext(getSparkSession().sparkContext()),
batchDuration);
INSTANCE.jsc = jsc;
}
代码示例来源:origin: baghelamit/iot-traffic-monitor
/**
* Method to get window traffic counts of different type of vehicles for each route.
* Window duration = 30 seconds and Slide interval = 10 seconds
*
* @param filteredIotDataStream IoT data stream
*/
public void processWindowTrafficData(JavaDStream<IoTData> filteredIotDataStream) {
// reduce by key and window (30 sec window and 10 sec slide).
JavaPairDStream<AggregateKey, Long> countDStreamPair = filteredIotDataStream
.mapToPair(iot -> new Tuple2<>(new AggregateKey(iot.getRouteId(), iot.getVehicleType()), 1L))
.reduceByKeyAndWindow((a, b) -> a + b, Durations.seconds(30), Durations.seconds(10));
// Transform to dstream of TrafficData
JavaDStream<WindowTrafficData> trafficDStream = countDStreamPair.map(windowTrafficDataFunc);
// Map Cassandra table column
Map<String, String> columnNameMappings = new HashMap<String, String>();
columnNameMappings.put("routeId", "routeid");
columnNameMappings.put("vehicleType", "vehicletype");
columnNameMappings.put("totalCount", "totalcount");
columnNameMappings.put("timeStamp", "timestamp");
columnNameMappings.put("recordDate", "recorddate");
// call CassandraStreamingJavaUtil function to save in DB
javaFunctions(trafficDStream).writerBuilder("traffickeyspace", "window_traffic",
CassandraJavaUtil.mapToRow(WindowTrafficData.class, columnNameMappings)).saveToCassandra();
}
代码示例来源:origin: org.apache.spark/spark-streaming_2.10
@Test
public void testMinutes() {
Assert.assertEquals(new Duration(2 * 60 * 1000), Durations.minutes(2));
}
代码示例来源:origin: ebi-wp/kafka-streams-api-websockets
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(5));
代码示例来源:origin: org.apache.spark/spark-streaming-kafka_2.10
@Before
public void setUp() {
kafkaTestUtils = new KafkaTestUtils();
kafkaTestUtils.setup();
SparkConf sparkConf = new SparkConf()
.setMaster("local[4]").setAppName(this.getClass().getSimpleName());
ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200));
}
代码示例来源:origin: org.apache.spark/spark-streaming_2.11
@Test
public void testMinutes() {
Assert.assertEquals(new Duration(2 * 60 * 1000), Durations.minutes(2));
}
代码示例来源:origin: dibbhatt/kafka-spark-consumer
JavaStreamingContext jsc = new JavaStreamingContext(_sparkConf, Durations.seconds(30));
代码示例来源:origin: org.apache.spark/spark-streaming-kafka_2.11
@Before
public void setUp() {
kafkaTestUtils = new KafkaTestUtils();
kafkaTestUtils.setup();
SparkConf sparkConf = new SparkConf()
.setMaster("local[4]").setAppName(this.getClass().getSimpleName());
ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200));
}
代码示例来源:origin: nivdul/spark-in-practice
/**
* Load the data using TwitterUtils: we obtain a DStream of tweets
*
* More about TwitterUtils:
* https://spark.apache.org/docs/1.4.0/api/java/index.html?org/apache/spark/streaming/twitter/TwitterUtils.html
*/
public JavaDStream<Status> loadData() {
// create the spark configuration and spark context
SparkConf conf = new SparkConf()
.setAppName("Spark Streaming")
.set("spark.driver.allowMultipleContexts", "true")
.setMaster("local[*]");
// create a java streaming context and define the window (2 seconds batch)
jssc = new JavaStreamingContext(conf, Durations.seconds(2));
System.out.println("Initializing Twitter stream...");
// create a DStream (sequence of RDD). The object tweetsStream is a DStream of tweet statuses:
// - the Status class contains all information of a tweet
// See http://twitter4j.org/javadoc/twitter4j/Status.html
// and fill the keys and tokens in the Streamutils class!
JavaDStream<Status> tweetsStream = TwitterUtils.createStream(jssc, StreamUtils.getAuth());
return tweetsStream;
}
代码示例来源:origin: org.apache.spark/spark-streaming-kafka-0-10
@Before
public void setUp() {
kafkaTestUtils = new KafkaTestUtils();
kafkaTestUtils.setup();
SparkConf sparkConf = new SparkConf()
.setMaster("local[4]").setAppName(this.getClass().getSimpleName());
ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200));
}
代码示例来源:origin: jgperrin/net.jgp.labs.spark
private void start() {
// Create a local StreamingContext with two working thread and batch
// interval of
// 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName(
"NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations
.seconds(5));
JavaDStream<String> msgDataStream = jssc.textFileStream(StreamingUtils
.getInputDirectory());
msgDataStream.print();
jssc.start();
try {
jssc.awaitTermination();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
代码示例来源:origin: org.apache.spark/spark-streaming-kafka
@Before
public void setUp() {
kafkaTestUtils = new KafkaTestUtils();
kafkaTestUtils.setup();
SparkConf sparkConf = new SparkConf()
.setMaster("local[4]").setAppName(this.getClass().getSimpleName());
ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200));
}
代码示例来源:origin: co.cask.cdap/hydrator-spark-core2
@Override
public SparkCollection<T> window(StageSpec stageSpec, Windower windower) {
String stageName = stageSpec.getName();
return wrap(stream.transform(new CountingTransformFunction<T>(stageName, sec.getMetrics(), "records.in", null))
.window(Durations.seconds(windower.getWidth()), Durations.seconds(windower.getSlideInterval()))
.transform(new CountingTransformFunction<T>(stageName, sec.getMetrics(), "records.out",
sec.getDataTracer(stageName))));
}
代码示例来源:origin: org.apache.spark/spark-streaming-kafka-0-10_2.11
@Before
public void setUp() {
kafkaTestUtils = new KafkaTestUtils();
kafkaTestUtils.setup();
SparkConf sparkConf = new SparkConf()
.setMaster("local[4]").setAppName(this.getClass().getSimpleName());
ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200));
}
代码示例来源:origin: baghelamit/iot-traffic-monitor
/**
* Method to get total traffic counts of different type of vehicles for each route.
*
* @param filteredIotDataStream IoT data stream
*/
public void processTotalTrafficData(JavaDStream<IoTData> filteredIotDataStream) {
// We need to get count of vehicle group by routeId and vehicleType
JavaPairDStream<AggregateKey, Long> countDStreamPair = filteredIotDataStream
.mapToPair(iot -> new Tuple2<>(new AggregateKey(iot.getRouteId(), iot.getVehicleType()), 1L))
.reduceByKey((a, b) -> a + b);
// Need to keep state for total count
JavaMapWithStateDStream<AggregateKey, Long, Long, Tuple2<AggregateKey, Long>> countDStreamWithStatePair = countDStreamPair
.mapWithState(StateSpec.function(totalSumFunc).timeout(Durations.seconds(3600)));//maintain state for one hour
// Transform to dstream of TrafficData
JavaDStream<Tuple2<AggregateKey, Long>> countDStream = countDStreamWithStatePair.map(tuple2 -> tuple2);
JavaDStream<TotalTrafficData> trafficDStream = countDStream.map(totalTrafficDataFunc);
// Map Cassandra table column
Map<String, String> columnNameMappings = new HashMap<String, String>();
columnNameMappings.put("routeId", "routeid");
columnNameMappings.put("vehicleType", "vehicletype");
columnNameMappings.put("totalCount", "totalcount");
columnNameMappings.put("timeStamp", "timestamp");
columnNameMappings.put("recordDate", "recorddate");
// call CassandraStreamingJavaUtil function to save in DB
javaFunctions(trafficDStream).writerBuilder("traffickeyspace", "total_traffic",
CassandraJavaUtil.mapToRow(TotalTrafficData.class, columnNameMappings)).saveToCassandra();
}
代码示例来源:origin: org.apache.spark/spark-streaming-kafka-0-8_2.11
@Before
public void setUp() {
kafkaTestUtils = new KafkaTestUtils();
kafkaTestUtils.setup();
SparkConf sparkConf = new SparkConf()
.setMaster("local[4]").setAppName(this.getClass().getSimpleName());
ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200));
}
代码示例来源:origin: org.apache.spark/spark-streaming_2.10
@Test
public void testSeconds() {
Assert.assertEquals(new Duration(30 * 1000), Durations.seconds(30));
}
代码示例来源:origin: org.apache.spark/spark-streaming-kafka-0-8
@Before
public void setUp() {
kafkaTestUtils = new KafkaTestUtils();
kafkaTestUtils.setup();
SparkConf sparkConf = new SparkConf()
.setMaster("local[4]").setAppName(this.getClass().getSimpleName());
ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200));
}
代码示例来源:origin: org.apache.spark/spark-streaming_2.11
@Test
public void testSeconds() {
Assert.assertEquals(new Duration(30 * 1000), Durations.seconds(30));
}
代码示例来源:origin: org.apache.spark/spark-streaming_2.10
@Test
public void testMilliseconds() {
Assert.assertEquals(new Duration(100), Durations.milliseconds(100));
}
内容来源于网络,如有侵权,请联系作者删除!