cascading.flow.Flow.start()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(9.7k)|赞(0)|评价(0)|浏览(152)

本文整理了Java中cascading.flow.Flow.start()方法的一些代码示例,展示了Flow.start()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flow.start()方法的具体详情如下:
包路径:cascading.flow.Flow
类名称:Flow
方法名:start

Flow.start介绍

[英]Method start begins the execution of this Flow instance. It will return immediately. Use the method #complete()to block until this Flow completes.
[中]方法start开始执行此流实例。它将立即返回。使用方法#complete()阻塞,直到此流完成。

代码示例

代码示例来源:origin: com.twitter/scalding-core_2.10

public static <Config, T> Future<T> start(Flow<Config> flow, final scala.Function1<Flow<Config>, T> fn) {
  final Promise<T> result = Promise$.MODULE$.<T>apply();
  flow.addListener(new FlowListener() {
   public void onStarting(Flow f) { } // ignore
   public void onStopping(Flow f) { } // ignore
   public void onCompleted(Flow f) {
    // This is always called, but onThrowable is called first
    if(!result.isCompleted()) {
     // we use the above rather than trySuccess to avoid calling fn twice
     try {
      T toPut = (T) fn.apply(f);
      result.success(toPut);
     }
     catch(Throwable t) {
      result.failure(t);
     }
    }
   }
   public boolean onThrowable(Flow f, Throwable t) {
    result.failure(t);
    // The exception is handled by the owner of the promise and should not be rethrown
    return true;
   }
  });
  flow.start();
  return result.future();
 }
}

代码示例来源:origin: com.twitter/scalding-core

public static <Config, T> Future<T> start(Flow<Config> flow, final scala.Function1<Flow<Config>, T> fn) {
  final Promise<T> result = Promise$.MODULE$.<T>apply();
  flow.addListener(new FlowListener() {
   public void onStarting(Flow f) { } // ignore
   public void onStopping(Flow f) { } // ignore
   public void onCompleted(Flow f) {
    // This is always called, but onThrowable is called first
    if(!result.isCompleted()) {
     // we use the above rather than trySuccess to avoid calling fn twice
     try {
      T toPut = (T) fn.apply(f);
      result.success(toPut);
     }
     catch(Throwable t) {
      result.failure(t);
     }
    }
   }
   public boolean onThrowable(Flow f, Throwable t) {
    result.failure(t);
    // The exception is handled by the owner of the promise and should not be rethrown
    return true;
   }
  });
  flow.start();
  return result.future();
 }
}

代码示例来源:origin: stackoverflow.com

public class EventSystemDemo extends Application {

  @Override
  public void start(Stage primaryStage) throws Exception {

  HBox box = new HBox();
  box.setSpacing(12);
  box.setPadding(new Insets(12));
  box.setFillHeight(true);
  box.setAlignment(Pos.CENTER);

  Flow senderFlow = new Flow(ProducerController.class);
  box.getChildren().add(senderFlow.start());

  Flow receiverFlow = new Flow(ReceiverController.class);
  box.getChildren().add(receiverFlow.start());

  primaryStage.setScene(new Scene(box));
  primaryStage.show();

  }

  public static void main(String... args) {
    launch(args);
  }
}

代码示例来源:origin: cascading/cascading-hadoop2-common

flow.start();

代码示例来源:origin: cwensel/cascading

flow.start();

代码示例来源:origin: cwensel/cascading

@Test
public void testProcessFlowFlowListenerExceptionHandlingInStart() throws IOException, InterruptedException
 {
 ThrowableListener listener = new ThrowableListener();
 getPlatform().copyFromLocal( inputFileIps );
 String path = "startException";
 Flow process = flowWithException( path, FailingRiffle.Failing.START );
 process.addListener( listener );
 try
  {
  process.start();
  fail( "there should have been an exception" );
  }
 catch( CascadingException exception )
  {
  assertNotNull( listener.getThrowable() );
  }
 }

代码示例来源:origin: cascading/cascading-platform

@Test
public void testProcessFlowFlowListenerExceptionHandlingInStart() throws IOException, InterruptedException
 {
 ThrowableListener listener = new ThrowableListener();
 getPlatform().copyFromLocal( inputFileIps );
 String path = "startException";
 Flow process = flowWithException( path, FailingRiffle.Failing.START );
 process.addListener( listener );
 try
  {
  process.start();
  fail( "there should have been an exception" );
  }
 catch( CascadingException exception )
  {
  assertNotNull( listener.getThrowable() );
  }
 }

代码示例来源:origin: cwensel/cascading

flow.start();

代码示例来源:origin: cwensel/cascading

@Test
public void testProcessFlowFlowListenerExceptionHandlingInStop() throws IOException, InterruptedException
 {
 ThrowableListener listener = new ThrowableListener();
 getPlatform().copyFromLocal( inputFileIps );
 String path = "stopException";
 Flow process = flowWithException( path, FailingRiffle.Failing.STOP );
 process.addListener( listener );
 process.start();
 try
  {
  process.stop();
  fail( "there should have been an exception" );
  }
 catch( CascadingException exception )
  {
  assertNotNull( listener.getThrowable() );
  }
 }

代码示例来源:origin: alexholmes/hadoop-book

parsedLogFlow.start();

代码示例来源:origin: cascading/cascading-platform

@Test
public void testProcessFlowFlowListenerExceptionHandlingInStop() throws IOException, InterruptedException
 {
 ThrowableListener listener = new ThrowableListener();
 getPlatform().copyFromLocal( inputFileIps );
 String path = "stopException";
 Flow process = flowWithException( path, FailingRiffle.Failing.STOP );
 process.addListener( listener );
 process.start();
 try
  {
  process.stop();
  fail( "there should have been an exception" );
  }
 catch( CascadingException exception )
  {
  assertNotNull( listener.getThrowable() );
  }
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testStartWithoutComplete() throws Exception
 {
 getPlatform().copyFromLocal( inputFileLower );
 Tap sourceLower = new Hfs( new TextLine( new Fields( "offset", "line" ) ), inputFileLower );
 Map sources = new HashMap();
 sources.put( "lower", sourceLower );
 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
 // using null pos so all fields are written
 Tap sink = new Hfs( new TextLine(), getOutputPath( "withoutcomplete" ), SinkMode.REPLACE );
 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
 pipeLower = new GroupBy( pipeLower, new Fields( "num" ) );
 Flow flow = getPlatform().getFlowConnector( getProperties() ).connect( sources, sink, pipeLower );
 LockingFlowListener listener = new LockingFlowListener();
 flow.addListener( listener );
 flow.start();
 assertTrue( listener.completed.tryAcquire( 90, TimeUnit.SECONDS ) );
 }

代码示例来源:origin: cascading/cascading-hadoop2-common

@Test
public void testStartWithoutComplete() throws Exception
 {
 getPlatform().copyFromLocal( inputFileLower );
 Tap sourceLower = new Hfs( new TextLine( new Fields( "offset", "line" ) ), inputFileLower );
 Map sources = new HashMap();
 sources.put( "lower", sourceLower );
 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
 // using null pos so all fields are written
 Tap sink = new Hfs( new TextLine(), getOutputPath( "withoutcomplete" ), SinkMode.REPLACE );
 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
 pipeLower = new GroupBy( pipeLower, new Fields( "num" ) );
 Flow flow = getPlatform().getFlowConnector( getProperties() ).connect( sources, sink, pipeLower );
 LockingFlowListener listener = new LockingFlowListener();
 flow.addListener( listener );
 flow.start();
 assertTrue( listener.completed.tryAcquire( 90, TimeUnit.SECONDS ) );
 }

代码示例来源:origin: cwensel/cascading

flow.start();

代码示例来源:origin: cascading/cascading-hadoop2-common

flow.start();

代码示例来源:origin: cascading/cascading-platform

void runTestCount( String name, Fields argumentSelector, Fields fieldDeclaration, Fields outputSelector ) throws Exception
 {
 getPlatform().copyFromLocal( inputFileIps );
 Tap source = getPlatform().getTextFile( Fields.size( 2 ), inputFileIps );
 Tap sink = getPlatform().getTextFile( Fields.size( 1 ), getOutputPath( name ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "count" );
 pipe = new GroupBy( pipe, new Fields( 1 ) );
 pipe = new Every( pipe, argumentSelector, new Count( fieldDeclaration ), outputSelector );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.start(); // simple test for start
 flow.complete();
 validateLength( flow, 17 );
 assertTrue( getSinkAsList( flow ).contains( new Tuple( "63.123.238.8\t2" ) ) );
 }

代码示例来源:origin: cwensel/cascading

@Test
public void testStartStopRace() throws Exception
 {
 getPlatform().copyFromLocal( inputFileLower );
 Tap sourceLower = new Hfs( new TextLine( new Fields( "offset", "line" ) ), inputFileLower );
 Map sources = new HashMap();
 sources.put( "lower", sourceLower );
 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
 // using null pos so all fields are written
 Tap sink = new Hfs( new TextLine(), getOutputPath( "startstop" ), SinkMode.REPLACE );
 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
 pipeLower = new GroupBy( pipeLower, new Fields( "num" ) );
 Flow flow = getPlatform().getFlowConnector( getProperties() ).connect( sources, sink, pipeLower );
 flow.start();
 flow.stop(); // should not fail
 }

代码示例来源:origin: cascading/cascading-hadoop2-common

@Test
public void testStartStopRace() throws Exception
 {
 getPlatform().copyFromLocal( inputFileLower );
 Tap sourceLower = new Hfs( new TextLine( new Fields( "offset", "line" ) ), inputFileLower );
 Map sources = new HashMap();
 sources.put( "lower", sourceLower );
 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
 // using null pos so all fields are written
 Tap sink = new Hfs( new TextLine(), getOutputPath( "startstop" ), SinkMode.REPLACE );
 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
 pipeLower = new GroupBy( pipeLower, new Fields( "num" ) );
 Flow flow = getPlatform().getFlowConnector( getProperties() ).connect( sources, sink, pipeLower );
 flow.start();
 flow.stop(); // should not fail
 }

代码示例来源:origin: cwensel/cascading

void runTestCount( String name, Fields argumentSelector, Fields fieldDeclaration, Fields outputSelector ) throws Exception
 {
 getPlatform().copyFromLocal( inputFileIps );
 Tap source = getPlatform().getTextFile( Fields.size( 2 ), inputFileIps );
 Tap sink = getPlatform().getTextFile( Fields.size( 1 ), getOutputPath( name ), SinkMode.REPLACE );
 Pipe pipe = new Pipe( "count" );
 pipe = new GroupBy( pipe, new Fields( 1 ) );
 pipe = new Every( pipe, argumentSelector, new Count( fieldDeclaration ), outputSelector );
 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe );
 flow.start(); // simple test for start
 flow.complete();
 validateLength( flow, 17 );
 assertTrue( getSinkAsList( flow ).contains( new Tuple( "63.123.238.8\t2" ) ) );
 }

相关文章