org.apache.flink.api.java.operators.MapOperator.returns()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.5k)|赞(0)|评价(0)|浏览(117)

本文整理了Java中org.apache.flink.api.java.operators.MapOperator.returns()方法的一些代码示例,展示了MapOperator.returns()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。MapOperator.returns()方法的具体详情如下:
包路径:org.apache.flink.api.java.operators.MapOperator
类名称:MapOperator
方法名:returns

MapOperator.returns介绍

暂无

代码示例

代码示例来源:origin: apache/flink

/**
 * Apply a function to the attribute of each vertex in the graph.
 *
 * @param mapper the map function to apply.
 * @param returnType the explicit return type.
 * @return a new graph
 */
public <NV> Graph<K, NV, EV> mapVertices(final MapFunction<Vertex<K, VV>, NV> mapper, TypeInformation<Vertex<K, NV>> returnType) {
  DataSet<Vertex<K, NV>> mappedVertices = vertices.map(
      new MapFunction<Vertex<K, VV>, Vertex<K, NV>>() {
        private Vertex<K, NV> output = new Vertex<>();
        public Vertex<K, NV> map(Vertex<K, VV> value) throws Exception {
          output.f0 = value.f0;
          output.f1 = mapper.map(value);
          return output;
        }
      })
      .returns(returnType)
      .withForwardedFields("f0")
        .name("Map vertices");
  return new Graph<>(mappedVertices, this.edges, this.context);
}

代码示例来源:origin: apache/flink

/**
 * Apply a function to the attribute of each edge in the graph.
 *
 * @param mapper the map function to apply.
 * @param returnType the explicit return type.
 * @return a new graph
 */
public <NV> Graph<K, VV, NV> mapEdges(final MapFunction<Edge<K, EV>, NV> mapper, TypeInformation<Edge<K, NV>> returnType) {
  DataSet<Edge<K, NV>> mappedEdges = edges.map(
    new MapFunction<Edge<K, EV>, Edge<K, NV>>() {
      private Edge<K, NV> output = new Edge<>();
      public Edge<K, NV> map(Edge<K, EV> value) throws Exception {
        output.f0 = value.f0;
        output.f1 = value.f1;
        output.f2 = mapper.map(value);
        return output;
      }
    })
    .returns(returnType)
    .withForwardedFields("f0; f1")
      .name("Map edges");
  return new Graph<>(this.vertices, mappedEdges, this.context);
}

代码示例来源:origin: apache/flink

/**
 * Count the number of elements in a DataSet.
 *
 * @param input DataSet of elements to be counted
 * @param <T> element type
 * @return count
 */
public static <T> DataSet<LongValue> count(DataSet<T> input) {
  return input
    .map(new MapTo<>(new LongValue(1)))
      .returns(LONG_VALUE_TYPE_INFO)
      .name("Emit 1")
    .reduce(new AddLongValue())
      .name("Sum");
}

代码示例来源:origin: apache/flink

@SuppressWarnings({"unchecked", "rawtypes"})
@Test
public void testFunctionWithMissingGenericsAndReturns() {
  RichMapFunction function = new RichMapFunction() {
    private static final long serialVersionUID = 1L;
    @Override
    public Object map(Object value) throws Exception {
      return null;
    }
  };
  TypeInformation<?> info = ExecutionEnvironment.getExecutionEnvironment()
      .fromElements("arbitrary", "data")
      .map(function).returns(Types.STRING).getResultType();
  assertEquals(Types.STRING, info);
}

代码示例来源:origin: apache/flink

return output;
}).returns(returnType).withForwardedFields("f0").name("Initialize vertex values");

代码示例来源:origin: apache/flink

private static DataSet<String> analyze(DataSet<String> input, DataSet<String> stats, int branches) {
    for (int i = 0; i < branches; i++) {
      final int ii = i;

      if (stats != null) {
        input = input.map(
          new RichMapFunction<String, String>() {
            @Override
            public String map(String value) {
              return value;
            }
        }).withBroadcastSet(stats.map(s -> "(" + s + ").map"), "stats");
      }

      DataSet<String> branch = input
        .map(s -> new Tuple2<>(0, s + ii)).returns(Types.TUPLE(Types.STRING, Types.INT))
        .groupBy(0)
        .minBy(1)
        .map(kv -> kv.f1).returns(Types.STRING);
      if (stats == null) {
        stats = branch;
      } else {
        stats = stats.union(branch);
      }
    }
    return stats.map(s -> "(" + s + ").stats");
  }
}

代码示例来源:origin: apache/flink

.returns(Types.TUPLE(Types.INT, Types.DOUBLE)); // for lambda with generics

代码示例来源:origin: apache/flink

.returns(returnType)
  .setParallelism(parallelism)
  .name("Translate edge IDs");

代码示例来源:origin: apache/flink

.returns(returnType)
  .setParallelism(parallelism)
  .name("Translate edge values");

代码示例来源:origin: apache/flink

.returns(returnType)
  .setParallelism(parallelism)
  .name("Translate vertex IDs");

代码示例来源:origin: apache/flink

.returns(returnType)
  .setParallelism(parallelism)
  .name("Translate vertex values");

代码示例来源:origin: apache/flink

@Test
public void testIdentityMapWithMissingTypesAndStringTypeHint() throws Exception {
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env.getConfig().disableSysoutLogging();
  DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
  DataSet<Tuple3<Integer, Long, String>> identityMapDs = ds
    .map(new Mapper<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>())
    .returns(new TypeHint<Tuple3<Integer, Long, String>>(){});
  List<Tuple3<Integer, Long, String>> result = identityMapDs.collect();
  String expectedResult = "(2,2,Hello)\n" +
    "(3,2,Hello world)\n" +
    "(1,1,Hi)\n";
  compareResultAsText(result, expectedResult);
}

代码示例来源:origin: apache/flink

@Test
public void testIdentityMapWithMissingTypesAndTypeInformationTypeHint() throws Exception {
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  env.getConfig().disableSysoutLogging();
  DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
  DataSet<Tuple3<Integer, Long, String>> identityMapDs = ds
    // all following generics get erased during compilation
    .map(new Mapper<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>())
    .returns(new TupleTypeInfo<Tuple3<Integer, Long, String>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));
  List<Tuple3<Integer, Long, String>> result = identityMapDs
    .collect();
  String expectedResult = "(2,2,Hello)\n" +
    "(3,2,Hello world)\n" +
    "(1,1,Hi)\n";
  compareResultAsText(result, expectedResult);
}

代码示例来源:origin: apache/flink

.returns(Types.TUPLE(Types.STRING, Types.LONG, Types.INT, Types.STRING))
DataSet<Tuple2<String, Integer>> iteration = initial
  .map(x -> Tuple2.of(x.f0, x.f1 * 2))
    .returns(Types.TUPLE(Types.STRING, Types.INT));
DataSet<Boolean> termination = iteration
    .returns(Types.TUPLE(Types.INT, Types.INT));

代码示例来源:origin: apache/flink

new InitializeWorkSet<K, VV, Message>()).returns(workSetTypeInfo);

代码示例来源:origin: org.gradoop/gradoop-flink

/**
 * Prepares the statistic a source and edge label distribution.
 * @param graph the logical graph for the calculation.
 * @return tuples with the containing statistics.
 */
@Override
public MapOperator<WithCount<Tuple2<String, String>>, Tuple3<String, String, Long>>
execute(final LogicalGraph graph) {
 return new SourceLabelAndEdgeLabelDistribution()
   .execute(graph)
   .map(value -> Tuple3.of(value.f0.f0, value.f0.f1, value.f1))
   .returns(new TypeHint<Tuple3<String, String, Long>>() { });
}

代码示例来源:origin: org.gradoop/gradoop-flink

/**
 * Prepares the statistic for distinct edge properties by label.
 * @param graph the logical graph for the calculation.
 * @return tuples with the containing statistics.
 */
@Override
public MapOperator<WithCount<Tuple2<String, String>>, Tuple3<String, String, Long>>
execute(LogicalGraph graph) {
 return new DistinctEdgePropertiesByLabel()
   .execute(graph)
   .map(value -> Tuple3.of(value.f0.f0, value.f0.f1, value.f1))
   .returns(new TypeHint<Tuple3<String, String, Long>>() { });
}

代码示例来源:origin: seznam/euphoria

static DataSet<Tuple2<Long, String>> getHdfsSource(ExecutionEnvironment env, URI inputPath)
throws IOException {
 SearchEventsParser parser = new SearchEventsParser();
 return env.readFile(new TextInputFormat(new Path(inputPath)), inputPath.toString())
     .map(parser::parse)
     .filter(q -> q != null && q.query != null && !q.query.isEmpty())
     .map(q -> Tuple2.of(q.timestamp, q.query))
     .returns(new TypeHint<Tuple2<Long, String>>() {});
}

代码示例来源:origin: dbs-leipzig/gradoop

@Override
public DataSet<Tuple3<String, String, String>> readDistributed(String path, GradoopFlinkConfig
 config) {
 return config.getExecutionEnvironment()
  .readTextFile(path)
  .map(line -> StringEscaper.split(line, CSVConstants.TOKEN_DELIMITER, 3))
  .map(tokens -> Tuple3.of(tokens[0], tokens[1], tokens[2]))
  .returns(new TypeHint<Tuple3<String, String, String>>() {
  });
}

代码示例来源:origin: org.gradoop/gradoop-flink

@Override
public DataSet<Tuple3<String, String, String>> readDistributed(String path, GradoopFlinkConfig
 config) {
 return config.getExecutionEnvironment()
  .readTextFile(path)
  .map(line -> StringEscaper.split(line, CSVConstants.TOKEN_DELIMITER, 3))
  .map(tokens -> Tuple3.of(tokens[0], tokens[1], tokens[2]))
  .returns(new TypeHint<Tuple3<String, String, String>>() {
  });
}

相关文章