无法连接到ksql中的外部主题

cbjzeqam  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(290)

我对融合ksql非常陌生,但对Kafka并不陌生。我有Kafka中作为avro序列化数据存在的现有主题。我已经启动并运行了合流模式注册表,并配置ksql以指向注册表。
当我试图根据我的一个主题创建一个表时,ksql抱怨它找不到流。当我尝试在ksql中创建一个流,该流只是在ksql中对我的主题进行流式处理时,似乎无法指向注册表中有引用的avro序列化主题。
有人知道如何解决这两个问题吗?我想使用ksql的方式是否不适合它的功能?
更新
这里有更多的细节

ksql> show topics;

 Kafka Topic                                                                                 | Registered | Partitions | Partition Replicas | Consumers | Consumer Groups
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 COM_FINDOLOGY_MODEL_REPORTING_OUTGOINGFEEDADVERTISERSEARCHDATA                              | false      | 2          | 2                  | 0         | 0
 COM_FINDOLOGY_MODEL_TRAFFIC_CPATRACKINGCALLBACK                                             | false      | 2          | 2                  | 0         | 0
 COM_FINDOLOGY_MODEL_TRAFFIC_ENTRYPOINTCLICK                                                 | true       | 10         | 3                  | 0         | 0

ksql配置


# bootstrap.servers=localhost:9092

bootstrap.servers=host1:9092,host2:9092,host3:9092,host4:9092,host5:9092

# listeners=http://localhost:8088

listeners=http://localhost:59093

ksql.server.ui.enabled=true

ksql.schema.registry.url=http://host1:59092

注册表配置


# The host name advertised in ZooKeeper. Make sure to set this if running Schema Registry with multiple nodes.

host.name: x.x.x.x
listeners=http://0.0.0.0:59092

# Zookeeper connection string for the Zookeeper cluster used by your Kafka cluster

# (see zookeeper docs for details).

# This is a comma separated host:port pairs, each corresponding to a zk

# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".

# kafkastore.connection.url=localhost:2181

# Alternatively, Schema Registry can now operate without Zookeeper, handling all coordination via

# Kafka brokers. Use this setting to specify the bootstrap servers for your Kafka cluster and it

# will be used both for selecting the master schema registry instance and for storing the data for

# registered schemas.

# (Note that you cannot mix the two modes; use this mode only on new deployments or by shutting down

# all instances, switching to the new configuration, and then starting the schema registry

# instances again.)

kafkastore.bootstrap.servers=PLAINTEXT://host1:9092,PLAINTEXT://host2:9092,PLAINTEXT://host3:9092,PLAINTEXT://host4:9092,PLAINTEXT://host5:9092

# The name of the topic to store schemas in

kafkastore.topic=_schemas

# If true, API requests that fail will include extra debugging information, including stack traces

debug=false

试图通过声明外部主题来解决问题

ksql> register  topic xxx with (value_format='avro', kafka_topic='COM_FINDOLOGY_MODEL_REPORTING_OUTGOINGFEEDADVERTISERSEARCHDATA');
You need to provide avro schema file path for topics in avro format.
dxpyg8gm

dxpyg8gm1#

我解决了问题后,我改变了什么信息,我从Kafka主题使用,而不是使用整个主题的内容。本主题包含使用 ReflectionData . KSql 在处理流中的非标准项时有问题,但只要存在相应的ksql数据类型,就可以处理reflectiondata项。我通过在ksql中创建一个新的流来解决这个问题,该流只选择了我所需要的与ksql兼容的项。当这一切完成后,我可以处理我需要从更大的流。
注解我认为ksql中的一个缺陷是您必须在其中创建新的实际中介主题 Kafka 处理数据。我认为更好的解决方案是将中介流作为 View 进入真正的溪流。我理解,在将其解析为ktable之前,需要中介主题来保存累积和处理的项。

30byixjq

30byixjq2#

REGISTER TOPIC 已弃用语法。你应该使用 CREATE STREAM (或 CREATE TABLE ,具体取决于您的数据访问要求)。
所以你的陈述应该是这样的:

CREATE STREAM MY_STREAM_1 \
  WITH (VALUE_FORMAT='AVRO', \
  KAFKA_TOPIC='COM_FINDOLOGY_MODEL_REPORTING_OUTGOINGFEEDADVERTISERSEARCHDATA');

注意我用过 \ 为了可读性而断章取义;你不必这么做。

相关问题