org.apache.flink.core.fs.Path类的使用及代码示例

x33g5p2x  于2022-01-26 转载在 其他  
字(11.4k)|赞(0)|评价(0)|浏览(123)

本文整理了Java中org.apache.flink.core.fs.Path类的一些代码示例,展示了Path类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Path类的具体详情如下:
包路径:org.apache.flink.core.fs.Path
类名称:Path

Path介绍

[英]Names a file or directory in a FileSystem. Path strings use slash as the directory separator. A path string is absolute if it begins with a slash.

Tailing slashes are removed from the path.
[中]命名文件系统中的文件或目录。路径字符串使用斜杠作为目录分隔符。如果路径字符串以斜线开头,则它是绝对的。
从路径中删除尾部斜线。

代码示例

代码示例来源:origin: apache/flink

private static Path createTempFilePath(String contents) throws IOException {
    File tempFile = File.createTempFile("test_contents", "tmp");
    tempFile.deleteOnExit();

    try (OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile))) {
      wrt.write(contents);
    }
    return new Path(tempFile.toURI().toString());
  }
}

代码示例来源:origin: apache/flink

@Internal
public static FileSystem getUnguardedFileSystem(final URI fsUri) throws IOException {
  checkNotNull(fsUri, "file system URI");
    final URI uri;
    if (fsUri.getScheme() != null) {
      uri = fsUri;
      final URI defaultUri = getDefaultFsUri();
      URI rewrittenUri = null;
        rewrittenUri = new URI(defaultUri.getScheme(), null, defaultUri.getHost(),
            defaultUri.getPort(), fsUri.getPath(), null, null);
            rewrittenUri = new URI(
                "file", null,
                new Path(new File(fsUri.getPath()).getAbsolutePath()).toUri().getPath(),
                null);
          } catch (URISyntaxException ignored) {
      initialize(new Configuration());

代码示例来源:origin: apache/flink

@Override
  public void flatMap(FileCopyTask task, Collector<Object> out) throws Exception {
    LOGGER.info("Processing task: " + task);
    Path outPath = new Path(targetPath, task.getRelativePath());
    FileSystem targetFs = targetPath.getFileSystem();
    // creating parent folders in case of a local FS
    if (!targetFs.isDistributedFS()) {
      //dealing with cases like file:///tmp or just /tmp
      File outFile = outPath.toUri().isAbsolute() ? new File(outPath.toUri()) : new File(outPath.toString());
      File parentFile = outFile.getParentFile();
      if (!parentFile.mkdirs() && !parentFile.exists()) {
        throw new RuntimeException("Cannot create local file system directories: " + parentFile);
      }
    }
    FSDataOutputStream outputStream = null;
    FSDataInputStream inputStream = null;
    try {
      outputStream = targetFs.create(outPath, FileSystem.WriteMode.OVERWRITE);
      inputStream = task.getPath().getFileSystem().open(task.getPath());
      int bytes = IOUtils.copy(inputStream, outputStream);
      bytesCounter.add(bytes);
    } finally {
      IOUtils.closeQuietly(inputStream);
      IOUtils.closeQuietly(outputStream);
    }
    fileCounter.add(1L);
  }
});

代码示例来源:origin: apache/flink

/**
 * Adds a suffix to the final name in the path.
 *
 * @param suffix The suffix to be added
 * @return the new path including the suffix
 */
public Path suffix(String suffix) {
  return new Path(getParent(), getName() + suffix);
}

代码示例来源:origin: apache/flink

public CompletedFuture(Path entry) {
  try{
    LocalFileSystem fs = (LocalFileSystem) FileSystem.getUnguardedFileSystem(entry.toUri());
    result = entry.isAbsolute() ? new Path(entry.toUri().getPath()): new Path(fs.getWorkingDirectory(),entry);
  } catch (Exception e){
    throw new RuntimeException("DistributedCache supports only local files for Collection Environments");
  }
}

代码示例来源:origin: apache/flink

public static Path compressDirectory(Path directory, Path target) throws IOException {
  FileSystem sourceFs = directory.getFileSystem();
  FileSystem targetFs = target.getFileSystem();
  try (ZipOutputStream out = new ZipOutputStream(targetFs.create(target, FileSystem.WriteMode.NO_OVERWRITE))) {
    addToZip(directory, sourceFs, directory.getParent(), out);
  }
  return target;
}

代码示例来源:origin: apache/flink

@Test
public void testPojoType() throws Exception {
  File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
  tempFile.deleteOnExit();
  tempFile.setWritable(true);
  OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
  wrt.write("123,AAA,3.123,BBB\n");
  wrt.write("456,BBB,1.123,AAA\n");
  wrt.close();
  @SuppressWarnings("unchecked")
  PojoTypeInfo<PojoItem> typeInfo = (PojoTypeInfo<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
  CsvInputFormat<PojoItem> inputFormat = new PojoCsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo);
  inputFormat.configure(new Configuration());
  FileInputSplit[] splits = inputFormat.createInputSplits(1);
  inputFormat.open(splits[0]);
  validatePojoItem(inputFormat);
}

代码示例来源:origin: apache/flink

@Test
public void testDeletePathIfEmpty() throws IOException {
  final FileSystem localFs = FileSystem.getLocalFileSystem();
  final File dir = tmp.newFolder();
  assertTrue(dir.exists());
  final Path dirPath = new Path(dir.toURI());
  // deleting an empty directory should work
  assertTrue(FileUtils.deletePathIfEmpty(localFs, dirPath));
  // deleting a non existing directory should work
  assertTrue(FileUtils.deletePathIfEmpty(localFs, dirPath));
  // create a non-empty dir
  final File nonEmptyDir = tmp.newFolder();
  final Path nonEmptyDirPath = new Path(nonEmptyDir.toURI());
  new FileOutputStream(new File(nonEmptyDir, "filename")).close();
  assertFalse(FileUtils.deletePathIfEmpty(localFs, nonEmptyDirPath));
}

代码示例来源:origin: apache/flink

/**
 * Returns a qualified path object.
 *
 * @param fs
 *        the FileSystem that should be used to obtain the current working directory
 * @return the qualified path object
 */
public Path makeQualified(FileSystem fs) {
  Path path = this;
  if (!isAbsolute()) {
    path = new Path(fs.getWorkingDirectory(), this);
  }
  final URI pathUri = path.toUri();
  final URI fsUri = fs.getUri();
  String scheme = pathUri.getScheme();
  String authority = pathUri.getAuthority();
  if (scheme != null && (authority != null || fsUri.getAuthority() == null)) {
    return path;
  }
  if (scheme == null) {
    scheme = fsUri.getScheme();
  }
  if (authority == null) {
    authority = fsUri.getAuthority();
    if (authority == null) {
      authority = "";
    }
  }
  return new Path(scheme + ":" + "//" + authority + pathUri.getPath());
}

代码示例来源:origin: apache/flink

/**
 * Creates a <code>LocalFileStatus</code> object from a given {@link File} object.
 *
 * @param f
 *        the {@link File} object this <code>LocalFileStatus</code> refers to
 * @param fs
 *        the file system the corresponding file has been read from
 */
public LocalFileStatus(final File f, final FileSystem fs) {
  this.file = f;
  this.path = new Path(fs.getUri().getScheme() + ":" + f.toURI().getPath());
}

代码示例来源:origin: apache/flink

/**
 * Test with one nested directory and recursive.file.enumeration = true
 */
@Test
public void testOneNestedDirectoryTrue() {
  try {
    String firstLevelDir = TestFileUtils.randomFileName();
    String secondLevelDir = TestFileUtils.randomFileName();
    File insideNestedDir = tempFolder.newFolder(firstLevelDir, secondLevelDir);
    File nestedDir = insideNestedDir.getParentFile();
    // create a file in the first-level and two files in the nested dir
    TestFileUtils.createTempFileInDirectory(nestedDir.getAbsolutePath(), "paella");
    TestFileUtils.createTempFileInDirectory(insideNestedDir.getAbsolutePath(), "kalamari");
    TestFileUtils.createTempFileInDirectory(insideNestedDir.getAbsolutePath(), "fideua");
    this.format.setFilePath(new Path(nestedDir.toURI().toString()));
    this.config.setBoolean("recursive.file.enumeration", true);
    format.configure(this.config);
    FileInputSplit[] splits = format.createInputSplits(1);
    Assert.assertEquals(3, splits.length);
  } catch (Exception ex) {
    ex.printStackTrace();
    Assert.fail(ex.getMessage());
  }
}

代码示例来源:origin: apache/flink

@Test
  public void testMakeQualified() throws IOException {
    // make relative path qualified
    String path = "test/test";
    Path p  = new Path(path).makeQualified(FileSystem.getLocalFileSystem());
    URI u = p.toUri();

    assertEquals("file", u.getScheme());
    assertEquals(null, u.getAuthority());

    String q = new Path(FileSystem.getLocalFileSystem().getWorkingDirectory().getPath(), path).getPath();
    assertEquals(q, u.getPath());

    // make absolute path qualified
    path = "/test/test";
    p = new Path(path).makeQualified(FileSystem.getLocalFileSystem());
    u = p.toUri();
    assertEquals("file", u.getScheme());
    assertEquals(null, u.getAuthority());
    assertEquals(path, u.getPath());
  }
}

代码示例来源:origin: apache/flink

@Override
  public void flatMap(Tuple3<String, Long, Long> value, Collector<String> out) throws Exception {
    FSDataInputStream stream = FileSystem.get(new URI(value.f0)).open(new Path(value.f0));
    stream.seek(value.f1);

    BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
    String line;

    try {
      while ((line = reader.readLine()) != null && (value.f2 == -1L || stream.getPos() <= value.f2)) {
        out.collect(line);
      }
    } finally {
      reader.close();
    }
  }
}

代码示例来源:origin: apache/flink

@Override
public void run(SourceContext<Tuple3<String, Long, Long>> ctx) throws Exception {
  FileSystem fileSystem = FileSystem.get(new URI(path));
  while (isRunning) {
    List<String> files = listNewFiles(fileSystem);
    for (String filePath : files) {
      if (watchType == WatchType.ONLY_NEW_FILES
          || watchType == WatchType.REPROCESS_WITH_APPENDED) {
        ctx.collect(new Tuple3<String, Long, Long>(filePath, 0L, -1L));
        offsetOfFiles.put(filePath, -1L);
      } else if (watchType == WatchType.PROCESS_ONLY_APPENDED) {
        long offset = 0;
        long fileSize = fileSystem.getFileStatus(new Path(filePath)).getLen();
        if (offsetOfFiles.containsKey(filePath)) {
          offset = offsetOfFiles.get(filePath);
        }
        ctx.collect(new Tuple3<String, Long, Long>(filePath, offset, fileSize));
        offsetOfFiles.put(filePath, fileSize);
        LOG.info("File processed: {}, {}, {}", filePath, offset, fileSize);
      }
    }
    Thread.sleep(interval);
  }
}

代码示例来源:origin: apache/flink

@BeforeClass
public static void createHDFS() throws Exception {
  final File baseDir = TMP.newFolder();
  Configuration hdConf = new Configuration();
  hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
  MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
  hdfsCluster = builder.build();
  org.apache.hadoop.fs.FileSystem hdfs = hdfsCluster.getFileSystem();
  fs = new HadoopFileSystem(hdfs);
  basePath = new Path(hdfs.getUri().toString() + "/tests");
}

代码示例来源:origin: apache/flink

@Test
public void testSetPathsSingleWithMulti() {
  final MultiDummyFileInputFormat format = new MultiDummyFileInputFormat();
  final String myPath = "/an/imaginary/path";
  format.setFilePaths(myPath);
  final Path[] filePaths = format.getFilePaths();
  Assert.assertEquals(1, filePaths.length);
  Assert.assertEquals(myPath, filePaths[0].toUri().toString());
  // ensure backwards compatibility
  Assert.assertEquals(myPath, format.filePath.toUri().toString());
}

代码示例来源:origin: apache/flink

@Test
public void testSuffix() {
  Path p = new Path("/my/path");
  p = p.suffix("_123");
  assertEquals("/my/path_123", p.toUri().getPath());
  p = new Path("/my/path/");
  p = p.suffix("/abc");
  assertEquals("/my/path/abc", p.toUri().getPath());
  p = new Path("C:/my/windows/path");
  p = p.suffix("/abc");
  assertEquals("/C:/my/windows/path/abc", p.toUri().getPath());
}

代码示例来源:origin: apache/flink

org.apache.hadoop.conf.Configuration hadoopConf) throws IOException {
checkNotNull(config);
checkNotNull(hadoopConf);
if (fsUri.getScheme() == null || !"hdfs".equals(fsUri.getScheme().toLowerCase())) {
  throw new IOException("Invalid file system found for YarnHighAvailabilityServices: " +
      "Expected 'hdfs', but found '" + fsUri.getScheme() + "'.");
this.workingDirectory = new Path(hadoopFileSystem.getWorkingDirectory().toUri());
this.haDataDirectory = new Path(workingDirectory, FLINK_RECOVERY_DATA_DIR);
  flinkFileSystem.mkdirs(haDataDirectory);
blobStoreService = new FileSystemBlobStore(flinkFileSystem, haDataDirectory.toString());

代码示例来源:origin: apache/flink

@Test
public void testHomeAndWorkDir() {
  assertEquals(fs.getUri().getScheme(), fs.getWorkingDirectory().toUri().getScheme());
  assertEquals(fs.getUri().getScheme(), fs.getHomeDirectory().toUri().getScheme());
}

代码示例来源:origin: apache/flink

@Test
public void testPathAndScheme() throws Exception {
  assertEquals(fs.getUri(), getBasePath().getFileSystem().getUri());
  assertEquals(fs.getUri().getScheme(), getBasePath().toUri().getScheme());
}

相关文章