由pyspark代码中的java.lang.arrayindexoutofboundsexception:3引起

zzwlnbp8  于 2021-05-29  发布在  Spark
关注(0)|答案(0)|浏览(418)

我正在pyspark中运行以下代码。我可以打印dataframe的计数,也可以选择dataframe中的特定列,但是当我尝试在dataframe上使用show时,它抛出的数组超出了绑定异常。
请在这个问题上给予帮助


**Pyspark code**

from pyspark_llap import HiveWarehouseSession
from pyspark.sql.functions import lit,unix_timestamp
import datetime
import time
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import *
from pyspark.sql import functions as f
from pyspark.sql import SQLContext
hive = HiveWarehouseSession.session(spark).build()
hive.setDatabase("preprod_fms_con_ap")
data = hive.executeQuery("select * from rp2_tds_table_2")
df_test = data.filter(data.cpudt_cpudt >= "20191101")
hive = HiveWarehouseSession.session(spark).build()
hive.setDatabase("preprod_fms_trf_ap")
gl_master = hive.executeQuery("select * from preprod_fms_trf_ap.gl_master")

# Preprocessing

gl_master = gl_master.withColumn('gl_code', gl_master["gl_code"].cast("string"))
gl_master = gl_master.withColumn('gl_code',f.concat(f.lit('000'), f.col('gl_code')))
gl_master = gl_master.withColumnRenamed('gl_code','expense_gl_hkont')

# GL Mismatch

df_common_check_gl = df_test.join(gl_master,how='left',on=['server','expense_gl_hkont'])
df_common_check_gl = df_common_check_gl.withColumn('tds_section',when(col('tds_section').isNull(), "No GL - Section code combination found in Master").otherwise(col('tds_section')))
df_common_check_gl = df_common_check_gl.withColumn('section_wt_qscod',  array(lit(df_common_check_gl['section_wt_qscod'])))
df_common_check_gl = df_common_check_gl.withColumn('tds_section',  array(lit(df_common_check_gl['tds_section'])))
intersect = lambda type: (udf(lambda x, y: (list(set(x) & set(y)) if x is not None and y is not None else None),ArrayType(type)))
string_intersect = intersect(StringType())
anomaly_test4 = df_common_check_gl.where(size(string_intersect("section_wt_qscod", "tds_section")) <= 0)
anomaly_test4 = anomaly_test4.withColumn('anomalous_flag',lit('GL Mismatch'))
anomaly_test4 = anomaly_test4.withColumn('tds_section', anomaly_test4["tds_section"].cast("string"))
anomaly_test4 = anomaly_test4.withColumn('ml_remarks',concat(coalesce(lit("Section code in master:")), coalesce(col("tds_section"))))
anomaly_test4 = anomaly_test4.drop('tds_section','long_text','short_text','remarks')

if anomaly_test4.count() != 0:
    anomaly_test4 = anomaly_test4.withColumn('section_wt_qscod', concat_ws(",", "section_wt_qscod"))
    anomaly_test4 = anomaly_test4.withColumn('wht_rate_qsatz', concat_ws(",", "wht_rate_qsatz"))

test4_na = df_test.join(anomaly_test4,on=['server','company_code_bukrs','documentno_belnr','year_gjahr','expense_gl_hkont','vendor_customer_code_wt_acco','wht_type_witht','calculated_rate_caliculated_field','wht_code_wt_withcd','wht_rate_qsatz','tds_amount_wt_qbshh','tds_base_amount_wt_qsshh','exemption_number_wt_wtexmn','date_of_deduction_zzdeddat','po_ebeln','hsn_sac_hsn_sac','po_description_txz01','po_order_type_bsart','section_wt_qscod','posting_date_budat','document_date_bldat','document_type_blart','invoice_reference_no_xblnr','user_id_usnam','section_description_text40','wht_type_description_text40','wht_code_description_text40','gl_description_txt50','user_mail_id_smtp_addr','ap_persona_responsible_id_personresp','ap_code_ap_code','vendor_custome_name_name1','pan_j_1ipanno','hsn_sac_description_sacdesc','cpudt_cpudt'],how='left_anti')

test4_na = df_test.join(anomaly_test4,on=['server','expense_gl_hkont'],how='left_anti')

test4_na.show()

引发以下错误arrayindexoutofboundexception
:org.apache.spark.sparkexception:作业已中止
由于阶段失败:阶段78.0中的任务4失败了4次,最近的失败:阶段78.0中的任务4.3丢失(tid 20587,sjdcpphdpn4.ril.com,executor 2):java.lang.arrayindexoutofboundsexception:3位于org.apache.spark.sql.vectoried.columnarbatch.column(columnarbatch)。java:98)位于org.apache.spark.sql.catalyst.expressions.generatedclass$generateditorforcodegenstage1.datasourcev2scan\u nextbatch\u 0$(未知源代码)org.apache.spark.sql.catalyst.expressions.generatedclass$generateEditorForCodeGenStage1.processnext(未知源代码),位于org.apache.spark.sql.execution.bufferedrowiterator.hasnext(bufferedrowiterator)。java:43)在org.apache.spark.sql.execution.whistagecodegenexec$$anonfun$10$$anon$1.hasnext(whistagecodegenexec。scala:614)在scala.collection.iterator$$anon$11.hasnext(迭代器。scala:408)在org.apache.spark.shuffle.sort.unsafeshufflewriter.write(unsafeshufflewriter。java:187)在org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask。scala:96)在org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask。scala:53)在org.apache.spark.scheduler.task.run(任务。scala:109)在org.apache.spark.executor.executor$taskrunner.run(executor。scala:345)位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1142)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:617)在java.lang.thread.run(线程。java:745)
驱动程序stacktrace:org.apache.spark.scheduler.dagscheduler.org$apache$spark$scheduler$dagscheduler$$failjobandindependentstages(dagscheduler)。scala:1651)在org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler。scala:1639)在org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler。scala:1638) 在scala.collection.mutable.resizablearray$class.foreach(resizablearray。scala:59)在scala.collection.mutable.arraybuffer.foreach(arraybuffer。scala:48)在org.apache.spark.scheduler.dagscheduler.abortstage(dagscheduler。scala:1638)位于org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler)。scala:831)在org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler)。scala:831)在scala.option.foreach(option。scala:257)在org.apache.spark.scheduler.dagscheduler.handletasksetfailed(dagscheduler。scala:831)位于org.apache.spark.scheduler.dagschedulereventprocessloop.doonreceive(dagscheduler。scala:1872)在org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler。scala:1821)位于org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler。scala:1810)在org.apache.spark.util.eventloop$$anon$1.run(eventloop。scala:48)在org.apache.spark.scheduler.dagscheduler.runjob(dagscheduler。scala:642)在org.apache.spark.sparkcontext.runjob(sparkcontext。scala:2034)在org.apache.spark.sparkcontext.runjob(sparkcontext。scala:2055)在org.apache.spark.sparkcontext.runjob(sparkcontext。scala:2074)在org.apache.spark.sql.execution.sparkplan.executetake(sparkplan。scala:363)在org.apache.spark.sql.execution.collectlimitexec.executecollect(限制。scala:38)org.apache.spark.sql.dataset.org$apache$spark$sql$dataset$$collectfromplan(数据集)。scala:3278)在org.apache.spark.sql.dataset$$anonfun$head$1.apply(dataset。scala:2489)在org.apache.spark.sql.dataset$$anonfun$head$1.apply(dataset。scala:2489)在org.apache.spark.sql.dataset$$anonfun$52.apply(数据集。scala:3259)位于org.apache.spark.sql.execution.sqlexecution$.withnewexecutionid(sqlexecution)。scala:77)在org.apache.spark.sql.dataset.withaction(dataset。scala:3258)在org.apache.spark.sql.dataset.head(dataset。scala:2489)在org.apache.spark.sql.dataset.take(dataset。scala:2703)在org.apache.spark.sql.dataset.showstring(数据集。scala:254)在sun.reflect.nativemethodaccessorimpl.invoke0(本机方法)在sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl)。java:62)在sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl。java:43)在java.lang.reflect.method.invoke(方法。java:498)在py4j.reflection.methodinvoker.invoke(methodinvoker。java:244)在py4j.reflection.reflectionengine.invoke(reflectionengine。java:357)在py4j.gateway.invoke(gateway。java:282)在py4j.commands.abstractcommand.invokemethod(abstractcommand。java:132)在py4j.commands.callcommand.execute(callcommand。java:79)在py4j.gatewayconnection.run(网关连接。java:238)在java.lang.thread.run(线程。java:745)引起原因:java.lang.arrayindexoutofboundsexception:3 at org.apache.spark.sql.vectoried.columnarbatch.column(columnarbatch)。java:98)在org.apache.spark.sql.catalyst.expressions.generatedclass$generatediteratorforcodegenstage1.datasourcev2scan\u nextbatch\u 0$(未知源),位于org.apache.spark.sql.catalyst.expressions.generatedclass$generatediteratorforcodegenstage1.processnext(未知源),位于org.apache.spark.sql.execution.bufferedrowterator.hasnext(bufferedrowterator)。java:43) 在org.apache.spark.sql.execution.whistagecodegenexec$$anonfun$10$$anon$1.hasnext(whistagecodegenexec。scala:614)在scala.collection.iterator$$anon$11.hasnext(iterator。scala:408)在org.apache.spark.shuffle.sort.unsafeshufflewriter.write(unsafeshufflewriter。java:187)在org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask。scala:96) 在org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask。scala:53)在org.apache.spark.scheduler.task.run(task。scala:109)在org.apache.spark.executor.executor$taskrunner.run(executor。scala:345)位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1142)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:61

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题