postgresql copymanager copyin在与stdin的copy一起使用时,似乎什么也不做

uttx8gqw  于 2021-06-27  发布在  Java
关注(0)|答案(1)|浏览(4360)

我正在尝试将postgresql copymanagercopyin功能用于 COPY FROM STDIN 正如文档中建议的那样,可以非常快速地从inputstream复制到数据库表中。我正在考虑使用它来连续地流式传输当我接收/处理一个表时要写入表的行。然而,下面的快速而肮脏的示例代码似乎被卡住了 copyIn 不会写入表。
有人知道我在这里遗漏了什么,或者我的理解是不是错了?

import java.sql.*;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.BufferedWriter;
import java.io.OutputStreamWriter;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import org.postgresql.core.BaseConnection;
import org.postgresql.copy.CopyManager;

public class PGConnectTest {

    public static void main(String[] args) {

        try {
                try (Connection connection = DriverManager.getConnection("jdbc:postgresql://XX.XX.XX.XX:9432/somedb", "someadmin", "somepassword");
                    BaseConnection pgcon = (BaseConnection)connection;
                    PipedInputStream is = new PipedInputStream();
                    BufferedReader br = new BufferedReader(new InputStreamReader(is));
                    PipedOutputStream os = new PipedOutputStream(is);
                    BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(os));) {
                        ExecutorService executorService = Executors.newSingleThreadExecutor();
                        Callable callable = () -> {
                            Thread.sleep(3000);
                            String frmtStr = "%s\t{\"id\":%s, \"somefield\":\"%s\"}\n";
                            String row = null;
                            for(int i=1; i<10; i++) {
                                row = String.format(frmtStr, i, i, ("row"+i));
                                System.out.print(row);
                                bw.write(row);
                            }
                            bw.write("\n");
                            bw.flush();
                            System.out.println("WRITTEN!");
                            return true;
                        };
                        executorService.submit(callable);
                        System.out.println(connection);
                        CopyManager copyManager = new CopyManager(pgcon);
                        String copySql = "COPY dcm.testtbl FROM STDIN";
                        executorService.submit(() -> copyManager.copyIn(copySql, br));
                        Thread.sleep(10000);
                        System.out.println("QUITTING");
                } catch (Exception e) {
                    throw e;
                }
        } catch(Exception ex) {
            System.out.println(ex);
        }

    }

}

表的架构 testtbl 在下面,

create table testtbl (
id  integer primary key,
jsnclm  jsonb
)

控制台输出是(它不返回,需要使用ctrl+c终止它),

C:\Users\ml410408\Documents\Useful Lookups\POSTGRESQL>java -cp ".;postgresql-42.2.18.jar" PGConnectTest
org.postgresql.jdbc.PgConnection@41975e01
1       {"id":1, "somefield":"row1"}
2       {"id":2, "somefield":"row2"}
3       {"id":3, "somefield":"row3"}
4       {"id":4, "somefield":"row4"}
5       {"id":5, "somefield":"row5"}
6       {"id":6, "somefield":"row6"}
7       {"id":7, "somefield":"row7"}
8       {"id":8, "somefield":"row8"}
9       {"id":9, "somefield":"row9"}
WRITTEN!
QUITTING

更新:
一旦我改变了 COPY sql命令将默认文本转换为csv,并传入csv记录它不再被卡住,而是什么也不做(意味着表中没有记录),尽管它返回的结果与以前不同。

import java.sql.*;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.BufferedWriter;
import java.io.OutputStreamWriter;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import org.postgresql.core.BaseConnection;
import org.postgresql.copy.CopyManager;

public class PGConnectTest {

    public static void main(String[] args) {

        try {
                try (Connection connection = DriverManager.getConnection("jdbc:postgresql://XX.XX.XX.XX:9432/somedb", "someadmin", "somepassword");
                    BaseConnection pgcon = (BaseConnection)connection;
                    PipedInputStream is = new PipedInputStream();
                    BufferedReader br = new BufferedReader(new InputStreamReader(is));
                    PipedOutputStream os = new PipedOutputStream(is);
                    BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(os));) {
                        ExecutorService executorService = Executors.newSingleThreadExecutor();
                        Callable callable = () -> {
                            Thread.sleep(3000);
                            String frmtStr = "%s,'{\"id\":%s,\"somefield\":\"%s\"}'\n";
                            String row = null;
                            for(int i=1; i<10; i++) {
                                row = String.format(frmtStr, i, i, ("row"+i));
                                System.out.print(row);
                                bw.write(row);
                            }
                            bw.write("\n");
                            bw.write("'\\.'\n");
                            System.out.println("'\\.'\n");
                            bw.flush();
                            os.flush();
                            System.out.println("WRITTEN!");
                            return true;
                        };
                        executorService.submit(callable);
                        System.out.println(connection);
                        CopyManager copyManager = new CopyManager(pgcon);
                        String copySql = "COPY dcm.testtbl FROM STDIN FORMAT CSV DELIMITER ','";
                        executorService.submit(() -> copyManager.copyIn(copySql, br));
                        Thread.sleep(5000);
                        System.out.println(br.ready());
                        while (br.ready()) {
                            System.out.println("LINE : " + br.readLine());
                        }
                        executorService.shutdown();
                        System.out.println("QUITTING");
                } catch (Exception e) {
                    throw e;
                }
                System.out.println("QUITTING FINALLY");
        } catch(Exception ex) {
            System.out.println(ex);
        }

    }

}

谢谢

4ngedf3f

4ngedf3f1#

这里似乎有几个不同的问题。
程序挂起,因为 ExecutorService 是让它活着;打电话 shutdown() 在提交任务后,它将按预期终止。
什么都没写的主要原因是 copyIn() 正在引发异常:流中的尾随换行符( bw.write("\n") )触发 ERROR: invalid input syntax for integer: "" 因为它找不到 id 列。
即使如此,由于资源清理的时间安排,这看起来仍然受到一些竞争条件的影响。这个 copyIn() 调用将被阻止,直到到达其 InputStream ,如果是 PipedInputStream “终点”是 PipedOutputStream 已关闭。但是在溪流关闭后 copyIn() 调用被取消阻止,输入流和数据库连接被连续快速关闭,可能在拷贝有机会完成之前关闭。充其量,它似乎成功地提交到表中,但随后出错为“取消复制操作时数据库连接失败”。
要确保这些资源在仍在使用时不会被释放,请执行以下操作:
等待编写器完成
关闭 OutputStream 等待复印机完成
关闭 InputStream / Connection 等待任务完成还有一个额外的好处,就是将任何异常传播到主线程。
还有一个潜在的僵局是由于 newSingleThreadExecutor() :如果writer线程填充管道的缓冲区,它将阻塞,直到读卡器开始使用数据,如果按顺序执行,则永远不会发生这种情况。使用 newFixedThreadPool(2) 我们应该解决这个问题。
考虑到这些:

public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(2);
    try {
      try (Connection connection = DriverManager.getConnection("jdbc:postgresql://XX.XX.XX.XX:9432/somedb", "someadmin", "somepassword");
          BaseConnection pgcon = (BaseConnection) connection;
          PipedInputStream is = new PipedInputStream();
          BufferedReader br = new BufferedReader(new InputStreamReader(is));
      ) {
        Future write;
        Future copy;
        try (
            PipedOutputStream os = new PipedOutputStream(is);
            BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(os))) {
          write = executorService.submit(() -> {
            String frmtStr = "%s\t{\"id\":%s, \"somefield\":\"%s\"}\n";
            String row = null;
            for (int i = 1; i < 1000; i++) {
              row = String.format(frmtStr, i, i, ("row" + i));
              System.out.print(row);
              bw.write(row);
            }
            bw.flush();
            System.out.println("WRITTEN!");
            return true;
          });
          System.out.println(connection);
          CopyManager copyManager = new CopyManager(pgcon);
          String copySql = "COPY dcm.testtbl FROM STDIN";
          copy = executorService.submit(() -> copyManager.copyIn(copySql, br));
          System.out.println("QUITTING");
          write.get();
        }
        copy.get();
      }
    } catch (Exception ex) {
      System.out.println(ex);
    } finally {
      executorService.shutdown();
    }
  }

相关问题