本文整理了Java中org.apache.gobblin.util.WriterUtils
类的一些代码示例,展示了WriterUtils
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。WriterUtils
类的具体详情如下:
包路径:org.apache.gobblin.util.WriterUtils
类名称:WriterUtils
[英]Utility class for use with the org.apache.gobblin.writer.DataWriter class.
[中]用于组织的实用程序类。阿帕奇。戈布林。作家数据编写器类。
代码示例来源:origin: apache/incubator-gobblin
/**
* Cleanup staging data of a Gobblin task.
*
* @param state a {@link State} instance storing task configuration properties
* @param logger a {@link Logger} used for logging
*/
public static void cleanTaskStagingData(State state, Logger logger) throws IOException {
int numBranches = state.getPropAsInt(ConfigurationKeys.FORK_BRANCHES_KEY, 1);
for (int branchId = 0; branchId < numBranches; branchId++) {
String writerFsUri = state.getProp(
ForkOperatorUtils.getPropertyNameForBranch(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, numBranches, branchId),
ConfigurationKeys.LOCAL_FS_URI);
FileSystem fs = getFsWithProxy(state, writerFsUri, WriterUtils.getFsConfiguration(state));
Path stagingPath = WriterUtils.getWriterStagingDir(state, numBranches, branchId);
if (fs.exists(stagingPath)) {
logger.info("Cleaning up staging directory " + stagingPath.toUri().getPath());
if (!fs.delete(stagingPath, true)) {
throw new IOException("Clean up staging directory " + stagingPath.toUri().getPath() + " failed");
}
}
Path outputPath = WriterUtils.getWriterOutputDir(state, numBranches, branchId);
if (fs.exists(outputPath)) {
logger.info("Cleaning up output directory " + outputPath.toUri().getPath());
if (!fs.delete(outputPath, true)) {
throw new IOException("Clean up output directory " + outputPath.toUri().getPath() + " failed");
}
}
}
}
代码示例来源:origin: apache/incubator-gobblin
public static FileSystem getWriterFs(State state)
throws IOException {
return getWriterFS(state, 1, 0);
}
代码示例来源:origin: apache/incubator-gobblin
private void moveDirectory(String sourceDir, String targetDir) throws IOException {
// If targetDir exists, delete it
if (this.fs.exists(new Path(targetDir))) {
deleteDirectory(targetDir);
}
// Create parent directories of targetDir
WriterUtils.mkdirsWithRecursivePermission(this.fs, new Path(targetDir).getParent(),
FsPermission.getCachePoolDefault());
// Move directory
log.info("Moving directory: " + sourceDir + " to: " + targetDir);
if (!this.fs.rename(new Path(sourceDir), new Path(targetDir))) {
throw new IOException(String.format("Unable to move %s to %s", sourceDir, targetDir));
}
}
代码示例来源:origin: apache/incubator-gobblin
public static FileSystem getWriterFS(State state, int numBranches, int branchId)
throws IOException {
URI uri = URI.create(state.getProp(
ForkOperatorUtils.getPropertyNameForBranch(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, numBranches, branchId),
ConfigurationKeys.LOCAL_FS_URI));
Configuration hadoopConf = getFsConfiguration(state);
if (state.getPropAsBoolean(ConfigurationKeys.SHOULD_FS_PROXY_AS_USER,
ConfigurationKeys.DEFAULT_SHOULD_FS_PROXY_AS_USER)) {
// Initialize file system for a proxy user.
String authMethod =
state.getProp(ConfigurationKeys.FS_PROXY_AUTH_METHOD, ConfigurationKeys.DEFAULT_FS_PROXY_AUTH_METHOD);
if (authMethod.equalsIgnoreCase(ConfigurationKeys.TOKEN_AUTH)) {
return getWriterFsUsingToken(state, uri);
} else if (authMethod.equalsIgnoreCase(ConfigurationKeys.KERBEROS_AUTH)) {
return getWriterFsUsingKeytab(state, uri);
}
}
// Initialize file system as the current user.
return FileSystem.get(uri, hadoopConf);
}
代码示例来源:origin: apache/incubator-gobblin
Path writerOutputDir = WriterUtils.getWriterOutputDir(state, this.numBranches, branchId);
WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId), publisherOutputDir,
this.permissions.get(branchId), retrierConfig);
addSingleTaskWriterOutputToExistingDir(writerOutputDir, publisherOutputDir, state, branchId, parallelRunner);
} else {
WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
publisherOutputDir.getParent(), this.permissions.get(branchId), retrierConfig);
代码示例来源:origin: apache/incubator-gobblin
this.fs = WriterUtils.getWriterFS(properties, this.numBranches, this.branchId);
this.fileContext = FileContext.getFileContext(conf);
.getWriterStagingDir(properties, this.numBranches, this.branchId, this.writerAttemptIdOptional.get())
: WriterUtils.getWriterStagingDir(properties, this.numBranches, this.branchId);
this.stagingFile = new Path(writerStagingDir, this.fileName);
new Path(WriterUtils.getWriterOutputDir(properties, this.numBranches, this.branchId), this.fileName);
this.allOutputFilesPropName = ForkOperatorUtils
.getPropertyNameForBranch(ConfigurationKeys.WRITER_FINAL_OUTPUT_FILE_PATHS, this.numBranches, this.branchId);
WriterUtils.mkdirsWithRecursivePermission(this.fs, this.outputFile.getParent(), this.dirPermission);
this.bytesWritten = Optional.absent();
代码示例来源:origin: apache/incubator-gobblin
/**
* Create the given dir as well as all missing ancestor dirs. All created dirs will have the given permission.
* This should be used instead of {@link FileSystem#mkdirs(Path, FsPermission)}, since that method only sets
* the permission for the given dir, and not recursively for the ancestor dirs.
*
* @param fs FileSystem
* @param path The dir to be created
* @param perm The permission to be set
* @throws IOException if failing to create dir or set permission.
*/
public static void mkdirsWithRecursivePermission(final FileSystem fs, final Path path, FsPermission perm) throws IOException {
mkdirsWithRecursivePermissionWithRetry(fs, path, perm, NO_RETRY_CONFIG);
}
代码示例来源:origin: apache/incubator-gobblin
ConfigurationKeys.LOCAL_FS_URI);
Configuration conf = WriterUtils.getFsConfiguration(state);
URI uri = URI.create(uriStr);
this.fs = FileSystem.get(uri, conf);
.getWriterStagingDir(state, numBranches, branchId, this.writerAttemptIdOptional.get())
: WriterUtils.getWriterStagingDir(state, numBranches, branchId);
this.outputDir = getOutputDir(state);
this.copyableDatasetMetadata =
代码示例来源:origin: apache/incubator-gobblin
/**
* Get the staging {@link Path} for {@link org.apache.gobblin.writer.DataWriter} that has attemptId in the path.
*/
public static Path getWriterStagingDir(State state, int numBranches, int branchId, String attemptId) {
Preconditions.checkArgument(attemptId != null && !attemptId.isEmpty(), "AttemptId cannot be null or empty: " + attemptId);
return new Path(getWriterStagingDir(state, numBranches, branchId), attemptId);
}
代码示例来源:origin: apache/incubator-gobblin
public static void cleanUpOldJobData(State state, Logger logger, boolean stagingDirProvided, boolean outputDirProvided) throws IOException {
Set<Path> jobPaths = new HashSet<>();
String writerFsUri = state.getProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, ConfigurationKeys.LOCAL_FS_URI);
FileSystem fs = FileSystem.get(URI.create(writerFsUri), WriterUtils.getFsConfiguration(state));
Path jobPath;
if (stagingDirProvided) {
jobPath = new Path(state.getProp(ConfigurationKeys.WRITER_STAGING_DIR)).getParent();
} else {
jobPath = new Path(state.getProp(ConfigurationKeys.WRITER_STAGING_DIR)).getParent().getParent();
}
jobPaths.add(jobPath);
if (outputDirProvided) {
jobPath = new Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR)).getParent();
} else {
jobPath = new Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR)).getParent().getParent();
}
jobPaths.add(jobPath);
for (Path jobPathToDelete : jobPaths) {
logger.info("Cleaning up old job directory " + jobPathToDelete);
HadoopUtils.deletePath(fs, jobPathToDelete, true);
}
}
/**
代码示例来源:origin: apache/incubator-gobblin
/**
* Get the {@link Path} corresponding the to the directory a given {@link org.apache.gobblin.writer.DataWriter} should be writing
* its output data. The output data directory is determined by combining the
* {@link ConfigurationKeys#WRITER_OUTPUT_DIR} and the {@link ConfigurationKeys#WRITER_FILE_PATH}.
* @param state is the {@link State} corresponding to a specific {@link org.apache.gobblin.writer.DataWriter}.
* @param numBranches is the total number of branches for the given {@link State}.
* @param branchId is the id for the specific branch that the {@link org.apache.gobblin.writer.DataWriter} will write to.
* @return a {@link Path} specifying the directory where the {@link org.apache.gobblin.writer.DataWriter} will write to.
*/
public static Path getWriterOutputDir(State state, int numBranches, int branchId) {
String writerOutputDirKey =
ForkOperatorUtils.getPropertyNameForBranch(ConfigurationKeys.WRITER_OUTPUT_DIR, numBranches, branchId);
Preconditions.checkArgument(state.contains(writerOutputDirKey), "Missing required property " + writerOutputDirKey);
return new Path(state.getProp(writerOutputDirKey), WriterUtils.getWriterFilePath(state, numBranches, branchId));
}
代码示例来源:origin: apache/incubator-gobblin
public RestorableHivePartitionDatasetFinder(State state)
throws IOException {
this(WriterUtils.getWriterFs(new State(state)), state);
}
代码示例来源:origin: apache/incubator-gobblin
/**
* Get the output directory path this {@link BaseDataPublisher} will write to.
*
* <p>
* This is the default implementation. Subclasses of {@link BaseDataPublisher} may override this
* to write to a custom directory or write using a custom directory structure or naming pattern.
* </p>
*
* @param workUnitState a {@link WorkUnitState} object
* @param branchId the fork branch ID
* @return the output directory path this {@link BaseDataPublisher} will write to
*/
protected Path getPublisherOutputDir(WorkUnitState workUnitState, int branchId) {
return WriterUtils.getDataPublisherFinalDir(workUnitState, this.numBranches, branchId);
}
代码示例来源:origin: apache/incubator-gobblin
public String getDefaultEventBusId() {
State destinationCfg = getDestination().getProperties();
String eventBusIdKey =
ForkOperatorUtils.getPathForBranch(destinationCfg, FULL_EVENTBUSID_KEY, getBranches(),
getBranch());
if (destinationCfg.contains(eventBusIdKey)) {
return destinationCfg.getProp(eventBusIdKey);
}
else {
return WriterUtils.getWriterOutputDir(destinationCfg,
getBranches(),
getBranch())
.toString();
}
}
代码示例来源:origin: apache/incubator-gobblin
@Test
public void testGetCodecFactoryIgnoresCase() {
CodecFactory codecFactory = WriterUtils.getCodecFactory(Optional.of("SNAPPY"), Optional.<String>absent());
Assert.assertEquals(codecFactory.toString(), "snappy");
codecFactory = WriterUtils.getCodecFactory(Optional.of("snappy"), Optional.<String>absent());
Assert.assertEquals(codecFactory.toString(), "snappy");
}
}
代码示例来源:origin: org.apache.gobblin/gobblin-core
this.fs = WriterUtils.getWriterFS(properties, this.numBranches, this.branchId);
this.fileContext = FileContext.getFileContext(conf);
.getWriterStagingDir(properties, this.numBranches, this.branchId, this.writerAttemptIdOptional.get())
: WriterUtils.getWriterStagingDir(properties, this.numBranches, this.branchId);
this.stagingFile = new Path(writerStagingDir, this.fileName);
new Path(WriterUtils.getWriterOutputDir(properties, this.numBranches, this.branchId), this.fileName);
this.allOutputFilesPropName = ForkOperatorUtils
.getPropertyNameForBranch(ConfigurationKeys.WRITER_FINAL_OUTPUT_FILE_PATHS, this.numBranches, this.branchId);
WriterUtils.mkdirsWithRecursivePermission(this.fs, this.outputFile.getParent(), this.dirPermission);
this.bytesWritten = Optional.absent();
代码示例来源:origin: apache/incubator-gobblin
/**
* Make sure directory exists before running {@link BaseDataPublisher#publishData(WorkUnitState, int, boolean, Set)}
* so that tables will be moved one at a time rather than all at once
*/
@Override
protected void publishData(WorkUnitState state, int branchId, boolean publishSingleTaskData,
Set<Path> writerOutputPathsMoved) throws IOException {
Path publisherOutputDir = getPublisherOutputDir(state, branchId);
if (!this.publisherFileSystemByBranches.get(branchId).exists(publisherOutputDir)) {
WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
publisherOutputDir, this.permissions.get(branchId), this.retrierConfig);
}
super.publishData(state, branchId, publishSingleTaskData, writerOutputPathsMoved);
}
代码示例来源:origin: org.apache.gobblin/gobblin-data-management
ConfigurationKeys.LOCAL_FS_URI);
Configuration conf = WriterUtils.getFsConfiguration(state);
URI uri = URI.create(uriStr);
this.fs = FileSystem.get(uri, conf);
.getWriterStagingDir(state, numBranches, branchId, this.writerAttemptIdOptional.get())
: WriterUtils.getWriterStagingDir(state, numBranches, branchId);
this.outputDir = getOutputDir(state);
this.copyableDatasetMetadata =
代码示例来源:origin: org.apache.gobblin/gobblin-core
Path writerOutputDir = WriterUtils.getWriterOutputDir(state, this.numBranches, branchId);
WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId), publisherOutputDir,
this.permissions.get(branchId), retrierConfig);
addSingleTaskWriterOutputToExistingDir(writerOutputDir, publisherOutputDir, state, branchId, parallelRunner);
} else {
WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
publisherOutputDir.getParent(), this.permissions.get(branchId), retrierConfig);
代码示例来源:origin: apache/incubator-gobblin
@Test
public void testGetWriterDir() {
State state = new State();
state.setProp(ConfigurationKeys.WRITER_STAGING_DIR, TEST_WRITER_STAGING_DIR);
state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, TEST_WRITER_OUTPUT_DIR);
state.setProp(ConfigurationKeys.WRITER_FILE_PATH, TEST_WRITER_FILE_PATH);
Assert.assertEquals(WriterUtils.getWriterStagingDir(state, 0, 0), new Path(TEST_WRITER_STAGING_DIR,
TEST_WRITER_FILE_PATH));
state.setProp(ConfigurationKeys.WRITER_STAGING_DIR + ".0", TEST_WRITER_STAGING_DIR);
state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR + ".0", TEST_WRITER_OUTPUT_DIR);
state.setProp(ConfigurationKeys.WRITER_FILE_PATH + ".0", TEST_WRITER_FILE_PATH);
Assert.assertEquals(WriterUtils.getWriterStagingDir(state, 2, 0), new Path(TEST_WRITER_STAGING_DIR,
TEST_WRITER_FILE_PATH));
state.setProp(ConfigurationKeys.WRITER_STAGING_DIR + ".1", TEST_WRITER_STAGING_DIR);
state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR + ".1", TEST_WRITER_OUTPUT_DIR);
state.setProp(ConfigurationKeys.WRITER_FILE_PATH + ".1", TEST_WRITER_FILE_PATH);
Assert.assertEquals(WriterUtils.getWriterStagingDir(state, 2, 1), new Path(TEST_WRITER_STAGING_DIR,
TEST_WRITER_FILE_PATH));
}
内容来源于网络,如有侵权,请联系作者删除!