ksql left join不起作用

mum43rcc  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(261)

我是stackoverflow的新手,所以如果我有什么问题,请在这里发帖让我知道。
我试图找到答案,但找不到的ksql加入相关的问题在网站上,所以我张贴这个。我尝试了不同的方法来运行这个查询,但是我总是得到空指针异常,所以在这里发布它。
我有2个kafka avro主题deal&expense,但数据中有很多空白,我创建了下面的主题和带有修剪数据的表。 DEAL_STREAM 以及 EXPENSE_TABLE ```
ksql> describe EXPENSE_TABLE;

结果:

Field | Type
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
KSQL_COL_0 | VARCHAR(STRING)
KSQL_COL_1 | VARCHAR(STRING)
KSQL_COL_2 | VARCHAR(STRING)

ksql> describe deal_stream;

结果:

Field | Type
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
KSQL_COL_0 | VARCHAR(STRING)
KSQL_COL_1 | VARCHAR(STRING)
KSQL_COL_2 | VARCHAR(STRING)

当我执行以下查询时,它会给我空指针异常。我试着跟踪询问。
1:

ksql> CREATE STREAM deal_expense_new AS SELECT td.KSQL_COL_0 , te.KSQL_COL_1 FROM deal_stream td LEFT JOIN expense_table te ON td.KSQL_COL_0 = te.KSQL_COL_0;

2:

ksql> CREATE STREAM deal_expense_new AS SELECT td.KSQL_COL_0 AS KSQL_COL_0 , te.KSQL_COL_1 FROM deal_stream td LEFT JOIN expense_table te ON td.KSQL_COL_0 = te.KSQL_COL_0;

三:

CREATE STREAM deal_expense_trimmed AS SELECT td.KSQL_COL_0 AS KSQL_COL_0 , te.KSQL_COL_1 FROM deal_stream td LEFT JOIN expense_table te ON td.KSQL_COL_0 = te.KSQL_COL_0 where td.KSQL_COL_0 IS NOT NULL;

错误:

Message Stream created and running ksql> Exception in thread
"ksql_query_CSAS_DEAL_EXPENSE_NEW-01b2596a-3d2a-4d41-a823-0e345ec727fa-StreamThread-115"
java.lang.NullPointerException at
io.confluent.ksql.structured.SchemaKStream.lambda$selectKey$3(SchemaKStream.java:248)
at
org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:159)
at
org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:156)
at
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
at
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
at
org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:169)
at
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
at
org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:221)
at
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
at
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:422)
at
org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:924)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:804)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:756)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:726)
Exception in thread
"ksql_query_CSAS_DEAL_EXPENSE_NEW-01b2596a-3d2a-4d41-a823-0e345ec727fa-StreamThread-116"
java.lang.NullPointerException at
io.confluent.ksql.structured.SchemaKStream.lambda$selectKey$3(SchemaKStream.java:248)
at
org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:159)
at
org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:156)
at
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
at
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
at
org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:169)
at
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
at
org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:221)
at
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
at
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:422)
at
org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:924)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:804)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:756)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:726)
Exception in thread
"ksql_query_CSAS_DEAL_EXPENSE_NEW-01b2596a-3d2a-4d41-a823-0e345ec727fa-StreamThread-113"
java.lang.NullPointerException at
io.confluent.ksql.structured.SchemaKStream.lambda$selectKey$3(SchemaKStream.java:248)
at
org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:159)
at
org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:156)
at
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
at
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
at
org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:169)
at
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
at
org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:221)
at
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
at
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:422)
at
org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:924)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:804)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:756)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:726)
Exception in thread
"ksql_query_CSAS_DEAL_EXPENSE_NEW-01b2596a-3d2a-4d41-a823-0e345ec727fa-StreamThread-114"
java.lang.NullPointerException at
io.confluent.ksql.structured.SchemaKStream.lambda$selectKey$3(SchemaKStream.java:248)
at
org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:159)
at
org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:156)
at
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
at
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
at
org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:169)
at
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
at
org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:221)
at
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
at
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:422)
at
org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:924)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:804)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:756)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:726)

lsmd5eda

lsmd5eda1#

这个错误应该在最新的主机中修复。该修复程序将包含在下一个月度版本中。以下是github的问题:https://github.com/confluentinc/ksql/issues/521

相关问题