org.apache.flink.api.java.operators.MapOperator.withParameters()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(4.8k)|赞(0)|评价(0)|浏览(118)

本文整理了Java中org.apache.flink.api.java.operators.MapOperator.withParameters()方法的一些代码示例,展示了MapOperator.withParameters()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。MapOperator.withParameters()方法的具体详情如下:
包路径:org.apache.flink.api.java.operators.MapOperator
类名称:MapOperator
方法名:withParameters

MapOperator.withParameters介绍

暂无

代码示例

代码示例来源:origin: apache/flink

@Test
public void testPassingConfigurationObject() throws Exception {
  /*
   * Test passing configuration object.
   */
  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
  Configuration conf = new Configuration();
  conf.setInteger(TEST_KEY, TEST_VALUE);
  DataSet<Tuple3<Integer, Long, String>> bcMapDs = ds.
      map(new RichMapper2()).withParameters(conf);
  List<Tuple3<Integer, Long, String>> result = bcMapDs.collect();
  String expected = "1,1,Hi\n"
      + "2,2,Hello\n"
      + "3,2,Hello world";
  compareResultAsTuples(result, expected);
}

代码示例来源:origin: amidst/toolbox

private DataSet<DataInstance> loadDataSet(ExecutionEnvironment env){
  if (attributes==null)
    this.loadHeader(env);
  DataSet<Attributes> attsDataSet = env.fromElements(attributes);
  DataSource<String> data = env.readTextFile(pathFileData);
  Configuration config = new Configuration();
  config.setString(DataFlinkLoader.RELATION_NAME, this.relationName);
  return  data
      .filter(w -> !w.isEmpty())
      .filter(w -> !w.startsWith("%"))
      .filter(line -> !line.startsWith("@attribute"))
      .filter(line -> !line.startsWith("@relation"))
      .filter(line -> !line.startsWith("@data"))
      .map(new DataInstanceBuilder(isNormalize()))
      .withParameters(config)
      .withBroadcastSet(attsDataSet, DataFlinkLoader.ATTRIBUTES_NAME + "_" + this.relationName);
}

代码示例来源:origin: amidst/toolbox

public static double computeELBO(DataFlink<DataInstance> dataFlink, SVB svb, Function2<DataFlink<DataInstance>,Integer,DataSet<DataOnMemory<DataInstance>>> batchConverter){
  svb.setOutput(false);
  double elbo =  svb.getPlateuStructure().getNonReplictedNodes().mapToDouble(node -> svb.getPlateuStructure().getVMP().computeELBO(node)).sum();
  try {
    Configuration config = new Configuration();
    config.setBytes(SVB, Serialization.serializeObject(svb));
    config.setBytes(PRIOR, Serialization.serializeObject(svb.getPlateuStructure().getPlateauNaturalParameterPosterior()));
    DataSet<DataOnMemory<DataInstance>> batches;
    if (batchConverter!=null)
      batches= dataFlink.getBatchedDataSet(svb.getWindowsSize(),batchConverter);
    else
      batches= dataFlink.getBatchedDataSet(svb.getWindowsSize());
    elbo += batches.map(new ParallelVBMapELBO())
        .withParameters(config)
        .reduce(new ReduceFunction<Double>() {
          @Override
          public Double reduce(Double aDouble, Double t1) throws Exception {
            return aDouble + t1;
          }
        }).collect().get(0);
  } catch (Exception e) {
    e.printStackTrace();
  }
  svb.setOutput(true);
  return elbo;
}

代码示例来源:origin: amidst/toolbox

unionData
.map(new ParallelVBMap(randomStart, idenitifableModelling))
.withParameters(config)
.withBroadcastSet(loop, "VB_PARAMS_" + this.getName())
.reduce(new ParallelVBReduce());

代码示例来源:origin: amidst/toolbox

unionData
.map(new ParallelVBMap(randomStart, idenitifableModelling))
.withParameters(config)
.withBroadcastSet(loop, "VB_PARAMS_" + this.dag.getName())
.reduce(new ParallelVBReduce());

代码示例来源:origin: amidst/toolbox

/**
 * {@inheritDoc}
 */
@Override
public double updateModel(DataFlink<DataInstance> dataUpdate) {
  try {
    Configuration config = new Configuration();
    config.setString(BN_NAME, this.dag.getName());
    config.setBytes(EFBN_NAME, Serialization.serializeObject(efBayesianNetwork));
    DataSet<DataInstance> dataset = dataUpdate.getDataSet();
    this.sumSS = dataset.map(new SufficientSatisticsMAP())
        .withParameters(config)
        .reduce(new SufficientSatisticsReduce())
        .collect().get(0);
    //Add the prior
    sumSS.sum(efBayesianNetwork.createInitSufficientStatistics());
    JobExecutionResult result = dataset.getExecutionEnvironment().getLastJobExecutionResult();
    numInstances = result.getAccumulatorResult(ParallelMaximumLikelihood.COUNTER_NAME+"_"+this.dag.getName());
    numInstances++;//Initial counts
  }catch(Exception ex){
    throw new UndeclaredThrowableException(ex);
  }
  return this.getLogMarginalProbability();
}

代码示例来源:origin: amidst/toolbox

unionData
.map(new ParallelVBMap(randomStart, idenitifableModelling))
.withParameters(config)
.withBroadcastSet(loop, "VB_PARAMS_" + this.dag.getName())
.reduce(new ParallelVBReduce());

代码示例来源:origin: amidst/toolbox

unionData
.map(new ParallelVBMap(randomStart, idenitifableModelling))
.withParameters(config)
.withBroadcastSet(loop, "VB_PARAMS_" + this.dag.getName())
.reduce(new ParallelVBReduce());

代码示例来源:origin: amidst/toolbox

.withParameters(config)
.withBroadcastSet(loop, "VB_PARAMS_" + this.dagTimeT.getName())
.reduce(new eu.amidst.flinklink.core.learning.parametric.ParallelVB.ParallelVBReduce());

相关文章