spark结构化流媒体性能优化

gblwokeq  于 2021-07-09  发布在  Spark
关注(0)|答案(0)|浏览(173)

我想在历史数据的内存流上使用spark结构化流。下面的测试代码运行良好。数据定期发送到memorystream,流查询通过调用processallavailable方法手动触发。while循环中的每个批处理和打印大约需要20秒,即使测试代码正在进行简单的字数计算。如何优化此代码的性能?我正在windows上本地运行它。

MemoryStream<String> testStream = new MemoryStream<String>(100, sparkSession.sqlContext(), null, Encoders.STRING());
        List<String> testList = Arrays.asList("one first","two second","three third");

        Dataset<Row> dataset = testStream.toDF();
        // Split the lines into words
        Dataset<String> words = dataset
                .as(Encoders.STRING())
                .flatMap((FlatMapFunction<String, String>) 
                        x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING());

        // Generate running word count
        Dataset<Row> wordCounts = words.groupBy("value").count();

        DataStreamWriter<Row> streamWriter = wordCounts
                .writeStream()
                .outputMode("complete")
                .format("memory")
                .queryName("testQuery");

        StreamingQuery query = streamWriter.start();

        while(query.isActive()){
            try {   
                Thread.sleep(3000);
                for(int i=0;i<100;i++) {    
        testStream.addData(JavaConverters.asScalaIteratorConverter(testList.iterator()).asScala().toSeq());                 
                }           
                query.processAllAvailable();
                sparkSession.sql("select * from testQuery").show();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
}

这是在第三个循环周期后对查询调用lastprogress的结果。

{"id":"076a6971-8b9c-482c-88db-e9a6b8ced285","runId":"c285839e-d052-4848-ba72-280043c68a50","name":"testQuery","timestamp":"2021-03-31T14:21:15.023Z","batchId":2,"numInputRows":300,"inputRowsPerSecond":20000.0,"processedRowsPerSecond":14.566642388929353,"durationMs":{"addBatch":20221,"getBatch":0,"latestOffset":0,"queryPlanning":13,"triggerExecution":20595,"walCommit":162},"stateOperators":[{"numRowsTotal":6,"numRowsUpdated":6,"memoryUsedBytes":73488,"numRowsDroppedByWatermark":0,"customMetrics":{"loadedMapCacheHitCount":800,"loadedMapCacheMissCount":0,"stateOnCurrentVersionSizeBytes":18560}}],"sources":[{"description":"MemoryStream[value#1]","startOffset":199,"endOffset":299,"numInputRows":300,"inputRowsPerSecond":20000.0,"processedRowsPerSecond":14.566642388929353}],"sink":{"description":"MemorySink","numOutputRows":6}}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题