本文整理了Java中cascading.flow.Flow.addListener()
方法的一些代码示例,展示了Flow.addListener()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flow.addListener()
方法的具体详情如下:
包路径:cascading.flow.Flow
类名称:Flow
方法名:addListener
[英]Method addListener registers the given flowListener with this instance.
[中]方法addListener将给定的flowListener注册到此实例。
代码示例来源:origin: LiveRamp/cascading_ext
@Override
public void complete(JobPersister persister, boolean failOnCounterFetch) {
AtomicBoolean isComplete = new AtomicBoolean(false);
flow.addListener(new StopListener(isComplete));
flow.addStepListener(new JobRecordListener(
persister,
failOnCounterFetch
));
flow.complete();
// TODO kill skipCompleteListener once we figure out the cascading internal NPE (upgrade past 2.5.1 maybe?)
if (!isComplete.get() && !skipCompleteListener) {
throw new RuntimeException("Flow terminated but did not complete! Possible shutdown hook invocation.");
}
}
代码示例来源:origin: com.twitter/scalding-core
public static <Config, T> Future<T> start(Flow<Config> flow, final scala.Function1<Flow<Config>, T> fn) {
final Promise<T> result = Promise$.MODULE$.<T>apply();
flow.addListener(new FlowListener() {
public void onStarting(Flow f) { } // ignore
public void onStopping(Flow f) { } // ignore
public void onCompleted(Flow f) {
// This is always called, but onThrowable is called first
if(!result.isCompleted()) {
// we use the above rather than trySuccess to avoid calling fn twice
try {
T toPut = (T) fn.apply(f);
result.success(toPut);
}
catch(Throwable t) {
result.failure(t);
}
}
}
public boolean onThrowable(Flow f, Throwable t) {
result.failure(t);
// The exception is handled by the owner of the promise and should not be rethrown
return true;
}
});
flow.start();
return result.future();
}
}
代码示例来源:origin: com.twitter/scalding-core_2.10
public static <Config, T> Future<T> start(Flow<Config> flow, final scala.Function1<Flow<Config>, T> fn) {
final Promise<T> result = Promise$.MODULE$.<T>apply();
flow.addListener(new FlowListener() {
public void onStarting(Flow f) { } // ignore
public void onStopping(Flow f) { } // ignore
public void onCompleted(Flow f) {
// This is always called, but onThrowable is called first
if(!result.isCompleted()) {
// we use the above rather than trySuccess to avoid calling fn twice
try {
T toPut = (T) fn.apply(f);
result.success(toPut);
}
catch(Throwable t) {
result.failure(t);
}
}
}
public boolean onThrowable(Flow f, Throwable t) {
result.failure(t);
// The exception is handled by the owner of the promise and should not be rethrown
return true;
}
});
flow.start();
return result.future();
}
}
代码示例来源:origin: org.springframework.data/spring-cascading
@Override
public void afterPropertiesSet() throws Exception {
flow = createFlow();
if (skipStrategy != null) {
flow.setFlowSkipStrategy(skipStrategy);
}
if (stepStrategy != null) {
flow.setFlowStepStrategy(stepStrategy);
}
if (listeners != null) {
for (FlowListener listener : listeners) {
flow.addListener(listener);
}
}
if (StringUtils.hasText(writeDOT)) {
flow.writeDOT(writeDOT);
}
if (StringUtils.hasText(writeStepsDOT)) {
flow.writeStepsDOT(writeStepsDOT);
}
if (priority != null) {
flow.setSubmitPriority(priority);
}
}
代码示例来源:origin: cascading/lingual-core
@Override
public Flow create()
{
FlowDef flowDef = FlowDef.flowDef()
.setName( getName() )
.addTails( tail )
.setDebugLevel( platformBroker.getDebugLevel() );
LOG.debug( "using log level: {}", platformBroker.getDebugLevel() );
for( String jar : jars )
{
LOG.debug( "adding jar to classpath: {}", jar );
flowDef.addToClassPath( jar );
}
flowDef.addDescription( FlowDescriptors.STATEMENTS, lingualConnection.getCurrentSQL() );
Flow flow = createFlowFrom( flowDef, tail );
flow.addListener( new LingualConnectionFlowListener( lingualConnection ) );
return flow;
}
}
代码示例来源:origin: cwensel/cascading
LockingFlowListener fourthFlowListener = new LockingFlowListener();
first.addListener( firstFlowListener );
second.addListener( secondFlowListener );
third.addListener( thirdFlowListener );
fourth.addListener( fourthFlowListener );
代码示例来源:origin: cwensel/cascading
LockingFlowListener flowListener = new LockingFlowListener();
first.addListener( flowListener );
代码示例来源:origin: cascading/cascading-platform
LockingFlowListener flowListener = new LockingFlowListener();
first.addListener( flowListener );
代码示例来源:origin: cascading/cascading-platform
@Test
public void testProcessFlowFlowListenerExceptionHandlingInComplete() throws IOException, InterruptedException
{
ThrowableListener listener = new ThrowableListener();
getPlatform().copyFromLocal( inputFileIps );
String path = "completeException";
Flow process = flowWithException( path, FailingRiffle.Failing.COMPLETE );
process.addListener( listener );
try
{
process.complete();
fail( "there should have been an exception" );
}
catch( CascadingException exception )
{
assertNotNull( listener.getThrowable() );
}
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testProcessFlowFlowListenerExceptionHandlingInStart() throws IOException, InterruptedException
{
ThrowableListener listener = new ThrowableListener();
getPlatform().copyFromLocal( inputFileIps );
String path = "startException";
Flow process = flowWithException( path, FailingRiffle.Failing.START );
process.addListener( listener );
try
{
process.start();
fail( "there should have been an exception" );
}
catch( CascadingException exception )
{
assertNotNull( listener.getThrowable() );
}
}
代码示例来源:origin: cwensel/cascading
@Test
public void testProcessFlowFlowListenerExceptionHandlingInComplete() throws IOException, InterruptedException
{
ThrowableListener listener = new ThrowableListener();
getPlatform().copyFromLocal( inputFileIps );
String path = "completeException";
Flow process = flowWithException( path, FailingRiffle.Failing.COMPLETE );
process.addListener( listener );
try
{
process.complete();
fail( "there should have been an exception" );
}
catch( CascadingException exception )
{
assertNotNull( listener.getThrowable() );
}
}
代码示例来源:origin: cwensel/cascading
@Test
public void testProcessFlowFlowListenerExceptionHandlingInStart() throws IOException, InterruptedException
{
ThrowableListener listener = new ThrowableListener();
getPlatform().copyFromLocal( inputFileIps );
String path = "startException";
Flow process = flowWithException( path, FailingRiffle.Failing.START );
process.addListener( listener );
try
{
process.start();
fail( "there should have been an exception" );
}
catch( CascadingException exception )
{
assertNotNull( listener.getThrowable() );
}
}
代码示例来源:origin: cwensel/cascading
@Test
public void testProcessFlowFlowListenerExceptionHandlingInStop() throws IOException, InterruptedException
{
ThrowableListener listener = new ThrowableListener();
getPlatform().copyFromLocal( inputFileIps );
String path = "stopException";
Flow process = flowWithException( path, FailingRiffle.Failing.STOP );
process.addListener( listener );
process.start();
try
{
process.stop();
fail( "there should have been an exception" );
}
catch( CascadingException exception )
{
assertNotNull( listener.getThrowable() );
}
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testProcessFlowFlowListenerExceptionHandlingInStop() throws IOException, InterruptedException
{
ThrowableListener listener = new ThrowableListener();
getPlatform().copyFromLocal( inputFileIps );
String path = "stopException";
Flow process = flowWithException( path, FailingRiffle.Failing.STOP );
process.addListener( listener );
process.start();
try
{
process.stop();
fail( "there should have been an exception" );
}
catch( CascadingException exception )
{
assertNotNull( listener.getThrowable() );
}
}
代码示例来源:origin: cwensel/cascading
@Test
public void testStartWithoutComplete() throws Exception
{
getPlatform().copyFromLocal( inputFileLower );
Tap sourceLower = new Hfs( new TextLine( new Fields( "offset", "line" ) ), inputFileLower );
Map sources = new HashMap();
sources.put( "lower", sourceLower );
Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
// using null pos so all fields are written
Tap sink = new Hfs( new TextLine(), getOutputPath( "withoutcomplete" ), SinkMode.REPLACE );
Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
pipeLower = new GroupBy( pipeLower, new Fields( "num" ) );
Flow flow = getPlatform().getFlowConnector( getProperties() ).connect( sources, sink, pipeLower );
LockingFlowListener listener = new LockingFlowListener();
flow.addListener( listener );
flow.start();
assertTrue( listener.completed.tryAcquire( 90, TimeUnit.SECONDS ) );
}
代码示例来源:origin: cascading/cascading-hadoop2-common
@Test
public void testStartWithoutComplete() throws Exception
{
getPlatform().copyFromLocal( inputFileLower );
Tap sourceLower = new Hfs( new TextLine( new Fields( "offset", "line" ) ), inputFileLower );
Map sources = new HashMap();
sources.put( "lower", sourceLower );
Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
// using null pos so all fields are written
Tap sink = new Hfs( new TextLine(), getOutputPath( "withoutcomplete" ), SinkMode.REPLACE );
Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
pipeLower = new GroupBy( pipeLower, new Fields( "num" ) );
Flow flow = getPlatform().getFlowConnector( getProperties() ).connect( sources, sink, pipeLower );
LockingFlowListener listener = new LockingFlowListener();
flow.addListener( listener );
flow.start();
assertTrue( listener.completed.tryAcquire( 90, TimeUnit.SECONDS ) );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testFailOnMissingSuccessFlowListener() throws Exception
{
getPlatform().copyFromLocal( inputFileLower );
FlowListener listener = new FailOnMissingSuccessFlowListener();
Hfs source = new Hfs( new TextLine( new Fields( "offset", "line" ) ), inputFileLower );
Hfs success = new Hfs( new TextLine(), getOutputPath( "withsuccess" ), SinkMode.REPLACE );
Hfs without = new Hfs( new TextLine(), getOutputPath( "withoutsuccess" ), SinkMode.REPLACE );
Hfs sink = new Hfs( new TextLine(), getOutputPath( "final" ), SinkMode.REPLACE );
Flow firstFlow = getPlatform().getFlowConnector( getProperties() ).connect( source, success, new Pipe( "lower" ) );
firstFlow.addListener( listener );
firstFlow.complete();
Flow secondFlow = getPlatform().getFlowConnector( getProperties() ).connect( success, without, new Pipe( "lower" ) );
secondFlow.addListener( listener );
secondFlow.complete();
Hfs successTap = new Hfs( new TextLine(), new Path( without.getPath(), "_SUCCESS" ).toString() );
assertTrue( successTap.deleteResource( getPlatform().getFlowProcess() ) );
Flow thirdFlow = getPlatform().getFlowConnector( getProperties() ).connect( without, sink, new Pipe( "lower" ) );
thirdFlow.addListener( listener );
try
{
thirdFlow.complete();
fail( "listener did not fail flow" );
}
catch( FlowException exception )
{
// do nothing
}
}
}
代码示例来源:origin: cascading/cascading-hadoop2-common
@Test
public void testFailOnMissingSuccessFlowListener() throws Exception
{
getPlatform().copyFromLocal( inputFileLower );
FlowListener listener = new FailOnMissingSuccessFlowListener();
Hfs source = new Hfs( new TextLine( new Fields( "offset", "line" ) ), inputFileLower );
Hfs success = new Hfs( new TextLine(), getOutputPath( "withsuccess" ), SinkMode.REPLACE );
Hfs without = new Hfs( new TextLine(), getOutputPath( "withoutsuccess" ), SinkMode.REPLACE );
Hfs sink = new Hfs( new TextLine(), getOutputPath( "final" ), SinkMode.REPLACE );
Flow firstFlow = getPlatform().getFlowConnector( getProperties() ).connect( source, success, new Pipe( "lower" ) );
firstFlow.addListener( listener );
firstFlow.complete();
Flow secondFlow = getPlatform().getFlowConnector( getProperties() ).connect( success, without, new Pipe( "lower" ) );
secondFlow.addListener( listener );
secondFlow.complete();
Hfs successTap = new Hfs( new TextLine(), new Path( without.getPath(), "_SUCCESS" ).toString() );
assertTrue( successTap.deleteResource( getPlatform().getFlowProcess() ) );
Flow thirdFlow = getPlatform().getFlowConnector( getProperties() ).connect( without, sink, new Pipe( "lower" ) );
thirdFlow.addListener( listener );
try
{
thirdFlow.complete();
fail( "listener did not fail flow" );
}
catch( FlowException exception )
{
// do nothing
}
}
}
代码示例来源:origin: cascading/cascading-platform
@Test
public void testSkippedCascade() throws IOException
{
getPlatform().copyFromLocal( inputFileIps );
String path = "skipped";
Flow first = firstFlow( path + "/first", false );
Flow second = secondFlow( first.getSink(), path + "/second" );
Flow third = thirdFlow( second.getSink(), path + "/third" );
Flow fourth = fourthFlow( third.getSink(), path + "/fourth" );
CountingFlowListener flowListener = new CountingFlowListener();
second.addListener( flowListener );
Cascade cascade = new CascadeConnector( getProperties() ).connect( first, second, third, fourth );
cascade.setFlowSkipStrategy( new FlowSkipStrategy()
{
public boolean skipFlow( Flow flow ) throws IOException
{
return true;
}
} );
cascade.start();
cascade.complete();
assertEquals( 1, flowListener.skipped );
assertFalse( "file exists", fourth.getSink().resourceExists( fourth.getConfig() ) );
}
代码示例来源:origin: cwensel/cascading
@Test
public void testSkippedCascade() throws IOException
{
getPlatform().copyFromLocal( inputFileIps );
String path = "skipped";
Flow first = firstFlow( path + "/first", false );
Flow second = secondFlow( first.getSink(), path + "/second" );
Flow third = thirdFlow( second.getSink(), path + "/third" );
Flow fourth = fourthFlow( third.getSink(), path + "/fourth" );
CountingFlowListener flowListener = new CountingFlowListener();
second.addListener( flowListener );
Cascade cascade = new CascadeConnector( getProperties() ).connect( first, second, third, fourth );
cascade.setFlowSkipStrategy( new FlowSkipStrategy()
{
public boolean skipFlow( Flow flow ) throws IOException
{
return true;
}
} );
cascade.start();
cascade.complete();
assertEquals( 1, flowListener.skipped );
assertFalse( "file exists", fourth.getSink().resourceExists( fourth.getConfig() ) );
}
内容来源于网络,如有侵权,请联系作者删除!