flinksql客户端环境配置,将csv文件作为源流表读取

wmtdaxz3  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(639)

我想试试这个 Match_Recognize 来自sql客户端的flink sql中的运算符。为此,我对源表进行了以下设置


# A typical table source definition looks like:

 - name: TaxiRides
   type: source
   update-mode: append
   connector: 
       type: filesystem
       path: "/home/bitnami/Match_Recognize/TaxiRide.csv"
   format: 
       type: csv
       fields:
           - name: rideId
             type: LONG
           - name: taxiId
             type: LONG
           - name: isStart
             type: BOOLEAN
           - name: lon
             type: FLOAT
           - name: lat
             type: FLOAT
           - name: rideTime
             type: TIMESTAMP
           - name: psgCnt
             type: INT
       line-delimiter: "\n"
       field-delimiter: ","

   schema:
    - name: rideId
      type: LONG
    - name: taxiId
      type: LONG
    - name: isStart
      type: BOOLEAN
    - name: lon
      type: FLOAT
    - name: lat
      type: FLOAT
    - name: rideTime
      type: TIMESTAMP
      rowtime:
        timestamps:
          type: "from-field"
          from: "eventTime"
        watermarks:
          type: "periodic-bounded"
          delay: "60000"
    - name: psgCnt
      type: INT

启动会话时,出现以下错误

Exception in thread "main" org.apache.flink.table.client.SqlClientException: The configured environment is invalid. Please check your environment files again.
        at org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:147)
        at org.apache.flink.table.client.SqlClient.start(SqlClient.java:99)
        at org.apache.flink.table.client.SqlClient.main(SqlClient.java:194)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context.
        at org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:562)
        at org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:382)
        at org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:144)
        ... 2 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: No factory supports all properties.

所以,我的问题是:是可以从文件中以表的形式读取源流,还是必须从kafka中读取?
更新:我正在使用flink版本1.9.1

eblbsuwk

eblbsuwk1#

不幸的是,您遇到了csv文件系统连接器的限制。此连接器不支持行时属性。
在1.10中,我们开始用一种稍微不同的方式来表达水印和时间属性。请参阅以获取参考:https://issues.apache.org/jira/browse/flink-14320.
您可以尝试从ddl创建带有水印声明的表,如下所述:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-表它只适用于blink planner(blink planner是从1.10版本开始的sql客户机中的默认实现)。
另一个选择是用csv格式阅读Kafka。
顺便说一句,Flink1.10改进了这个特殊的异常消息。从现在起,Flink将告诉有问题的属性。

相关问题