spark在写入elasticsearch时不支持arraylist吗?

l3zydbqr  于 2021-05-29  发布在  Hadoop
关注(0)|答案(3)|浏览(489)

我有以下结构:

mylist = [{"key1":"val1"}, {"key2":"val2"}]
myrdd = value_counts.map(lambda item: ('key', { 
    'field': somelist 
}))

我得到错误:15/02/10 15:54:08 info scheduler.tasksetmanager:在executor ip-10-80-15-145.ec2.internal上的stage 2.0(tid 6)中丢失了task 1.0(不能使用java.util.arraylist类型的数据)[duplicate 1]

rdd.saveAsNewAPIHadoopFile( 
            path='-', 
            outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", 
            keyClass="org.apache.hadoop.io.NullWritable", 
            valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
            conf={ 
        "es.nodes" : "localhost", 
        "es.port" : "9200", 
        "es.resource" : "mboyd/mboydtype" 
    })

当我将文档写入es时,我希望它的结果是:

{
field:[{"key1":"val1"}, {"key2":"val2"}]
}
7ivaypg9

7ivaypg91#

刚遇到这个问题,解决方法是将所有列表转换为元组。转换为json也是如此。

vshtjzan

vshtjzan2#

比赛有点晚了,但这是我们昨天遇到这个问题后提出的解决方案。添加 'es.input.json': 'true' 回到你的形态,然后跑 json.dumps() 在你的数据上。
修改您的示例,如下所示:

import json

rdd = sc.parallelize([{"key1": ["val1", "val2"]}])
json_rdd = rdd.map(json.dumps)
json_rdd.saveAsNewAPIHadoopFile( 
    path='-', 
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", 
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf={ 
        "es.nodes" : "localhost", 
        "es.port" : "9200", 
        "es.resource" : "mboyd/mboydtype",
        "es.input.json": "true"
    }
)
6za6bjd0

6za6bjd03#

我觉得在其他答案中缺少了一些要点,比如你必须从rdd返回一个2元组(我不知道为什么),还需要elasticsearch hadoop jar文件来让它工作。所以我会写下整个过程,我必须遵循,使之工作。
下载elasticsearch hadoop jar文件。您可以从中央maven存储库下载它(最新版本在大多数情况下都可以使用-更多信息请查看他们的官方需求自述)。
创建一个文件 run.py 下面是演示的最小代码片段。

import json

import pymongo_spark
pymongo_spark.activate()

from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName('demo').setMaster('local')
sc = SparkContext(conf=conf)

rdd = sc.parallelize([{"key1": ["val1", "val2"]}])
final_rdd = rdd.map(json.dumps).map(lambda x: ('key', x))

final_rdd.saveAsNewAPIHadoopFile(
    path='-',
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf={
        "es.nodes" : "<server-ip>",
        "es.port" : "9200",
        "es.resource" : "index_name/doc_type_name",
        "es.input.json": "true"
    }
)

使用以下命令运行spark作业 ./bin/spark-submit --jars /path/to/your/jar/file/elasticsearch-hadoop-5.6.4.jar --driver-class-path /path/to/you/jar/file/elasticsearch-hadoop-5.6.4.jar --master yarn /path/to/your/run/file/run.py 嗯!

相关问题