我想试试这个 Match_Recognize
来自sql客户端的flink sql中的运算符。为此,我对源表进行了以下设置
# A typical table source definition looks like:
- name: TaxiRides
type: source
update-mode: append
connector:
type: filesystem
path: "/home/bitnami/Match_Recognize/TaxiRide.csv"
format:
type: csv
fields:
- name: rideId
type: LONG
- name: taxiId
type: LONG
- name: isStart
type: BOOLEAN
- name: lon
type: FLOAT
- name: lat
type: FLOAT
- name: rideTime
type: TIMESTAMP
- name: psgCnt
type: INT
line-delimiter: "\n"
field-delimiter: ","
schema:
- name: rideId
type: LONG
- name: taxiId
type: LONG
- name: isStart
type: BOOLEAN
- name: lon
type: FLOAT
- name: lat
type: FLOAT
- name: rideTime
type: TIMESTAMP
rowtime:
timestamps:
type: "from-field"
from: "eventTime"
watermarks:
type: "periodic-bounded"
delay: "60000"
- name: psgCnt
type: INT
启动会话时,出现以下错误
Exception in thread "main" org.apache.flink.table.client.SqlClientException: The configured environment is invalid. Please check your environment files again.
at org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:147)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:99)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:194)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context.
at org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:562)
at org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:382)
at org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:144)
... 2 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.
Reason: No factory supports all properties.
所以,我的问题是:是可以从文件中以表的形式读取源流,还是必须从kafka中读取?
更新:我正在使用flink版本1.9.1
1条答案
按热度按时间eblbsuwk1#
不幸的是,您遇到了csv文件系统连接器的限制。此连接器不支持行时属性。
在1.10中,我们开始用一种稍微不同的方式来表达水印和时间属性。请参阅以获取参考:https://issues.apache.org/jira/browse/flink-14320.
您可以尝试从ddl创建带有水印声明的表,如下所述:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-表它只适用于blink planner(blink planner是从1.10版本开始的sql客户机中的默认实现)。
另一个选择是用csv格式阅读Kafka。
顺便说一句,Flink1.10改进了这个特殊的异常消息。从现在起,Flink将告诉有问题的属性。