找不到数据源:kafka使用docker all spark笔记本spark 3.1.1版

new9mtju  于 2021-07-09  发布在  Spark
关注(0)|答案(0)|浏览(222)

我对spark和kafka非常陌生,我正在尝试用python(jupyter)运行一些示例代码,使用docker compose下载、配置和执行docker图像。我正在通过Windows10WSL运行Ubuntu20.04。
以下是我迄今为止遵循的步骤:
使用下面显示的docker-compose.yml文件,我运行了[docker-compose up-d]
所有容器成功启动
我开始用http://127.0.0.1:8888/?令牌=
我逐行执行python脚本,它在

.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "cloud") \
    .option("startingOffsets", "earliest") \
    .load()```

出现错误时: error preamble... AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide". 我显然错过了一个步骤,我已经徒劳地寻找解决办法。对此问题的其他答案建议添加: import os os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1 pyspark-shell' 这对我不起作用。我错过了哪一步?
docker-compose.yml公司

---
version: '2'
services:
  spark:
    image: jupyter/all-spark-notebook:latest
    ports:
      - "8888:8888"
    working_dir: /home/$USER/work
    volumes:
      - $PWD/work:/home/$USER/work

  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      KAFKA_OPTS: "-Dzookeeper.4lw.commands.whitelist=*"

  kafka:
    image: confluentinc/cp-kafka:latest
    hostname: kafka
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  connect:
    image: confluentinc/cp-kafka-connect:latest
    hostname: connect
    container_name: connect
    depends_on:
      - zookeeper
      - kafka
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka:9092
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_ZOOKEEPER_CONNECT: zookeeper:2181
      CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars
      CONNECT_LOG4J_LOGGERS: org.apache.kafka.connect=DEBUG
    command:
      - bash
      - -c
      - |
        echo "Installing connector plugins"
        confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest
        cd /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib
        curl http://repo.odysseusinc.com/artifactory/community-libs-release-local/org/netezza/nzjdbc/1.0/nzjdbc-1.0.jar -o nzjdbc-1.0.jar
        /etc/confluent/docker/run &
        sleep infinity

python代码


# !/usr/bin/env python

# coding: utf-8

# In[ ]:

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1 pyspark-shell'

# In[ ]:

# Spark

import pyspark
import findspark
from pyspark import SparkContext
from pyspark.sql import SparkSession

# Spark Streaming

from pyspark.streaming import StreamingContext

# json parsing

import json

# In[ ]:

findspark.init()

# In[ ]:

spark = SparkSession     .builder     .appName("My App")     .getOrCreate()

# .config("spark.jars", "/path/to/jar.jar,/path/to/another/jar.jar") \

# In[ ]:

spark.version

# In[ ]:

df = spark         .readStream         .format("kafka")         .option("kafka.bootstrap.servers", "localhost:9092")         .option("subscribe", "cloud")         .option("startingOffsets", "earliest")         .load()

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题