org.apache.flink.core.fs.Path.getFileSystem()方法的使用及代码示例

x33g5p2x  于2022-01-26 转载在 其他  
字(9.5k)|赞(0)|评价(0)|浏览(73)

本文整理了Java中org.apache.flink.core.fs.Path.getFileSystem()方法的一些代码示例,展示了Path.getFileSystem()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Path.getFileSystem()方法的具体详情如下:
包路径:org.apache.flink.core.fs.Path
类名称:Path
方法名:getFileSystem

Path.getFileSystem介绍

[英]Returns the FileSystem that owns this Path.
[中]返回拥有此路径的文件系统。

代码示例

代码示例来源:origin: apache/flink

private static boolean isOnDistributedFS(final Path path) throws IOException {
  return path.getFileSystem().isDistributedFS();
}

代码示例来源:origin: apache/flink

public static Path compressDirectory(Path directory, Path target) throws IOException {
  FileSystem sourceFs = directory.getFileSystem();
  FileSystem targetFs = target.getFileSystem();
  try (ZipOutputStream out = new ZipOutputStream(targetFs.create(target, FileSystem.WriteMode.NO_OVERWRITE))) {
    addToZip(directory, sourceFs, directory.getParent(), out);
  }
  return target;
}

代码示例来源:origin: apache/flink

public File getFile(String name) {
  if (name == null) {
    throw new NullPointerException("name must not be null");
  }
  Future<Path> future = cacheCopyTasks.get(name);
  if (future == null) {
    throw new IllegalArgumentException("File with name '" + name + "' is not available." +
        " Did you forget to register the file?");
  }
  try {
    final Path path = future.get();
    URI tmp = path.makeQualified(path.getFileSystem()).toUri();
    return new File(tmp);
  }
  catch (ExecutionException e) {
    throw new RuntimeException("An error occurred while copying the file.", e.getCause());
  }
  catch (Exception e) {
    throw new RuntimeException("Error while getting the file registered under '" + name +
        "' from the distributed cache", e);
  }
}

代码示例来源:origin: apache/flink

private static void deleteIfExists(Path path) throws IOException {
  FileSystem fs = path.getFileSystem();
  if (fs.exists(path)) {
    fs.delete(path, true);
  }
}

代码示例来源:origin: apache/flink

private static void deleteIfExists(Path path) throws IOException {
  FileSystem fs = path.getFileSystem();
  if (fs.exists(path)) {
    fs.delete(path, true);
  }
}

代码示例来源:origin: apache/flink

/**
 * Deletes the YARN application files, e.g., Flink binaries, libraries, etc., from the remote
 * filesystem.
 *
 * @param env The environment variables.
 */
public static void deleteApplicationFiles(final Map<String, String> env) {
  final String applicationFilesDir = env.get(YarnConfigKeys.FLINK_YARN_FILES);
  if (!StringUtils.isNullOrWhitespaceOnly(applicationFilesDir)) {
    final org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(applicationFilesDir);
    try {
      final org.apache.flink.core.fs.FileSystem fileSystem = path.getFileSystem();
      if (!fileSystem.delete(path, true)) {
        LOG.error("Deleting yarn application files under {} was unsuccessful.", applicationFilesDir);
      }
    } catch (final IOException e) {
      LOG.error("Could not properly delete yarn application files directory {}.", applicationFilesDir, e);
    }
  } else {
    LOG.debug("No yarn application files directory set. Therefore, cannot clean up the data.");
  }
}

代码示例来源:origin: apache/flink

/**
 * This recreates the new working directory of the recovered RocksDB instance and links/copies the contents from
 * a local state.
 */
private void restoreInstanceDirectoryFromPath(Path source) throws IOException {
  FileSystem fileSystem = source.getFileSystem();
  final FileStatus[] fileStatuses = fileSystem.listStatus(source);
  if (fileStatuses == null) {
    throw new IOException("Cannot list file statues. Directory " + source + " does not exist.");
  }
  for (FileStatus fileStatus : fileStatuses) {
    final Path filePath = fileStatus.getPath();
    final String fileName = filePath.getName();
    File restoreFile = new File(source.getPath(), fileName);
    File targetFile = new File(stateBackend.instanceRocksDBPath.getPath(), fileName);
    if (fileName.endsWith(SST_FILE_SUFFIX)) {
      // hardlink'ing the immutable sst-files.
      Files.createLink(targetFile.toPath(), restoreFile.toPath());
    } else {
      // true copy for all other files.
      Files.copy(restoreFile.toPath(), targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
    }
  }
}

代码示例来源:origin: apache/flink

public VirtualFileServerHandler(Path path) throws IOException {
  this.path = path;
  if (!path.isAbsolute()) {
    throw new IllegalArgumentException("path must be absolute: " + path.toString());
  }
  this.fs = path.getFileSystem();
  if (!fs.exists(path) || fs.getFileStatus(path).isDir()) {
    throw new IllegalArgumentException("no such file: " + path.toString());
  }
}

代码示例来源:origin: apache/flink

protected List<FileStatus> getFiles() throws IOException {
  // get all the files that are involved in the splits
  List<FileStatus> files = new ArrayList<>();
  for (Path filePath: getFilePaths()) {
    final FileSystem fs = filePath.getFileSystem();
    final FileStatus pathFile = fs.getFileStatus(filePath);
    if (pathFile.isDir()) {
      // input is directory. list all contained files
      final FileStatus[] partials = fs.listStatus(filePath);
      for (FileStatus partial : partials) {
        if (!partial.isDir()) {
          files.add(partial);
        }
      }
    } else {
      files.add(pathFile);
    }
  }
  return files;
}

代码示例来源:origin: apache/flink

@Override
public void open(int taskNumber, int numTasks) throws IOException {
  super.open(taskNumber, numTasks);
  final long blockSize = this.blockSize == NATIVE_BLOCK_SIZE ?
    this.outputFilePath.getFileSystem().getDefaultBlockSize() : this.blockSize;
  this.blockBasedOutput = new BlockBasedOutput(this.stream, (int) blockSize);
  this.outView = new DataOutputViewStreamWrapper(this.blockBasedOutput);
}

代码示例来源:origin: apache/flink

private DataFileReader<E> initReader(FileInputSplit split) throws IOException {
  DatumReader<E> datumReader;
  if (org.apache.avro.generic.GenericRecord.class == avroValueType) {
    datumReader = new GenericDatumReader<E>();
  } else {
    datumReader = org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)
      ? new SpecificDatumReader<E>(avroValueType) : new ReflectDatumReader<E>(avroValueType);
  }
  if (LOG.isInfoEnabled()) {
    LOG.info("Opening split {}", split);
  }
  SeekableInput in = new FSDataInputStreamWrapper(stream, split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
  DataFileReader<E> dataFileReader = (DataFileReader) DataFileReader.openReader(in, datumReader);
  if (LOG.isDebugEnabled()) {
    LOG.debug("Loaded SCHEMA: {}", dataFileReader.getSchema());
  }
  end = split.getStart() + split.getLength();
  recordsReadSinceLastSync = 0;
  return dataFileReader;
}

代码示例来源:origin: apache/flink

private long fileSize(Path path) throws IOException {
  return path.getFileSystem().getFileStatus(path).getLen();
}

代码示例来源:origin: apache/flink

@Test
public void testS3nKind() throws IOException {
  try {
    Class.forName("org.apache.hadoop.fs.s3native.NativeS3FileSystem");
  } catch (ClassNotFoundException ignored) {
    // not in the classpath, cannot run this test
    log.info("Skipping test 'testS3nKind()' because the Native S3 file system is not in the class path");
    return;
  }
  final FileSystem s3n = new Path("s3n://myId:mySecret@bucket/some/bucket/some/object").getFileSystem();
  assertEquals(FileSystemKind.OBJECT_STORE, s3n.getKind());
}

代码示例来源:origin: apache/flink

@Test
public void testS3Kind() throws IOException {
  try {
    Class.forName("org.apache.hadoop.fs.s3.S3FileSystem");
  } catch (ClassNotFoundException ignored) {
    // not in the classpath, cannot run this test
    log.info("Skipping test 'testS3Kind()' because the S3 file system is not in the class path");
    return;
  }
  final FileSystem s3 = new Path("s3://myId:mySecret@bucket/some/bucket/some/object").getFileSystem();
  assertEquals(FileSystemKind.OBJECT_STORE, s3.getKind());
}

代码示例来源:origin: apache/flink

@Test
public void testS3aKind() throws IOException {
  try {
    Class.forName("org.apache.hadoop.fs.s3a.S3AFileSystem");
  } catch (ClassNotFoundException ignored) {
    // not in the classpath, cannot run this test
    log.info("Skipping test 'testS3aKind()' because the S3AFileSystem is not in the class path");
    return;
  }
  final FileSystem s3a = new Path("s3a://myId:mySecret@bucket/some/bucket/some/object").getFileSystem();
  assertEquals(FileSystemKind.OBJECT_STORE, s3a.getKind());
}

代码示例来源:origin: apache/flink

private static void getCopyTasks(Path p, String rel, List<FileCopyTask> tasks) throws IOException {
    FileStatus[] res = p.getFileSystem().listStatus(p);
    if (res == null) {
      return;
    }
    for (FileStatus fs : res) {
      if (fs.isDir()) {
        getCopyTasks(fs.getPath(), rel + fs.getPath().getName() + "/", tasks);
      } else {
        Path cp = fs.getPath();
        tasks.add(new FileCopyTask(cp, rel + cp.getName()));
      }
    }
  }
}

代码示例来源:origin: apache/flink

@Test
public void testHdfsKind() throws IOException {
  final FileSystem fs = new Path("hdfs://localhost:55445/my/file").getFileSystem();
  assertEquals(FileSystemKind.FILE_SYSTEM, fs.getKind());
}

代码示例来源:origin: apache/flink

private static void assertState(DistributedCache.DistributedCacheEntry entry, boolean isExecutable, boolean isZipped) throws IOException {
    assertNotNull(entry);
    assertEquals(isExecutable, entry.isExecutable);
    assertEquals(isZipped, entry.isZipped);
    org.apache.flink.core.fs.Path filePath = new org.apache.flink.core.fs.Path(entry.filePath);
    assertTrue(filePath.getFileSystem().exists(filePath));
    assertFalse(filePath.getFileSystem().getFileStatus(filePath).isDir());
  }
}

代码示例来源:origin: apache/flink

@Test
public void testPathAndScheme() throws Exception {
  assertEquals(fs.getUri(), getBasePath().getFileSystem().getUri());
  assertEquals(fs.getUri().getScheme(), getBasePath().toUri().getScheme());
}

代码示例来源:origin: apache/flink

/**
 * Verifies that nested directories are properly copied with to the given S3 path (using the
 * appropriate file system) during resource uploads for YARN.
 *
 * @param scheme
 *         file system scheme
 * @param pathSuffix
 *         test path suffix which will be the test's target path
 */
private void testRecursiveUploadForYarn(String scheme, String pathSuffix) throws Exception {
  ++numRecursiveUploadTests;
  final Path basePath = new Path(S3TestCredentials.getTestBucketUriWithScheme(scheme) + TEST_DATA_DIR);
  final HadoopFileSystem fs = (HadoopFileSystem) basePath.getFileSystem();
  assumeFalse(fs.exists(basePath));
  try {
    final Path directory = new Path(basePath, pathSuffix);
    YarnFileStageTest.testCopyFromLocalRecursive(fs.getHadoopFileSystem(),
      new org.apache.hadoop.fs.Path(directory.toUri()), tempFolder, true);
  } finally {
    // clean up
    fs.delete(basePath, true);
  }
}

相关文章