从spark rdd中提取数据,并在scala中填充元组

3lxsmp7m  于 2021-05-19  发布在  Spark
关注(0)|答案(1)|浏览(467)

我在hadoop/spark框架的顶部使用scala。
实际上,我的数据属于以下类型:

RDD[(List[(String, Int)], Long)]

这是这个数据湖中前两行的一个例子:

(List(("COD_LOCALE_PROGETTO",0), ("CUP",1), ("OC_TITOLO_PROGETTO",2), ("OC_SINTESI_PROGETTO",3), ("OC_LINK",4), ("OC_COD_CICLO",5), ("OC_DESCR_CICLO",6), ("OC_COD_TEMA_SINTETICO",7), ("OC_TEMA_SINTETICO",8), ("COD_GRANDE_PROGETTO",9), ("DESCRIZIONE_GRANDE_PROGETTO",10)),0)

(List(("10CAPORTO-POZZUOLI 1",0), ("J86G08000450003",1), ("INTERVENTO C11 2° LOTTO ¿ 1° STRALCIO FUNZIONALE ¿COLLEGAMENTO TRA TANGENZIALE DI NAPOLI (VIA CAMPANA), RETE VIARIA COSTIERA E PORTO DI POZZUOLI""",2), ("INTERVENTO C11 2° LOTTO ¿ 1° STRALCIO FUNZIONALE ¿COLLEGAMENTO TRA TANGENZIALE DI NAPOLI (VIA CAMPANA), RETE VIARIA COSTIERA E PORTO DI POZZUOLI""",3), ("www.opencoesione.gov.it/progetti/10caporto-pozzuoli-1",4), (1,5), ("Ciclo di programmazione 2007-2013",6), ("07",7), ("Trasporti e infrastrutture a rete",8), (" ",9), (" ",10)),1)

在实际情况中,每一行可以容纳194列,我总共有160多万条记录。
使用此数据集,我将填充一个新列表,类型为:

List[(String, Int, Int, Int)]

其中第一个“int”是每行的每个字段(cod\u locale\u progetto,cup…),第二个字段是每个字段的大小(19,3,…),第三个字段是每个字段的位置,已经编码在变量中,就在字符串之后,最后一个“int”是整个数据集中每行的位置。
我试过这个剧本:

| val Dimensione = item._1.size;
     | for(i <- 0 until Dimensione){
     | ComponentiOpenCoesione :+= (item._1(i)._1.replace("\"","").toString,
     | item._1(i)._1.replace("\"","").toString.size,
     | item._1(i)._2.toInt,
     | item._2.toLong)}
     | })

但是失败了,元组列表,我称之为“ComponentOpenCoesione”并没有填充。
最后,这个变量是这样定义的:

var ComponentiOpenCoesione : List[(String, Int, Int, Long)] = List();

有人能帮我吗?如何从rdd中提取数据并将其加载到列表中?
非常感谢。

acruukt9

acruukt91#

在scala中,返回函数的最后一条语句。在这里,函数将不返回任何内容,因为它的最后一个语句是 for 不返回任何内容的循环。
你只要把 ComponentiOpenCoesione 作为你最后的陈述。所以,如果你只是计划 RDD[(List[(String, Int)], Long)] 得到 RDD[List[(String, Int, Int, Long)]] ,您的代码应该是:

rdd.map(item => {
  var ComponentiOpenCoesione: List[(String, Int, Int, Long)] = List();
  val Dimensione = item._1.size;
  for (i <- 0 until Dimensione) {
    ComponentiOpenCoesione :+= (item._1(i)._1.replace("\"", "").toString,
      item._1(i)._1.replace("\"", "").toString.size,
      item._1(i)._2.toInt,
      item._2.toLong)
  }
  ComponentiOpenCoesione
})

您可以查看scala问题的答案中的return,以掌握值在scala中是如何返回的。

相关问题