本文整理了Java中org.apache.flink.core.fs.Path.getFileSystem()
方法的一些代码示例,展示了Path.getFileSystem()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Path.getFileSystem()
方法的具体详情如下:
包路径:org.apache.flink.core.fs.Path
类名称:Path
方法名:getFileSystem
[英]Returns the FileSystem that owns this Path.
[中]返回拥有此路径的文件系统。
代码示例来源:origin: apache/flink
private static boolean isOnDistributedFS(final Path path) throws IOException {
return path.getFileSystem().isDistributedFS();
}
代码示例来源:origin: apache/flink
public static Path compressDirectory(Path directory, Path target) throws IOException {
FileSystem sourceFs = directory.getFileSystem();
FileSystem targetFs = target.getFileSystem();
try (ZipOutputStream out = new ZipOutputStream(targetFs.create(target, FileSystem.WriteMode.NO_OVERWRITE))) {
addToZip(directory, sourceFs, directory.getParent(), out);
}
return target;
}
代码示例来源:origin: apache/flink
public File getFile(String name) {
if (name == null) {
throw new NullPointerException("name must not be null");
}
Future<Path> future = cacheCopyTasks.get(name);
if (future == null) {
throw new IllegalArgumentException("File with name '" + name + "' is not available." +
" Did you forget to register the file?");
}
try {
final Path path = future.get();
URI tmp = path.makeQualified(path.getFileSystem()).toUri();
return new File(tmp);
}
catch (ExecutionException e) {
throw new RuntimeException("An error occurred while copying the file.", e.getCause());
}
catch (Exception e) {
throw new RuntimeException("Error while getting the file registered under '" + name +
"' from the distributed cache", e);
}
}
代码示例来源:origin: apache/flink
private static void deleteIfExists(Path path) throws IOException {
FileSystem fs = path.getFileSystem();
if (fs.exists(path)) {
fs.delete(path, true);
}
}
代码示例来源:origin: apache/flink
private static void deleteIfExists(Path path) throws IOException {
FileSystem fs = path.getFileSystem();
if (fs.exists(path)) {
fs.delete(path, true);
}
}
代码示例来源:origin: apache/flink
/**
* Deletes the YARN application files, e.g., Flink binaries, libraries, etc., from the remote
* filesystem.
*
* @param env The environment variables.
*/
public static void deleteApplicationFiles(final Map<String, String> env) {
final String applicationFilesDir = env.get(YarnConfigKeys.FLINK_YARN_FILES);
if (!StringUtils.isNullOrWhitespaceOnly(applicationFilesDir)) {
final org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(applicationFilesDir);
try {
final org.apache.flink.core.fs.FileSystem fileSystem = path.getFileSystem();
if (!fileSystem.delete(path, true)) {
LOG.error("Deleting yarn application files under {} was unsuccessful.", applicationFilesDir);
}
} catch (final IOException e) {
LOG.error("Could not properly delete yarn application files directory {}.", applicationFilesDir, e);
}
} else {
LOG.debug("No yarn application files directory set. Therefore, cannot clean up the data.");
}
}
代码示例来源:origin: apache/flink
/**
* This recreates the new working directory of the recovered RocksDB instance and links/copies the contents from
* a local state.
*/
private void restoreInstanceDirectoryFromPath(Path source) throws IOException {
FileSystem fileSystem = source.getFileSystem();
final FileStatus[] fileStatuses = fileSystem.listStatus(source);
if (fileStatuses == null) {
throw new IOException("Cannot list file statues. Directory " + source + " does not exist.");
}
for (FileStatus fileStatus : fileStatuses) {
final Path filePath = fileStatus.getPath();
final String fileName = filePath.getName();
File restoreFile = new File(source.getPath(), fileName);
File targetFile = new File(stateBackend.instanceRocksDBPath.getPath(), fileName);
if (fileName.endsWith(SST_FILE_SUFFIX)) {
// hardlink'ing the immutable sst-files.
Files.createLink(targetFile.toPath(), restoreFile.toPath());
} else {
// true copy for all other files.
Files.copy(restoreFile.toPath(), targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
}
}
}
代码示例来源:origin: apache/flink
public VirtualFileServerHandler(Path path) throws IOException {
this.path = path;
if (!path.isAbsolute()) {
throw new IllegalArgumentException("path must be absolute: " + path.toString());
}
this.fs = path.getFileSystem();
if (!fs.exists(path) || fs.getFileStatus(path).isDir()) {
throw new IllegalArgumentException("no such file: " + path.toString());
}
}
代码示例来源:origin: apache/flink
protected List<FileStatus> getFiles() throws IOException {
// get all the files that are involved in the splits
List<FileStatus> files = new ArrayList<>();
for (Path filePath: getFilePaths()) {
final FileSystem fs = filePath.getFileSystem();
final FileStatus pathFile = fs.getFileStatus(filePath);
if (pathFile.isDir()) {
// input is directory. list all contained files
final FileStatus[] partials = fs.listStatus(filePath);
for (FileStatus partial : partials) {
if (!partial.isDir()) {
files.add(partial);
}
}
} else {
files.add(pathFile);
}
}
return files;
}
代码示例来源:origin: apache/flink
@Override
public void open(int taskNumber, int numTasks) throws IOException {
super.open(taskNumber, numTasks);
final long blockSize = this.blockSize == NATIVE_BLOCK_SIZE ?
this.outputFilePath.getFileSystem().getDefaultBlockSize() : this.blockSize;
this.blockBasedOutput = new BlockBasedOutput(this.stream, (int) blockSize);
this.outView = new DataOutputViewStreamWrapper(this.blockBasedOutput);
}
代码示例来源:origin: apache/flink
private DataFileReader<E> initReader(FileInputSplit split) throws IOException {
DatumReader<E> datumReader;
if (org.apache.avro.generic.GenericRecord.class == avroValueType) {
datumReader = new GenericDatumReader<E>();
} else {
datumReader = org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)
? new SpecificDatumReader<E>(avroValueType) : new ReflectDatumReader<E>(avroValueType);
}
if (LOG.isInfoEnabled()) {
LOG.info("Opening split {}", split);
}
SeekableInput in = new FSDataInputStreamWrapper(stream, split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
DataFileReader<E> dataFileReader = (DataFileReader) DataFileReader.openReader(in, datumReader);
if (LOG.isDebugEnabled()) {
LOG.debug("Loaded SCHEMA: {}", dataFileReader.getSchema());
}
end = split.getStart() + split.getLength();
recordsReadSinceLastSync = 0;
return dataFileReader;
}
代码示例来源:origin: apache/flink
private long fileSize(Path path) throws IOException {
return path.getFileSystem().getFileStatus(path).getLen();
}
代码示例来源:origin: apache/flink
@Test
public void testS3nKind() throws IOException {
try {
Class.forName("org.apache.hadoop.fs.s3native.NativeS3FileSystem");
} catch (ClassNotFoundException ignored) {
// not in the classpath, cannot run this test
log.info("Skipping test 'testS3nKind()' because the Native S3 file system is not in the class path");
return;
}
final FileSystem s3n = new Path("s3n://myId:mySecret@bucket/some/bucket/some/object").getFileSystem();
assertEquals(FileSystemKind.OBJECT_STORE, s3n.getKind());
}
代码示例来源:origin: apache/flink
@Test
public void testS3Kind() throws IOException {
try {
Class.forName("org.apache.hadoop.fs.s3.S3FileSystem");
} catch (ClassNotFoundException ignored) {
// not in the classpath, cannot run this test
log.info("Skipping test 'testS3Kind()' because the S3 file system is not in the class path");
return;
}
final FileSystem s3 = new Path("s3://myId:mySecret@bucket/some/bucket/some/object").getFileSystem();
assertEquals(FileSystemKind.OBJECT_STORE, s3.getKind());
}
代码示例来源:origin: apache/flink
@Test
public void testS3aKind() throws IOException {
try {
Class.forName("org.apache.hadoop.fs.s3a.S3AFileSystem");
} catch (ClassNotFoundException ignored) {
// not in the classpath, cannot run this test
log.info("Skipping test 'testS3aKind()' because the S3AFileSystem is not in the class path");
return;
}
final FileSystem s3a = new Path("s3a://myId:mySecret@bucket/some/bucket/some/object").getFileSystem();
assertEquals(FileSystemKind.OBJECT_STORE, s3a.getKind());
}
代码示例来源:origin: apache/flink
private static void getCopyTasks(Path p, String rel, List<FileCopyTask> tasks) throws IOException {
FileStatus[] res = p.getFileSystem().listStatus(p);
if (res == null) {
return;
}
for (FileStatus fs : res) {
if (fs.isDir()) {
getCopyTasks(fs.getPath(), rel + fs.getPath().getName() + "/", tasks);
} else {
Path cp = fs.getPath();
tasks.add(new FileCopyTask(cp, rel + cp.getName()));
}
}
}
}
代码示例来源:origin: apache/flink
@Test
public void testHdfsKind() throws IOException {
final FileSystem fs = new Path("hdfs://localhost:55445/my/file").getFileSystem();
assertEquals(FileSystemKind.FILE_SYSTEM, fs.getKind());
}
代码示例来源:origin: apache/flink
private static void assertState(DistributedCache.DistributedCacheEntry entry, boolean isExecutable, boolean isZipped) throws IOException {
assertNotNull(entry);
assertEquals(isExecutable, entry.isExecutable);
assertEquals(isZipped, entry.isZipped);
org.apache.flink.core.fs.Path filePath = new org.apache.flink.core.fs.Path(entry.filePath);
assertTrue(filePath.getFileSystem().exists(filePath));
assertFalse(filePath.getFileSystem().getFileStatus(filePath).isDir());
}
}
代码示例来源:origin: apache/flink
@Test
public void testPathAndScheme() throws Exception {
assertEquals(fs.getUri(), getBasePath().getFileSystem().getUri());
assertEquals(fs.getUri().getScheme(), getBasePath().toUri().getScheme());
}
代码示例来源:origin: apache/flink
/**
* Verifies that nested directories are properly copied with to the given S3 path (using the
* appropriate file system) during resource uploads for YARN.
*
* @param scheme
* file system scheme
* @param pathSuffix
* test path suffix which will be the test's target path
*/
private void testRecursiveUploadForYarn(String scheme, String pathSuffix) throws Exception {
++numRecursiveUploadTests;
final Path basePath = new Path(S3TestCredentials.getTestBucketUriWithScheme(scheme) + TEST_DATA_DIR);
final HadoopFileSystem fs = (HadoopFileSystem) basePath.getFileSystem();
assumeFalse(fs.exists(basePath));
try {
final Path directory = new Path(basePath, pathSuffix);
YarnFileStageTest.testCopyFromLocalRecursive(fs.getHadoopFileSystem(),
new org.apache.hadoop.fs.Path(directory.toUri()), tempFolder, true);
} finally {
// clean up
fs.delete(basePath, true);
}
}
内容来源于网络,如有侵权,请联系作者删除!