使用bufferedreader.lines()并行中断

vktxenjb  于 2021-07-12  发布在  Java
关注(0)|答案(2)|浏览(246)

我正在写一些代码来读取日志行,并在后台对这些数据进行处理。这种处理可能受益于并行化,比如stream.parallel方法提供的内容,我正试图使用这种方法。这是我开始使用的代码,它非常有效。

public static void main(String[] args) {
    try {
        final Socket socket = new Socket(ADDRESS, PORT);
        final BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
        socket.getOutputStream().write(QUERY);
        reader.lines().forEach(System.out::println);
    } catch (IOException e) {
        e.printStackTrace();
    }

}

这个代码连接并打印出我所有的数据。我非常希望将此代码重组如下:

public static void main(String[] args) {
    try (Socket socket = new Socket(ADDRESS, PORT); 
         BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
        socket.getOutputStream().write(QUERY);
        reader.lines().forEach(System.out::println);
    } catch (IOException e) {
        e.printStackTrace();
    }

}

但遗憾的是,这不起作用。更糟糕的是,回到原始代码,这甚至不起作用:

public static void main(String[] args) {
    try {
        final Socket socket = new Socket(ADDRESS, PORT);
        final BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
        socket.getOutputStream().write(QUERY);
        reader.lines().parallel().forEach(System.out::println);
    } catch (IOException e) {
        e.printStackTrace();
    }

}

这里添加的只是一个并行调用,这完全不起作用。它只是坐在那里,什么都没有打印出来。
我可以活得很好,没有第二个版本使用修改过的 try(A a = new A()) {} 因为在这种情况下看起来不太好。我不能没有的是弄明白为什么这个并行调用会破坏一切。
我假设修改后的try语句会在我从流中掉出来时立即关闭流(就在我们启动foreach之后),所以在操作之前它们会被杀死和gc'd。我一辈子都搞不懂平行电话到底是怎么回事。
这里请求的是在该代码的.parellel()版本上运行的jstack的输出。

Full thread dump OpenJDK 64-Bit Server VM (25.112-b15 mixed mode):

"Attach Listener" #9 daemon prio=9 os_prio=0 tid=0x00007fd4f4001000 nid=0x4907 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Service Thread" #8 daemon prio=9 os_prio=0 tid=0x00007fd5280be000 nid=0x48d2 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C1 CompilerThread2" #7 daemon prio=9 os_prio=0 tid=0x00007fd5280bb000 nid=0x48d1 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread1" #6 daemon prio=9 os_prio=0 tid=0x00007fd5280b9800 nid=0x48d0 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread0" #5 daemon prio=9 os_prio=0 tid=0x00007fd5280b6800 nid=0x48cf waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Signal Dispatcher" #4 daemon prio=9 os_prio=0 tid=0x00007fd5280b5000 nid=0x48ce runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Finalizer" #3 daemon prio=8 os_prio=0 tid=0x00007fd528082000 nid=0x48cd in Object.wait() [0x00007fd515c6d000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0x00000000ec008e98> (a java.lang.ref.ReferenceQueue$Lock)
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
    - locked <0x00000000ec008e98> (a java.lang.ref.ReferenceQueue$Lock)
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
    at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)

"Reference Handler" #2 daemon prio=10 os_prio=0 tid=0x00007fd52807d800 nid=0x48cc in Object.wait() [0x00007fd515d6e000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0x00000000ec006b40> (a java.lang.ref.Reference$Lock)
    at java.lang.Object.wait(Object.java:502)
    at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
    - locked <0x00000000ec006b40> (a java.lang.ref.Reference$Lock)
    at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)

"main" #1 prio=5 os_prio=0 tid=0x00007fd528008000 nid=0x48c2 runnable [0x00007fd52fd9f000]
   java.lang.Thread.State: RUNNABLE
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    - locked <0x00000000ec086790> (a java.net.SocksSocketImpl)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at java.net.Socket.connect(Socket.java:538)
    at java.net.Socket.<init>(Socket.java:434)
    at java.net.Socket.<init>(Socket.java:211)
    at com.gravypod.Test.main(Test.java:48)

"VM Thread" os_prio=0 tid=0x00007fd528075800 nid=0x48ca runnable 

"GC task thread#0 (ParallelGC)" os_prio=0 tid=0x00007fd52801d800 nid=0x48c4 runnable 

"GC task thread#1 (ParallelGC)" os_prio=0 tid=0x00007fd52801f000 nid=0x48c5 runnable 

"GC task thread#2 (ParallelGC)" os_prio=0 tid=0x00007fd528021000 nid=0x48c6 runnable 

"GC task thread#3 (ParallelGC)" os_prio=0 tid=0x00007fd528022800 nid=0x48c7 runnable 

"VM Periodic Task Thread" os_prio=0 tid=0x00007fd5280c0800 nid=0x48d3 waiting on condition 

JNI global references: 18

测试。java:48 line 是 Socket socket = new Socket 线路。这是完全工作的非并行代码的结果(仅使用.lines())。

Full thread dump OpenJDK 64-Bit Server VM (25.112-b15 mixed mode):

"Attach Listener" #9 daemon prio=9 os_prio=0 tid=0x00007f9048001000 nid=0x4982 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Service Thread" #8 daemon prio=9 os_prio=0 tid=0x00007f90800be800 nid=0x496f runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C1 CompilerThread2" #7 daemon prio=9 os_prio=0 tid=0x00007f90800bb000 nid=0x496e waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread1" #6 daemon prio=9 os_prio=0 tid=0x00007f90800b9800 nid=0x496d waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread0" #5 daemon prio=9 os_prio=0 tid=0x00007f90800b6800 nid=0x496c waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Signal Dispatcher" #4 daemon prio=9 os_prio=0 tid=0x00007f90800b5000 nid=0x496b runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Finalizer" #3 daemon prio=8 os_prio=0 tid=0x00007f9080082000 nid=0x496a in Object.wait() [0x00007f907018d000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0x00000000ec008e98> (a java.lang.ref.ReferenceQueue$Lock)
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
    - locked <0x00000000ec008e98> (a java.lang.ref.ReferenceQueue$Lock)
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
    at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)

"Reference Handler" #2 daemon prio=10 os_prio=0 tid=0x00007f908007d800 nid=0x4969 in Object.wait() [0x00007f907028e000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0x00000000ec006b40> (a java.lang.ref.Reference$Lock)
    at java.lang.Object.wait(Object.java:502)
    at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
    - locked <0x00000000ec006b40> (a java.lang.ref.Reference$Lock)
    at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)

"main" #1 prio=5 os_prio=0 tid=0x00007f9080008000 nid=0x4961 runnable [0x00007f90884c3000]
   java.lang.Thread.State: RUNNABLE
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
    at java.net.SocketInputStream.read(SocketInputStream.java:170)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
    at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
    at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
    - locked <0x00000000ec08e890> (a java.io.InputStreamReader)
    at java.io.InputStreamReader.read(InputStreamReader.java:184)
    at java.io.BufferedReader.fill(BufferedReader.java:161)
    at java.io.BufferedReader.readLine(BufferedReader.java:324)
    - locked <0x00000000ec08e890> (a java.io.InputStreamReader)
    at java.io.BufferedReader.readLine(BufferedReader.java:389)
    at java.io.BufferedReader$1.hasNext(BufferedReader.java:571)
    at java.util.Iterator.forEachRemaining(Iterator.java:115)
    at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
    at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
    at com.gravypod.Test.main(Test.java:51)

"VM Thread" os_prio=0 tid=0x00007f9080075800 nid=0x4968 runnable 

"GC task thread#0 (ParallelGC)" os_prio=0 tid=0x00007f908001d800 nid=0x4963 runnable 

"GC task thread#1 (ParallelGC)" os_prio=0 tid=0x00007f908001f000 nid=0x4964 runnable 

"GC task thread#2 (ParallelGC)" os_prio=0 tid=0x00007f9080021000 nid=0x4965 runnable 

"GC task thread#3 (ParallelGC)" os_prio=0 tid=0x00007f9080022800 nid=0x4966 runnable 

"VM Periodic Task Thread" os_prio=0 tid=0x00007f90800c1000 nid=0x4970 waiting on condition 

JNI global references: 319

线路测试。java:51 is 这个 reader.lines().forEach 线路。

mzillmmw

mzillmmw1#

从技术上讲,应用程序似乎没有挂起,只是在执行可观察的工作之前等待大量输入。这是两个实现细节的组合。当您启动并行流操作时,它将首先尝试分割工作负载,直到每个cpu核心都有事情要做,然后才真正开始处理元素。这与reader#lines()结合在一起,由于批量大小不可配置的问题,并行性很差。
简单地说,当一个流有一个未知的大小时,实现将尝试缓冲大小为 1024 ,在每个裂口上生长。这个伟大的答案表明,对于一个具有多个核心的未知大小的流,分裂将如何发生,表明 1024 元素将在过程中得到缓冲。这可能需要很长一段时间,才能让消费者接受 forEach 被调用过。
注意,通过非短路处理无限源 forEach 在流api的范围之外。假设一个及时的副作用是关于流的处理顺序的一个假设,但是没有关于它的保证。
这个答案会引导你找到解决方法。你可以用

try(Socket socket = new Socket(ADDRESS, PORT);
    BufferedReader reader = new BufferedReader(
        new InputStreamReader(socket.getInputStream()))) {

    socket.getOutputStream().write(QUERY);
    Stream.generate(() -> {
        try { return reader.readLine(); }
        catch (IOException ex) { throw new UncheckedIOException(ex); }
    }).parallel().forEach(System.out::println);
} catch(IOException|UncheckedIOException e) {
    e.printStackTrace();
}

但是,如前所述,这不是流api的预期用例…

hsvhsicv

hsvhsicv2#

我设想并行流上的parallel()或foreach()在并行化任务之前等待读取所有输入。因为服务器从不关闭连接,所以它将永远等待。
你的任务不是真正的可并行化。数据是按顺序通过导线传输的,因此并行读取数据是行不通的。

相关问题