我很难想出一个安全的方法来附加到文件中 HDFS
.
我用的是小的, 3-node Hadoop cluster (CDH v.5.3.9 to be specific)
. 我们的过程是一个数据流水线 multi-threaded (8 threads)
它还有一个stage,将分隔文本行附加到服务器上专用目录中的文件中 HDFS
. 我使用锁来同步线程对附加数据的缓冲写入程序的访问。
我的第一个问题是决定一般的方法。
方法a是打开文件,附加到文件中,然后对附加的每一行关闭它。这似乎是缓慢的,似乎会造成太多的小块,或至少我看到一些这样的情绪在不同的职位。
方法b是缓存写入程序,但定期刷新它们,以确保写入程序列表不会无限增长(目前,流水线处理的每个输入文件都有一个写入程序)。这似乎是一种更有效的方法,但我认为在一段时间内不管如何控制打开的流可能是一个问题,特别是对于输出文件读取器(?)
除此之外,我真正的问题有两个。我用的是 FileSystem Java Hadoop API
要执行追加操作,我会间歇性地获取以下两个异常:
org.apache.hadoop.ipc.RemoteException: failed to create file /output/acme_20160524_1.txt for DFSClient_NONMAPREDUCE_271210261_1 for client XXX.XX.XXX.XX because current leaseholder is trying to recreate file. org.apache.hadoop.ipc.RemoteException: BP-1999982165-XXX.XX.XXX.XX-1463070000410:blk_1073760252_54540 does not exist or is not under Constructionblk_1073760252_545 40{blockUCState=UNDER_RECOVERY, primaryNodeIndex=1, replicas=[ReplicaUnderConstruction[[DISK]DS-ccdf4e55-234b-4e17-955f-daaed1afdd92:NORMAL|RBW], ReplicaUnderConst ruction[[DISK]DS-1f66db61-759f-4c5d-bb3b-f78c260e338f:NORMAL|RBW]]}
有人对这两个有什么想法吗?
对于第一个问题,我尝试了插入本文中讨论的逻辑,但似乎没有帮助。
我也对 dfs.support.append
财产,如适用。
获取文件系统的代码:
userGroupInfo = UserGroupInformation.createRemoteUser("hdfs"); Configuration conf = new Configuration();
conf.set(key1, val1);
...
conf.set(keyN, valN);
fileSystem = userGroupInfo.doAs(new PrivilegedExceptionAction<FileSystem>() {
public FileSystem run() throws Exception {
return FileSystem.get(conf);
}
});
获取outputstream的代码:
org.apache.hadoop.fs.path.Path file = ...
public OutputStream getOutputStream(boolean append) throws IOException {
OutputStream os = null;
synchronized (file) {
if (isFile()) {
os = (append) ? fs.append(file) : fs.create(file, true);
} else if (append) {
// Create the file first, to avoid "failed to append to non-existent file" exception
FSDataOutputStream dos = fs.create(file);
dos.close();
// or, this can be: fs.createNewFile(file);
os = fs.append(file);
}
// Creating a new file
else {
os = fs.create(file);
}
}
return os;
}
1条答案
按热度按时间iq3niunx1#
我在使用cdh5.3/hdfs2.5.0时得到了一个附加文件。我的结论如下:
不能让一个专用线程对每个文件执行附加操作,也不能让多个线程向多个文件写入数据,无论我们是通过同一个hdfsapi文件系统示例写入数据,还是通过不同的示例写入数据。
无法刷新(即关闭并重新打开)写入程序;它们必须保持开放。
最后一项偶尔会导致相对罕见的closedchannelexception,它似乎是可恢复的(通过重试追加)。
我们使用带有阻塞队列的单线程执行器服务(一个用于附加到所有文件);每个文件有一个writer,writer保持打开状态(直到处理结束时关闭)。
当我们升级到比5.3更新的cdh时,我们想重新讨论一下这个问题,看看什么线程策略是有意义的:一个且只有一个线程,每个文件一个线程,多个线程写入多个文件。此外,我们还想看看是否可以/需要定期关闭和重新打开写入程序。
此外,我还看到了以下错误,通过在客户端将“dfs.client.block.write.replace datanode on failure.policy”设置为“never”,可以消除此错误。