Spark数据读取与保存

x33g5p2x  于2020-09-30 发布在 Spark  
字(19.6k)|赞(0)|评价(0)|浏览(1924)

本章对于工程师和数据科学家都较为实用。工程师会了解到更多的输出格式,有利于找到非常适合用于下游处理程序的格式。数据科学家则可能更关心数据的现有的组织形式。

###1 动机

Spark支持很多输入输出源。一部分原始时Spark本身是基于Hadoop生态圈而构建,特别是Spark可以通过Hadoop的InputFormat和OutputFormat接口访问数据,大部分文件格式与存储系统都支持这样的接口,例如:S3,HDFS,Cassandra,HBase等。

本章会介绍三类常见的数据源:

  • 文件格式与文件系统

    对于存储在本地文件系统或分布式文件系统(NFS,HDFS,Amazon S3等)中的数据,Spark可以访问多种文件格式的数据:文本文件,Json,SequenceFile,以及protocol buffer。

  • Spark SQL中的结构化数据

    针对Json和Hive在内的结构化数据。

  • 数据库与键值存储

    Spark自带库和一些第三方库,可以用来连接Cassandra、HBase、Elasticsearch以及JDBC源。

2 文件格式

Spark支持从诸如文本文件的非结构化的文件, 到诸如 JSON 格式的半结构化的文件,再到诸如 SequenceFile 这样的结构化的文件。

表5-1:Spark支持的一些常见格式

格式名称结构化备注
文本文件普通文本文件,每一行一条记录
JSON半结构化大多数库都要求每行一条记录
CSV通常用于电子表格
SequenceFiles用于键值对数据的Hadoop文件格式
Protocol buffers一种快速、节约空间的跨语言格式
对象文件用于将Spark作业中的数据存储下来以让共享的读取,改变类时会失效,因为它依赖于 Java 序列化

2.1 文本文件

Spark将文本文件的每一行读取为RDD的一个元素,也可以将多个文本文件读取为PairRDD,文件名为Key,文件内容为Value。

1. 读取文本文件

使用SparkContext的textFile(filePath, minPartitions)读取文本文件。

# 例 5-1:在 Python 中读取一个文本文件
input = sc.textFile("file:///home/holden/repos/spark/README.md")
// 例 5-2:在 Scala 中读取一个文本文件
val input = sc.textFile("file:///home/holden/repos/spark/README.md")
// 例 5-3:在 Java 中读取一个文本文件
JavaRDD<String> input = sc.textFile("file:///home/holden/repos/spark/README.md")

多个输入文件以目录的形式出现,可以用两种方式来读取:

  • textFile函数,文件路径入参为目录,会讲所有文件中的数据读取到RDD
  • wholeTextFiles函数,返回PairRDD,Key时文件名,Value是文件内容,wholeTextFiles适合处理时间序列的数据(将数据分时间段存储在不同文件中)

2. 文件路径

spark支持:

  1. 指定文件所在路径+文件名读取单个文件;
  2. 指定文件目录路径,读取多个文件;
  3. 文件路径使用通配符(如 part-*.txt)。大规模数据集通常存放在多个文件中,尤其是在同一目录中存在一些别的文件(比如成功标记文件)的时候。

3. 保存文本文件

使用saveTextFile( )方法,文件目录作为入参,将RDD中内容作为存储为多个文件在路径下。在这个方法中,我 们不能控制数据的哪一部分输出到哪个文件中,不过有些输出格式支持控制。

2.2 JSON

JSON是一种广泛使用的半结构化数据格式。读取 JSON 数据的最简单的方式是将数据作为文本文件读取,然后使用 JSON 解析器来对 RDD 中的值进行映射操作。在 Java 和 Scala 中也可以使用一个自定义 Hadoop 格式来操作 JSON 数据,使用的语言中 构建一个 JSON 解析器的开销较大,你可以使用 mapPartitions() 来重用解析器。另外Spark SQL也可以读取JSON数据。

1. 读取JSON

将数据作为文本文件读取,然后对JSON数据进行解析,这种方法假设文件中的每一行都是一个JSON串,如果你有跨行的 JSON 数据,你就只能读入整个文件,然后对每个文件进行解析。

python,Java和Scala有大量可用的第三方JSON解析库可以用于JSON解析。例如:python自带JSON库,Java中的FastJson,JackSon等。

# 例 5-6:在 Python 中读取非结构化的 JSON 
import json
data = input.map(lambda x: json.loads(x))

在Scala和Java中,通常将记录读入到一个代表结构信息的类中。

// 例 5-7:在 Scala 中读取 JSON
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.DeserializationFeature
...
case class Person(name: String, lovesPandas: Boolean) // 必须是顶级类
...
// 将其解析为特定的case class。使用flatMap,通过在遇到问题时返回空列表(None) // 来处理错误,而在没有问题时返回包含一个元素的列表(Some(_))
val result = input.flatMap(record => {
       try {
         Some(mapper.readValue(record, classOf[Person]))
       } catch {
         case e: Exception => None
}})
//  例 5-8:在 Java 中读取 JSON
class ParseJson implements FlatMapFunction<Iterator<String>, Person> {
       public Iterable<Person> call(Iterator<String> lines) throws Exception {
         ArrayList<Person> people = new ArrayList<Person>();
         ObjectMapper mapper = new ObjectMapper();
         while (lines.hasNext()) {
           String line = lines.next();
           try {
             people.add(mapper.readValue(line, Person.class));
           } catch (Exception e) {
								// 跳过失败的数据 
           }
         }
         return people;
       }
}
JavaRDD<String> input = sc.textFile("file.json");
JavaRDD<Person> result = input.mapPartitions(new ParseJson());

**注意:**上面代码中,有数据转换异常捕获非常重要,对于读取JSON这样半结构化的数据。小数据集可以接受在遇到错误的输入时停止程序(程序失败),但是对于大规模数据集来说,格式错误是家常便饭。捕获异常时不中断程序,使用累加器来跟踪错误的个数。

2. 保存JSON

使用第三方JSON库或自带JSON库,将结构化数据组成的RDD转换为字符串RDD,最后使用Spark的文本文件API写入。

# 例 5-9:在 Python 保存为 JSON
data.filter(lambda x: x["lovesPandas"]).map(lambda x: json.dumps(x))
       .saveAsTextFile(outputFile)
// 例 5-10:在 Scala 中保存为 JSON
result.filter(p => P.lovesPandas).map(mapper.writeValueAsString(_))
       .saveAsTextFile(outputFile)
//
class WriteJson implements FlatMapFunction<Iterator<Person>, String> {
       public Iterable<String> call(Iterator<Person> people) throws Exception {
         ArrayList<String> text = new ArrayList<String>();
         ObjectMapper mapper = new ObjectMapper();
         while (people.hasNext()) {
           Person person = people.next();
           text.add(mapper.writeValueAsString(person));
           }
         return text;
       }
}
JavaRDD<Person> result = input.mapPartitions(new ParseJson()).filter(
       new LikesPandas());
JavaRDD<String> formatted = result.mapPartitions(new WriteJson());
     formatted.saveAsTextFile(outfile);

2.3 逗号分隔值与制表符分隔值

逗号分隔值文件(CSV)每一行有固定数目的字段,字段间用逗号隔开;制表符分隔值(TSV文件)字段间使用制表符分隔,通常是每行数据对应一条记录。CSV 文件和 TSV 文件有时支持的标准并不一致,主要是在处理换行符、转义字符、非 ASCII字符、非整数值等方面。CSV 原生并不支持嵌套字段,所以需要手动组合 和分解特定的字段。

与JSON中的字段不一样的是,每行记录没有关联的字段名,通常做法是使用第一行的值作为字段名。

1. 读取CSV文件

读取 CSV/TSV 数据和读取 JSON 数据相似,都需要先把文件当作普通文本文件来读取数 据,再对数据进行处理。与 JSON 一样,CSV 也有很多不同的库,例如:python自带的CSV库,Java和Scala第三方opencsv库。

Tips

Hadoop InputFormat 中的 CSVInputFormat也可以用于在 Scala 和 Java 中读取 CSV 数据。不过它不支持包含换行符的记录。

下面示例(5-12至5-14)是CSV文件中所有字段均不包括换行符。

# 例 5-12:在 Python 中使用 textFile() 读取 CSV 
import csv
import StringIO
def loadRecord(line):
		"""解析一行CSV记录"""
		input = StringIO.StringIO(line)
		reader = csv.DictReader(input, fieldnames=["name", "favouriteAnimal"]) 
		return reader.next()
input = sc.textFile(inputFile).map(loadRecord)
//例 5-13:在 Scala 中使用 textFile() 读取 CSV
import Java.io.StringReader
import au.com.bytecode.opencsv.CSVReader
...
val input = sc.textFile(inputFile)
val result = input.map{ line =>
     val reader = new CSVReader(new StringReader(line));
     reader.readNext();
}
// 例 5-14:在 Java 中使用 textFile() 读取 CSV
import au.com.bytecode.opencsv.CSVReader;
import Java.io.StringReader;
...
public static class ParseLine implements Function<String, String[]> {
       public String[] call(String line) throws Exception {
         CSVReader reader = new CSVReader(new StringReader(line));
         return reader.readNext();
       } 
}
JavaRDD<String> csvFile1 = sc.textFile(inputFile);
JavaPairRDD<String[]> csvData = csvFile1.map(new ParseLine());

如果字段中包含有换行符,需要完整读入文件,然后解析各个字段,如果每个文件很大,读取和解析过程会成为性能瓶颈。如例 5-15 至例 5-17 所示。

Tips 换行符

在windows下:\r\n代表换行,拆分两个代码是:回到行首+换到下一行。在linux下:\n代表换行。

# 例 5-15:在 Python 中完整读取 CSV
import csv
def loadRecords(fileNameContents):
	"""读取给定文件中的所有记录"""
	input = StringIO.StringIO(fileNameContents[1])
	reader = csv.DictReader(input, fieldnames=["name", "favoriteAnimal"]) 
  return reader
fullFileData = sc.wholeTextFiles(inputFile).flatMap(loadRecords)
// 例 5-16:在 Scala 中完整读取 CSV
case class Person(name: String, favoriteAnimal: String)
val input = sc.wholeTextFiles(inputFile)
val result = input.flatMap{ case (_, txt) =>
val reader = new CSVReader(new StringReader(txt));
    reader.readAll().map(x => Person(x(0), x(1)))
}
 // 例 5-17:在 Java 中完整读取 CSV
public static class ParseLine implements FlatMapFunction<Tuple2<String, String>, String[]> {
       public Iterable<String[]> call(Tuple2<String, String> file) throws Exception {
         CSVReader reader = new CSVReader(new StringReader(file._2()));
         return reader.readAll();
       }
}
JavaPairRDD<String, String> csvData = sc.wholeTextFiles(inputFile);
JavaRDD<String[]> keyedRDD = csvData.flatMap(new ParseLine());

2. 保存CSV

与JSON数据一样,可以通过重用输出编码器来加速。由于在 CSV 中我们不会在每条记录中输出字段名,因此为了使输出保持一致,需要创建一种映射关系。简单做法是写一个函数,用于将各字段转为指定顺序的数组。在 Python 中,如果输出字典,CSV 输出器会根据创建输出器时给定的 fieldnames 的顺序帮 我们完成这一行为。

我们所使用的 CSV 库要输出到文件或者输出器,所以可以使用 StringWriter 或 StringIO 来将结果放到 RDD 中,

# 例 5-18:在 Python 中写 CSV
def writeRecords(records):
		"""写出一些CSV记录"""
		output = StringIO.StringIO()
		writer = csv.DictWriter(output, fieldnames=["name", "favoriteAnimal"]) 
    for record in records:
        writer.writerow(record)
    return [output.getvalue()]
pandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile)
// 例 5-19:在 Scala 中写 CSV
import opencsv
pandaLovers.map(person => List(person.name, person.favoriteAnimal).toArray)
     .mapPartitions{people =>
       val stringWriter = new StringWriter();
       val csvWriter = new CSVWriter(stringWriter);
       csvWriter.writeAll(people.toList)
       Iterator(stringWriter.toString)
}.saveAsTextFile(outFile)

2.4 SequenceFile

SequenceFile是由没有相对关系结构的键值组成的常用Hadoop格式,SequenceFile文件有同步标记,Spark可以用它来定位到文件中的某个点,因此可以使用多个节点高效并行读取。

SequenceFile是由实现Hadoop的Writable接口的元素组成,可以通过 重栽org.apache.hadoop.io.Writable 中的 readfields 和 write 来实现自己的 Writable 类。

表5-2:Hadoop Writable类型对应表

Scala类型Java类型Hadoop Writable类
IntIntegerIntWritable 或 VIntWritable
LongLongLongWritable或VLongWritable
FloatFloatFloatWitable
DoubleDoubleDouleWritable
BooleanBooleanBooleanWritable
Array[Byte]byteByteWritable
StringStringString
Array[T]T[]ArrayWritable<TW>
List[T]List<T>ArrayWritable<TW>
Map<K, V>Map<K, V>MapWritable<KW, VW>

**注意:**TW表示模板类型T也必须使用 Writable 类型。Spark 的 Python API 只能将 Hadoop 中存在的基本 Writable 类型转为 Python 类型,并尽量基于可用的 getter 方法处理别的类型。

1. 读取SequenceFile

在SparkContext中,调用sequenceFile(path, KeyClass, ValueClass, minPartitions)。KeyClass, ValueClass必须使用基本Writable类型或者重栽Writable方法的类。

# 例 5-20:在 Python 读取 SequenceFile 
val data = sc.sequenceFile(inFile,"org.apache.hadoop.io.Text", "org.apache.hadoop.io.IntWritable")
// 例 5-21:在 Scala 中读取 SequenceFile
val data = sc.sequenceFile(inFile, classOf[Text], classOf[IntWritable]).
       map{case (x, y) => (x.toString, y.get())}
// 例 5-22:在 Java 中读取 SequenceFile
public static class ConvertToNativeTypes implements PairFunction<Tuple2<Text, IntWritable>, String, Integer> {
      public Tuple2<String, Integer> call(Tuple2<Text, IntWritable> record) {
         return new Tuple2(record._1.toString(), record._2.get());
       }
}
JavaPairRDD<Text, IntWritable> input = sc.sequenceFile(fileName, Text.class,IntWritable.class);
JavaPairRDD<String, Integer> result = input.mapToPair(new ConvertToNativeTypes());

2. 保存SequenceFile

因为 SequenceFile 存储的是键值对,所以需要创建一个由可以写出到 SequenceFile 的类型构成的 PairRDD。Spark中已经将Scala原生数据类型与Hadoop Writable类型做了隐式转换,如果 Scala原生类型直接调用 saveSequenceFile(path) 保存你的 PairRDD。如果键和值不能自动转为 Writable 类型,或者想使用变长类型(比如 VIntWritable),需要在保存之前进行类型转换。

// 例 5-23:在 Scala 中保存 SequenceFile
val data = sc.parallelize(List(("Panda", 3), ("Kay", 6), ("Snail", 2)))
// Spark 已经将Scala String转为Text,int转为intWritable
data.saveAsSequenceFile(outputFile)

在 Java 中保存 SequenceFile 要稍微复杂一些,因为 JavaPairRDD 上没有 saveAsSequenceFile() 方法。我们要使用 Spark 保存自定义 Hadoop 格式的功能来实现。5.2.6节会详细描述。

2.5 对象文件

对象文件基于Java对象序列化,一旦类发生改变,则已生成的文件就不可读。使用对象文件有几点需要注意:

  • 对于同样的对象,对象文件的输出和 Hadoop 的输出不一样;
  • 与其他文件格式不同的是,对象文件通常用于 Spark 作业间的通信;
  • Java 序列化过程很慢;

使用方法:SparkContext的saveAsObject方法保存;objectFile( path)读取

优点:可以用来保存任意对象而不需要额外的工作。

Tips

对象文件在 Python 中无法使用,不过 Python 中的 RDD 和 SparkContext 支持 saveAsPickleFile() 和 pickleFile() 方法作为替代,这使用了 Python 的 pickle 序列化库。

2.6 Hadoop输入输出格式

Spark除了封装的格式之外,也可以与任何 Hadoop 支持的文件格式交互。

1. 读取其他Hadoop输入格式

使用SparkContext的newAPIHadoopFile( )方法读取,接收四个参数:filePath表示文件格式,fClass表示输入文件类型,KeyClass,ValueClass。

/*
	 * @param fClass storage format of the data to be read
   * @param kClass `Class` of the key associated with the `fClass` parameter
   * @param vClass `Class` of the value associated with the `fClass` parameter
   * @param conf Hadoop configuration
   * @return RDD of tuples of key and corresponding value
   */
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]

KeyValueTextInputFormat 是最简单的 Hadoop 输入格式之一,可以用于从文本文件中读取 键值对数据(如例 5-24 所示)。

// 例 5-25:在 Scala 中使用 Elephant Bird 读取 LZO 算法压缩的 JSON 文件
val input = sc.newAPIHadoopFile(inputFile, classOf[KeyValueTextInputFormat],
       classOf[LongWritable], classOf[MapWritable], conf)
// "输入"中的每个MapWritable代表一个JSON对象

2. 保存Hadoop输出格式

Java API 中没有易用的保存pairRDD的函数,因此在Java中使用saveAsHadoopFile( )保存Hadoop支持的文件格式,如SequenceFile。

/** Output the RDD to any Hadoop-supported file system. */
  def saveAsHadoopFile[F <: OutputFormat[_, _]](
      path: String,
      keyClass: Class[_],
      valueClass: Class[_],
      outputFormatClass: Class[F]) 
  {
    rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
  }
// 例 5-26:在 Java 保存 SequenceFile
public static class ConvertToWritableTypes implements
     PairFunction<Tuple2<String, Integer>, Text, IntWritable> {
     public Tuple2<Text, IntWritable> call(Tuple2<String, Integer> record) {
        return new Tuple2(new Text(record._1), new IntWritable(record._2));
     }
}
JavaPairRDD<String, Integer> rdd = sc.parallelizePairs(input);
JavaPairRDD<Text, IntWritable> result = rdd.mapToPair(new ConvertToWritableTypes()); result.saveAsHadoopFile(fileName, Text.class, IntWritable.class,
       SequenceFileOutputFormat.class);

3. 非文件系统数据源

除 了 hadoopFile() 和 saveAsHadoopFile() 这 一 大 类 函 数, 还 可 以 使 用 hadoopDataset/ saveAsHadoopDataSet(旧API) 和 newAPIHadoopDataset/saveAsNewAPIHadoopDataset(新API) 来 访 问 Hadoop 所 支持的非文件系统的存储格式。

/**
   * Output the RDD to any Hadoop-supported storage system, using
   * a Configuration object for that storage system.
   */
  def saveAsNewAPIHadoopDataset(conf: Configuration) {
    rdd.saveAsNewAPIHadoopDataset(conf)
  }

hadoopDataset() 这一组函数只接收一个 Configuration 对象,这个对象用来设置访问数据源所必需的 Hadoop 属性(Configuration.setXXXX)。与MapReduce作业驱动类配置Configuration类似。

4. 示例 protocol buffers

protocol buffers(简称PB)是Google开发,用于RPC过程。PB 是结构化数据,它要求字段和类型都要明确定义。它们是经过优化的,编解码速度快,而且占用空间也很小。

PB由可选字段,必须字段,重复字段三种字段组成。在解析时,可选字段缺失不会导致解析失败,必须字段缺失则会导致解析失败。因此最好将新字段设为可选字段。

// 例 5-27:PB 定义示例
message Venue {
        required int32 id = 1;
        required string name = 2;
        required VenueType type = 3;
        optional string address = 4;
        enum VenueType {
          COFFEESHOP = 0;
          WORKPLACE = 1;
          CLUB = 2;
          OMNOMNOM = 3;
          OTHER = 4;
} }
      message VenueResponse {
        repeated Venue results = 1;
}

2.7 文件压缩

在大数据工作中,经常需要对数据进行压缩以节省存储空间和网络传输开销。对于大多数 Hadoop 输出格式来说,可以指定一种压缩编解码器来压缩数据。这些压缩选项只支持压缩Hadoop格式,即写入到文件系统的格式,写入到数据库的Hadoop格式一般没有实现压缩支持。

Spark 这 样的分布式系统,通常会尝试从多个不同机器上读入数据。要实现这种情况,每个工作节点都必须能够找到一条新记录的开端。有些压缩格式不支持这样读取,必须要单个节点来读入所有数据,这就很容易产生性能瓶颈。能从多个节点上并行读取的格式被称为**“可分割”**的格式。

格式可分割压缩速度压缩效率Hadoop压缩解码器纯Java实现原生备注
gziporg.apache.hadoop.io.compress.GzipCodec
Lzo非常快中等com.hadoop. compression.lzo.LzoCodec需要每个节点装Lzo
Bzip2非常高org.apache.hadoop.io.compress.Bzip2Codec为可分割版本使用纯 Java
zlib中等org.apache.hadoop.io.compress.DefaultCodecHadoop 的默认压缩编解码器
Snappy非常快org.apache.hadoop.io.compress.SnappyCodecSnappy有纯 Java 的移植版,但是 在 Spark/Hadoop 中不能用

**注意:**Spark的textFile可以处理压缩输入,但是即使压缩文件可以分割处理,使用多台机器并行读取,Spark也不会打开splittable,因此推荐使用 newAPIHadoopFile 或者 hadoopFile,并指定正确的压缩编解码器,读取压缩输入数据。

3 文件系统

Spark支持读写很多文件系统。

3.1 本地/常规文件系统

Spark支持从本地文件系统中读取文件,不过它要求文件在集群中所有节点的相同路径下都可以找到。

一些像NFS,AFS以及MapR的NFS layer这样的网络文件系统会把文件以常规文件系统的形式暴露给用户,如果数据已经在这些系统中,只需要指定输入为一个 file:// 路径;只要这个文件系统挂载在集群每个节点的同一个路径下,Spark 就会自动处理(如例 5-29 所示)。

// 例 5-29:在 Scala 中从本地文件系统读取一个压缩的文本文件 
val rdd = sc.textFile("file:///home/holden/happypandas.gz")

Tips NFS

NFS就是Network File System的缩写,它最大的功能就是可以通过网络,让不同的机器、不同的操作系统可以共享彼此的文件。

NFS服务器可以让PC将网络中的NFS服务器共享的目录挂载到本地端的文件系统中,而在本地端的系统中来看,那个远程主机的目录就好像是自己的一个磁盘分区一样。

如上图示:当我们在NFS服务器设置好一个共享目录/home/public后,其他的有权访问NFS服务器的NFS客户端就可以将这个目录挂载到自己文件系统的某个挂载点,这个挂载点可以自己定义,如上图客户端A与客户端B挂载的目录就不相同。并且挂载好后我们在本地能够看到服务端/home/public的所有数据。如果服务器端配置的客户端只读,那么客户端就只能够只读。如果配置读写,客户端就能够进行读写。挂载后,NFS客户端查看磁盘信息命令:#df –h。

3.2 Amazon S3

用 Amazon S3 来存储大量数据正日益流行。当计算节点部署在 Amazon EC2 上的时候,使用 S3 作为存储尤其快,但是在需要通过公网访问数据时性能会差很多。

要在 Spark 中访问 S3 数据,应该首先把 S3 访问凭据设置为 AWS_ACCESS_KEY_ID 和 AWS_SECRET_ACCESS_KEY 环境变量。可以从 Amazon Web Service 控制台创建这些凭据。 接下来,将一个以 s3n:// 开头的路径以 s3n://bucket/path-within-bucket 的形式传给 Spark 的输入方法。和其他所有文件系统一样,Spark 也能在 S3 路径中支持通配字符,例 如 s3n://bucket/my-Files/*.txt。

3.3 HDFS

Hadoop分布式文件系统(HDFS)是一种广泛使用的文件系统,Spark能够很好的使用它,HDFS被设计成可以在廉价的硬件上工作,有弹性地应对节点失败,同时提高吞吐量,Spark和HDFS可以部署在同一批机器上,这样 Spark 可以利用数据分布来尽量避免一些 网络开销。输入文件路径: hdfs://master:port/path。

Tips

HDFS 协议随 Hadoop 版本改变而变化,因此如果你使用的 Spark 是依赖于另一个版本的 Hadoop 编译的,那么读取会失败。如果从源代码编译,你可以在环境变量中指定 SPARK_ HADOOP_VERSION= 来基于另一个版本的 Hadoop 进行编译;也可以直接下载预 编译好的 Spark 版本。

4 Spark SQL中的结构化数据

Spark SQL是Spark 1.0版本新加入的组件,结构化数据指的是有结构信息的数据——所有的数据具有一致字段结构,Spark SQL支持多种结构化数据源作为输入。

Spark SQL查询结果是Row对象组成的RDD,每个Row对象代表一条记录。在Java和Scala中,Row对象的访问是基于下标的,每个 Row 都有一个 get() 方法,返回一个一般类型让我们可以进行类型转换。另外还有针对常见基本类型 的专用 get() 方 法( 例 如 getFloat()、getInt()、getLong()、getString()、getShort()、 getBoolean() 等)。在 Python 中,可以使用 row[column_number] 以及 row.column_name 来访问元素。

4.1 Apache Hive

Apache Hive是Hadoop上常见的结构化数据源,Hive可以在HDFS内核或者其他存储系统上存储多种格式的表,从普通文本到列式存储格式。

要把 Spark SQL 连接到已有的 Hive 上,需要将 hive-site. xml 文件复制到 Spark 的 ./conf/ 目录下,再创建出 HiveContext 对象(Spark SQL 的入口),就可以使用 Hive查询语言(HQL)来对表进行查询,并以由Row组成的 RDD 的形式拿到返回数据,如例 5-30 至例 5-32 所示。

# 例 5-30:用 Python 创建 HiveContext 并查询数据 
from pyspark.sql import HiveContext
hiveCtx = HiveContext(sc)
rows = hiveCtx.sql("SELECT name, age FROM users")
firstRow = rows.first()
print(firstRow.name)
// 例 5-31:用 Scala 创建 HiveContext 并查询数据 
import org.apache.spark.sql.hive.HiveContext
val hiveCtx = new HiveContext(sc) 
val rows = hiveCtx.sql("SELECT name, age FROM users")
val firstRow = rows.first()
println(firstRow.getString(0)) // 字段0是name字段
// 例 5-32:用 Java 创建 HiveContext 并查询数据
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SchemaRDD;
HiveContext hiveCtx = new HiveContext(sc);
SchemaRDD rows = hiveCtx.sql("SELECT name, age FROM users"); 
Row firstRow = rows.first(); 
System.out.println(firstRow.getString(0)); // 字段0是name字段

会在 9.3.1 节更详细地介绍如何从 Hive 中读取数据。

4.2 JSON

如果你有记录结构一致的 JSON 数据,Spark SQL 也可以自动推断出它们的结构信息,并将这些数据读取为记录。读取JSON格式的数据,首先需要和使用 Hive一样创建 HiveContext(不过在这种情况下不需要安装好 Hive,也就是说不需要 hive-site.xml 文件)然后使用 HiveContext.jsonFile 方法来从整个文件中获取由 Row 对象组成的 RDD。也可以将RDD注册为一张表,方便使用SQL操作。

// 例 5-33:JSON 中的示例推文
{"user": {"name": "Holden", "location": "San Francisco"}, "text": "Nice day out today"}
{"user": {"name": "Matei", "location": "Berkeley"}, "text": "Even nicer here :)"}

例 5-34:在 Python 中使用 Spark SQL 读取 JSON 数据

     tweets = hiveCtx.jsonFile("tweets.json")
     tweets.registerTempTable("tweets")
     results = hiveCtx.sql("SELECT user.name, text FROM tweets")

例 5-35:在 Scala 中使用 Spark SQL 读取 JSON 数据

     val tweets = hiveCtx.jsonFile("tweets.json")
     tweets.registerTempTable("tweets")
     val results = hiveCtx.sql("SELECT user.name, text FROM tweets")

例 5-36:在 Java 中使用 Spark SQL 读取 JSON 数据

     SchemaRDD tweets = hiveCtx.jsonFile(jsonFile);
// 将RDD注册成一张表
     tweets.registerTempTable("tweets");
     SchemaRDD results = hiveCtx.sql("SELECT user.name, text FROM tweets");

5 数据库

通过数据库提供的Hadoop连接器与Spark连接器,Spark可以访问一些常见的数据库。

5.1 Java数据库连接

Spark可以从任何支持Java数据库连接(JDBC)的关系型数据库中读取数据,包括:MySQL,Postgres等。需要构建一个 org.apache.spark.rdd. JdbcRDD,将 SparkContext 和其他参数一起传给它。

例 5-37:Scala 中的 JdbcRDD

def createConnection() = {
  	Class.forName("com.mysql.jdbc.Driver").newInstance();
  	DriverManager.getConnection("jdbc:mysql://localhost/test?user=holden");
}
def extractValues(r: ResultSet) = {
    (r.getInt(1), r.getString(2))
}
val data = new JdbcRDD(sc,
                       createConnection, "SELECT * FROM panda WHERE ? <= id AND id <= ?",
                       lowerBound = 1, upperBound = 3, numPartitions = 2, mapRow = extractValues)
println(data.collect().toList)

JDBCRDD需要接收几个参数:

  • 对数据库创建连接的函数,让每个节点在连接必要的配置后创建自己读取数据的连接;
  • 读取一定范围内数据的查询(Where 条件),以及查询参数中 lowerBound 和 upperBound 的值。这些参数可以让 Spark 在不同机器上查询不同范围的数据,这样就不会因尝试在一个节点上读取所有数据而遭遇性能瓶颈。
  • 最后一个参数是将输出结果从 java.sql.ResultSet转为对操作数据有用的格式的函数。如果这个参数空缺,Spark 会自动将每行结果转为一个对象数组。

5.2 Cassandra

5.3 HBase

Spark 可以通Hadoop输入格式(org.apache.hadoop.hbase.mapreduce.TableInputFormat)访问 HBase。这个输入格式会返回键值对数据,其中键的类型为 org. apache.hadoop.hbase.io.ImmutableBytesWritable,而值的类型为 org.apache.hadoop.hbase. client.Result。Result 类包含多种根据列获取值的方法。

// 例 5-45:从 HBase 读取数据的 Scala 示例
     import org.apache.hadoop.hbase.HBaseConfiguration
     import org.apache.hadoop.hbase.client.Result
     import org.apache.hadoop.hbase.io.ImmutableBytesWritable
     import org.apache.hadoop.hbase.mapreduce.TableInputFormat
val conf = HBaseConfiguration.create() 
conf.set(TableInputFormat.INPUT_TABLE, "tablename") 
val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], 、classOf[ImmutableBytesWritable],classOf[Result])

5.4 Elasticsearch

Spark 可以使用 Elasticsearch-Hadoop从 Elasticsearch中读写数据。Elasticsearch连接器会忽略我们提供的路径信息,而依赖于SparkContext中的配置项。Elasticsearc 的OutputFormat连接器也没有用到Spark所封装的类型,所以使用 saveAsHadoopDataSet 来代替,这意味着需要手动设置更多属性。

例 5-46:在 Scala 中使用 Elasticsearch 输出

     val jobConf = new JobConf(sc.hadoopConfiguration)
     jobConf.set("mapred.output.format.class", "org.elasticsearch.hadoop.
     mr.EsOutputFormat")
     jobConf.setOutputCommitter(classOf[FileOutputCommitter])
     jobConf.set(ConfigurationOptions.ES_RESOURCE_WRITE, "twitter/tweets")
     jobConf.set(ConfigurationOptions.ES_NODES, "localhost")
     FileOutputFormat.setOutputPath(jobConf, new Path("-"))
     output.saveAsHadoopDataset(jobConf)

例 5-47:在 Scala 中使用 Elasticsearch 输入

def mapWritableToInput(in: MapWritable): Map[String, String] = {
       in.map{case (k, v) => (k.toString, v.toString)}.toMap
}
     val jobConf = new JobConf(sc.hadoopConfiguration)
     jobConf.set(ConfigurationOptions.ES_RESOURCE_READ, args(1))
     jobConf.set(ConfigurationOptions.ES_NODES, args(2))
     val currentTweets = sc.hadoopRDD(jobConf,
       classOf[EsInputFormat[Object, MapWritable]], classOf[Object],
classOf[MapWritable])
// 仅提取map
// 将MapWritable[Text, Text]转为Map[String, String]
val tweets = currentTweets.map{ case (key, value) => mapWritableToInput(value) }

6 总结

本章讲述了Spark如何从各种类型数据源读取数据,并以写入各种文件格式。

相关文章