org.apache.spark.streaming.Durations类的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(9.0k)|赞(0)|评价(0)|浏览(267)

本文整理了Java中org.apache.spark.streaming.Durations类的一些代码示例,展示了Durations类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Durations类的具体详情如下:
包路径:org.apache.spark.streaming.Durations
类名称:Durations

Durations介绍

暂无

代码示例

代码示例来源:origin: cloudera-labs/envelope

private static void initializeStreamingJob() {
 int batchMilliseconds = INSTANCE.config.getInt(BATCH_MILLISECONDS_PROPERTY);
 final Duration batchDuration = Durations.milliseconds(batchMilliseconds);
 JavaStreamingContext jsc = new JavaStreamingContext(new JavaSparkContext(getSparkSession().sparkContext()),
   batchDuration);
 INSTANCE.jsc = jsc;
}

代码示例来源:origin: baghelamit/iot-traffic-monitor

/**
 * Method to get window traffic counts of different type of vehicles for each route.
 * Window duration = 30 seconds and Slide interval = 10 seconds
 * 
 * @param filteredIotDataStream IoT data stream
 */
public void processWindowTrafficData(JavaDStream<IoTData> filteredIotDataStream) {
  // reduce by key and window (30 sec window and 10 sec slide).
  JavaPairDStream<AggregateKey, Long> countDStreamPair = filteredIotDataStream
      .mapToPair(iot -> new Tuple2<>(new AggregateKey(iot.getRouteId(), iot.getVehicleType()), 1L))
      .reduceByKeyAndWindow((a, b) -> a + b, Durations.seconds(30), Durations.seconds(10));
  // Transform to dstream of TrafficData
  JavaDStream<WindowTrafficData> trafficDStream = countDStreamPair.map(windowTrafficDataFunc);
  // Map Cassandra table column
  Map<String, String> columnNameMappings = new HashMap<String, String>();
  columnNameMappings.put("routeId", "routeid");
  columnNameMappings.put("vehicleType", "vehicletype");
  columnNameMappings.put("totalCount", "totalcount");
  columnNameMappings.put("timeStamp", "timestamp");
  columnNameMappings.put("recordDate", "recorddate");
  // call CassandraStreamingJavaUtil function to save in DB
  javaFunctions(trafficDStream).writerBuilder("traffickeyspace", "window_traffic",
      CassandraJavaUtil.mapToRow(WindowTrafficData.class, columnNameMappings)).saveToCassandra();
}

代码示例来源:origin: org.apache.spark/spark-streaming_2.10

@Test
public void testMinutes() {
 Assert.assertEquals(new Duration(2 * 60 * 1000), Durations.minutes(2));
}

代码示例来源:origin: ebi-wp/kafka-streams-api-websockets

JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(5));

代码示例来源:origin: org.apache.spark/spark-streaming-kafka_2.10

@Before
public void setUp() {
 kafkaTestUtils = new KafkaTestUtils();
 kafkaTestUtils.setup();
 SparkConf sparkConf = new SparkConf()
  .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
 ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200));
}

代码示例来源:origin: org.apache.spark/spark-streaming_2.11

@Test
public void testMinutes() {
 Assert.assertEquals(new Duration(2 * 60 * 1000), Durations.minutes(2));
}

代码示例来源:origin: dibbhatt/kafka-spark-consumer

JavaStreamingContext jsc = new JavaStreamingContext(_sparkConf, Durations.seconds(30));

代码示例来源:origin: org.apache.spark/spark-streaming-kafka_2.11

@Before
public void setUp() {
 kafkaTestUtils = new KafkaTestUtils();
 kafkaTestUtils.setup();
 SparkConf sparkConf = new SparkConf()
  .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
 ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200));
}

代码示例来源:origin: nivdul/spark-in-practice

/**
 *  Load the data using TwitterUtils: we obtain a DStream of tweets
 *
 *  More about TwitterUtils:
 *  https://spark.apache.org/docs/1.4.0/api/java/index.html?org/apache/spark/streaming/twitter/TwitterUtils.html
 */
public JavaDStream<Status> loadData() {
 // create the spark configuration and spark context
 SparkConf conf = new SparkConf()
   .setAppName("Spark Streaming")
   .set("spark.driver.allowMultipleContexts", "true")
   .setMaster("local[*]");
 // create a java streaming context and define the window (2 seconds batch)
 jssc = new JavaStreamingContext(conf, Durations.seconds(2));
 System.out.println("Initializing Twitter stream...");
 // create a DStream (sequence of RDD). The object tweetsStream is a DStream of tweet statuses:
 // - the Status class contains all information of a tweet
 // See http://twitter4j.org/javadoc/twitter4j/Status.html
 // and fill the keys and tokens in the Streamutils class!
 JavaDStream<Status> tweetsStream = TwitterUtils.createStream(jssc, StreamUtils.getAuth());
 return tweetsStream;
}

代码示例来源:origin: org.apache.spark/spark-streaming-kafka-0-10

@Before
public void setUp() {
 kafkaTestUtils = new KafkaTestUtils();
 kafkaTestUtils.setup();
 SparkConf sparkConf = new SparkConf()
  .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
 ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200));
}

代码示例来源:origin: jgperrin/net.jgp.labs.spark

private void start() {
  // Create a local StreamingContext with two working thread and batch
  // interval of
  // 1 second
  SparkConf conf = new SparkConf().setMaster("local[2]").setAppName(
    "NetworkWordCount");
  JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations
    .seconds(5));

  JavaDStream<String> msgDataStream = jssc.textFileStream(StreamingUtils
    .getInputDirectory());
  msgDataStream.print();

  jssc.start();
  try {
   jssc.awaitTermination();
  } catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
 }
}

代码示例来源:origin: org.apache.spark/spark-streaming-kafka

@Before
public void setUp() {
 kafkaTestUtils = new KafkaTestUtils();
 kafkaTestUtils.setup();
 SparkConf sparkConf = new SparkConf()
  .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
 ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200));
}

代码示例来源:origin: co.cask.cdap/hydrator-spark-core2

@Override
public SparkCollection<T> window(StageSpec stageSpec, Windower windower) {
 String stageName = stageSpec.getName();
 return wrap(stream.transform(new CountingTransformFunction<T>(stageName, sec.getMetrics(), "records.in", null))
        .window(Durations.seconds(windower.getWidth()), Durations.seconds(windower.getSlideInterval()))
        .transform(new CountingTransformFunction<T>(stageName, sec.getMetrics(), "records.out",
                              sec.getDataTracer(stageName))));
}

代码示例来源:origin: org.apache.spark/spark-streaming-kafka-0-10_2.11

@Before
public void setUp() {
 kafkaTestUtils = new KafkaTestUtils();
 kafkaTestUtils.setup();
 SparkConf sparkConf = new SparkConf()
  .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
 ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200));
}

代码示例来源:origin: baghelamit/iot-traffic-monitor

/**
 * Method to get total traffic counts of different type of vehicles for each route.
 * 
 * @param filteredIotDataStream IoT data stream
 */
public void processTotalTrafficData(JavaDStream<IoTData> filteredIotDataStream) {
  // We need to get count of vehicle group by routeId and vehicleType
  JavaPairDStream<AggregateKey, Long> countDStreamPair = filteredIotDataStream
      .mapToPair(iot -> new Tuple2<>(new AggregateKey(iot.getRouteId(), iot.getVehicleType()), 1L))
      .reduceByKey((a, b) -> a + b);
  
  // Need to keep state for total count
  JavaMapWithStateDStream<AggregateKey, Long, Long, Tuple2<AggregateKey, Long>> countDStreamWithStatePair = countDStreamPair
      .mapWithState(StateSpec.function(totalSumFunc).timeout(Durations.seconds(3600)));//maintain state for one hour
  // Transform to dstream of TrafficData
  JavaDStream<Tuple2<AggregateKey, Long>> countDStream = countDStreamWithStatePair.map(tuple2 -> tuple2);
  JavaDStream<TotalTrafficData> trafficDStream = countDStream.map(totalTrafficDataFunc);
  // Map Cassandra table column
  Map<String, String> columnNameMappings = new HashMap<String, String>();
  columnNameMappings.put("routeId", "routeid");
  columnNameMappings.put("vehicleType", "vehicletype");
  columnNameMappings.put("totalCount", "totalcount");
  columnNameMappings.put("timeStamp", "timestamp");
  columnNameMappings.put("recordDate", "recorddate");
  // call CassandraStreamingJavaUtil function to save in DB
  javaFunctions(trafficDStream).writerBuilder("traffickeyspace", "total_traffic",
      CassandraJavaUtil.mapToRow(TotalTrafficData.class, columnNameMappings)).saveToCassandra();
}

代码示例来源:origin: org.apache.spark/spark-streaming-kafka-0-8_2.11

@Before
public void setUp() {
 kafkaTestUtils = new KafkaTestUtils();
 kafkaTestUtils.setup();
 SparkConf sparkConf = new SparkConf()
  .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
 ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200));
}

代码示例来源:origin: org.apache.spark/spark-streaming_2.10

@Test
public void testSeconds() {
 Assert.assertEquals(new Duration(30 * 1000), Durations.seconds(30));
}

代码示例来源:origin: org.apache.spark/spark-streaming-kafka-0-8

@Before
public void setUp() {
 kafkaTestUtils = new KafkaTestUtils();
 kafkaTestUtils.setup();
 SparkConf sparkConf = new SparkConf()
  .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
 ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200));
}

代码示例来源:origin: org.apache.spark/spark-streaming_2.11

@Test
public void testSeconds() {
 Assert.assertEquals(new Duration(30 * 1000), Durations.seconds(30));
}

代码示例来源:origin: org.apache.spark/spark-streaming_2.10

@Test
public void testMilliseconds() {
 Assert.assertEquals(new Duration(100), Durations.milliseconds(100));
}

相关文章

微信公众号

最新文章

更多