cascading.flow.Flow.addListener()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(13.3k)|赞(0)|评价(0)|浏览(134)

本文整理了Java中cascading.flow.Flow.addListener()方法的一些代码示例,展示了Flow.addListener()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flow.addListener()方法的具体详情如下:
包路径:cascading.flow.Flow
类名称:Flow
方法名:addListener

Flow.addListener介绍

[英]Method addListener registers the given flowListener with this instance.
[中]方法addListener将给定的flowListener注册到此实例。

代码示例

代码示例来源:origin: LiveRamp/cascading_ext

@Override
public void complete(JobPersister persister, boolean failOnCounterFetch) {
 AtomicBoolean isComplete = new AtomicBoolean(false);
 flow.addListener(new StopListener(isComplete));
 flow.addStepListener(new JobRecordListener(
   persister,
   failOnCounterFetch
 ));
 flow.complete();
 //  TODO kill skipCompleteListener once we figure out the cascading internal NPE (upgrade past 2.5.1 maybe?)
 if (!isComplete.get() && !skipCompleteListener) {
  throw new RuntimeException("Flow terminated but did not complete!  Possible shutdown hook invocation.");
 }
}

代码示例来源:origin: com.twitter/scalding-core

public static <Config, T> Future<T> start(Flow<Config> flow, final scala.Function1<Flow<Config>, T> fn) {
  final Promise<T> result = Promise$.MODULE$.<T>apply();
  flow.addListener(new FlowListener() {
   public void onStarting(Flow f) { } // ignore
   public void onStopping(Flow f) { } // ignore
   public void onCompleted(Flow f) {
    // This is always called, but onThrowable is called first
    if(!result.isCompleted()) {
     // we use the above rather than trySuccess to avoid calling fn twice
     try {
      T toPut = (T) fn.apply(f);
      result.success(toPut);
     }
     catch(Throwable t) {
      result.failure(t);
     }
    }
   }
   public boolean onThrowable(Flow f, Throwable t) {
    result.failure(t);
    // The exception is handled by the owner of the promise and should not be rethrown
    return true;
   }
  });
  flow.start();
  return result.future();
 }
}

代码示例来源:origin: com.twitter/scalding-core_2.10

public static <Config, T> Future<T> start(Flow<Config> flow, final scala.Function1<Flow<Config>, T> fn) {
  final Promise<T> result = Promise$.MODULE$.<T>apply();
  flow.addListener(new FlowListener() {
   public void onStarting(Flow f) { } // ignore
   public void onStopping(Flow f) { } // ignore
   public void onCompleted(Flow f) {
    // This is always called, but onThrowable is called first
    if(!result.isCompleted()) {
     // we use the above rather than trySuccess to avoid calling fn twice
     try {
      T toPut = (T) fn.apply(f);
      result.success(toPut);
     }
     catch(Throwable t) {
      result.failure(t);
     }
    }
   }
   public boolean onThrowable(Flow f, Throwable t) {
    result.failure(t);
    // The exception is handled by the owner of the promise and should not be rethrown
    return true;
   }
  });
  flow.start();
  return result.future();
 }
}

代码示例来源:origin: org.springframework.data/spring-cascading

@Override
public void afterPropertiesSet() throws Exception {
  flow = createFlow();
  if (skipStrategy != null) {
    flow.setFlowSkipStrategy(skipStrategy);
  }
  if (stepStrategy != null) {
    flow.setFlowStepStrategy(stepStrategy);
  }
  if (listeners != null) {
    for (FlowListener listener : listeners) {
      flow.addListener(listener);
    }
  }
  if (StringUtils.hasText(writeDOT)) {
    flow.writeDOT(writeDOT);
  }
  if (StringUtils.hasText(writeStepsDOT)) {
    flow.writeStepsDOT(writeStepsDOT);
  }
  if (priority != null) {
    flow.setSubmitPriority(priority);
  }
}

代码示例来源:origin: cascading/lingual-core

@Override
public Flow create()
 {
 FlowDef flowDef = FlowDef.flowDef()
  .setName( getName() )
  .addTails( tail )
  .setDebugLevel( platformBroker.getDebugLevel() );
 LOG.debug( "using log level: {}", platformBroker.getDebugLevel() );
 for( String jar : jars )
  {
  LOG.debug( "adding jar to classpath: {}", jar );
  flowDef.addToClassPath( jar );
  }
 flowDef.addDescription( FlowDescriptors.STATEMENTS, lingualConnection.getCurrentSQL() );
 Flow flow = createFlowFrom( flowDef, tail );
 flow.addListener( new LingualConnectionFlowListener( lingualConnection ) );
 return flow;
 }
}

代码示例来源:origin: cwensel/cascading

LockingFlowListener fourthFlowListener = new LockingFlowListener();
first.addListener( firstFlowListener );
second.addListener( secondFlowListener );
third.addListener( thirdFlowListener );
fourth.addListener( fourthFlowListener );

代码示例来源:origin: cwensel/cascading

LockingFlowListener flowListener = new LockingFlowListener();
first.addListener( flowListener );

代码示例来源:origin: cascading/cascading-platform

LockingFlowListener flowListener = new LockingFlowListener();
first.addListener( flowListener );

代码示例来源:origin: cascading/cascading-platform

@Test
public void testProcessFlowFlowListenerExceptionHandlingInComplete() throws IOException, InterruptedException
 {
 ThrowableListener listener = new ThrowableListener();
 getPlatform().copyFromLocal( inputFileIps );
 String path = "completeException";
 Flow process = flowWithException( path, FailingRiffle.Failing.COMPLETE );
 process.addListener( listener );
 try
  {
  process.complete();
  fail( "there should have been an exception" );
  }
 catch( CascadingException exception )
  {
  assertNotNull( listener.getThrowable() );
  }
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testProcessFlowFlowListenerExceptionHandlingInStart() throws IOException, InterruptedException
 {
 ThrowableListener listener = new ThrowableListener();
 getPlatform().copyFromLocal( inputFileIps );
 String path = "startException";
 Flow process = flowWithException( path, FailingRiffle.Failing.START );
 process.addListener( listener );
 try
  {
  process.start();
  fail( "there should have been an exception" );
  }
 catch( CascadingException exception )
  {
  assertNotNull( listener.getThrowable() );
  }
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testProcessFlowFlowListenerExceptionHandlingInComplete() throws IOException, InterruptedException
 {
 ThrowableListener listener = new ThrowableListener();
 getPlatform().copyFromLocal( inputFileIps );
 String path = "completeException";
 Flow process = flowWithException( path, FailingRiffle.Failing.COMPLETE );
 process.addListener( listener );
 try
  {
  process.complete();
  fail( "there should have been an exception" );
  }
 catch( CascadingException exception )
  {
  assertNotNull( listener.getThrowable() );
  }
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testProcessFlowFlowListenerExceptionHandlingInStart() throws IOException, InterruptedException
 {
 ThrowableListener listener = new ThrowableListener();
 getPlatform().copyFromLocal( inputFileIps );
 String path = "startException";
 Flow process = flowWithException( path, FailingRiffle.Failing.START );
 process.addListener( listener );
 try
  {
  process.start();
  fail( "there should have been an exception" );
  }
 catch( CascadingException exception )
  {
  assertNotNull( listener.getThrowable() );
  }
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testProcessFlowFlowListenerExceptionHandlingInStop() throws IOException, InterruptedException
 {
 ThrowableListener listener = new ThrowableListener();
 getPlatform().copyFromLocal( inputFileIps );
 String path = "stopException";
 Flow process = flowWithException( path, FailingRiffle.Failing.STOP );
 process.addListener( listener );
 process.start();
 try
  {
  process.stop();
  fail( "there should have been an exception" );
  }
 catch( CascadingException exception )
  {
  assertNotNull( listener.getThrowable() );
  }
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testProcessFlowFlowListenerExceptionHandlingInStop() throws IOException, InterruptedException
 {
 ThrowableListener listener = new ThrowableListener();
 getPlatform().copyFromLocal( inputFileIps );
 String path = "stopException";
 Flow process = flowWithException( path, FailingRiffle.Failing.STOP );
 process.addListener( listener );
 process.start();
 try
  {
  process.stop();
  fail( "there should have been an exception" );
  }
 catch( CascadingException exception )
  {
  assertNotNull( listener.getThrowable() );
  }
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testStartWithoutComplete() throws Exception
 {
 getPlatform().copyFromLocal( inputFileLower );
 Tap sourceLower = new Hfs( new TextLine( new Fields( "offset", "line" ) ), inputFileLower );
 Map sources = new HashMap();
 sources.put( "lower", sourceLower );
 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
 // using null pos so all fields are written
 Tap sink = new Hfs( new TextLine(), getOutputPath( "withoutcomplete" ), SinkMode.REPLACE );
 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
 pipeLower = new GroupBy( pipeLower, new Fields( "num" ) );
 Flow flow = getPlatform().getFlowConnector( getProperties() ).connect( sources, sink, pipeLower );
 LockingFlowListener listener = new LockingFlowListener();
 flow.addListener( listener );
 flow.start();
 assertTrue( listener.completed.tryAcquire( 90, TimeUnit.SECONDS ) );
 }

代码示例来源:origin: cascading/cascading-hadoop2-common

@Test
public void testStartWithoutComplete() throws Exception
 {
 getPlatform().copyFromLocal( inputFileLower );
 Tap sourceLower = new Hfs( new TextLine( new Fields( "offset", "line" ) ), inputFileLower );
 Map sources = new HashMap();
 sources.put( "lower", sourceLower );
 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
 // using null pos so all fields are written
 Tap sink = new Hfs( new TextLine(), getOutputPath( "withoutcomplete" ), SinkMode.REPLACE );
 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
 pipeLower = new GroupBy( pipeLower, new Fields( "num" ) );
 Flow flow = getPlatform().getFlowConnector( getProperties() ).connect( sources, sink, pipeLower );
 LockingFlowListener listener = new LockingFlowListener();
 flow.addListener( listener );
 flow.start();
 assertTrue( listener.completed.tryAcquire( 90, TimeUnit.SECONDS ) );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testFailOnMissingSuccessFlowListener() throws Exception
 {
 getPlatform().copyFromLocal( inputFileLower );
 FlowListener listener = new FailOnMissingSuccessFlowListener();
 Hfs source = new Hfs( new TextLine( new Fields( "offset", "line" ) ), inputFileLower );
 Hfs success = new Hfs( new TextLine(), getOutputPath( "withsuccess" ), SinkMode.REPLACE );
 Hfs without = new Hfs( new TextLine(), getOutputPath( "withoutsuccess" ), SinkMode.REPLACE );
 Hfs sink = new Hfs( new TextLine(), getOutputPath( "final" ), SinkMode.REPLACE );
 Flow firstFlow = getPlatform().getFlowConnector( getProperties() ).connect( source, success, new Pipe( "lower" ) );
 firstFlow.addListener( listener );
 firstFlow.complete();
 Flow secondFlow = getPlatform().getFlowConnector( getProperties() ).connect( success, without, new Pipe( "lower" ) );
 secondFlow.addListener( listener );
 secondFlow.complete();
 Hfs successTap = new Hfs( new TextLine(), new Path( without.getPath(), "_SUCCESS" ).toString() );
 assertTrue( successTap.deleteResource( getPlatform().getFlowProcess() ) );
 Flow thirdFlow = getPlatform().getFlowConnector( getProperties() ).connect( without, sink, new Pipe( "lower" ) );
 thirdFlow.addListener( listener );
 try
  {
  thirdFlow.complete();
  fail( "listener did not fail flow" );
  }
 catch( FlowException exception )
  {
  // do nothing
  }
 }
}

代码示例来源:origin: cascading/cascading-hadoop2-common

@Test
public void testFailOnMissingSuccessFlowListener() throws Exception
 {
 getPlatform().copyFromLocal( inputFileLower );
 FlowListener listener = new FailOnMissingSuccessFlowListener();
 Hfs source = new Hfs( new TextLine( new Fields( "offset", "line" ) ), inputFileLower );
 Hfs success = new Hfs( new TextLine(), getOutputPath( "withsuccess" ), SinkMode.REPLACE );
 Hfs without = new Hfs( new TextLine(), getOutputPath( "withoutsuccess" ), SinkMode.REPLACE );
 Hfs sink = new Hfs( new TextLine(), getOutputPath( "final" ), SinkMode.REPLACE );
 Flow firstFlow = getPlatform().getFlowConnector( getProperties() ).connect( source, success, new Pipe( "lower" ) );
 firstFlow.addListener( listener );
 firstFlow.complete();
 Flow secondFlow = getPlatform().getFlowConnector( getProperties() ).connect( success, without, new Pipe( "lower" ) );
 secondFlow.addListener( listener );
 secondFlow.complete();
 Hfs successTap = new Hfs( new TextLine(), new Path( without.getPath(), "_SUCCESS" ).toString() );
 assertTrue( successTap.deleteResource( getPlatform().getFlowProcess() ) );
 Flow thirdFlow = getPlatform().getFlowConnector( getProperties() ).connect( without, sink, new Pipe( "lower" ) );
 thirdFlow.addListener( listener );
 try
  {
  thirdFlow.complete();
  fail( "listener did not fail flow" );
  }
 catch( FlowException exception )
  {
  // do nothing
  }
 }
}

代码示例来源:origin: cascading/cascading-platform

@Test
public void testSkippedCascade() throws IOException
 {
 getPlatform().copyFromLocal( inputFileIps );
 String path = "skipped";
 Flow first = firstFlow( path + "/first", false );
 Flow second = secondFlow( first.getSink(), path + "/second" );
 Flow third = thirdFlow( second.getSink(), path + "/third" );
 Flow fourth = fourthFlow( third.getSink(), path + "/fourth" );
 CountingFlowListener flowListener = new CountingFlowListener();
 second.addListener( flowListener );
 Cascade cascade = new CascadeConnector( getProperties() ).connect( first, second, third, fourth );
 cascade.setFlowSkipStrategy( new FlowSkipStrategy()
  {
  public boolean skipFlow( Flow flow ) throws IOException
   {
   return true;
   }
  } );
 cascade.start();
 cascade.complete();
 assertEquals( 1, flowListener.skipped );
 assertFalse( "file exists", fourth.getSink().resourceExists( fourth.getConfig() ) );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testSkippedCascade() throws IOException
 {
 getPlatform().copyFromLocal( inputFileIps );
 String path = "skipped";
 Flow first = firstFlow( path + "/first", false );
 Flow second = secondFlow( first.getSink(), path + "/second" );
 Flow third = thirdFlow( second.getSink(), path + "/third" );
 Flow fourth = fourthFlow( third.getSink(), path + "/fourth" );
 CountingFlowListener flowListener = new CountingFlowListener();
 second.addListener( flowListener );
 Cascade cascade = new CascadeConnector( getProperties() ).connect( first, second, third, fourth );
 cascade.setFlowSkipStrategy( new FlowSkipStrategy()
  {
  public boolean skipFlow( Flow flow ) throws IOException
   {
   return true;
   }
  } );
 cascade.start();
 cascade.complete();
 assertEquals( 1, flowListener.skipped );
 assertFalse( "file exists", fourth.getSink().resourceExists( fourth.getConfig() ) );
 }

相关文章