本文整理了Java中org.apache.flink.core.fs.Path
类的一些代码示例,展示了Path
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Path
类的具体详情如下:
包路径:org.apache.flink.core.fs.Path
类名称:Path
[英]Names a file or directory in a FileSystem. Path strings use slash as the directory separator. A path string is absolute if it begins with a slash.
Tailing slashes are removed from the path.
[中]命名文件系统中的文件或目录。路径字符串使用斜杠作为目录分隔符。如果路径字符串以斜线开头,则它是绝对的。
从路径中删除尾部斜线。
代码示例来源:origin: apache/flink
private static Path createTempFilePath(String contents) throws IOException {
File tempFile = File.createTempFile("test_contents", "tmp");
tempFile.deleteOnExit();
try (OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile))) {
wrt.write(contents);
}
return new Path(tempFile.toURI().toString());
}
}
代码示例来源:origin: apache/flink
@Internal
public static FileSystem getUnguardedFileSystem(final URI fsUri) throws IOException {
checkNotNull(fsUri, "file system URI");
final URI uri;
if (fsUri.getScheme() != null) {
uri = fsUri;
final URI defaultUri = getDefaultFsUri();
URI rewrittenUri = null;
rewrittenUri = new URI(defaultUri.getScheme(), null, defaultUri.getHost(),
defaultUri.getPort(), fsUri.getPath(), null, null);
rewrittenUri = new URI(
"file", null,
new Path(new File(fsUri.getPath()).getAbsolutePath()).toUri().getPath(),
null);
} catch (URISyntaxException ignored) {
initialize(new Configuration());
代码示例来源:origin: apache/flink
@Override
public void flatMap(FileCopyTask task, Collector<Object> out) throws Exception {
LOGGER.info("Processing task: " + task);
Path outPath = new Path(targetPath, task.getRelativePath());
FileSystem targetFs = targetPath.getFileSystem();
// creating parent folders in case of a local FS
if (!targetFs.isDistributedFS()) {
//dealing with cases like file:///tmp or just /tmp
File outFile = outPath.toUri().isAbsolute() ? new File(outPath.toUri()) : new File(outPath.toString());
File parentFile = outFile.getParentFile();
if (!parentFile.mkdirs() && !parentFile.exists()) {
throw new RuntimeException("Cannot create local file system directories: " + parentFile);
}
}
FSDataOutputStream outputStream = null;
FSDataInputStream inputStream = null;
try {
outputStream = targetFs.create(outPath, FileSystem.WriteMode.OVERWRITE);
inputStream = task.getPath().getFileSystem().open(task.getPath());
int bytes = IOUtils.copy(inputStream, outputStream);
bytesCounter.add(bytes);
} finally {
IOUtils.closeQuietly(inputStream);
IOUtils.closeQuietly(outputStream);
}
fileCounter.add(1L);
}
});
代码示例来源:origin: apache/flink
/**
* Adds a suffix to the final name in the path.
*
* @param suffix The suffix to be added
* @return the new path including the suffix
*/
public Path suffix(String suffix) {
return new Path(getParent(), getName() + suffix);
}
代码示例来源:origin: apache/flink
public CompletedFuture(Path entry) {
try{
LocalFileSystem fs = (LocalFileSystem) FileSystem.getUnguardedFileSystem(entry.toUri());
result = entry.isAbsolute() ? new Path(entry.toUri().getPath()): new Path(fs.getWorkingDirectory(),entry);
} catch (Exception e){
throw new RuntimeException("DistributedCache supports only local files for Collection Environments");
}
}
代码示例来源:origin: apache/flink
public static Path compressDirectory(Path directory, Path target) throws IOException {
FileSystem sourceFs = directory.getFileSystem();
FileSystem targetFs = target.getFileSystem();
try (ZipOutputStream out = new ZipOutputStream(targetFs.create(target, FileSystem.WriteMode.NO_OVERWRITE))) {
addToZip(directory, sourceFs, directory.getParent(), out);
}
return target;
}
代码示例来源:origin: apache/flink
@Test
public void testPojoType() throws Exception {
File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
tempFile.deleteOnExit();
tempFile.setWritable(true);
OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
wrt.write("123,AAA,3.123,BBB\n");
wrt.write("456,BBB,1.123,AAA\n");
wrt.close();
@SuppressWarnings("unchecked")
PojoTypeInfo<PojoItem> typeInfo = (PojoTypeInfo<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
CsvInputFormat<PojoItem> inputFormat = new PojoCsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo);
inputFormat.configure(new Configuration());
FileInputSplit[] splits = inputFormat.createInputSplits(1);
inputFormat.open(splits[0]);
validatePojoItem(inputFormat);
}
代码示例来源:origin: apache/flink
@Test
public void testDeletePathIfEmpty() throws IOException {
final FileSystem localFs = FileSystem.getLocalFileSystem();
final File dir = tmp.newFolder();
assertTrue(dir.exists());
final Path dirPath = new Path(dir.toURI());
// deleting an empty directory should work
assertTrue(FileUtils.deletePathIfEmpty(localFs, dirPath));
// deleting a non existing directory should work
assertTrue(FileUtils.deletePathIfEmpty(localFs, dirPath));
// create a non-empty dir
final File nonEmptyDir = tmp.newFolder();
final Path nonEmptyDirPath = new Path(nonEmptyDir.toURI());
new FileOutputStream(new File(nonEmptyDir, "filename")).close();
assertFalse(FileUtils.deletePathIfEmpty(localFs, nonEmptyDirPath));
}
代码示例来源:origin: apache/flink
/**
* Returns a qualified path object.
*
* @param fs
* the FileSystem that should be used to obtain the current working directory
* @return the qualified path object
*/
public Path makeQualified(FileSystem fs) {
Path path = this;
if (!isAbsolute()) {
path = new Path(fs.getWorkingDirectory(), this);
}
final URI pathUri = path.toUri();
final URI fsUri = fs.getUri();
String scheme = pathUri.getScheme();
String authority = pathUri.getAuthority();
if (scheme != null && (authority != null || fsUri.getAuthority() == null)) {
return path;
}
if (scheme == null) {
scheme = fsUri.getScheme();
}
if (authority == null) {
authority = fsUri.getAuthority();
if (authority == null) {
authority = "";
}
}
return new Path(scheme + ":" + "//" + authority + pathUri.getPath());
}
代码示例来源:origin: apache/flink
/**
* Creates a <code>LocalFileStatus</code> object from a given {@link File} object.
*
* @param f
* the {@link File} object this <code>LocalFileStatus</code> refers to
* @param fs
* the file system the corresponding file has been read from
*/
public LocalFileStatus(final File f, final FileSystem fs) {
this.file = f;
this.path = new Path(fs.getUri().getScheme() + ":" + f.toURI().getPath());
}
代码示例来源:origin: apache/flink
/**
* Test with one nested directory and recursive.file.enumeration = true
*/
@Test
public void testOneNestedDirectoryTrue() {
try {
String firstLevelDir = TestFileUtils.randomFileName();
String secondLevelDir = TestFileUtils.randomFileName();
File insideNestedDir = tempFolder.newFolder(firstLevelDir, secondLevelDir);
File nestedDir = insideNestedDir.getParentFile();
// create a file in the first-level and two files in the nested dir
TestFileUtils.createTempFileInDirectory(nestedDir.getAbsolutePath(), "paella");
TestFileUtils.createTempFileInDirectory(insideNestedDir.getAbsolutePath(), "kalamari");
TestFileUtils.createTempFileInDirectory(insideNestedDir.getAbsolutePath(), "fideua");
this.format.setFilePath(new Path(nestedDir.toURI().toString()));
this.config.setBoolean("recursive.file.enumeration", true);
format.configure(this.config);
FileInputSplit[] splits = format.createInputSplits(1);
Assert.assertEquals(3, splits.length);
} catch (Exception ex) {
ex.printStackTrace();
Assert.fail(ex.getMessage());
}
}
代码示例来源:origin: apache/flink
@Test
public void testMakeQualified() throws IOException {
// make relative path qualified
String path = "test/test";
Path p = new Path(path).makeQualified(FileSystem.getLocalFileSystem());
URI u = p.toUri();
assertEquals("file", u.getScheme());
assertEquals(null, u.getAuthority());
String q = new Path(FileSystem.getLocalFileSystem().getWorkingDirectory().getPath(), path).getPath();
assertEquals(q, u.getPath());
// make absolute path qualified
path = "/test/test";
p = new Path(path).makeQualified(FileSystem.getLocalFileSystem());
u = p.toUri();
assertEquals("file", u.getScheme());
assertEquals(null, u.getAuthority());
assertEquals(path, u.getPath());
}
}
代码示例来源:origin: apache/flink
@Override
public void flatMap(Tuple3<String, Long, Long> value, Collector<String> out) throws Exception {
FSDataInputStream stream = FileSystem.get(new URI(value.f0)).open(new Path(value.f0));
stream.seek(value.f1);
BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
String line;
try {
while ((line = reader.readLine()) != null && (value.f2 == -1L || stream.getPos() <= value.f2)) {
out.collect(line);
}
} finally {
reader.close();
}
}
}
代码示例来源:origin: apache/flink
@Override
public void run(SourceContext<Tuple3<String, Long, Long>> ctx) throws Exception {
FileSystem fileSystem = FileSystem.get(new URI(path));
while (isRunning) {
List<String> files = listNewFiles(fileSystem);
for (String filePath : files) {
if (watchType == WatchType.ONLY_NEW_FILES
|| watchType == WatchType.REPROCESS_WITH_APPENDED) {
ctx.collect(new Tuple3<String, Long, Long>(filePath, 0L, -1L));
offsetOfFiles.put(filePath, -1L);
} else if (watchType == WatchType.PROCESS_ONLY_APPENDED) {
long offset = 0;
long fileSize = fileSystem.getFileStatus(new Path(filePath)).getLen();
if (offsetOfFiles.containsKey(filePath)) {
offset = offsetOfFiles.get(filePath);
}
ctx.collect(new Tuple3<String, Long, Long>(filePath, offset, fileSize));
offsetOfFiles.put(filePath, fileSize);
LOG.info("File processed: {}, {}, {}", filePath, offset, fileSize);
}
}
Thread.sleep(interval);
}
}
代码示例来源:origin: apache/flink
@BeforeClass
public static void createHDFS() throws Exception {
final File baseDir = TMP.newFolder();
Configuration hdConf = new Configuration();
hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
hdfsCluster = builder.build();
org.apache.hadoop.fs.FileSystem hdfs = hdfsCluster.getFileSystem();
fs = new HadoopFileSystem(hdfs);
basePath = new Path(hdfs.getUri().toString() + "/tests");
}
代码示例来源:origin: apache/flink
@Test
public void testSetPathsSingleWithMulti() {
final MultiDummyFileInputFormat format = new MultiDummyFileInputFormat();
final String myPath = "/an/imaginary/path";
format.setFilePaths(myPath);
final Path[] filePaths = format.getFilePaths();
Assert.assertEquals(1, filePaths.length);
Assert.assertEquals(myPath, filePaths[0].toUri().toString());
// ensure backwards compatibility
Assert.assertEquals(myPath, format.filePath.toUri().toString());
}
代码示例来源:origin: apache/flink
@Test
public void testSuffix() {
Path p = new Path("/my/path");
p = p.suffix("_123");
assertEquals("/my/path_123", p.toUri().getPath());
p = new Path("/my/path/");
p = p.suffix("/abc");
assertEquals("/my/path/abc", p.toUri().getPath());
p = new Path("C:/my/windows/path");
p = p.suffix("/abc");
assertEquals("/C:/my/windows/path/abc", p.toUri().getPath());
}
代码示例来源:origin: apache/flink
org.apache.hadoop.conf.Configuration hadoopConf) throws IOException {
checkNotNull(config);
checkNotNull(hadoopConf);
if (fsUri.getScheme() == null || !"hdfs".equals(fsUri.getScheme().toLowerCase())) {
throw new IOException("Invalid file system found for YarnHighAvailabilityServices: " +
"Expected 'hdfs', but found '" + fsUri.getScheme() + "'.");
this.workingDirectory = new Path(hadoopFileSystem.getWorkingDirectory().toUri());
this.haDataDirectory = new Path(workingDirectory, FLINK_RECOVERY_DATA_DIR);
flinkFileSystem.mkdirs(haDataDirectory);
blobStoreService = new FileSystemBlobStore(flinkFileSystem, haDataDirectory.toString());
代码示例来源:origin: apache/flink
@Test
public void testHomeAndWorkDir() {
assertEquals(fs.getUri().getScheme(), fs.getWorkingDirectory().toUri().getScheme());
assertEquals(fs.getUri().getScheme(), fs.getHomeDirectory().toUri().getScheme());
}
代码示例来源:origin: apache/flink
@Test
public void testPathAndScheme() throws Exception {
assertEquals(fs.getUri(), getBasePath().getFileSystem().getUri());
assertEquals(fs.getUri().getScheme(), getBasePath().toUri().getScheme());
}
内容来源于网络,如有侵权,请联系作者删除!