客户端应用程序尝试在kafka上生成主题,但出现问题,并且不返回任何错误消息,也不确定

7rfyedvj  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(365)

我们将Kafka和zookeeper吊舱部署在kubernetes集群上。这两个连接得很好。但是当我们想通过客户端应用程序生成一个主题时,put请求被挂起,并且在很长时间之后没有消息返回!如何调试这种情况?这个 .yaml kafka和zookeeper以及客户端应用程序的文件如下:
Kafka.亚马尔

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  annotations:
    kompose.cmd: kompose convert
    kompose.version: 1.18.0 (06a2e56)
  creationTimestamp: null

  labels:
    io.kompose.service: kafka
  name: kafka
spec:
  replicas: 1
  strategy:
    type: Recreate
  template:
    metadata:
      creationTimestamp: null
      labels:
        io.kompose.service: kafka
    spec:
      containers:
      - env:
        - name: KAFKA_ADVERTISED_HOST_NAME
          value: kafka
        - name: KAFKA_ADVERTISED_PORT
          value: "9092"
        - name: KAFKA_CREATE_TOPICS
          value: newsrawdata:1:1
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: 192.168.88.42:30573
        - name: KAFKA_PORT
          value: "9092"
        - name: KAFKA_ZOOKEEPER_CONNECT_TIMEOUT_MS
          value: "1000"
        image: wurstmeister/kafka
        name: kafka
        ports:
        - containerPort: 9092
        - containerPort: 9094
        resources: {}
        volumeMounts:
        - mountPath: /var/run/docker.sock
          name: kafka-claim0
      hostname: kafka
      restartPolicy: Always
      volumes:
      - name: kafka-claim0
        persistentVolumeClaim:
          claimName: kafka-claim0
status: {}

Zookeeper.yaml:

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  annotations:
    kompose.cmd: kompose convert
    kompose.version: 1.18.0 (06a2e56)
  creationTimestamp: null
  labels:
    io.kompose.service: zookeeper
  name: zookeeper
spec:
  replicas: 1
  strategy: {}
  template:
    metadata:
      creationTimestamp: null
      labels:
        io.kompose.service: zookeeper
    spec:
      containers:
      - env:
        - name: ALLOW_ANONYMOUS_LOGIN
          value: "yes"
        image: wurstmeister/zookeeper
        name: zookeeper
        ports:
        - containerPort: 2181
        resources: {}
      hostname: zookeeper
      restartPolicy: Always
status: {}

附录1:

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  labels:
    service: broker-service
  name: broker-app
spec:
  replicas: 1
  template:
    metadata:
      labels:
        service: broker-service
    spec:
      imagePullSecrets:
      - name: pullsecret
      containers:
      - env:
        - name: OHH_COMMON_REDEPLOY
          value: THIS_WILL_BE_REPLACED
        - name: ASPNETCORE_ENVIRONMENT
          value: docker
        image: localgitlabregistry/broker.app:v0.01
        name: broker-app
        imagePullPolicy: "Always"
        ports:
        - containerPort: 80
        - containerPort: 443
      nodeSelector:
        role: slave1
      restartPolicy: Always

服务如下:
kafka-service.yaml:

apiVersion: v1
kind: Service
metadata:
  annotations:
    kompose.cmd: kompose convert
    kompose.version: 1.18.0 (06a2e56)
  creationTimestamp: null
  labels:
    io.kompose.service: kafka
  name: kafka
spec:
  ports:
  - name: "9092"
    port: 9092
    targetPort: 9092
  - name: "9094"
    port: 9094
    targetPort: 9094
  clusterIP: None

# type: NodePort

  selector:
    io.kompose.service: kafka
status:
  loadBalancer: {}

Zookeeper服务.yaml

apiVersion: v1
kind: Service
metadata:
  annotations:
    kompose.cmd: kompose convert
    kompose.version: 1.18.0 (06a2e56)
  creationTimestamp: null
  labels:
    io.kompose.service: zookeeper
  name: zookeeper
spec:
  ports:
  - name: "2181"
    port: 2181
    targetPort: 2181
  selector:
    io.kompose.service: zookeeper
status:
  loadBalancer: {}

应用服务.yaml

apiVersion: v1
kind: Service
metadata:
  labels:
    service: broker-service
  name: broker-service
spec:
  ports:
  - name: "57270"
    port: 80
    targetPort: 80
  - name: "44348"
    port: 443
    targetPort: 443
  selector:
    service: broker-service
  type: 
    NodePort

Kafka吊舱的原木如下:

waiting for kafka to be ready
[Configuring] 'advertised.port' in '/opt/kafka/config/server.properties'
Excluding KAFKA_HOME from broker config
[Configuring] 'advertised.host.name' in '/opt/kafka/config/server.properties'
[Configuring] 'port' in '/opt/kafka/config/server.properties'
[Configuring] 'broker.id' in '/opt/kafka/config/server.properties'
Excluding KAFKA_VERSION from broker config
[Configuring] 'zookeeper.connect' in '/opt/kafka/config/server.properties'
[Configuring] 'log.dirs' in '/opt/kafka/config/server.properties'
[Configuring] 'zookeeper.connect.timeout.ms' in '/opt/kafka/config/server.properties'
[2019-09-29 08:06:56,783] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2019-09-29 08:06:57,767] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
[2019-09-29 08:06:57,768] INFO starting (kafka.server.KafkaServer)
[2019-09-29 08:06:57,769] INFO Connecting to zookeeper on 192.168.88.42:30573 (kafka.server.KafkaServer)
[2019-09-29 08:06:57,796] INFO [ZooKeeperClient Kafka server] Initializing a new session to 

.
.
.
[2019-09-29 08:06:57,804] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,804] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,804] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,804] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,804] INFO Client environment:os.version=4.4.0-116-generic (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,804] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,804] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,804] INFO Client environment:user.dir=/ (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,806] INFO Initiating client connection, connectString=192.168.88.42:30573 sessionTimeout=6000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@2667f029 (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,822] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
[2019-09-29 08:06:57,847] INFO Opening socket connection to server 192.168.88.42/192.168.88.42:30573. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2019-09-29 08:06:57,865] INFO Socket connection established to 192.168.88.42/192.168.88.42:30573, initiating session (org.apache.zookeeper.ClientCnxn)
[2019-09-29 08:06:57,880] INFO Session establishment complete on server 192.168.88.42/192.168.88.42:30573, sessionid = 0x10005366a620042, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2019-09-29 08:06:57,886] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
[2019-09-29 08:06:58,448] INFO Cluster ID = b8bTvrC2T6iidAcNqD482A (kafka.server.KafkaServer)
[2019-09-29 08:06:58,455] WARN No meta.properties file under dir /kafka/kafka-logs-kafka/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2019-09-29 08:06:58,632] INFO KafkaConfig values: 
    advertised.host.name = kafka
    advertised.listeners = null
    advertised.port = 9092
    alter.config.policy.class.name = null
    alter.log.dirs.replication.quota.window.num = 11
    alter.log.dirs.replication.quota.window.size.seconds = 1
    authorizer.class.name = 
    auto.create.topics.enable = true
    auto.leader.rebalance.enable = true
.
.
.
    zookeeper.connect = 192.168.88.42:30573
    zookeeper.connection.timeout.ms = 6000
    zookeeper.max.in.flight.requests = 10
    zookeeper.session.timeout.ms = 6000
    zookeeper.set.acl = false
    zookeeper.sync.time.ms = 2000
 (kafka.server.KafkaConfig)
[2019-09-29 08:06:58,659] INFO KafkaConfig values: 
    advertised.host.name = kafka
    advertised.listeners = null
    advertised.port = 9092
    alter.config.policy.class.name = null
    alter.log.dirs.replication.quota.window.num = 11
    alter.log.dirs.replication.quota.window.size.seconds = 1
    authorizer.class.name = 
    auto.create.topics.enable = true
    auto.leader.rebalance.enable = true

    kafka.metrics.reporters = []
    leader.imbalance.check.interval.seconds = 300
    leader.imbalance.per.broker.percentage = 10
    listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
    listeners = null
    log.cleaner.backoff.ms = 15000
    log.cleaner.dedupe.buffer.size = 134217728
    log.cleaner.delete.retention.ms = 86400000
    log.cleaner.enable = true
    log.cleaner.io.buffer.load.factor = 0.9
    log.cleaner.io.buffer.size = 524288
    log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
    log.cleaner.max.compaction.lag.ms = 9223372036854775807
    log.cleaner.min.cleanable.ratio = 0.5
    log.cleaner.min.compaction.lag.ms = 0
    log.cleaner.threads = 1
    log.cleanup.policy = [delete]
    log.dir = /tmp/kafka-logs
        .
.
.
    unclean.leader.election.enable = false
    zookeeper.connect = 192.168.88.42:30573
    zookeeper.connection.timeout.ms = 6000
    zookeeper.max.in.flight.requests = 10
    zookeeper.session.timeout.ms = 6000
    zookeeper.set.acl = false
    zookeeper.sync.time.ms = 2000
 (kafka.server.KafkaConfig)
[2019-09-29 08:06:58,721] INFO [ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2019-09-29 08:06:58,722] INFO [ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2019-09-29 08:06:58,724] INFO [ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2019-09-29 08:06:58,797] INFO Log directory /kafka/kafka-logs-kafka not found, creating it. (kafka.log.LogManager)
[2019-09-29 08:06:58,814] INFO Loading logs. (kafka.log.LogManager)
[2019-09-29 08:06:58,834] INFO Logs loading complete in 20 ms. (kafka.log.LogManager)
[2019-09-29 08:06:58,869] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2019-09-29 08:06:58,877] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2019-09-29 08:06:59,505] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2019-09-29 08:06:59,549] INFO [SocketServer brokerId=1033] Created data-plane acceptor and processors for endpoint : EndPoint(null,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.network.SocketServer)
[2019-09-29 08:06:59,550] INFO [SocketServer brokerId=1033] Started 1 acceptor threads for data-plane (kafka.network.SocketServer)
[2019-09-29 08:06:59,587] INFO [ExpirationReaper-1033-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-09-29 08:06:59,590] INFO [ExpirationReaper-1033-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-09-29 08:06:59,590] INFO [ExpirationReaper-1033-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-09-29 08:06:59,600] INFO [ExpirationReaper-1033-ElectPreferredLeader]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-09-29 08:06:59,614] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler)
[2019-09-29 08:06:59,716] INFO Creating /brokers/ids/1033 (is it secure? false) (kafka.zk.KafkaZkClient)
[2019-09-29 08:06:59,743] INFO Stat of the created znode at /brokers/ids/1033 is: 776,776,1569744419734,1569744419734,1,0,0,72063325309108290,180,0,776
 (kafka.zk.KafkaZkClient)
[2019-09-29 08:06:59,745] INFO Registered broker 1033 at path /brokers/ids/1033 with addresses: ArrayBuffer(EndPoint(kafka,9092,ListenerName(PLAINTEXT),PLAINTEXT)), czxid (broker epoch): 776 (kafka.zk.KafkaZkClient)
[2019-09-29 08:06:59,748] WARN No meta.properties file under dir /kafka/kafka-logs-kafka/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2019-09-29 08:06:59,882] INFO [ExpirationReaper-1033-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-09-29 08:06:59,888] INFO [ExpirationReaper-1033-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-09-29 08:06:59,895] INFO [ExpirationReaper-1033-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-09-29 08:06:59,940] INFO [GroupCoordinator 1033]: Starting up. (kafka.coordinator.group.GroupCoordinator)
[2019-09-29 08:06:59,949] INFO [GroupCoordinator 1033]: Startup complete. (kafka.coordinator.group.GroupCoordinator)
[2019-09-29 08:06:59,961] INFO [GroupMetadataManager brokerId=1033] Removed 0 expired offsets in 17 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-09-29 08:06:59,990] INFO [ProducerId Manager 1033]: Acquired new producerId block (brokerId:1033,blockStartProducerId:21000,blockEndProducerId:21999) by writing to Zk with path version 22 (kafka.coordinator.transaction.ProducerIdManager)
[2019-09-29 08:07:00,044] INFO [TransactionCoordinator id=1033] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-29 08:07:00,056] INFO [Transaction Marker Channel Manager 1033]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2019-09-29 08:07:00,061] INFO [TransactionCoordinator id=1033] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-29 08:07:00,207] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2019-09-29 08:07:00,289] INFO [SocketServer brokerId=1033] Started data-plane processors for 1 acceptors (kafka.network.SocketServer)
[2019-09-29 08:07:00,326] INFO Kafka version: 2.3.0 (org.apache.kafka.common.utils.AppInfoParser)
[2019-09-29 08:07:00,326] INFO Kafka commitId: fc1aaa116b661c8a (org.apache.kafka.common.utils.AppInfoParser)
[2019-09-29 08:07:00,326] INFO Kafka startTimeMs: 1569744420299 (org.apache.kafka.common.utils.AppInfoParser)
[2019-09-29 08:07:00,341] INFO [KafkaServer id=1033] started (kafka.server.KafkaServer)
creating topics: newsrawdata:1:1

zookeeper吊舱的日志:

2019-09-29 08:06:58,003 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x10005366a620042 type:create cxid:0xd zxid:0x306 txntype:-1 reqpath:n/a Error Path:/config/brokers Error:KeeperErrorCode = NodeExists for /config/brokers
2019-09-29 08:07:00,421 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@596] - Got user-level KeeperException when processing sessionid:0x10005366a620042 type:multi cxid:0x3f zxid:0x30d txntype:-1 reqpath:n/a aborting remaining multi ops. Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election
2019-09-29 08:07:07,512 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /10.44.0.0:39244
2019-09-29 08:07:07,519 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@949] - Client attempting to establish new session at /10.44.0.0:39244
2019-09-29 08:07:07,521 [myid:] - INFO  [SyncThread:0:ZooKeeperServer@694] - Established session 0x10005366a620043 with negotiated timeout 30000 for client /10.44.0.0:39244
2019-09-29 08:07:08,034 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@487] - Processed session termination for sessionid: 0x10005366a620043
2019-09-29 08:07:08,045 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1056] - Closed socket connection for client /10.44.0.0:39244 which had sessionid 0x10005366a620043
2019-09-29 08:07:13,180 [myid:] - INFO  [PurgeTask:DatadirCleanupManager$PurgeTask@138] - Purge task started.
2019-09-29 08:07:13,181 [myid:] - INFO  [PurgeTask:DatadirCleanupManager$PurgeTask@144] - Purge task completed.
2019-09-29 09:07:13,180 [myid:] - INFO  [PurgeTask:DatadirCleanupManager$PurgeTask@138] - Purge task started.
2019-09-29 09:07:13,182 [myid:] - INFO  [PurgeTask:DatadirCleanupManager$PurgeTask@144] - Purge task completed.

从客户端应用程序登录:

Kafka Ip Server:kafka:9092
warn: Microsoft.AspNetCore.DataProtection.KeyManagement.XmlKeyManager[35]
      No XML encryptor configured. Key {16b9a9aa-732a-47ab-bd31-ce341be7f812} may be persisted to storage in unencrypted form.
Hosting environment: docker
Content root path: /app
Now listening on: http://[::]:80
Application started. Press Ctrl+C to shut down.

我们将“bootstrapserver”设置为kafka:9092“在客户端应用程序中。似乎客户端cat解析了集群中的kafka,并看到了kafka pod的ip,但是在发送put请求时没有发生任何事件。值得注意的是,通过使用kubernetes集群的这个配置和docker compose,它可以像预期的那样工作!这种配置有什么问题?

ukqbszuj

ukqbszuj1#

然后确保您的节点具有适当的Selector:部署代理的节点必须具有适当的Selectorrole:slave1 seletector. 否则,只需从代理部署文件中删除带有nodeselector的行。
然后在配置文件的spec中添加行:

selector:
    matchLabels:
      io.kompose.service: kafka

这是给Kafka的
您不必为服务添加标签,指定选择器就足够了,所以请从服务配置文件中删除标签字段。
然后在kafka部署配置文件chane行中:

name: KAFKA_ZOOKEEPER_CONNECT
          #value: 192.168.88.42:30573
           value: your_zookeeper_service_ip:2181

行值应包括zookeeper服务的ip和端口2181,如果zookeeper服务的ip为192.168.88.42,则值是适当的。

pbossiut

pbossiut2#

实际上,问题与Kafka配置(Kafka\u广告\u监听器、Kafka\u中介\u监听器\u名称、Kafka\u监听器)有关。不管怎样,下面的deployemnt.yaml配置现在对我来说工作正常,没有对服务文件进行任何更改。

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  annotations:
  labels:
    io.kompose.service: kafka
  name: kafka
spec:
  replicas: 1
  strategy:
    type: Recreate
  template:
    metadata:
      labels:
        io.kompose.service: kafka
    spec:
      containers:
      - env:
        - name: KAFKA_ADVERTISED_HOST_NAME
          value: kafka
        - name: KAFKA_ADVERTISED_PORT
          value: "9092"
        - name: KAFKA_ADVERTISED_LISTENERS
          value: INSIDE://:9092
        - name: KAFKA_CREATE_TOPICS
          value: newsrawdata:1:1
        - name: KAFKA_INTER_BROKER_LISTENER_NAME
          value: INSIDE
        - name: KAFKA_LISTENERS
          value: INSIDE://:9092
        - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
          value: INSIDE:PLAINTEXT
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: 192.168.88.207:30573
        - name: KAFKA_PORT
          value: "9092"
        - name: KAFKA_ZOOKEEPER_CONNECT_TIMEOUT_MS
          value: "1000"
        image: wurstmeister/kafka
        name: kafka
        ports:
        - containerPort: 9092
        - containerPort: 9094
      hostname: kafka
      restartPolicy: Always

Kafka示例当前连接到zookeeper,客户端应用程序可以正确生成和消费Kafka。

相关问题