本章对于工程师和数据科学家都较为实用。工程师会了解到更多的输出格式,有利于找到非常适合用于下游处理程序的格式。数据科学家则可能更关心数据的现有的组织形式。
###1 动机
Spark支持很多输入输出源。一部分原始时Spark本身是基于Hadoop生态圈而构建,特别是Spark可以通过Hadoop的InputFormat和OutputFormat接口访问数据,大部分文件格式与存储系统都支持这样的接口,例如:S3,HDFS,Cassandra,HBase等。
本章会介绍三类常见的数据源:
文件格式与文件系统
对于存储在本地文件系统或分布式文件系统(NFS,HDFS,Amazon S3等)中的数据,Spark可以访问多种文件格式的数据:文本文件,Json,SequenceFile,以及protocol buffer。
Spark SQL中的结构化数据
针对Json和Hive在内的结构化数据。
数据库与键值存储
Spark自带库和一些第三方库,可以用来连接Cassandra、HBase、Elasticsearch以及JDBC源。
Spark支持从诸如文本文件的非结构化的文件, 到诸如 JSON 格式的半结构化的文件,再到诸如 SequenceFile 这样的结构化的文件。
表5-1:Spark支持的一些常见格式
格式名称 | 结构化 | 备注 |
---|---|---|
文本文件 | 否 | 普通文本文件,每一行一条记录 |
JSON | 半结构化 | 大多数库都要求每行一条记录 |
CSV | 是 | 通常用于电子表格 |
SequenceFiles | 是 | 用于键值对数据的Hadoop文件格式 |
Protocol buffers | 是 | 一种快速、节约空间的跨语言格式 |
对象文件 | 是 | 用于将Spark作业中的数据存储下来以让共享的读取,改变类时会失效,因为它依赖于 Java 序列化 |
Spark将文本文件的每一行读取为RDD的一个元素,也可以将多个文本文件读取为PairRDD,文件名为Key,文件内容为Value。
使用SparkContext的textFile(filePath, minPartitions)读取文本文件。
# 例 5-1:在 Python 中读取一个文本文件
input = sc.textFile("file:///home/holden/repos/spark/README.md")
// 例 5-2:在 Scala 中读取一个文本文件
val input = sc.textFile("file:///home/holden/repos/spark/README.md")
// 例 5-3:在 Java 中读取一个文本文件
JavaRDD<String> input = sc.textFile("file:///home/holden/repos/spark/README.md")
多个输入文件以目录的形式出现,可以用两种方式来读取:
spark支持:
使用saveTextFile( )方法,文件目录作为入参,将RDD中内容作为存储为多个文件在路径下。在这个方法中,我 们不能控制数据的哪一部分输出到哪个文件中,不过有些输出格式支持控制。
JSON是一种广泛使用的半结构化数据格式。读取 JSON 数据的最简单的方式是将数据作为文本文件读取,然后使用 JSON 解析器来对 RDD 中的值进行映射操作。在 Java 和 Scala 中也可以使用一个自定义 Hadoop 格式来操作 JSON 数据,使用的语言中 构建一个 JSON 解析器的开销较大,你可以使用 mapPartitions() 来重用解析器。另外Spark SQL也可以读取JSON数据。
将数据作为文本文件读取,然后对JSON数据进行解析,这种方法假设文件中的每一行都是一个JSON串,如果你有跨行的 JSON 数据,你就只能读入整个文件,然后对每个文件进行解析。
python,Java和Scala有大量可用的第三方JSON解析库可以用于JSON解析。例如:python自带JSON库,Java中的FastJson,JackSon等。
# 例 5-6:在 Python 中读取非结构化的 JSON
import json
data = input.map(lambda x: json.loads(x))
在Scala和Java中,通常将记录读入到一个代表结构信息的类中。
// 例 5-7:在 Scala 中读取 JSON
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.DeserializationFeature
...
case class Person(name: String, lovesPandas: Boolean) // 必须是顶级类
...
// 将其解析为特定的case class。使用flatMap,通过在遇到问题时返回空列表(None) // 来处理错误,而在没有问题时返回包含一个元素的列表(Some(_))
val result = input.flatMap(record => {
try {
Some(mapper.readValue(record, classOf[Person]))
} catch {
case e: Exception => None
}})
// 例 5-8:在 Java 中读取 JSON
class ParseJson implements FlatMapFunction<Iterator<String>, Person> {
public Iterable<Person> call(Iterator<String> lines) throws Exception {
ArrayList<Person> people = new ArrayList<Person>();
ObjectMapper mapper = new ObjectMapper();
while (lines.hasNext()) {
String line = lines.next();
try {
people.add(mapper.readValue(line, Person.class));
} catch (Exception e) {
// 跳过失败的数据
}
}
return people;
}
}
JavaRDD<String> input = sc.textFile("file.json");
JavaRDD<Person> result = input.mapPartitions(new ParseJson());
**注意:**上面代码中,有数据转换异常捕获非常重要,对于读取JSON这样半结构化的数据。小数据集可以接受在遇到错误的输入时停止程序(程序失败),但是对于大规模数据集来说,格式错误是家常便饭。捕获异常时不中断程序,使用累加器来跟踪错误的个数。
使用第三方JSON库或自带JSON库,将结构化数据组成的RDD转换为字符串RDD,最后使用Spark的文本文件API写入。
# 例 5-9:在 Python 保存为 JSON
data.filter(lambda x: x["lovesPandas"]).map(lambda x: json.dumps(x))
.saveAsTextFile(outputFile)
// 例 5-10:在 Scala 中保存为 JSON
result.filter(p => P.lovesPandas).map(mapper.writeValueAsString(_))
.saveAsTextFile(outputFile)
//
class WriteJson implements FlatMapFunction<Iterator<Person>, String> {
public Iterable<String> call(Iterator<Person> people) throws Exception {
ArrayList<String> text = new ArrayList<String>();
ObjectMapper mapper = new ObjectMapper();
while (people.hasNext()) {
Person person = people.next();
text.add(mapper.writeValueAsString(person));
}
return text;
}
}
JavaRDD<Person> result = input.mapPartitions(new ParseJson()).filter(
new LikesPandas());
JavaRDD<String> formatted = result.mapPartitions(new WriteJson());
formatted.saveAsTextFile(outfile);
逗号分隔值文件(CSV)每一行有固定数目的字段,字段间用逗号隔开;制表符分隔值(TSV文件)字段间使用制表符分隔,通常是每行数据对应一条记录。CSV 文件和 TSV 文件有时支持的标准并不一致,主要是在处理换行符、转义字符、非 ASCII字符、非整数值等方面。CSV 原生并不支持嵌套字段,所以需要手动组合 和分解特定的字段。
与JSON中的字段不一样的是,每行记录没有关联的字段名,通常做法是使用第一行的值作为字段名。
读取 CSV/TSV 数据和读取 JSON 数据相似,都需要先把文件当作普通文本文件来读取数 据,再对数据进行处理。与 JSON 一样,CSV 也有很多不同的库,例如:python自带的CSV库,Java和Scala第三方opencsv库。
Tips
Hadoop InputFormat 中的 CSVInputFormat也可以用于在 Scala 和 Java 中读取 CSV 数据。不过它不支持包含换行符的记录。
下面示例(5-12至5-14)是CSV文件中所有字段均不包括换行符。
# 例 5-12:在 Python 中使用 textFile() 读取 CSV
import csv
import StringIO
def loadRecord(line):
"""解析一行CSV记录"""
input = StringIO.StringIO(line)
reader = csv.DictReader(input, fieldnames=["name", "favouriteAnimal"])
return reader.next()
input = sc.textFile(inputFile).map(loadRecord)
//例 5-13:在 Scala 中使用 textFile() 读取 CSV
import Java.io.StringReader
import au.com.bytecode.opencsv.CSVReader
...
val input = sc.textFile(inputFile)
val result = input.map{ line =>
val reader = new CSVReader(new StringReader(line));
reader.readNext();
}
// 例 5-14:在 Java 中使用 textFile() 读取 CSV
import au.com.bytecode.opencsv.CSVReader;
import Java.io.StringReader;
...
public static class ParseLine implements Function<String, String[]> {
public String[] call(String line) throws Exception {
CSVReader reader = new CSVReader(new StringReader(line));
return reader.readNext();
}
}
JavaRDD<String> csvFile1 = sc.textFile(inputFile);
JavaPairRDD<String[]> csvData = csvFile1.map(new ParseLine());
如果字段中包含有换行符,需要完整读入文件,然后解析各个字段,如果每个文件很大,读取和解析过程会成为性能瓶颈。如例 5-15 至例 5-17 所示。
Tips 换行符
在windows下:\r\n代表换行,拆分两个代码是:回到行首+换到下一行。在linux下:\n代表换行。
# 例 5-15:在 Python 中完整读取 CSV
import csv
def loadRecords(fileNameContents):
"""读取给定文件中的所有记录"""
input = StringIO.StringIO(fileNameContents[1])
reader = csv.DictReader(input, fieldnames=["name", "favoriteAnimal"])
return reader
fullFileData = sc.wholeTextFiles(inputFile).flatMap(loadRecords)
// 例 5-16:在 Scala 中完整读取 CSV
case class Person(name: String, favoriteAnimal: String)
val input = sc.wholeTextFiles(inputFile)
val result = input.flatMap{ case (_, txt) =>
val reader = new CSVReader(new StringReader(txt));
reader.readAll().map(x => Person(x(0), x(1)))
}
// 例 5-17:在 Java 中完整读取 CSV
public static class ParseLine implements FlatMapFunction<Tuple2<String, String>, String[]> {
public Iterable<String[]> call(Tuple2<String, String> file) throws Exception {
CSVReader reader = new CSVReader(new StringReader(file._2()));
return reader.readAll();
}
}
JavaPairRDD<String, String> csvData = sc.wholeTextFiles(inputFile);
JavaRDD<String[]> keyedRDD = csvData.flatMap(new ParseLine());
与JSON数据一样,可以通过重用输出编码器来加速。由于在 CSV 中我们不会在每条记录中输出字段名,因此为了使输出保持一致,需要创建一种映射关系。简单做法是写一个函数,用于将各字段转为指定顺序的数组。在 Python 中,如果输出字典,CSV 输出器会根据创建输出器时给定的 fieldnames 的顺序帮 我们完成这一行为。
我们所使用的 CSV 库要输出到文件或者输出器,所以可以使用 StringWriter 或 StringIO 来将结果放到 RDD 中,
# 例 5-18:在 Python 中写 CSV
def writeRecords(records):
"""写出一些CSV记录"""
output = StringIO.StringIO()
writer = csv.DictWriter(output, fieldnames=["name", "favoriteAnimal"])
for record in records:
writer.writerow(record)
return [output.getvalue()]
pandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile)
// 例 5-19:在 Scala 中写 CSV
import opencsv
pandaLovers.map(person => List(person.name, person.favoriteAnimal).toArray)
.mapPartitions{people =>
val stringWriter = new StringWriter();
val csvWriter = new CSVWriter(stringWriter);
csvWriter.writeAll(people.toList)
Iterator(stringWriter.toString)
}.saveAsTextFile(outFile)
SequenceFile是由没有相对关系结构的键值组成的常用Hadoop格式,SequenceFile文件有同步标记,Spark可以用它来定位到文件中的某个点,因此可以使用多个节点高效并行读取。
SequenceFile是由实现Hadoop的Writable接口的元素组成,可以通过 重栽org.apache.hadoop.io.Writable 中的 readfields 和 write 来实现自己的 Writable 类。
表5-2:Hadoop Writable类型对应表
Scala类型 | Java类型 | Hadoop Writable类 |
---|---|---|
Int | Integer | IntWritable 或 VIntWritable |
Long | Long | LongWritable或VLongWritable |
Float | Float | FloatWitable |
Double | Double | DouleWritable |
Boolean | Boolean | BooleanWritable |
Array[Byte] | byte | ByteWritable |
String | String | String |
Array[T] | T[] | ArrayWritable<TW> |
List[T] | List<T> | ArrayWritable<TW> |
Map<K, V> | Map<K, V> | MapWritable<KW, VW> |
**注意:**TW表示模板类型T也必须使用 Writable 类型。Spark 的 Python API 只能将 Hadoop 中存在的基本 Writable 类型转为 Python 类型,并尽量基于可用的 getter 方法处理别的类型。
在SparkContext中,调用sequenceFile(path, KeyClass, ValueClass, minPartitions)。KeyClass, ValueClass必须使用基本Writable类型或者重栽Writable方法的类。
# 例 5-20:在 Python 读取 SequenceFile
val data = sc.sequenceFile(inFile,"org.apache.hadoop.io.Text", "org.apache.hadoop.io.IntWritable")
// 例 5-21:在 Scala 中读取 SequenceFile
val data = sc.sequenceFile(inFile, classOf[Text], classOf[IntWritable]).
map{case (x, y) => (x.toString, y.get())}
// 例 5-22:在 Java 中读取 SequenceFile
public static class ConvertToNativeTypes implements PairFunction<Tuple2<Text, IntWritable>, String, Integer> {
public Tuple2<String, Integer> call(Tuple2<Text, IntWritable> record) {
return new Tuple2(record._1.toString(), record._2.get());
}
}
JavaPairRDD<Text, IntWritable> input = sc.sequenceFile(fileName, Text.class,IntWritable.class);
JavaPairRDD<String, Integer> result = input.mapToPair(new ConvertToNativeTypes());
因为 SequenceFile 存储的是键值对,所以需要创建一个由可以写出到 SequenceFile 的类型构成的 PairRDD。Spark中已经将Scala原生数据类型与Hadoop Writable类型做了隐式转换,如果 Scala原生类型直接调用 saveSequenceFile(path) 保存你的 PairRDD。如果键和值不能自动转为 Writable 类型,或者想使用变长类型(比如 VIntWritable),需要在保存之前进行类型转换。
// 例 5-23:在 Scala 中保存 SequenceFile
val data = sc.parallelize(List(("Panda", 3), ("Kay", 6), ("Snail", 2)))
// Spark 已经将Scala String转为Text,int转为intWritable
data.saveAsSequenceFile(outputFile)
在 Java 中保存 SequenceFile 要稍微复杂一些,因为 JavaPairRDD 上没有 saveAsSequenceFile() 方法。我们要使用 Spark 保存自定义 Hadoop 格式的功能来实现。5.2.6节会详细描述。
对象文件基于Java对象序列化,一旦类发生改变,则已生成的文件就不可读。使用对象文件有几点需要注意:
使用方法:SparkContext的saveAsObject方法保存;objectFile( path)读取
优点:可以用来保存任意对象而不需要额外的工作。
Tips
对象文件在 Python 中无法使用,不过 Python 中的 RDD 和 SparkContext 支持 saveAsPickleFile() 和 pickleFile() 方法作为替代,这使用了 Python 的 pickle 序列化库。
Spark除了封装的格式之外,也可以与任何 Hadoop 支持的文件格式交互。
使用SparkContext的newAPIHadoopFile( )方法读取,接收四个参数:filePath表示文件格式,fClass表示输入文件类型,KeyClass,ValueClass。
/*
* @param fClass storage format of the data to be read
* @param kClass `Class` of the key associated with the `fClass` parameter
* @param vClass `Class` of the value associated with the `fClass` parameter
* @param conf Hadoop configuration
* @return RDD of tuples of key and corresponding value
*/
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
KeyValueTextInputFormat 是最简单的 Hadoop 输入格式之一,可以用于从文本文件中读取 键值对数据(如例 5-24 所示)。
// 例 5-25:在 Scala 中使用 Elephant Bird 读取 LZO 算法压缩的 JSON 文件
val input = sc.newAPIHadoopFile(inputFile, classOf[KeyValueTextInputFormat],
classOf[LongWritable], classOf[MapWritable], conf)
// "输入"中的每个MapWritable代表一个JSON对象
Java API 中没有易用的保存pairRDD的函数,因此在Java中使用saveAsHadoopFile( )保存Hadoop支持的文件格式,如SequenceFile。
/** Output the RDD to any Hadoop-supported file system. */
def saveAsHadoopFile[F <: OutputFormat[_, _]](
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[F])
{
rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
}
// 例 5-26:在 Java 保存 SequenceFile
public static class ConvertToWritableTypes implements
PairFunction<Tuple2<String, Integer>, Text, IntWritable> {
public Tuple2<Text, IntWritable> call(Tuple2<String, Integer> record) {
return new Tuple2(new Text(record._1), new IntWritable(record._2));
}
}
JavaPairRDD<String, Integer> rdd = sc.parallelizePairs(input);
JavaPairRDD<Text, IntWritable> result = rdd.mapToPair(new ConvertToWritableTypes()); result.saveAsHadoopFile(fileName, Text.class, IntWritable.class,
SequenceFileOutputFormat.class);
除 了 hadoopFile() 和 saveAsHadoopFile() 这 一 大 类 函 数, 还 可 以 使 用 hadoopDataset/ saveAsHadoopDataSet(旧API) 和 newAPIHadoopDataset/saveAsNewAPIHadoopDataset(新API) 来 访 问 Hadoop 所 支持的非文件系统的存储格式。
/**
* Output the RDD to any Hadoop-supported storage system, using
* a Configuration object for that storage system.
*/
def saveAsNewAPIHadoopDataset(conf: Configuration) {
rdd.saveAsNewAPIHadoopDataset(conf)
}
hadoopDataset() 这一组函数只接收一个 Configuration 对象,这个对象用来设置访问数据源所必需的 Hadoop 属性(Configuration.setXXXX)。与MapReduce作业驱动类配置Configuration类似。
protocol buffers(简称PB)是Google开发,用于RPC过程。PB 是结构化数据,它要求字段和类型都要明确定义。它们是经过优化的,编解码速度快,而且占用空间也很小。
PB由可选字段,必须字段,重复字段三种字段组成。在解析时,可选字段缺失不会导致解析失败,必须字段缺失则会导致解析失败。因此最好将新字段设为可选字段。
// 例 5-27:PB 定义示例
message Venue {
required int32 id = 1;
required string name = 2;
required VenueType type = 3;
optional string address = 4;
enum VenueType {
COFFEESHOP = 0;
WORKPLACE = 1;
CLUB = 2;
OMNOMNOM = 3;
OTHER = 4;
} }
message VenueResponse {
repeated Venue results = 1;
}
在大数据工作中,经常需要对数据进行压缩以节省存储空间和网络传输开销。对于大多数 Hadoop 输出格式来说,可以指定一种压缩编解码器来压缩数据。这些压缩选项只支持压缩Hadoop格式,即写入到文件系统的格式,写入到数据库的Hadoop格式一般没有实现压缩支持。
Spark 这 样的分布式系统,通常会尝试从多个不同机器上读入数据。要实现这种情况,每个工作节点都必须能够找到一条新记录的开端。有些压缩格式不支持这样读取,必须要单个节点来读入所有数据,这就很容易产生性能瓶颈。能从多个节点上并行读取的格式被称为**“可分割”**的格式。
格式 | 可分割 | 压缩速度 | 压缩效率 | Hadoop压缩解码器 | 纯Java实现 | 原生 | 备注 |
---|---|---|---|---|---|---|---|
gzip | 否 | 快 | 高 | org.apache.hadoop.io.compress.GzipCodec | 是 | 是 | |
Lzo | 是 | 非常快 | 中等 | com.hadoop. compression.lzo.LzoCodec | 是 | 是 | 需要每个节点装Lzo |
Bzip2 | 是 | 慢 | 非常高 | org.apache.hadoop.io.compress.Bzip2Codec | 是 | 是 | 为可分割版本使用纯 Java |
zlib | 否 | 慢 | 中等 | org.apache.hadoop.io.compress.DefaultCodec | 是 | 是 | Hadoop 的默认压缩编解码器 |
Snappy | 是 | 非常快 | 低 | org.apache.hadoop.io.compress.SnappyCodec | 否 | 是 | Snappy有纯 Java 的移植版,但是 在 Spark/Hadoop 中不能用 |
**注意:**Spark的textFile可以处理压缩输入,但是即使压缩文件可以分割处理,使用多台机器并行读取,Spark也不会打开splittable,因此推荐使用 newAPIHadoopFile 或者 hadoopFile,并指定正确的压缩编解码器,读取压缩输入数据。
Spark支持读写很多文件系统。
Spark支持从本地文件系统中读取文件,不过它要求文件在集群中所有节点的相同路径下都可以找到。
一些像NFS,AFS以及MapR的NFS layer这样的网络文件系统会把文件以常规文件系统的形式暴露给用户,如果数据已经在这些系统中,只需要指定输入为一个 file:// 路径;只要这个文件系统挂载在集群每个节点的同一个路径下,Spark 就会自动处理(如例 5-29 所示)。
// 例 5-29:在 Scala 中从本地文件系统读取一个压缩的文本文件
val rdd = sc.textFile("file:///home/holden/happypandas.gz")
Tips NFS
NFS就是Network File System的缩写,它最大的功能就是可以通过网络,让不同的机器、不同的操作系统可以共享彼此的文件。
NFS服务器可以让PC将网络中的NFS服务器共享的目录挂载到本地端的文件系统中,而在本地端的系统中来看,那个远程主机的目录就好像是自己的一个磁盘分区一样。
如上图示:当我们在NFS服务器设置好一个共享目录/home/public后,其他的有权访问NFS服务器的NFS客户端就可以将这个目录挂载到自己文件系统的某个挂载点,这个挂载点可以自己定义,如上图客户端A与客户端B挂载的目录就不相同。并且挂载好后我们在本地能够看到服务端/home/public的所有数据。如果服务器端配置的客户端只读,那么客户端就只能够只读。如果配置读写,客户端就能够进行读写。挂载后,NFS客户端查看磁盘信息命令:#df –h。
用 Amazon S3 来存储大量数据正日益流行。当计算节点部署在 Amazon EC2 上的时候,使用 S3 作为存储尤其快,但是在需要通过公网访问数据时性能会差很多。
要在 Spark 中访问 S3 数据,应该首先把 S3 访问凭据设置为 AWS_ACCESS_KEY_ID 和 AWS_SECRET_ACCESS_KEY 环境变量。可以从 Amazon Web Service 控制台创建这些凭据。 接下来,将一个以 s3n:// 开头的路径以 s3n://bucket/path-within-bucket 的形式传给 Spark 的输入方法。和其他所有文件系统一样,Spark 也能在 S3 路径中支持通配字符,例 如 s3n://bucket/my-Files/*.txt。
Hadoop分布式文件系统(HDFS)是一种广泛使用的文件系统,Spark能够很好的使用它,HDFS被设计成可以在廉价的硬件上工作,有弹性地应对节点失败,同时提高吞吐量,Spark和HDFS可以部署在同一批机器上,这样 Spark 可以利用数据分布来尽量避免一些 网络开销。输入文件路径: hdfs://master:port/path。
Tips
HDFS 协议随 Hadoop 版本改变而变化,因此如果你使用的 Spark 是依赖于另一个版本的 Hadoop 编译的,那么读取会失败。如果从源代码编译,你可以在环境变量中指定 SPARK_ HADOOP_VERSION= 来基于另一个版本的 Hadoop 进行编译;也可以直接下载预 编译好的 Spark 版本。
Spark SQL是Spark 1.0版本新加入的组件,结构化数据指的是有结构信息的数据——所有的数据具有一致字段结构,Spark SQL支持多种结构化数据源作为输入。
Spark SQL查询结果是Row对象组成的RDD,每个Row对象代表一条记录。在Java和Scala中,Row对象的访问是基于下标的,每个 Row 都有一个 get() 方法,返回一个一般类型让我们可以进行类型转换。另外还有针对常见基本类型 的专用 get() 方 法( 例 如 getFloat()、getInt()、getLong()、getString()、getShort()、 getBoolean() 等)。在 Python 中,可以使用 row[column_number] 以及 row.column_name 来访问元素。
Apache Hive是Hadoop上常见的结构化数据源,Hive可以在HDFS内核或者其他存储系统上存储多种格式的表,从普通文本到列式存储格式。
要把 Spark SQL 连接到已有的 Hive 上,需要将 hive-site. xml 文件复制到 Spark 的 ./conf/ 目录下,再创建出 HiveContext 对象(Spark SQL 的入口),就可以使用 Hive查询语言(HQL)来对表进行查询,并以由Row组成的 RDD 的形式拿到返回数据,如例 5-30 至例 5-32 所示。
# 例 5-30:用 Python 创建 HiveContext 并查询数据
from pyspark.sql import HiveContext
hiveCtx = HiveContext(sc)
rows = hiveCtx.sql("SELECT name, age FROM users")
firstRow = rows.first()
print(firstRow.name)
// 例 5-31:用 Scala 创建 HiveContext 并查询数据
import org.apache.spark.sql.hive.HiveContext
val hiveCtx = new HiveContext(sc)
val rows = hiveCtx.sql("SELECT name, age FROM users")
val firstRow = rows.first()
println(firstRow.getString(0)) // 字段0是name字段
// 例 5-32:用 Java 创建 HiveContext 并查询数据
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SchemaRDD;
HiveContext hiveCtx = new HiveContext(sc);
SchemaRDD rows = hiveCtx.sql("SELECT name, age FROM users");
Row firstRow = rows.first();
System.out.println(firstRow.getString(0)); // 字段0是name字段
会在 9.3.1 节更详细地介绍如何从 Hive 中读取数据。
如果你有记录结构一致的 JSON 数据,Spark SQL 也可以自动推断出它们的结构信息,并将这些数据读取为记录。读取JSON格式的数据,首先需要和使用 Hive一样创建 HiveContext(不过在这种情况下不需要安装好 Hive,也就是说不需要 hive-site.xml 文件)然后使用 HiveContext.jsonFile 方法来从整个文件中获取由 Row 对象组成的 RDD。也可以将RDD注册为一张表,方便使用SQL操作。
// 例 5-33:JSON 中的示例推文
{"user": {"name": "Holden", "location": "San Francisco"}, "text": "Nice day out today"}
{"user": {"name": "Matei", "location": "Berkeley"}, "text": "Even nicer here :)"}
例 5-34:在 Python 中使用 Spark SQL 读取 JSON 数据
tweets = hiveCtx.jsonFile("tweets.json")
tweets.registerTempTable("tweets")
results = hiveCtx.sql("SELECT user.name, text FROM tweets")
例 5-35:在 Scala 中使用 Spark SQL 读取 JSON 数据
val tweets = hiveCtx.jsonFile("tweets.json")
tweets.registerTempTable("tweets")
val results = hiveCtx.sql("SELECT user.name, text FROM tweets")
例 5-36:在 Java 中使用 Spark SQL 读取 JSON 数据
SchemaRDD tweets = hiveCtx.jsonFile(jsonFile);
// 将RDD注册成一张表
tweets.registerTempTable("tweets");
SchemaRDD results = hiveCtx.sql("SELECT user.name, text FROM tweets");
通过数据库提供的Hadoop连接器与Spark连接器,Spark可以访问一些常见的数据库。
Spark可以从任何支持Java数据库连接(JDBC)的关系型数据库中读取数据,包括:MySQL,Postgres等。需要构建一个 org.apache.spark.rdd. JdbcRDD,将 SparkContext 和其他参数一起传给它。
例 5-37:Scala 中的 JdbcRDD
def createConnection() = {
Class.forName("com.mysql.jdbc.Driver").newInstance();
DriverManager.getConnection("jdbc:mysql://localhost/test?user=holden");
}
def extractValues(r: ResultSet) = {
(r.getInt(1), r.getString(2))
}
val data = new JdbcRDD(sc,
createConnection, "SELECT * FROM panda WHERE ? <= id AND id <= ?",
lowerBound = 1, upperBound = 3, numPartitions = 2, mapRow = extractValues)
println(data.collect().toList)
JDBCRDD需要接收几个参数:
Spark 可以通Hadoop输入格式(org.apache.hadoop.hbase.mapreduce.TableInputFormat)访问 HBase。这个输入格式会返回键值对数据,其中键的类型为 org. apache.hadoop.hbase.io.ImmutableBytesWritable,而值的类型为 org.apache.hadoop.hbase. client.Result。Result 类包含多种根据列获取值的方法。
// 例 5-45:从 HBase 读取数据的 Scala 示例
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, "tablename")
val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], 、classOf[ImmutableBytesWritable],classOf[Result])
Spark 可以使用 Elasticsearch-Hadoop从 Elasticsearch中读写数据。Elasticsearch连接器会忽略我们提供的路径信息,而依赖于SparkContext中的配置项。Elasticsearc 的OutputFormat连接器也没有用到Spark所封装的类型,所以使用 saveAsHadoopDataSet 来代替,这意味着需要手动设置更多属性。
例 5-46:在 Scala 中使用 Elasticsearch 输出
val jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set("mapred.output.format.class", "org.elasticsearch.hadoop.
mr.EsOutputFormat")
jobConf.setOutputCommitter(classOf[FileOutputCommitter])
jobConf.set(ConfigurationOptions.ES_RESOURCE_WRITE, "twitter/tweets")
jobConf.set(ConfigurationOptions.ES_NODES, "localhost")
FileOutputFormat.setOutputPath(jobConf, new Path("-"))
output.saveAsHadoopDataset(jobConf)
例 5-47:在 Scala 中使用 Elasticsearch 输入
def mapWritableToInput(in: MapWritable): Map[String, String] = {
in.map{case (k, v) => (k.toString, v.toString)}.toMap
}
val jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set(ConfigurationOptions.ES_RESOURCE_READ, args(1))
jobConf.set(ConfigurationOptions.ES_NODES, args(2))
val currentTweets = sc.hadoopRDD(jobConf,
classOf[EsInputFormat[Object, MapWritable]], classOf[Object],
classOf[MapWritable])
// 仅提取map
// 将MapWritable[Text, Text]转为Map[String, String]
val tweets = currentTweets.map{ case (key, value) => mapWritableToInput(value) }
本章讲述了Spark如何从各种类型数据源读取数据,并以写入各种文件格式。
内容来源于网络,如有侵权,请联系作者删除!