我想在历史数据的内存流上使用spark结构化流。下面的测试代码运行良好。数据定期发送到memorystream,流查询通过调用processallavailable方法手动触发。while循环中的每个批处理和打印大约需要20秒,即使测试代码正在进行简单的字数计算。如何优化此代码的性能?我正在windows上本地运行它。
MemoryStream<String> testStream = new MemoryStream<String>(100, sparkSession.sqlContext(), null, Encoders.STRING());
List<String> testList = Arrays.asList("one first","two second","three third");
Dataset<Row> dataset = testStream.toDF();
// Split the lines into words
Dataset<String> words = dataset
.as(Encoders.STRING())
.flatMap((FlatMapFunction<String, String>)
x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING());
// Generate running word count
Dataset<Row> wordCounts = words.groupBy("value").count();
DataStreamWriter<Row> streamWriter = wordCounts
.writeStream()
.outputMode("complete")
.format("memory")
.queryName("testQuery");
StreamingQuery query = streamWriter.start();
while(query.isActive()){
try {
Thread.sleep(3000);
for(int i=0;i<100;i++) {
testStream.addData(JavaConverters.asScalaIteratorConverter(testList.iterator()).asScala().toSeq());
}
query.processAllAvailable();
sparkSession.sql("select * from testQuery").show();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
这是在第三个循环周期后对查询调用lastprogress的结果。
{"id":"076a6971-8b9c-482c-88db-e9a6b8ced285","runId":"c285839e-d052-4848-ba72-280043c68a50","name":"testQuery","timestamp":"2021-03-31T14:21:15.023Z","batchId":2,"numInputRows":300,"inputRowsPerSecond":20000.0,"processedRowsPerSecond":14.566642388929353,"durationMs":{"addBatch":20221,"getBatch":0,"latestOffset":0,"queryPlanning":13,"triggerExecution":20595,"walCommit":162},"stateOperators":[{"numRowsTotal":6,"numRowsUpdated":6,"memoryUsedBytes":73488,"numRowsDroppedByWatermark":0,"customMetrics":{"loadedMapCacheHitCount":800,"loadedMapCacheMissCount":0,"stateOnCurrentVersionSizeBytes":18560}}],"sources":[{"description":"MemoryStream[value#1]","startOffset":199,"endOffset":299,"numInputRows":300,"inputRowsPerSecond":20000.0,"processedRowsPerSecond":14.566642388929353}],"sink":{"description":"MemorySink","numOutputRows":6}}
暂无答案!
目前还没有任何答案,快来回答吧!