flinkjdbc连接

mlmc2os5  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(184)

我有以下两门课。主要类是aloscalculation,它有一个包含特定值的变量finaloutput。 DataSet<Tuple4<String, String, Double, String>> finalOutput . 试图将该值插入到postgresql中。变量connectionvalues具有json,它具有各种参数,如username、password、drivername,这些参数是从名为outermap的hashmap接收的。从writealos我可以发送行对象,这是试图插入一个计算。但是我无法发送变量connectionvalue来设置drivername、password、url和username。请告诉我怎么做。相应地计算组织度量结果、组织度量组织标识等值。
我现在用一种新的方法编辑代码来解决这个问题。但是现在新问题出现在writealos方法中,如果匹配没有发生,则返回一个空对象。因此,当数据库插入我得到一个异常说空值正在形成,不能插入。请问谁能解决这个错误?

public class Writealos implements MapFunction<Tuple4<String, String, Double, String>, Row> 
    {
        @Autowired
        private Tenantresource outermap;
        public Row map(Tuple4<String, String, Double, String> arg0)throws Exception
        {
            if(arg0.f0.equals(currentKey))
            {
               Row obj = new Row(7);
               String string = RandomStringUtils.randomAlphanumeric(32);
               obj.setField(0, string);
               obj.setField(1, alos);
               obj.setField(2, org_metric_result.toString());
               obj.setField(3, Start_Date.toString());
               obj.setField(4, End_Date.toString());
               obj.setField(5, Execution_Date.toString());
               obj.setField(6, org_metric_orgid);
         }
            return obj; 
        }
    }

    public class Aloscalculation
    {
        @Autowired
    private static Tenantresource outermap;
        public static void calculateAlos(Fhirresource fhir_resource ) throws Exception 
        {
            String query = "insert into reports (org_metric_id,org_metric_topic, org_metric_result, org_metric_from, org_metric_to, org_metric_executed_on, org_metric_orgid) values (?,?,cast(? as json),cast(? as timestamp),cast(? as timestamp),cast(? as timestamp),?)";
         for(String currentKey : outermap.tenantresourcereturn().keySet()) 
         { 
             JSONObject connection =new JSONObject( outermap.tenantresourcereturn().get(currentKey));
             System.out.println(currentKey);
             String url=connection.getString("jdbc_url")+":"+connection.getString("port")+"/"+connection.getString("db_name");
             System.out.println(url);
            finaloutput.map(new Writealos(fhir_resource,currentKey))
                .output(JDBCOutputFormat.buildJDBCOutputFormat()
                .setDrivername()
                .setDBUrl()
                .setUsername()
                .setPassword()
                .setQuery(query)
                .finish());
        }
    }
mpgws1up

mpgws1up1#

你是怎么暴露的 public Row map(Tuple4<String, String, Double, String> arg0) ,是通过服务。spring提供了一个注解@ ResponseBody . 您需要有一个匹配的pojo,spring将负责转换。其次,为什么不打算使用任何orm工具(hibernate、ibatis等),它可以简化crud操作。
spring@responsebody注解在这个restful应用程序示例中是如何工作的?

相关问题