使用azuredatabricks创建值和时间戳Dataframe时出错

kzmpq1sx  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(268)

我不太熟悉spark,但我不得不用它来消耗一些数据。我已经尝试了我能找到的所有语法来创建一个Dataframe,其中包含一个值和一个时间戳,当我从数据源获得更新时,我可以将其放入数据库中进行跟踪。错误是无止境的,我没有想法,也没有理由解释为什么我不能把事情做得这么简单。下面是我尝试使用的代码示例

sc = spark.sparkContext
df = sc.parallelize([[1,pyspark.sql.functions.current_timestamp()]]).toDF(("Value","CreatedAt"))

这个错误并没有真正的帮助

py4j.Py4JException: Method __getstate__([]) does not exist
 ---------------------------------------------------------------------------
 Py4JError                                 Traceback (most recent call last)
 <command-1699228214903488> in <module>
      29 
      30 sc = spark.sparkContext
 ---> 31 df = sc.parallelize([[1,pyspark.sql.functions.current_timestamp()]]).toDF(("Value","CreatedAt"))

 /databricks/spark/python/pyspark/context.py in parallelize(self, c, numSlices)
     557                 return self._jvm.PythonParallelizeServer(self._jsc.sc(), numSlices)
     558 
 --> 559             jrdd = self._serialize_to_jvm(c, serializer, reader_func, createRDDServer)
     560 
     561         return RDD(jrdd, self, serializer)

 /databricks/spark/python/pyspark/context.py in _serialize_to_jvm(self, data, serializer, reader_func, createRDDServer)
     590             try:
     591                 try:
 --> 592                     serializer.dump_stream(data, tempFile)
     593                 finally:
     594                     tempFile.close()

我也试过这个

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc) # sc is the spark context

df = sqlContext.createDataFrame(
     [( current_timestamp(), '12a345')],
     ['CreatedAt','Value'] # the row header/column labels should be entered here
)

带着错误

AssertionError: dataType <py4j.java_gateway.JavaMember object at 0x7f43d97c6ba8> should be an instance of <class 'pyspark.sql.types.DataType'>
---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
<command-2294571935273349> in <module>
     33 df = sqlContext.createDataFrame(
     34     [( current_timestamp(), '12a345')],
---> 35     ['CreatedAt','Value'] # the row header/column labels should be entered here
     36 )
     37 

/databricks/spark/python/pyspark/sql/context.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
    305         Py4JJavaError: ...
    306         """
--> 307         return self.sparkSession.createDataFrame(data, schema, samplingRatio, verifySchema)
    308 
    309     @since(1.3)

/databricks/spark/python/pyspark/sql/session.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
    815                 rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
    816             else:
--> 817                 rdd, schema = self._createFromLocal(map(prepare, data), schema)
    818             jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
krugob8w

krugob8w1#

好吧,我最终会编写一些代码。不过,我无法让它与timestamptype()一起工作,在插入数据时,spark会跳转。我认为这可能是运行时错误,而不是编码问题。

import adal
 import datetime;
 from pyspark.sql.types import *

 # Set Access Token
 access_token = token["accessToken"]
 from pyspark.sql import SQLContext
 sqlContext = SQLContext(sc) # sc is the spark context

 schema = StructType([
     StructField("CreatedAt", StringType(), True),
     StructField("value", StringType(), True)
 ])

 da =  datetime.datetime.now().strftime("%m/%d/%Y %H:%M:%S")

 df = sqlContext.createDataFrame(
     [(da,'12a345')],schema
 )                            

 df.write \
   .format("com.microsoft.sqlserver.jdbc.spark") \
   .option("url", url)\
   .option("dbtable", "dbo.RunStart")\
   .option("accessToken", access_token)\
   .option("databaseName", database_name) \
   .option("encrypt", "true")\
   .option("hostNameInCertificate", "*.database.windows.net")\
   .option("applicationintent", "ReadWrite") \
   .mode("append") \
   .save()

相关问题