本文整理了Java中cascading.flow.Flow.getFlowSteps()
方法的一些代码示例,展示了Flow.getFlowSteps()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flow.getFlowSteps()
方法的具体详情如下:
包路径:cascading.flow.Flow
类名称:Flow
方法名:getFlowSteps
[英]Method getFlowSteps returns the flowSteps of this Flow object. They will be in topological order.
[中]方法getFlowSteps返回此流对象的flowSteps。它们将按拓扑顺序排列。
代码示例来源:origin: twitter/ambrose
/**
* The onStarting event is fired when a Flow instance receives the start() message. A Flow is cut
* down into executing units called stepFlow. A stepFlow contains a stepFlowJob which represents
* the mapreduce job to be submitted to Hadoop. The ambrose graph is constructed from the step
* graph found in flow object.
*
* @param flow the flow.
*/
@Override
@SuppressWarnings("unchecked")
public void onStarting(Flow flow) {
// init flow
List<FlowStep> steps = flow.getFlowSteps();
totalNumberOfJobs = steps.size();
currentFlowId = flow.getID();
Properties props = new Properties();
props.putAll(flow.getConfigAsProperties());
try {
statsWriteService.initWriteService(props);
} catch (IOException e) {
LOG.error("Failed to initialize statsWriteService", e);
}
// convert graph from cascading to ambrose
AmbroseCascadingGraphConverter converter =
new AmbroseCascadingGraphConverter(Flows.getStepGraphFrom(flow), nodesByName);
converter.convert();
AmbroseUtils.sendDagNodeNameMap(statsWriteService, currentFlowId, nodesByName);
}
代码示例来源:origin: twitter/ambrose
public void onStarting(Flow flow) {
List<FlowStep> steps = flow.getFlowSteps();
totalNumberOfJobs = steps.size();
currentFlowId = flow.getID();
代码示例来源:origin: LiveRamp/cascading_ext
private static String formatJobName(FlowStep<JobConf> flowStep) {
// WordCount [(2/5) input_1, input_2] -> output_1232_ABCDEF123456789...
String jobName = String.format(
"%s [(%d/%d) %s] -> %s",
flowStep.getFlowName(),
flowStep.getStepNum(),
flowStep.getFlow().getFlowSteps().size(),
StringUtils.abbreviate(
join(getPrettyNamesForTaps(flowStep.getSources(), true)),
MAX_SOURCE_PATH_NAMES_LENGTH),
join(getPrettyNamesForTaps(flowStep.getSinks(), false)));
return StringUtils.abbreviate(jobName, MAX_JOB_NAME_LENGTH);
}
代码示例来源:origin: cascading/cascading-hadoop2-mr1
@Test
public void testDupeSourceRepeat()
{
Tap source1 = new Hfs( new TextLine( new Fields( "offset", "line" ) ), "foo/merge" );
Tap sink = new Hfs( new TextLine(), "foo" );
Pipe pipe = new Pipe( "pipe" );
Pipe merge = new CoGroup( "cogroup", pipe, new Fields( "offset" ), 1, Fields.size( 4 ) );
Map sources = new HashMap();
sources.put( "pipe", source1 );
Map sinks = new HashMap();
sinks.put( "cogroup", sink );
Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, merge );
List<FlowStep> steps = flow.getFlowSteps();
assertEquals( "not equal: steps.size()", 1, steps.size() );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testDupeSourceRepeat()
{
Tap source1 = new Hfs( new TextLine( new Fields( "offset", "line" ) ), "foo/merge" );
Tap sink = new Hfs( new TextLine(), "foo" );
Pipe pipe = new Pipe( "pipe" );
Pipe merge = new CoGroup( "cogroup", pipe, new Fields( "offset" ), 1, Fields.size( 4 ) );
Map sources = new HashMap();
sources.put( "pipe", source1 );
Map sinks = new HashMap();
sinks.put( "cogroup", sink );
Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, merge );
List<FlowStep> steps = flow.getFlowSteps();
assertEquals( "not equal: steps.size()", 1, steps.size() );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testPipeAssemblySplit()
{
Pipe pipe = new TestAssembly( "test" );
Pipe pipe1 = new GroupBy( "left", pipe, new Fields( "ip" ) );
Pipe pipe2 = new GroupBy( "right", pipe, new Fields( "ip" ) );
Tap source = getPlatform().getTextFile( "foo" );
Tap sink1 = getPlatform().getTextFile( "foo/split1", SinkMode.REPLACE );
Tap sink2 = getPlatform().getTextFile( "foo/split2", SinkMode.REPLACE );
Map sources = new HashMap();
sources.put( "test", source );
Map sinks = new HashMap();
sinks.put( "left", sink1 );
sinks.put( "right", sink2 );
List<FlowStep> steps = getPlatform().getFlowConnector().connect( sources, sinks, pipe1, pipe2 ).getFlowSteps();
if( getPlatform().isMapReduce() )
assertEquals( "not equal: steps.size()", 2, steps.size() );
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testPipeAssemblySplit()
{
Pipe pipe = new TestAssembly( "test" );
Pipe pipe1 = new GroupBy( "left", pipe, new Fields( "ip" ) );
Pipe pipe2 = new GroupBy( "right", pipe, new Fields( "ip" ) );
Tap source = getPlatform().getTextFile( "foo" );
Tap sink1 = getPlatform().getTextFile( "foo/split1", SinkMode.REPLACE );
Tap sink2 = getPlatform().getTextFile( "foo/split2", SinkMode.REPLACE );
Map sources = new HashMap();
sources.put( "test", source );
Map sinks = new HashMap();
sinks.put( "left", sink1 );
sinks.put( "right", sink2 );
List<FlowStep> steps = getPlatform().getFlowConnector().connect( sources, sinks, pipe1, pipe2 ).getFlowSteps();
if( getPlatform().isMapReduce() )
assertEquals( "not equal: steps.size()", 2, steps.size() );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testLocalModeSink() throws Exception
{
Tap source = new Hfs( new TextLine(), "input/path" );
Tap sink = new Lfs( new TextLine(), "output/path", SinkMode.REPLACE );
Pipe pipe = new Pipe( "test" );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
List<FlowStep> steps = flow.getFlowSteps();
assertEquals( "wrong size", 1, steps.size() );
FlowStep step = steps.get( 0 );
boolean isLocal = HadoopUtil.isLocal( (Configuration) step.getConfig() );
assertTrue( "is not local", isLocal );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testLocalModeSource() throws Exception
{
Tap source = new Lfs( new TextLine(), "input/path" );
Tap sink = new Hfs( new TextLine(), "output/path", SinkMode.REPLACE );
Pipe pipe = new Pipe( "test" );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
List<FlowStep> steps = flow.getFlowSteps();
assertEquals( "wrong size", 1, steps.size() );
FlowStep step = steps.get( 0 );
boolean isLocal = HadoopUtil.isLocal( (Configuration) step.getConfig() );
assertTrue( "is not local", isLocal );
}
代码示例来源:origin: cascading/cascading-hadoop2-common
@Test
public void testLocalModeSource() throws Exception
{
Tap source = new Lfs( new TextLine(), "input/path" );
Tap sink = new Hfs( new TextLine(), "output/path", SinkMode.REPLACE );
Pipe pipe = new Pipe( "test" );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
List<FlowStep> steps = flow.getFlowSteps();
assertEquals( "wrong size", 1, steps.size() );
FlowStep step = steps.get( 0 );
boolean isLocal = HadoopUtil.isLocal( (Configuration) step.getConfig() );
assertTrue( "is not local", isLocal );
}
代码示例来源:origin: cascading/cascading-hadoop2-common
@Test
public void testLocalModeSink() throws Exception
{
Tap source = new Hfs( new TextLine(), "input/path" );
Tap sink = new Lfs( new TextLine(), "output/path", SinkMode.REPLACE );
Pipe pipe = new Pipe( "test" );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
List<FlowStep> steps = flow.getFlowSteps();
assertEquals( "wrong size", 1, steps.size() );
FlowStep step = steps.get( 0 );
boolean isLocal = HadoopUtil.isLocal( (Configuration) step.getConfig() );
assertTrue( "is not local", isLocal );
}
代码示例来源:origin: cwensel/cascading
/**
* Verifies the same tap instance can be shared between two logically different pipes.
*
* @throws IOException
*/
@Test
public void testSameTaps() throws IOException
{
Map sources = new HashMap();
Map sinks = new HashMap();
Hfs tap = new Hfs( new TextLine( new Fields( "first", "second" ) ), "input/path/a" );
sources.put( "a", tap );
sources.put( "b", tap );
Pipe pipeA = new Pipe( "a" );
Pipe pipeB = new Pipe( "b" );
Pipe group1 = new GroupBy( pipeA );
Pipe group2 = new GroupBy( pipeB );
Pipe merge = new GroupBy( "tail", Pipe.pipes( group1, group2 ), new Fields( "first", "second" ) );
sinks.put( merge.getName(), new Hfs( new TextLine(), "output/path" ) );
Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, merge );
assertEquals( "not equal: steps.size()", 3, flow.getFlowSteps().size() );
}
代码示例来源:origin: cwensel/cascading
/** Tests that proper pipe graph is assembled without throwing an internal error */
@Test
public void testPipeAssembly()
{
Pipe pipe = new TestAssembly( "test" );
pipe = new GroupBy( pipe, new Fields( "ip" ) );
Tap source = getPlatform().getTextFile( "foo" );
Tap sink = getPlatform().getTextFile( "foo/split1", SinkMode.REPLACE );
List<FlowStep> steps = getPlatform().getFlowConnector().connect( source, sink, pipe ).getFlowSteps();
assertEquals( "not equal: steps.size()", 1, steps.size() );
}
代码示例来源:origin: cwensel/cascading
/**
* This is an alternative to having two pipes with the same name, but uses one pipe that is split
* across two branches.
*
* @throws IOException
*/
@Test
public void testSameSourceForBranch() throws IOException
{
Map sources = new HashMap();
Map sinks = new HashMap();
sources.put( "a", new Hfs( new TextLine( new Fields( "first", "second" ) ), "input/path/a" ) );
Pipe pipeA = new Pipe( "a" );
Pipe group1 = new GroupBy( "a1", pipeA, Fields.FIRST );
Pipe group2 = new GroupBy( "a2", pipeA, Fields.FIRST );
Pipe merge = new GroupBy( "tail", Pipe.pipes( group1, group2 ), new Fields( "first", "second" ) );
sinks.put( merge.getName(), new Hfs( new TextLine(), "output/path" ) );
Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, merge );
assertEquals( "not equal: steps.size()", 3, flow.getFlowSteps().size() );
}
代码示例来源:origin: cascading/cascading-hadoop2-mr1
/**
* This is an alternative to having two pipes with the same name, but uses one pipe that is split
* across two branches.
*
* @throws IOException
*/
@Test
public void testSameSourceForBranch() throws IOException
{
Map sources = new HashMap();
Map sinks = new HashMap();
sources.put( "a", new Hfs( new TextLine( new Fields( "first", "second" ) ), "input/path/a" ) );
Pipe pipeA = new Pipe( "a" );
Pipe group1 = new GroupBy( "a1", pipeA, Fields.FIRST );
Pipe group2 = new GroupBy( "a2", pipeA, Fields.FIRST );
Pipe merge = new GroupBy( "tail", Pipe.pipes( group1, group2 ), new Fields( "first", "second" ) );
sinks.put( merge.getName(), new Hfs( new TextLine(), "output/path" ) );
Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, merge );
assertEquals( "not equal: steps.size()", 3, flow.getFlowSteps().size() );
}
代码示例来源:origin: cascading/cascading-platform
/** Tests that proper pipe graph is assembled without throwing an internal error */
@Test
public void testPipeAssembly()
{
Pipe pipe = new TestAssembly( "test" );
pipe = new GroupBy( pipe, new Fields( "ip" ) );
Tap source = getPlatform().getTextFile( "foo" );
Tap sink = getPlatform().getTextFile( "foo/split1", SinkMode.REPLACE );
List<FlowStep> steps = getPlatform().getFlowConnector().connect( source, sink, pipe ).getFlowSteps();
assertEquals( "not equal: steps.size()", 1, steps.size() );
}
代码示例来源:origin: cwensel/cascading
/**
* Test a single piece Pipe, should not fail, inserts Identity pipe
*
* @throws IOException
*/
@Test
public void testIdentity() throws Exception
{
Tap source = new Hfs( new TextLine(), "input/path" );
Tap sink = new Hfs( new TextLine(), "output/path", SinkMode.REPLACE );
Pipe pipe = new Pipe( "test" );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
List<FlowStep> steps = flow.getFlowSteps();
assertEquals( "wrong size", 1, steps.size() );
HadoopFlowStep step = (HadoopFlowStep) steps.get( 0 );
assertEquals( "not equal: step.sources.size()", 1, step.getSourceTaps().size() );
assertNull( "not null: step.groupBy", step.getGroup() );
assertNotNull( "null: step.sink", step.getSink() );
}
代码示例来源:origin: cascading/cascading-hadoop2-mr1
/**
* Test a single piece Pipe, should not fail, inserts Identity pipe
*
* @throws IOException
*/
@Test
public void testIdentity() throws Exception
{
Tap source = new Hfs( new TextLine(), "input/path" );
Tap sink = new Hfs( new TextLine(), "output/path", SinkMode.REPLACE );
Pipe pipe = new Pipe( "test" );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
List<FlowStep> steps = flow.getFlowSteps();
assertEquals( "wrong size", 1, steps.size() );
HadoopFlowStep step = (HadoopFlowStep) steps.get( 0 );
assertEquals( "not equal: step.sources.size()", 1, step.getSourceTaps().size() );
assertNull( "not null: step.groupBy", step.getGroup() );
assertNotNull( "null: step.sink", step.getSink() );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testOneJob() throws IOException
{
Map sources = new HashMap();
Map sinks = new HashMap();
sources.put( "count", new Hfs( new TextLine( new Fields( "first", "second" ) ), "input/path" ) );
sinks.put( "count", new Hfs( new TextLine( new Fields( 0, 1 ) ), "output/path" ) );
Pipe pipe = new Pipe( "count" );
pipe = new GroupBy( pipe, new Fields( 1 ) );
pipe = new Every( pipe, new Fields( 1 ), new Count(), new Fields( 0, 1 ) );
List steps = getPlatform().getFlowConnector().connect( sources, sinks, pipe ).getFlowSteps();
assertEquals( "wrong size", 1, steps.size() );
BaseFlowStep step = (BaseFlowStep) steps.get( 0 );
assertEquals( "not equal: step.sources.size()", 1, step.getSourceTaps().size() );
assertNotNull( "null: step.groupBy", step.getGroup() );
assertNotNull( "null: step.sink", step.getSink() );
int mapDist = ElementGraphs.shortestDistance( step.getElementGraph(), (FlowElement) step.getSourceTaps().iterator().next(), step.getGroup() );
assertEquals( "not equal: mapDist", 1, mapDist );
int reduceDist = ElementGraphs.shortestDistance( step.getElementGraph(), step.getGroup(), step.getSink() );
assertEquals( "not equal: reduceDist", 2, reduceDist );
}
代码示例来源:origin: cascading/cascading-hadoop2-common
@Test
public void testNotLocalMode() throws Exception
{
if( !getPlatform().isUseCluster() )
return;
Tap source = new Hfs( new TextLine(), "input/path" );
Tap sink = new Hfs( new TextLine(), "output/path", SinkMode.REPLACE );
Pipe pipe = new Pipe( "test" );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
List<FlowStep> steps = flow.getFlowSteps();
assertEquals( "wrong size", 1, steps.size() );
FlowStep step = steps.get( 0 );
boolean isLocal = HadoopUtil.isLocal( (Configuration) ( (BaseFlowStep) step ).createInitializedConfig( flow.getFlowProcess(), ( (BaseHadoopPlatform) getPlatform() ).getConfiguration() ) );
assertTrue( "is local", !isLocal );
}
内容来源于网络,如有侵权,请联系作者删除!