spark,cassandra,流媒体,python,错误,数据库,kafka

hsvhsicv  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(222)

我试图保存我的流式数据从SparkCassandra,Spark是连接到Kafka和它的工作正常,但保存到Cassandra它让我变得疯狂。我用的是spark 2.0.2,kafka 0.10和cassandra 2.23,
这就是我服从spark的方式

spark-submit --verbose --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0 --jars /tmp/pyspark-cassandra-0.3.5.jar --driver-class-path /tmp/pyspark-cassandra-0.3.5.jar --py-files /tmp/pyspark-cassandra-0.3.5.jar --conf spark.cassandra.connection.host=localhost /tmp/direct_kafka_wordcount5.py localhost:9092 testing

这是我的代码,它只是对spark示例的一点修改,它可以工作,但我不能将这些数据保存到cassandra。。。。
这就是我想做的,但只是计算结果http://rustyrazorblade.com/2015/05/spark-streaming-with-python-and-kafka/

from __future__ import print_function
import sys
import os
import time
import pyspark_cassandra
import pyspark_cassandra.streaming
from pyspark_cassandra import CassandraSparkContext
import urllib
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
from pyspark.sql.functions import from_unixtime, unix_timestamp, min, max
from pyspark.sql.types import FloatType
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: direct_kafka_wordcount.py <broker_list> <topic>", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 1)
    sqlContext = SQLContext(sc)
    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    lines = kvs.map(lambda x: x[1])
    counts=lines.count()
    counts.saveToCassandra("spark", "count")
    counts.pprint()
    ssc.start()
    ssc.awaitTermination()

我犯了这个错误,
traceback(最近一次调用last):file“/tmp/direct\u kafka\u wordcount5.py”,第88行,counts.savetocassandra(“spark”,“count”)

yv5phkfx

yv5phkfx1#

pyspark casasndra不久前停止更新,最新版本仅支持spark 1.6https://github.com/targetholding/pyspark-cassandra
另外

counts=lines.count() // Returns data to the driver (not an RDD)
``` `counts` 现在是整数。这意味着功能 `saveToCassandra` 不适用,因为这是RDD的函数

相关问题