【09】Flink 之 DataSet API(三):DataSet Sink 操作

x33g5p2x  于2021-12-25 转载在 其他  
字(1.2k)|赞(0)|评价(0)|浏览(430)

1、DataSet Sink 数据输出

在Data Source部分和其他部分使用过写文件和打印操作,代码相同,只对理论进行介绍

1.1、分类

  1. writeAsText():将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取
  2. writeAsCsv():将元组以逗号分隔写入文件中,行及字段之间的分隔是可配置的。每个字段的值来自对象的toString()方法
  3. print():打印每个元素的toString()方法的值到标准输出或者标准错误输出流中

2、数据输出详解

通过对批量数据的读写(DataSource)及转换(Transformation)操作,最终形成用户期望的结果数据集,然后需要将数据写入不同的外部介质中进行存储,进而完成整改批量数据处理过程。Flink中对应数据输出功能被称为DataSinks操作,和DataSource Operator操作类似,为了能够让用户更加灵活地使用外部数据,Flink抽象出通用的OutputFormat接口,批量数据输出全部实现与OutputFormat接口。Flink内置了常用数据存储介质对应的OutputFormat,如HadoopOutputFormat、JDBCOutputFormat等。同样,用户也可以自定义实现OutputFormat接口。
Flink在DataSet API中的数据输出共分为三种类型:

  1. 基于文件实现,对应DataSet的write( )方法;
  2. 基于通用存储介质实现,对应DataSet的output( )方法,如JDBCOutputFormat;
  3. 客户端输出,直接将DataSet数据从不同的节点收集到Client,如print( )方法

其中,第一种和第三种在之前程序中进行了使用,对第二种进行简单介绍。

2.1、通过输出接口

可以使用自定义OutputFormat方法来定义介质对应的OutputFormat。

//	读取数据集并转换为(word , count) 类型数据
val dataSet:DataSet[{String,  Long}] = ...
//	将数据集的格式转换成[Text, LongWritable] 类型
val words = dataset.map( ... )
//	定义HadoopOutputFormat
val hadoopOutputFormat = new HadoopOutputFormat[ Text,  LongWritable ](
	new TextOutputFormat[ Text,  LongWritable ] ,
	new JobConf
)
//	指定输出路径
FileOutputFormat.setOutputPath( hadoopOutputFormat .getJobConf , new Path(resultPath) )
//	调用 Output 方法将数据写入Hadoop文件系统
words.output(hadoopOutputFormat  )

相关文章