本文整理了Java中cascading.flow.Flow.getFlowStats()
方法的一些代码示例,展示了Flow.getFlowStats()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flow.getFlowStats()
方法的具体详情如下:
包路径:cascading.flow.Flow
类名称:Flow
方法名:getFlowStats
[英]Method getFlowStats returns the flowStats of this Flow object.
[中]方法getFlowStats返回此流对象的流状态。
代码示例来源:origin: LiveRamp/cascading_ext
public static List<Counter> getCountersForGroup(Flow flow, String group) throws IOException {
return getCountersForGroup(flow.getFlowStats(), group);
}
代码示例来源:origin: LiveRamp/cascading_ext
@Deprecated
public static List<Counter> getCounters(Flow flow) {
List<Counter> counters = new ArrayList<>();
for (FlowStepStats step : flow.getFlowStats().getFlowStepStats()) {
counters.addAll(getStatsFromStep(step));
}
Collections.sort(counters);
return counters;
}
代码示例来源:origin: LiveRamp/cascading_ext
/**
* Get all counters for a given flow. It returns a map keyed on the step stats object to
* a list of all the counter objects for that step
*/
@Deprecated
public static Map<FlowStepStats, List<Counter>> getCountersByStep(Flow flow) {
FlowStats flowStats = flow.getFlowStats();
Map<FlowStepStats, List<Counter>> counters = new HashMap<>();
for (FlowStepStats statsForStep : flowStats.getFlowStepStats()) {
if (!counters.containsKey(statsForStep)) {
counters.put(statsForStep, Lists.<Counter>newArrayList());
}
counters.get(statsForStep).addAll(getStatsFromStep(statsForStep));
}
for (Map.Entry<FlowStepStats, List<Counter>> entry : counters.entrySet()) {
Collections.sort(entry.getValue());
}
return counters;
}
代码示例来源:origin: LiveRamp/cascading_ext
public static Long get(Flow flow, String group, String value) throws IOException {
FlowStats stats = flow.getFlowStats();
long total = 0;
for (FlowStepStats step : stats.getFlowStepStats()) {
total += get(step, group, value);
}
return total;
}
代码示例来源:origin: LiveRamp/cascading_ext
public static Long get(Flow flow, Enum value) throws IOException {
FlowStats stats = flow.getFlowStats();
long total = 0;
for (FlowStepStats step : stats.getFlowStepStats()) {
total += get(step, value);
}
return total;
}
代码示例来源:origin: cwensel/cascading
private synchronized void markFlowRunning()
{
Flow flow = flowStep.getFlow();
if( flow == null )
{
LOG.warn( "no parent flow set" );
return;
}
FlowStats flowStats = flow.getFlowStats();
synchronized( flowStats )
{
if( flowStats.isStarted() || flowStats.isSubmitted() )
flowStats.markRunning();
}
}
代码示例来源:origin: cwensel/cascading
@Override
public void onCompleted( Flow flow )
{
completed++;
CascadingStats.Status status = flow.getFlowStats().getStatus();
switch( status )
{
case PENDING:
break;
case SKIPPED:
skipped++;
break;
case STARTED:
break;
case SUBMITTED:
break;
case RUNNING:
break;
case SUCCESSFUL:
successful++;
break;
case STOPPED:
stopped++;
break;
case FAILED:
failed++;
break;
}
}
代码示例来源:origin: cwensel/cascading
private void initializeNewJobsMap()
{
synchronized( jobsMap )
{
// keep topo order
TopologicalOrderIterator<Flow, Integer> topoIterator = flowGraph.getTopologicalIterator();
while( topoIterator.hasNext() )
{
Flow flow = topoIterator.next();
cascadeStats.addFlowStats( flow.getFlowStats() );
CascadeJob job = new CascadeJob( flow );
jobsMap.put( flow.getName(), job );
List<CascadeJob> predecessors = new ArrayList<CascadeJob>();
for( Flow predecessor : Graphs.predecessorListOf( flowGraph, flow ) )
predecessors.add( (CascadeJob) jobsMap.get( predecessor.getName() ) );
job.init( predecessors );
}
}
}
代码示例来源:origin: cwensel/cascading
private synchronized void markSubmitted()
{
if( flowStepStats.isStarted() )
{
flowStepStats.markSubmitted();
Collection<FlowNodeStats> children = flowStepStats.getChildren();
for( FlowNodeStats flowNodeStats : children )
flowNodeStats.markStarted();
}
Flow flow = flowStep.getFlow();
if( flow == null )
{
LOG.warn( "no parent flow set" );
return;
}
FlowStats flowStats = flow.getFlowStats();
synchronized( flowStats )
{
if( flowStats.isStarted() )
flowStats.markSubmitted();
}
}
代码示例来源:origin: cwensel/cascading
@Test
public void testProcessFlowWithCounters() throws IOException
{
getPlatform().copyFromLocal( inputFileIps );
Map<String, Map<String, Long>> counters = new HashMap<String, Map<String, Long>>();
Map<String, Long> innerMap = new HashMap<String, Long>();
innerMap.put( "inner-key", 42L );
counters.put( "outer-key", innerMap );
Flow process = flowWithCounters( "counter", counters );
process.complete();
FlowStats flowStats = process.getFlowStats();
assertNotNull( flowStats );
List children = new ArrayList( flowStats.getChildren() );
assertEquals( 1, children.size() );
ProcessStepStats stepStats = (ProcessStepStats) children.get( 0 );
assertEquals( counters.keySet(), stepStats.getCounterGroups() );
assertEquals( innerMap.keySet(), stepStats.getCountersFor( "outer-key" ) );
assertEquals( 42L, stepStats.getCounterValue( "outer-key", "inner-key" ) );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testProcessFlowWithChildCounters() throws IOException
{
getPlatform().copyFromLocal( inputFileIps );
Map<String, Map<String, Long>> counters = new HashMap<String, Map<String, Long>>();
Map<String, Long> innerMap = new HashMap<String, Long>();
innerMap.put( "inner-key", 42L );
counters.put( "outer-key", innerMap );
Flow process = flowWithChildren( "children", counters );
process.complete();
FlowStats flowStats = process.getFlowStats();
assertNotNull( flowStats );
List children = new ArrayList( flowStats.getChildren() );
assertEquals( 1, children.size() );
ProcessStepStats stepStats = (ProcessStepStats) children.get( 0 );
assertEquals( counters.keySet(), stepStats.getCounterGroups() );
assertEquals( innerMap.keySet(), stepStats.getCountersFor( "outer-key" ) );
assertEquals( 42L, stepStats.getCounterValue( "outer-key", "inner-key" ) );
}
代码示例来源:origin: cascading/cascading-hadoop2-mr1
protected void cleanIntermediateData( JobConf config, Tap sink )
{
if( sink.isTemporary() && ( getFlow().getFlowStats().isSuccessful() || getFlow().getRunID() == null ) )
{
try
{
sink.deleteResource( config );
}
catch( Exception exception )
{
// sink all exceptions, don't fail app
logWarn( "unable to remove temporary file: " + sink, exception );
}
}
else
{
cleanTapMetaData( config, sink );
}
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testProcessFlowWithCounters() throws IOException
{
getPlatform().copyFromLocal( inputFileIps );
Map<String, Map<String, Long>> counters = new HashMap<String, Map<String, Long>>();
Map<String, Long> innerMap = new HashMap<String, Long>();
innerMap.put( "inner-key", 42L );
counters.put( "outer-key", innerMap );
Flow process = flowWithCounters( "counter", counters );
process.complete();
FlowStats flowStats = process.getFlowStats();
assertNotNull( flowStats );
List children = new ArrayList( flowStats.getChildren() );
assertEquals( 1, children.size() );
ProcessStepStats stepStats = (ProcessStepStats) children.get( 0 );
assertEquals( counters.keySet(), stepStats.getCounterGroups() );
assertEquals( innerMap.keySet(), stepStats.getCountersFor( "outer-key" ) );
assertEquals( 42L, stepStats.getCounterValue( "outer-key", "inner-key" ) );
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testProcessFlowWithChildCounters() throws IOException
{
getPlatform().copyFromLocal( inputFileIps );
Map<String, Map<String, Long>> counters = new HashMap<String, Map<String, Long>>();
Map<String, Long> innerMap = new HashMap<String, Long>();
innerMap.put( "inner-key", 42L );
counters.put( "outer-key", innerMap );
Flow process = flowWithChildren( "children", counters );
process.complete();
FlowStats flowStats = process.getFlowStats();
assertNotNull( flowStats );
List children = new ArrayList( flowStats.getChildren() );
assertEquals( 1, children.size() );
ProcessStepStats stepStats = (ProcessStepStats) children.get( 0 );
assertEquals( counters.keySet(), stepStats.getCounterGroups() );
assertEquals( innerMap.keySet(), stepStats.getCountersFor( "outer-key" ) );
assertEquals( 42L, stepStats.getCounterValue( "outer-key", "inner-key" ) );
}
代码示例来源:origin: cwensel/cascading
protected void cleanIntermediateData( JobConf config, Tap sink )
{
if( sink.isTemporary() && ( getFlow().getFlowStats().isSuccessful() || getFlow().getRunID() == null ) )
{
try
{
sink.deleteResource( config );
}
catch( Exception exception )
{
// sink all exceptions, don't fail app
logWarn( "unable to remove temporary file: " + sink, exception );
}
}
else
{
cleanTapMetaData( config, sink );
}
}
代码示例来源:origin: cwensel/cascading
@Override
public void clean( TezConfiguration config )
{
for( Tap sink : getSinkTaps() )
{
if( sink.isTemporary() && ( getFlow().getFlowStats().isSuccessful() || getFlow().getRunID() == null ) )
{
try
{
sink.deleteResource( config );
}
catch( Exception exception )
{
// sink all exceptions, don't fail app
logWarn( "unable to remove temporary file: " + sink, exception );
}
}
else
{
cleanTapMetaData( config, sink );
}
}
for( Tap tap : getTraps() )
cleanTapMetaData( config, tap );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testSimple() throws Exception
{
getPlatform().copyFromLocal( inputFileApache200 );
Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache200 );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
pipe = new Each( pipe, new Fields( "ip" ), new Stop( new Limit( 100 ) ) );
Tap sink = getPlatform().getTextFile( getOutputPath(), SinkMode.REPLACE );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
flow.complete();
validateLength( flow.openSink(), 100 );
assertEquals( 100, flow.getFlowStats().getCounterValue( StepCounters.Tuples_Written ) );
assertEquals( 101, flow.getFlowStats().getCounterValue( StepCounters.Tuples_Read ) );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testSimpleGroup() throws Exception
{
getPlatform().copyFromLocal( inputFileApache200 );
Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache200 );
Pipe pipe = new Pipe( "test" );
pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
pipe = new GroupBy( pipe, new Fields( "ip" ) );
pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
pipe = new Each( pipe, new Fields( "ip" ), new Stop( new Limit( 100 ) ) );
Tap sink = getPlatform().getTextFile( getOutputPath(), SinkMode.REPLACE );
Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
flow.complete();
validateLength( flow.openSink(), 100 );
assertEquals( 100, flow.getFlowStats().getCounterValue( StepCounters.Tuples_Written ) );
assertEquals( 200, flow.getFlowStats().getCounterValue( StepCounters.Tuples_Read ) );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testStatsOnJoin() throws Exception
{
getPlatform().copyFromLocal( inputFileLower );
getPlatform().copyFromLocal( inputFileUpper );
Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
Map sources = new HashMap();
sources.put( "lower", sourceLower );
sources.put( "upper", sourceUpper );
Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "join" ), SinkMode.REPLACE );
Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
pipeLower = new Each( pipeLower, new Counter( TestEnum.FIRST ) );
Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
pipeUpper = new Each( pipeUpper, new Counter( TestEnum.SECOND ) );
Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
Map<Object, Object> properties = getProperties();
Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice );
flow.complete();
validateLength( flow, 5 );
FlowStats flowStats = flow.getFlowStats();
assertNotNull( flowStats.getID() );
long firstCounter = flowStats.getCounterValue( TestEnum.FIRST );
long secondCounter = flowStats.getCounterValue( TestEnum.SECOND );
assertEquals( 5, firstCounter );
assertNotSame( 0, secondCounter ); // verifies accumulated side counters fired
assertEquals( firstCounter + secondCounter, flowStats.getCounterValue( SliceCounters.Tuples_Read ) );
}
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testStatsOnJoin() throws Exception
{
getPlatform().copyFromLocal( inputFileLower );
getPlatform().copyFromLocal( inputFileUpper );
Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
Map sources = new HashMap();
sources.put( "lower", sourceLower );
sources.put( "upper", sourceUpper );
Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "join" ), SinkMode.REPLACE );
Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
pipeLower = new Each( pipeLower, new Counter( TestEnum.FIRST ) );
Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
pipeUpper = new Each( pipeUpper, new Counter( TestEnum.SECOND ) );
Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
Map<Object, Object> properties = getProperties();
Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice );
flow.complete();
validateLength( flow, 5 );
FlowStats flowStats = flow.getFlowStats();
assertNotNull( flowStats.getID() );
long firstCounter = flowStats.getCounterValue( TestEnum.FIRST );
long secondCounter = flowStats.getCounterValue( TestEnum.SECOND );
assertEquals( 5, firstCounter );
assertNotSame( 0, secondCounter ); // verifies accumulated side counters fired
assertEquals( firstCounter + secondCounter, flowStats.getCounterValue( SliceCounters.Tuples_Read ) );
}
}
内容来源于网络,如有侵权,请联系作者删除!