使用ApacheSpark从/到elasticsearch处理纯文本

kr98yfug  于 2021-05-29  发布在  Hadoop
关注(0)|答案(0)|浏览(201)

我想利用apachespark对elasticsearch索引中的文本数据进行预处理。使用elasticsearch hadoop,我正在检索apachespark的索引。我得到一个rdd类型:rdd[(string,scala.collection.map[string,anyref])]
第一个元素看起来像:document:(string,scala.collection.map[string,anyref])=(file:document_id,Map(创建->周一1月20日11:50:35 cet 2014,修改->周五10月23日12:46:40 cest 2015,索引->周五3月25日18:05:37 cet 2016,mimetype->应用程序/pdf,内容->文档的纯文本)
现在关键的部分是使用nlp工具箱处理上述内容字段,并将结果存储在elasticsearch中。第一部分很好。我使用stanfordcorenlp在stackoverflow上发现了一个类似的问题(不幸的是spark本身没有提供这个,我无法直接从elasticsearch检索令牌)。因此,我得到了每个文档的标记rdd[seq[string]],但我不知道如何将其引入elasticsearch。
显然,我需要一个outputdd来连接文档和相关的令牌。类似于:map(“document\u id\u 1”->“tokens for id\u 1”,“document\u id\u 2”->“tokens for id\u 2”)。也许有人想提供一个如何到达那里的提示,或者有一个更好的想法来解决问题。非常感谢您的帮助。

import org.apache.spark._
import org.apache.spark.rdd.RDD
import scala.collection.mutable.ArrayBuffer
import org.elasticsearch.spark._
import org.elasticsearch.spark.rdd._
import edu.stanford.nlp.pipeline._
import edu.stanford.nlp.ling.CoreAnnotations._
import scala.collection.JavaConversions._
import java.util.Properties 

object Stemming {
    def main(args: Array[String]) {
        val conf = new SparkConf() .setMaster("local[*]") .setAppName("SparkLemma")
            .set("es.nodes", "hadoop-m:9200" )
            .set("es.write.operation", "upsert")
            .set("es.mapping.id", "id")
          val esIndex = "elastic/documents"
        val sc = new SparkContext(conf)

    // Read data from ES
    val esRDD = sc.esRDD(esIndex)
    val id:RDD[String] = esRDD.map(_._1.toString)
    val content:RDD[String] = esRDD.map(_._2("content").toString)
    val plainText: RDD[(String, String)] = id.zip(content)
    val stopWords = sc.broadcast(scala.io.Source.fromFile("stopwords.txt").getLines().toSet).value

    def createNLPPipeline(): StanfordCoreNLP = {
        val props = new Properties()
        props.put("annotators", "tokenize, ssplit, pos, lemma")
        new StanfordCoreNLP(props)
    }

    def plainTextToLemmas(content: String, stopWords: Set[String], nlp: StanfordCoreNLP) : Seq[String] = {
        val doc = new Annotation(content)
        nlp.annotate(doc)
        val lemmas = new ArrayBuffer[String]()
        val sentences = doc.get(classOf[SentencesAnnotation])
        for (sentence <- sentences;
            token <- sentence.get(classOf[TokensAnnotation])) {
            val lemma = token.get(classOf[LemmaAnnotation])
            if (lemma.length > 3 && !stopWords.contains(lemma)) {
                if (lemmas.isEmpty) {
                    lemmas += id += lemma.toLowerCase
            }
                else {
                    lemmas += lemma.toLowerCase
                }
            }
}
lemmas
    }
    val lemmatized: RDD[Seq[String]] = plainText.mapPartitions(strings => {
        val nlp = createNLPPipeline()
        strings.map{case(id, content) => plainTextToLemmas(content, stopWords, nlp)}
    })

def writeTokensToES(row:Seq[String]): Map[String,String] = {
    val tokens = row.drop(1).mkString(" ")
    Map("id" -> row.head, "content" -> tokens, "last-run" -> getDate())
}
val outputRDD = lemmatized.map(row => writeTokensToES(row))
EsSpark.saveToEs(outputRDD, esIndex)
sc.stop()
}
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题