cascading.flow.Flow.getFlowStats()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(13.7k)|赞(0)|评价(0)|浏览(169)

本文整理了Java中cascading.flow.Flow.getFlowStats()方法的一些代码示例,展示了Flow.getFlowStats()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flow.getFlowStats()方法的具体详情如下:
包路径:cascading.flow.Flow
类名称:Flow
方法名:getFlowStats

Flow.getFlowStats介绍

[英]Method getFlowStats returns the flowStats of this Flow object.
[中]方法getFlowStats返回此流对象的流状态。

代码示例

代码示例来源:origin: LiveRamp/cascading_ext

public static List<Counter> getCountersForGroup(Flow flow, String group) throws IOException {
 return getCountersForGroup(flow.getFlowStats(), group);
}

代码示例来源:origin: LiveRamp/cascading_ext

@Deprecated
public static List<Counter> getCounters(Flow flow) {
 List<Counter> counters = new ArrayList<>();
 for (FlowStepStats step : flow.getFlowStats().getFlowStepStats()) {
  counters.addAll(getStatsFromStep(step));
 }
 Collections.sort(counters);
 return counters;
}

代码示例来源:origin: LiveRamp/cascading_ext

/**
 * Get all counters for a given flow.  It returns a map keyed on the step stats object to
 * a list of all the counter objects for that step
 */
@Deprecated
public static Map<FlowStepStats, List<Counter>> getCountersByStep(Flow flow) {
 FlowStats flowStats = flow.getFlowStats();
 Map<FlowStepStats, List<Counter>> counters = new HashMap<>();
 for (FlowStepStats statsForStep : flowStats.getFlowStepStats()) {
  if (!counters.containsKey(statsForStep)) {
   counters.put(statsForStep, Lists.<Counter>newArrayList());
  }
  counters.get(statsForStep).addAll(getStatsFromStep(statsForStep));
 }
 for (Map.Entry<FlowStepStats, List<Counter>> entry : counters.entrySet()) {
  Collections.sort(entry.getValue());
 }
 return counters;
}

代码示例来源:origin: LiveRamp/cascading_ext

public static Long get(Flow flow, String group, String value) throws IOException {
 FlowStats stats = flow.getFlowStats();
 long total = 0;
 for (FlowStepStats step : stats.getFlowStepStats()) {
  total += get(step, group, value);
 }
 return total;
}

代码示例来源:origin: LiveRamp/cascading_ext

public static Long get(Flow flow, Enum value) throws IOException {
 FlowStats stats = flow.getFlowStats();
 long total = 0;
 for (FlowStepStats step : stats.getFlowStepStats()) {
  total += get(step, value);
 }
 return total;
}

代码示例来源:origin: cwensel/cascading

private synchronized void markFlowRunning()
 {
 Flow flow = flowStep.getFlow();
 if( flow == null )
  {
  LOG.warn( "no parent flow set" );
  return;
  }
 FlowStats flowStats = flow.getFlowStats();
 synchronized( flowStats )
  {
  if( flowStats.isStarted() || flowStats.isSubmitted() )
   flowStats.markRunning();
  }
 }

代码示例来源:origin: cwensel/cascading

@Override
public void onCompleted( Flow flow )
 {
 completed++;
 CascadingStats.Status status = flow.getFlowStats().getStatus();
 switch( status )
  {
  case PENDING:
   break;
  case SKIPPED:
   skipped++;
   break;
  case STARTED:
   break;
  case SUBMITTED:
   break;
  case RUNNING:
   break;
  case SUCCESSFUL:
   successful++;
   break;
  case STOPPED:
   stopped++;
   break;
  case FAILED:
   failed++;
   break;
  }
 }

代码示例来源:origin: cwensel/cascading

private void initializeNewJobsMap()
 {
 synchronized( jobsMap )
  {
  // keep topo order
  TopologicalOrderIterator<Flow, Integer> topoIterator = flowGraph.getTopologicalIterator();
  while( topoIterator.hasNext() )
   {
   Flow flow = topoIterator.next();
   cascadeStats.addFlowStats( flow.getFlowStats() );
   CascadeJob job = new CascadeJob( flow );
   jobsMap.put( flow.getName(), job );
   List<CascadeJob> predecessors = new ArrayList<CascadeJob>();
   for( Flow predecessor : Graphs.predecessorListOf( flowGraph, flow ) )
    predecessors.add( (CascadeJob) jobsMap.get( predecessor.getName() ) );
   job.init( predecessors );
   }
  }
 }

代码示例来源:origin: cwensel/cascading

private synchronized void markSubmitted()
 {
 if( flowStepStats.isStarted() )
  {
  flowStepStats.markSubmitted();
  Collection<FlowNodeStats> children = flowStepStats.getChildren();
  for( FlowNodeStats flowNodeStats : children )
   flowNodeStats.markStarted();
  }
 Flow flow = flowStep.getFlow();
 if( flow == null )
  {
  LOG.warn( "no parent flow set" );
  return;
  }
 FlowStats flowStats = flow.getFlowStats();
 synchronized( flowStats )
  {
  if( flowStats.isStarted() )
   flowStats.markSubmitted();
  }
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testProcessFlowWithCounters() throws IOException
 {
 getPlatform().copyFromLocal( inputFileIps );
 Map<String, Map<String, Long>> counters = new HashMap<String, Map<String, Long>>();
 Map<String, Long> innerMap = new HashMap<String, Long>();
 innerMap.put( "inner-key", 42L );
 counters.put( "outer-key", innerMap );
 Flow process = flowWithCounters( "counter", counters );
 process.complete();
 FlowStats flowStats = process.getFlowStats();
 assertNotNull( flowStats );
 List children = new ArrayList( flowStats.getChildren() );
 assertEquals( 1, children.size() );
 ProcessStepStats stepStats = (ProcessStepStats) children.get( 0 );
 assertEquals( counters.keySet(), stepStats.getCounterGroups() );
 assertEquals( innerMap.keySet(), stepStats.getCountersFor( "outer-key" ) );
 assertEquals( 42L, stepStats.getCounterValue( "outer-key", "inner-key" ) );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testProcessFlowWithChildCounters() throws IOException
 {
 getPlatform().copyFromLocal( inputFileIps );
 Map<String, Map<String, Long>> counters = new HashMap<String, Map<String, Long>>();
 Map<String, Long> innerMap = new HashMap<String, Long>();
 innerMap.put( "inner-key", 42L );
 counters.put( "outer-key", innerMap );
 Flow process = flowWithChildren( "children", counters );
 process.complete();
 FlowStats flowStats = process.getFlowStats();
 assertNotNull( flowStats );
 List children = new ArrayList( flowStats.getChildren() );
 assertEquals( 1, children.size() );
 ProcessStepStats stepStats = (ProcessStepStats) children.get( 0 );
 assertEquals( counters.keySet(), stepStats.getCounterGroups() );
 assertEquals( innerMap.keySet(), stepStats.getCountersFor( "outer-key" ) );
 assertEquals( 42L, stepStats.getCounterValue( "outer-key", "inner-key" ) );
 }

代码示例来源:origin: cascading/cascading-hadoop2-mr1

protected void cleanIntermediateData( JobConf config, Tap sink )
 {
 if( sink.isTemporary() && ( getFlow().getFlowStats().isSuccessful() || getFlow().getRunID() == null ) )
  {
  try
   {
   sink.deleteResource( config );
   }
  catch( Exception exception )
   {
   // sink all exceptions, don't fail app
   logWarn( "unable to remove temporary file: " + sink, exception );
   }
  }
 else
  {
  cleanTapMetaData( config, sink );
  }
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testProcessFlowWithCounters() throws IOException
 {
 getPlatform().copyFromLocal( inputFileIps );
 Map<String, Map<String, Long>> counters = new HashMap<String, Map<String, Long>>();
 Map<String, Long> innerMap = new HashMap<String, Long>();
 innerMap.put( "inner-key", 42L );
 counters.put( "outer-key", innerMap );
 Flow process = flowWithCounters( "counter", counters );
 process.complete();
 FlowStats flowStats = process.getFlowStats();
 assertNotNull( flowStats );
 List children = new ArrayList( flowStats.getChildren() );
 assertEquals( 1, children.size() );
 ProcessStepStats stepStats = (ProcessStepStats) children.get( 0 );
 assertEquals( counters.keySet(), stepStats.getCounterGroups() );
 assertEquals( innerMap.keySet(), stepStats.getCountersFor( "outer-key" ) );
 assertEquals( 42L, stepStats.getCounterValue( "outer-key", "inner-key" ) );
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testProcessFlowWithChildCounters() throws IOException
 {
 getPlatform().copyFromLocal( inputFileIps );
 Map<String, Map<String, Long>> counters = new HashMap<String, Map<String, Long>>();
 Map<String, Long> innerMap = new HashMap<String, Long>();
 innerMap.put( "inner-key", 42L );
 counters.put( "outer-key", innerMap );
 Flow process = flowWithChildren( "children", counters );
 process.complete();
 FlowStats flowStats = process.getFlowStats();
 assertNotNull( flowStats );
 List children = new ArrayList( flowStats.getChildren() );
 assertEquals( 1, children.size() );
 ProcessStepStats stepStats = (ProcessStepStats) children.get( 0 );
 assertEquals( counters.keySet(), stepStats.getCounterGroups() );
 assertEquals( innerMap.keySet(), stepStats.getCountersFor( "outer-key" ) );
 assertEquals( 42L, stepStats.getCounterValue( "outer-key", "inner-key" ) );
 }

代码示例来源:origin: cwensel/cascading

protected void cleanIntermediateData( JobConf config, Tap sink )
 {
 if( sink.isTemporary() && ( getFlow().getFlowStats().isSuccessful() || getFlow().getRunID() == null ) )
  {
  try
   {
   sink.deleteResource( config );
   }
  catch( Exception exception )
   {
   // sink all exceptions, don't fail app
   logWarn( "unable to remove temporary file: " + sink, exception );
   }
  }
 else
  {
  cleanTapMetaData( config, sink );
  }
 }

代码示例来源:origin: cwensel/cascading

@Override
public void clean( TezConfiguration config )
 {
 for( Tap sink : getSinkTaps() )
  {
  if( sink.isTemporary() && ( getFlow().getFlowStats().isSuccessful() || getFlow().getRunID() == null ) )
   {
   try
    {
    sink.deleteResource( config );
    }
   catch( Exception exception )
    {
    // sink all exceptions, don't fail app
    logWarn( "unable to remove temporary file: " + sink, exception );
    }
   }
  else
   {
   cleanTapMetaData( config, sink );
   }
  }
 for( Tap tap : getTraps() )
  cleanTapMetaData( config, tap );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testSimple() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache200 );
 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache200 );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 pipe = new Each( pipe, new Fields( "ip" ), new Stop( new Limit( 100 ) ) );
 Tap sink = getPlatform().getTextFile( getOutputPath(), SinkMode.REPLACE );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow.openSink(), 100 );
 assertEquals( 100, flow.getFlowStats().getCounterValue( StepCounters.Tuples_Written ) );
 assertEquals( 101, flow.getFlowStats().getCounterValue( StepCounters.Tuples_Read ) );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testSimpleGroup() throws Exception
 {
 getPlatform().copyFromLocal( inputFileApache200 );
 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache200 );
 Pipe pipe = new Pipe( "test" );
 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) );
 pipe = new GroupBy( pipe, new Fields( "ip" ) );
 pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) );
 pipe = new Each( pipe, new Fields( "ip" ), new Stop( new Limit( 100 ) ) );
 Tap sink = getPlatform().getTextFile( getOutputPath(), SinkMode.REPLACE );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.complete();
 validateLength( flow.openSink(), 100 );
 assertEquals( 100, flow.getFlowStats().getCounterValue( StepCounters.Tuples_Written ) );
 assertEquals( 200, flow.getFlowStats().getCounterValue( StepCounters.Tuples_Read ) );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testStatsOnJoin() throws Exception
 {
 getPlatform().copyFromLocal( inputFileLower );
 getPlatform().copyFromLocal( inputFileUpper );
 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
 Map sources = new HashMap();
 sources.put( "lower", sourceLower );
 sources.put( "upper", sourceUpper );
 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "join" ), SinkMode.REPLACE );
 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
 pipeLower = new Each( pipeLower, new Counter( TestEnum.FIRST ) );
 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
 pipeUpper = new Each( pipeUpper, new Counter( TestEnum.SECOND ) );
 Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
 Map<Object, Object> properties = getProperties();
 Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice );
 flow.complete();
 validateLength( flow, 5 );
 FlowStats flowStats = flow.getFlowStats();
 assertNotNull( flowStats.getID() );
 long firstCounter = flowStats.getCounterValue( TestEnum.FIRST );
 long secondCounter = flowStats.getCounterValue( TestEnum.SECOND );
 assertEquals( 5, firstCounter );
 assertNotSame( 0, secondCounter ); // verifies accumulated side counters fired
 assertEquals( firstCounter + secondCounter, flowStats.getCounterValue( SliceCounters.Tuples_Read ) );
 }
}

代码示例来源:origin: cascading/cascading-platform

@Test
public void testStatsOnJoin() throws Exception
 {
 getPlatform().copyFromLocal( inputFileLower );
 getPlatform().copyFromLocal( inputFileUpper );
 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
 Map sources = new HashMap();
 sources.put( "lower", sourceLower );
 sources.put( "upper", sourceUpper );
 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "join" ), SinkMode.REPLACE );
 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
 pipeLower = new Each( pipeLower, new Counter( TestEnum.FIRST ) );
 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );
 pipeUpper = new Each( pipeUpper, new Counter( TestEnum.SECOND ) );
 Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );
 Map<Object, Object> properties = getProperties();
 Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice );
 flow.complete();
 validateLength( flow, 5 );
 FlowStats flowStats = flow.getFlowStats();
 assertNotNull( flowStats.getID() );
 long firstCounter = flowStats.getCounterValue( TestEnum.FIRST );
 long secondCounter = flowStats.getCounterValue( TestEnum.SECOND );
 assertEquals( 5, firstCounter );
 assertNotSame( 0, secondCounter ); // verifies accumulated side counters fired
 assertEquals( firstCounter + secondCounter, flowStats.getCounterValue( SliceCounters.Tuples_Read ) );
 }
}

相关文章