org.apache.gobblin.util.WriterUtils类的使用及代码示例

x33g5p2x  于2022-02-03 转载在 其他  
字(13.6k)|赞(0)|评价(0)|浏览(157)

本文整理了Java中org.apache.gobblin.util.WriterUtils类的一些代码示例,展示了WriterUtils类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。WriterUtils类的具体详情如下:
包路径:org.apache.gobblin.util.WriterUtils
类名称:WriterUtils

WriterUtils介绍

[英]Utility class for use with the org.apache.gobblin.writer.DataWriter class.
[中]用于组织的实用程序类。阿帕奇。戈布林。作家数据编写器类。

代码示例

代码示例来源:origin: apache/incubator-gobblin

/**
 * Cleanup staging data of a Gobblin task.
 *
 * @param state a {@link State} instance storing task configuration properties
 * @param logger a {@link Logger} used for logging
 */
public static void cleanTaskStagingData(State state, Logger logger) throws IOException {
 int numBranches = state.getPropAsInt(ConfigurationKeys.FORK_BRANCHES_KEY, 1);
 for (int branchId = 0; branchId < numBranches; branchId++) {
  String writerFsUri = state.getProp(
    ForkOperatorUtils.getPropertyNameForBranch(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, numBranches, branchId),
    ConfigurationKeys.LOCAL_FS_URI);
  FileSystem fs = getFsWithProxy(state, writerFsUri, WriterUtils.getFsConfiguration(state));
  Path stagingPath = WriterUtils.getWriterStagingDir(state, numBranches, branchId);
  if (fs.exists(stagingPath)) {
   logger.info("Cleaning up staging directory " + stagingPath.toUri().getPath());
   if (!fs.delete(stagingPath, true)) {
    throw new IOException("Clean up staging directory " + stagingPath.toUri().getPath() + " failed");
   }
  }
  Path outputPath = WriterUtils.getWriterOutputDir(state, numBranches, branchId);
  if (fs.exists(outputPath)) {
   logger.info("Cleaning up output directory " + outputPath.toUri().getPath());
   if (!fs.delete(outputPath, true)) {
    throw new IOException("Clean up output directory " + outputPath.toUri().getPath() + " failed");
   }
  }
 }
}

代码示例来源:origin: apache/incubator-gobblin

public static FileSystem getWriterFs(State state)
  throws IOException {
 return getWriterFS(state, 1, 0);
}

代码示例来源:origin: apache/incubator-gobblin

private void moveDirectory(String sourceDir, String targetDir) throws IOException {
 // If targetDir exists, delete it
 if (this.fs.exists(new Path(targetDir))) {
  deleteDirectory(targetDir);
 }
 // Create parent directories of targetDir
 WriterUtils.mkdirsWithRecursivePermission(this.fs, new Path(targetDir).getParent(),
   FsPermission.getCachePoolDefault());
 // Move directory
 log.info("Moving directory: " + sourceDir + " to: " + targetDir);
 if (!this.fs.rename(new Path(sourceDir), new Path(targetDir))) {
  throw new IOException(String.format("Unable to move %s to %s", sourceDir, targetDir));
 }
}

代码示例来源:origin: apache/incubator-gobblin

public static FileSystem getWriterFS(State state, int numBranches, int branchId)
  throws IOException {
 URI uri = URI.create(state.getProp(
   ForkOperatorUtils.getPropertyNameForBranch(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, numBranches, branchId),
   ConfigurationKeys.LOCAL_FS_URI));
 Configuration hadoopConf = getFsConfiguration(state);
 if (state.getPropAsBoolean(ConfigurationKeys.SHOULD_FS_PROXY_AS_USER,
   ConfigurationKeys.DEFAULT_SHOULD_FS_PROXY_AS_USER)) {
  // Initialize file system for a proxy user.
  String authMethod =
    state.getProp(ConfigurationKeys.FS_PROXY_AUTH_METHOD, ConfigurationKeys.DEFAULT_FS_PROXY_AUTH_METHOD);
  if (authMethod.equalsIgnoreCase(ConfigurationKeys.TOKEN_AUTH)) {
   return getWriterFsUsingToken(state, uri);
  } else if (authMethod.equalsIgnoreCase(ConfigurationKeys.KERBEROS_AUTH)) {
   return getWriterFsUsingKeytab(state, uri);
  }
 }
 // Initialize file system as the current user.
 return FileSystem.get(uri, hadoopConf);
}

代码示例来源:origin: apache/incubator-gobblin

Path writerOutputDir = WriterUtils.getWriterOutputDir(state, this.numBranches, branchId);
 WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId), publisherOutputDir,
   this.permissions.get(branchId), retrierConfig);
 addSingleTaskWriterOutputToExistingDir(writerOutputDir, publisherOutputDir, state, branchId, parallelRunner);
 } else {
  WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
    publisherOutputDir.getParent(), this.permissions.get(branchId), retrierConfig);

代码示例来源:origin: apache/incubator-gobblin

this.fs = WriterUtils.getWriterFS(properties, this.numBranches, this.branchId);
this.fileContext = FileContext.getFileContext(conf);
  .getWriterStagingDir(properties, this.numBranches, this.branchId, this.writerAttemptIdOptional.get())
  : WriterUtils.getWriterStagingDir(properties, this.numBranches, this.branchId);
this.stagingFile = new Path(writerStagingDir, this.fileName);
  new Path(WriterUtils.getWriterOutputDir(properties, this.numBranches, this.branchId), this.fileName);
this.allOutputFilesPropName = ForkOperatorUtils
  .getPropertyNameForBranch(ConfigurationKeys.WRITER_FINAL_OUTPUT_FILE_PATHS, this.numBranches, this.branchId);
WriterUtils.mkdirsWithRecursivePermission(this.fs, this.outputFile.getParent(), this.dirPermission);
this.bytesWritten = Optional.absent();

代码示例来源:origin: apache/incubator-gobblin

/**
 * Create the given dir as well as all missing ancestor dirs. All created dirs will have the given permission.
 * This should be used instead of {@link FileSystem#mkdirs(Path, FsPermission)}, since that method only sets
 * the permission for the given dir, and not recursively for the ancestor dirs.
 *
 * @param fs FileSystem
 * @param path The dir to be created
 * @param perm The permission to be set
 * @throws IOException if failing to create dir or set permission.
 */
public static void mkdirsWithRecursivePermission(final FileSystem fs, final Path path, FsPermission perm) throws IOException {
 mkdirsWithRecursivePermissionWithRetry(fs, path, perm, NO_RETRY_CONFIG);
}

代码示例来源:origin: apache/incubator-gobblin

ConfigurationKeys.LOCAL_FS_URI);
Configuration conf = WriterUtils.getFsConfiguration(state);
URI uri = URI.create(uriStr);
this.fs = FileSystem.get(uri, conf);
  .getWriterStagingDir(state, numBranches, branchId, this.writerAttemptIdOptional.get())
  : WriterUtils.getWriterStagingDir(state, numBranches, branchId);
this.outputDir = getOutputDir(state);
this.copyableDatasetMetadata =

代码示例来源:origin: apache/incubator-gobblin

/**
 * Get the staging {@link Path} for {@link org.apache.gobblin.writer.DataWriter} that has attemptId in the path.
 */
public static Path getWriterStagingDir(State state, int numBranches, int branchId, String attemptId) {
 Preconditions.checkArgument(attemptId != null && !attemptId.isEmpty(), "AttemptId cannot be null or empty: " + attemptId);
 return new Path(getWriterStagingDir(state, numBranches, branchId), attemptId);
}

代码示例来源:origin: apache/incubator-gobblin

public static void cleanUpOldJobData(State state, Logger logger, boolean stagingDirProvided, boolean outputDirProvided) throws IOException {
 Set<Path> jobPaths = new HashSet<>();
 String writerFsUri = state.getProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, ConfigurationKeys.LOCAL_FS_URI);
 FileSystem fs = FileSystem.get(URI.create(writerFsUri), WriterUtils.getFsConfiguration(state));
 Path jobPath;
 if (stagingDirProvided) {
  jobPath = new Path(state.getProp(ConfigurationKeys.WRITER_STAGING_DIR)).getParent();
 } else {
  jobPath = new Path(state.getProp(ConfigurationKeys.WRITER_STAGING_DIR)).getParent().getParent();
 }
 jobPaths.add(jobPath);
 if (outputDirProvided) {
  jobPath = new Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR)).getParent();
 } else {
  jobPath = new Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR)).getParent().getParent();
 }
 jobPaths.add(jobPath);
 for (Path jobPathToDelete : jobPaths) {
  logger.info("Cleaning up old job directory " + jobPathToDelete);
  HadoopUtils.deletePath(fs, jobPathToDelete, true);
 }
}
/**

代码示例来源:origin: apache/incubator-gobblin

/**
 * Get the {@link Path} corresponding the to the directory a given {@link org.apache.gobblin.writer.DataWriter} should be writing
 * its output data. The output data directory is determined by combining the
 * {@link ConfigurationKeys#WRITER_OUTPUT_DIR} and the {@link ConfigurationKeys#WRITER_FILE_PATH}.
 * @param state is the {@link State} corresponding to a specific {@link org.apache.gobblin.writer.DataWriter}.
 * @param numBranches is the total number of branches for the given {@link State}.
 * @param branchId is the id for the specific branch that the {@link org.apache.gobblin.writer.DataWriter} will write to.
 * @return a {@link Path} specifying the directory where the {@link org.apache.gobblin.writer.DataWriter} will write to.
 */
public static Path getWriterOutputDir(State state, int numBranches, int branchId) {
 String writerOutputDirKey =
   ForkOperatorUtils.getPropertyNameForBranch(ConfigurationKeys.WRITER_OUTPUT_DIR, numBranches, branchId);
 Preconditions.checkArgument(state.contains(writerOutputDirKey), "Missing required property " + writerOutputDirKey);
 return new Path(state.getProp(writerOutputDirKey), WriterUtils.getWriterFilePath(state, numBranches, branchId));
}

代码示例来源:origin: apache/incubator-gobblin

public RestorableHivePartitionDatasetFinder(State state)
  throws IOException {
 this(WriterUtils.getWriterFs(new State(state)), state);
}

代码示例来源:origin: apache/incubator-gobblin

/**
 * Get the output directory path this {@link BaseDataPublisher} will write to.
 *
 * <p>
 *   This is the default implementation. Subclasses of {@link BaseDataPublisher} may override this
 *   to write to a custom directory or write using a custom directory structure or naming pattern.
 * </p>
 *
 * @param workUnitState a {@link WorkUnitState} object
 * @param branchId the fork branch ID
 * @return the output directory path this {@link BaseDataPublisher} will write to
 */
protected Path getPublisherOutputDir(WorkUnitState workUnitState, int branchId) {
 return WriterUtils.getDataPublisherFinalDir(workUnitState, this.numBranches, branchId);
}

代码示例来源:origin: apache/incubator-gobblin

public String getDefaultEventBusId() {
 State destinationCfg = getDestination().getProperties();
 String eventBusIdKey =
   ForkOperatorUtils.getPathForBranch(destinationCfg, FULL_EVENTBUSID_KEY, getBranches(),
                     getBranch());
 if (destinationCfg.contains(eventBusIdKey)) {
  return destinationCfg.getProp(eventBusIdKey);
 }
 else {
  return WriterUtils.getWriterOutputDir(destinationCfg,
                     getBranches(),
                     getBranch())
           .toString();
 }
}

代码示例来源:origin: apache/incubator-gobblin

@Test
 public void testGetCodecFactoryIgnoresCase() {
  CodecFactory codecFactory = WriterUtils.getCodecFactory(Optional.of("SNAPPY"), Optional.<String>absent());
  Assert.assertEquals(codecFactory.toString(), "snappy");
  codecFactory = WriterUtils.getCodecFactory(Optional.of("snappy"), Optional.<String>absent());
  Assert.assertEquals(codecFactory.toString(), "snappy");
 }
}

代码示例来源:origin: org.apache.gobblin/gobblin-core

this.fs = WriterUtils.getWriterFS(properties, this.numBranches, this.branchId);
this.fileContext = FileContext.getFileContext(conf);
  .getWriterStagingDir(properties, this.numBranches, this.branchId, this.writerAttemptIdOptional.get())
  : WriterUtils.getWriterStagingDir(properties, this.numBranches, this.branchId);
this.stagingFile = new Path(writerStagingDir, this.fileName);
  new Path(WriterUtils.getWriterOutputDir(properties, this.numBranches, this.branchId), this.fileName);
this.allOutputFilesPropName = ForkOperatorUtils
  .getPropertyNameForBranch(ConfigurationKeys.WRITER_FINAL_OUTPUT_FILE_PATHS, this.numBranches, this.branchId);
WriterUtils.mkdirsWithRecursivePermission(this.fs, this.outputFile.getParent(), this.dirPermission);
this.bytesWritten = Optional.absent();

代码示例来源:origin: apache/incubator-gobblin

/**
 * Make sure directory exists before running {@link BaseDataPublisher#publishData(WorkUnitState, int, boolean, Set)}
 * so that tables will be moved one at a time rather than all at once
 */
@Override
protected void publishData(WorkUnitState state, int branchId, boolean publishSingleTaskData,
  Set<Path> writerOutputPathsMoved) throws IOException {
 Path publisherOutputDir = getPublisherOutputDir(state, branchId);
 if (!this.publisherFileSystemByBranches.get(branchId).exists(publisherOutputDir)) {
  WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
    publisherOutputDir, this.permissions.get(branchId), this.retrierConfig);
 }
 super.publishData(state, branchId, publishSingleTaskData, writerOutputPathsMoved);
}

代码示例来源:origin: org.apache.gobblin/gobblin-data-management

ConfigurationKeys.LOCAL_FS_URI);
Configuration conf = WriterUtils.getFsConfiguration(state);
URI uri = URI.create(uriStr);
this.fs = FileSystem.get(uri, conf);
  .getWriterStagingDir(state, numBranches, branchId, this.writerAttemptIdOptional.get())
  : WriterUtils.getWriterStagingDir(state, numBranches, branchId);
this.outputDir = getOutputDir(state);
this.copyableDatasetMetadata =

代码示例来源:origin: org.apache.gobblin/gobblin-core

Path writerOutputDir = WriterUtils.getWriterOutputDir(state, this.numBranches, branchId);
 WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId), publisherOutputDir,
   this.permissions.get(branchId), retrierConfig);
 addSingleTaskWriterOutputToExistingDir(writerOutputDir, publisherOutputDir, state, branchId, parallelRunner);
 } else {
  WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
    publisherOutputDir.getParent(), this.permissions.get(branchId), retrierConfig);

代码示例来源:origin: apache/incubator-gobblin

@Test
public void testGetWriterDir() {
 State state = new State();
 state.setProp(ConfigurationKeys.WRITER_STAGING_DIR, TEST_WRITER_STAGING_DIR);
 state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, TEST_WRITER_OUTPUT_DIR);
 state.setProp(ConfigurationKeys.WRITER_FILE_PATH, TEST_WRITER_FILE_PATH);
 Assert.assertEquals(WriterUtils.getWriterStagingDir(state, 0, 0), new Path(TEST_WRITER_STAGING_DIR,
   TEST_WRITER_FILE_PATH));
 state.setProp(ConfigurationKeys.WRITER_STAGING_DIR + ".0", TEST_WRITER_STAGING_DIR);
 state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR + ".0", TEST_WRITER_OUTPUT_DIR);
 state.setProp(ConfigurationKeys.WRITER_FILE_PATH + ".0", TEST_WRITER_FILE_PATH);
 Assert.assertEquals(WriterUtils.getWriterStagingDir(state, 2, 0), new Path(TEST_WRITER_STAGING_DIR,
   TEST_WRITER_FILE_PATH));
 state.setProp(ConfigurationKeys.WRITER_STAGING_DIR + ".1", TEST_WRITER_STAGING_DIR);
 state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR + ".1", TEST_WRITER_OUTPUT_DIR);
 state.setProp(ConfigurationKeys.WRITER_FILE_PATH + ".1", TEST_WRITER_FILE_PATH);
 Assert.assertEquals(WriterUtils.getWriterStagingDir(state, 2, 1), new Path(TEST_WRITER_STAGING_DIR,
   TEST_WRITER_FILE_PATH));
}

相关文章