cascading.flow.Flow.getFlowSteps()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(14.2k)|赞(0)|评价(0)|浏览(139)

本文整理了Java中cascading.flow.Flow.getFlowSteps()方法的一些代码示例,展示了Flow.getFlowSteps()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flow.getFlowSteps()方法的具体详情如下:
包路径:cascading.flow.Flow
类名称:Flow
方法名:getFlowSteps

Flow.getFlowSteps介绍

[英]Method getFlowSteps returns the flowSteps of this Flow object. They will be in topological order.
[中]方法getFlowSteps返回此流对象的flowSteps。它们将按拓扑顺序排列。

代码示例

代码示例来源:origin: twitter/ambrose

/**
 * The onStarting event is fired when a Flow instance receives the start() message. A Flow is cut
 * down into executing units called stepFlow. A stepFlow contains a stepFlowJob which represents
 * the mapreduce job to be submitted to Hadoop. The ambrose graph is constructed from the step
 * graph found in flow object.
 *
 * @param flow the flow.
 */
@Override
@SuppressWarnings("unchecked")
public void onStarting(Flow flow) {
 // init flow
 List<FlowStep> steps = flow.getFlowSteps();
 totalNumberOfJobs = steps.size();
 currentFlowId = flow.getID();
 Properties props = new Properties();
 props.putAll(flow.getConfigAsProperties());
 try {
  statsWriteService.initWriteService(props);
 } catch (IOException e) {
  LOG.error("Failed to initialize statsWriteService", e);
 }
 // convert graph from cascading to ambrose
 AmbroseCascadingGraphConverter converter =
   new AmbroseCascadingGraphConverter(Flows.getStepGraphFrom(flow), nodesByName);
 converter.convert();
 AmbroseUtils.sendDagNodeNameMap(statsWriteService, currentFlowId, nodesByName);
}

代码示例来源:origin: twitter/ambrose

public void onStarting(Flow flow) {
 List<FlowStep> steps = flow.getFlowSteps();
 totalNumberOfJobs = steps.size();
 currentFlowId = flow.getID();

代码示例来源:origin: LiveRamp/cascading_ext

private static String formatJobName(FlowStep<JobConf> flowStep) {
 // WordCount [(2/5) input_1, input_2] -> output_1232_ABCDEF123456789...
 String jobName = String.format(
   "%s [(%d/%d) %s] -> %s",
   flowStep.getFlowName(),
   flowStep.getStepNum(),
   flowStep.getFlow().getFlowSteps().size(),
   StringUtils.abbreviate(
     join(getPrettyNamesForTaps(flowStep.getSources(), true)),
     MAX_SOURCE_PATH_NAMES_LENGTH),
   join(getPrettyNamesForTaps(flowStep.getSinks(), false)));
 return StringUtils.abbreviate(jobName, MAX_JOB_NAME_LENGTH);
}

代码示例来源:origin: cascading/cascading-hadoop2-mr1

@Test
public void testDupeSourceRepeat()
 {
 Tap source1 = new Hfs( new TextLine( new Fields( "offset", "line" ) ), "foo/merge" );
 Tap sink = new Hfs( new TextLine(), "foo" );
 Pipe pipe = new Pipe( "pipe" );
 Pipe merge = new CoGroup( "cogroup", pipe, new Fields( "offset" ), 1, Fields.size( 4 ) );
 Map sources = new HashMap();
 sources.put( "pipe", source1 );
 Map sinks = new HashMap();
 sinks.put( "cogroup", sink );
 Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, merge );
 List<FlowStep> steps = flow.getFlowSteps();
 assertEquals( "not equal: steps.size()", 1, steps.size() );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testDupeSourceRepeat()
 {
 Tap source1 = new Hfs( new TextLine( new Fields( "offset", "line" ) ), "foo/merge" );
 Tap sink = new Hfs( new TextLine(), "foo" );
 Pipe pipe = new Pipe( "pipe" );
 Pipe merge = new CoGroup( "cogroup", pipe, new Fields( "offset" ), 1, Fields.size( 4 ) );
 Map sources = new HashMap();
 sources.put( "pipe", source1 );
 Map sinks = new HashMap();
 sinks.put( "cogroup", sink );
 Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, merge );
 List<FlowStep> steps = flow.getFlowSteps();
 assertEquals( "not equal: steps.size()", 1, steps.size() );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testPipeAssemblySplit()
 {
 Pipe pipe = new TestAssembly( "test" );
 Pipe pipe1 = new GroupBy( "left", pipe, new Fields( "ip" ) );
 Pipe pipe2 = new GroupBy( "right", pipe, new Fields( "ip" ) );
 Tap source = getPlatform().getTextFile( "foo" );
 Tap sink1 = getPlatform().getTextFile( "foo/split1", SinkMode.REPLACE );
 Tap sink2 = getPlatform().getTextFile( "foo/split2", SinkMode.REPLACE );
 Map sources = new HashMap();
 sources.put( "test", source );
 Map sinks = new HashMap();
 sinks.put( "left", sink1 );
 sinks.put( "right", sink2 );
 List<FlowStep> steps = getPlatform().getFlowConnector().connect( sources, sinks, pipe1, pipe2 ).getFlowSteps();
 if( getPlatform().isMapReduce() )
  assertEquals( "not equal: steps.size()", 2, steps.size() );
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testPipeAssemblySplit()
 {
 Pipe pipe = new TestAssembly( "test" );
 Pipe pipe1 = new GroupBy( "left", pipe, new Fields( "ip" ) );
 Pipe pipe2 = new GroupBy( "right", pipe, new Fields( "ip" ) );
 Tap source = getPlatform().getTextFile( "foo" );
 Tap sink1 = getPlatform().getTextFile( "foo/split1", SinkMode.REPLACE );
 Tap sink2 = getPlatform().getTextFile( "foo/split2", SinkMode.REPLACE );
 Map sources = new HashMap();
 sources.put( "test", source );
 Map sinks = new HashMap();
 sinks.put( "left", sink1 );
 sinks.put( "right", sink2 );
 List<FlowStep> steps = getPlatform().getFlowConnector().connect( sources, sinks, pipe1, pipe2 ).getFlowSteps();
 if( getPlatform().isMapReduce() )
  assertEquals( "not equal: steps.size()", 2, steps.size() );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testLocalModeSink() throws Exception
 {
 Tap source = new Hfs( new TextLine(), "input/path" );
 Tap sink = new Lfs( new TextLine(), "output/path", SinkMode.REPLACE );
 Pipe pipe = new Pipe( "test" );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 List<FlowStep> steps = flow.getFlowSteps();
 assertEquals( "wrong size", 1, steps.size() );
 FlowStep step = steps.get( 0 );
 boolean isLocal = HadoopUtil.isLocal( (Configuration) step.getConfig() );
 assertTrue( "is not local", isLocal );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testLocalModeSource() throws Exception
 {
 Tap source = new Lfs( new TextLine(), "input/path" );
 Tap sink = new Hfs( new TextLine(), "output/path", SinkMode.REPLACE );
 Pipe pipe = new Pipe( "test" );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 List<FlowStep> steps = flow.getFlowSteps();
 assertEquals( "wrong size", 1, steps.size() );
 FlowStep step = steps.get( 0 );
 boolean isLocal = HadoopUtil.isLocal( (Configuration) step.getConfig() );
 assertTrue( "is not local", isLocal );
 }

代码示例来源:origin: cascading/cascading-hadoop2-common

@Test
public void testLocalModeSource() throws Exception
 {
 Tap source = new Lfs( new TextLine(), "input/path" );
 Tap sink = new Hfs( new TextLine(), "output/path", SinkMode.REPLACE );
 Pipe pipe = new Pipe( "test" );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 List<FlowStep> steps = flow.getFlowSteps();
 assertEquals( "wrong size", 1, steps.size() );
 FlowStep step = steps.get( 0 );
 boolean isLocal = HadoopUtil.isLocal( (Configuration) step.getConfig() );
 assertTrue( "is not local", isLocal );
 }

代码示例来源:origin: cascading/cascading-hadoop2-common

@Test
public void testLocalModeSink() throws Exception
 {
 Tap source = new Hfs( new TextLine(), "input/path" );
 Tap sink = new Lfs( new TextLine(), "output/path", SinkMode.REPLACE );
 Pipe pipe = new Pipe( "test" );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 List<FlowStep> steps = flow.getFlowSteps();
 assertEquals( "wrong size", 1, steps.size() );
 FlowStep step = steps.get( 0 );
 boolean isLocal = HadoopUtil.isLocal( (Configuration) step.getConfig() );
 assertTrue( "is not local", isLocal );
 }

代码示例来源:origin: cwensel/cascading

/**
 * Verifies the same tap instance can be shared between two logically different pipes.
 *
 * @throws IOException
 */
@Test
public void testSameTaps() throws IOException
 {
 Map sources = new HashMap();
 Map sinks = new HashMap();
 Hfs tap = new Hfs( new TextLine( new Fields( "first", "second" ) ), "input/path/a" );
 sources.put( "a", tap );
 sources.put( "b", tap );
 Pipe pipeA = new Pipe( "a" );
 Pipe pipeB = new Pipe( "b" );
 Pipe group1 = new GroupBy( pipeA );
 Pipe group2 = new GroupBy( pipeB );
 Pipe merge = new GroupBy( "tail", Pipe.pipes( group1, group2 ), new Fields( "first", "second" ) );
 sinks.put( merge.getName(), new Hfs( new TextLine(), "output/path" ) );
 Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, merge );
 assertEquals( "not equal: steps.size()", 3, flow.getFlowSteps().size() );
 }

代码示例来源:origin: cwensel/cascading

/** Tests that proper pipe graph is assembled without throwing an internal error */
@Test
public void testPipeAssembly()
 {
 Pipe pipe = new TestAssembly( "test" );
 pipe = new GroupBy( pipe, new Fields( "ip" ) );
 Tap source = getPlatform().getTextFile( "foo" );
 Tap sink = getPlatform().getTextFile( "foo/split1", SinkMode.REPLACE );
 List<FlowStep> steps = getPlatform().getFlowConnector().connect( source, sink, pipe ).getFlowSteps();
 assertEquals( "not equal: steps.size()", 1, steps.size() );
 }

代码示例来源:origin: cwensel/cascading

/**
 * This is an alternative to having two pipes with the same name, but uses one pipe that is split
 * across two branches.
 *
 * @throws IOException
 */
@Test
public void testSameSourceForBranch() throws IOException
 {
 Map sources = new HashMap();
 Map sinks = new HashMap();
 sources.put( "a", new Hfs( new TextLine( new Fields( "first", "second" ) ), "input/path/a" ) );
 Pipe pipeA = new Pipe( "a" );
 Pipe group1 = new GroupBy( "a1", pipeA, Fields.FIRST );
 Pipe group2 = new GroupBy( "a2", pipeA, Fields.FIRST );
 Pipe merge = new GroupBy( "tail", Pipe.pipes( group1, group2 ), new Fields( "first", "second" ) );
 sinks.put( merge.getName(), new Hfs( new TextLine(), "output/path" ) );
 Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, merge );
 assertEquals( "not equal: steps.size()", 3, flow.getFlowSteps().size() );
 }

代码示例来源:origin: cascading/cascading-hadoop2-mr1

/**
 * This is an alternative to having two pipes with the same name, but uses one pipe that is split
 * across two branches.
 *
 * @throws IOException
 */
@Test
public void testSameSourceForBranch() throws IOException
 {
 Map sources = new HashMap();
 Map sinks = new HashMap();
 sources.put( "a", new Hfs( new TextLine( new Fields( "first", "second" ) ), "input/path/a" ) );
 Pipe pipeA = new Pipe( "a" );
 Pipe group1 = new GroupBy( "a1", pipeA, Fields.FIRST );
 Pipe group2 = new GroupBy( "a2", pipeA, Fields.FIRST );
 Pipe merge = new GroupBy( "tail", Pipe.pipes( group1, group2 ), new Fields( "first", "second" ) );
 sinks.put( merge.getName(), new Hfs( new TextLine(), "output/path" ) );
 Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, merge );
 assertEquals( "not equal: steps.size()", 3, flow.getFlowSteps().size() );
 }

代码示例来源:origin: cascading/cascading-platform

/** Tests that proper pipe graph is assembled without throwing an internal error */
@Test
public void testPipeAssembly()
 {
 Pipe pipe = new TestAssembly( "test" );
 pipe = new GroupBy( pipe, new Fields( "ip" ) );
 Tap source = getPlatform().getTextFile( "foo" );
 Tap sink = getPlatform().getTextFile( "foo/split1", SinkMode.REPLACE );
 List<FlowStep> steps = getPlatform().getFlowConnector().connect( source, sink, pipe ).getFlowSteps();
 assertEquals( "not equal: steps.size()", 1, steps.size() );
 }

代码示例来源:origin: cwensel/cascading

/**
 * Test a single piece Pipe, should not fail, inserts Identity pipe
 *
 * @throws IOException
 */
@Test
public void testIdentity() throws Exception
 {
 Tap source = new Hfs( new TextLine(), "input/path" );
 Tap sink = new Hfs( new TextLine(), "output/path", SinkMode.REPLACE );
 Pipe pipe = new Pipe( "test" );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 List<FlowStep> steps = flow.getFlowSteps();
 assertEquals( "wrong size", 1, steps.size() );
 HadoopFlowStep step = (HadoopFlowStep) steps.get( 0 );
 assertEquals( "not equal: step.sources.size()", 1, step.getSourceTaps().size() );
 assertNull( "not null: step.groupBy", step.getGroup() );
 assertNotNull( "null: step.sink", step.getSink() );
 }

代码示例来源:origin: cascading/cascading-hadoop2-mr1

/**
 * Test a single piece Pipe, should not fail, inserts Identity pipe
 *
 * @throws IOException
 */
@Test
public void testIdentity() throws Exception
 {
 Tap source = new Hfs( new TextLine(), "input/path" );
 Tap sink = new Hfs( new TextLine(), "output/path", SinkMode.REPLACE );
 Pipe pipe = new Pipe( "test" );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 List<FlowStep> steps = flow.getFlowSteps();
 assertEquals( "wrong size", 1, steps.size() );
 HadoopFlowStep step = (HadoopFlowStep) steps.get( 0 );
 assertEquals( "not equal: step.sources.size()", 1, step.getSourceTaps().size() );
 assertNull( "not null: step.groupBy", step.getGroup() );
 assertNotNull( "null: step.sink", step.getSink() );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testOneJob() throws IOException
 {
 Map sources = new HashMap();
 Map sinks = new HashMap();
 sources.put( "count", new Hfs( new TextLine( new Fields( "first", "second" ) ), "input/path" ) );
 sinks.put( "count", new Hfs( new TextLine( new Fields( 0, 1 ) ), "output/path" ) );
 Pipe pipe = new Pipe( "count" );
 pipe = new GroupBy( pipe, new Fields( 1 ) );
 pipe = new Every( pipe, new Fields( 1 ), new Count(), new Fields( 0, 1 ) );
 List steps = getPlatform().getFlowConnector().connect( sources, sinks, pipe ).getFlowSteps();
 assertEquals( "wrong size", 1, steps.size() );
 BaseFlowStep step = (BaseFlowStep) steps.get( 0 );
 assertEquals( "not equal: step.sources.size()", 1, step.getSourceTaps().size() );
 assertNotNull( "null: step.groupBy", step.getGroup() );
 assertNotNull( "null: step.sink", step.getSink() );
 int mapDist = ElementGraphs.shortestDistance( step.getElementGraph(), (FlowElement) step.getSourceTaps().iterator().next(), step.getGroup() );
 assertEquals( "not equal: mapDist", 1, mapDist );
 int reduceDist = ElementGraphs.shortestDistance( step.getElementGraph(), step.getGroup(), step.getSink() );
 assertEquals( "not equal: reduceDist", 2, reduceDist );
 }

代码示例来源:origin: cascading/cascading-hadoop2-common

@Test
public void testNotLocalMode() throws Exception
 {
 if( !getPlatform().isUseCluster() )
  return;
 Tap source = new Hfs( new TextLine(), "input/path" );
 Tap sink = new Hfs( new TextLine(), "output/path", SinkMode.REPLACE );
 Pipe pipe = new Pipe( "test" );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 List<FlowStep> steps = flow.getFlowSteps();
 assertEquals( "wrong size", 1, steps.size() );
 FlowStep step = steps.get( 0 );
 boolean isLocal = HadoopUtil.isLocal( (Configuration) ( (BaseFlowStep) step ).createInitializedConfig( flow.getFlowProcess(), ( (BaseHadoopPlatform) getPlatform() ).getConfiguration() ) );
 assertTrue( "is local", !isLocal );
 }

相关文章