com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker类的使用及代码示例

x33g5p2x  于2022-02-03 转载在 其他  
字(8.6k)|赞(0)|评价(0)|浏览(144)

本文整理了Java中com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker类的一些代码示例,展示了Worker类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Worker类的具体详情如下:
包路径:com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker
类名称:Worker

Worker介绍

[英]Worker is the high level class that Kinesis applications use to start processing data. It initializes and oversees different components (e.g. syncing shard and lease information, tracking shard assignments, and processing data from the shards).
[中]Worker是Kinesis应用程序用来开始处理数据的高级类。它初始化并监督不同的组件(例如同步碎片和租赁信息、跟踪碎片分配以及处理碎片中的数据)。

代码示例

代码示例来源:origin: harishreedharan/usingflumecode

@Override
 public void run() {
  while (!Thread.currentThread().isInterrupted()){
   worker = new Worker(processorFactory, clientConfig);
   worker.run(); // Returns when worker is shutdown
  }
 }
});

代码示例来源:origin: io.zipkin.aws/zipkin-collector-kinesis

@Override
public void close() {
 // The executor is a single thread that is tied to this worker. Once the worker shuts down
 // the executor will stop.
 worker.shutdown();
}

代码示例来源:origin: com.amazonaws/amazon-kinesis-client

/**
 * Start consuming data from the stream, and pass it to the application record processors.
 */
public void run() {
  if (shutdown) {
    return;
  }
  try {
    initialize();
    LOG.info("Initialization complete. Starting worker loop.");
  } catch (RuntimeException e1) {
    LOG.error("Unable to initialize after " + MAX_INITIALIZATION_ATTEMPTS + " attempts. Shutting down.", e1);
    shutdown();
  }
  while (!shouldShutdown()) {
    runProcessLoop();
  }
  finalShutdown();
  LOG.info("Worker loop is complete. Exiting from worker.");
}

代码示例来源:origin: aws-samples/aws-dynamodb-examples

worker = new Worker(recordProcessorFactory, workerConfig, adapterClient, dynamoDBClient, cloudWatchClient);
System.out.println("Starting worker...");
Thread t = new Thread(worker);
worker.shutdown();
t.join();

代码示例来源:origin: com.amazonaws/amazon-kinesis-client

@Override
public Integer call() throws Exception {
  int exitCode = 0;
  try {
    worker.run();
  } catch (Throwable t) {
    LOG.error("Caught throwable while processing data.", t);
    exitCode = 1;
  }
  return exitCode;
}

代码示例来源:origin: awslabs/amazon-kinesis-connectors

new Worker(getKinesisConnectorRecordProcessorFactory(),
          kinesisClientLibConfiguration,
          metricFactory);
} else {
  worker = new Worker(getKinesisConnectorRecordProcessorFactory(), kinesisClientLibConfiguration);

代码示例来源:origin: com.amazonaws/amazon-kinesis-client

boolean foundCompletedShard = false;
Set<ShardInfo> assignedShards = new HashSet<>();
for (ShardInfo shardInfo : getShardInfoForAssignments()) {
  ShardConsumer shardConsumer = createOrGetShardConsumer(shardInfo, recordProcessorFactory);
  if (shardConsumer.isShutdown() && shardConsumer.getShutdownReason().equals(ShutdownReason.TERMINATE)) {
    foundCompletedShard = true;
cleanupShardConsumers(assignedShards);

代码示例来源:origin: com.amazonaws/amazon-kinesis-client

/**
 * NOTE: This method is internal/private to the Worker class. It has package access solely for testing.
 *
 * @param shardInfo
 *            Kinesis shard info
 * @param processorFactory
 *            RecordProcessor factory
 * @return ShardConsumer for the shard
 */
ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) {
  ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo);
  // Instantiate a new consumer if we don't have one, or the one we
  // had was from an earlier
  // lease instance (and was shutdown). Don't need to create another
  // one if the shard has been
  // completely processed (shutdown reason terminate).
  if ((consumer == null)
      || (consumer.isShutdown() && consumer.getShutdownReason().equals(ShutdownReason.ZOMBIE))) {
    consumer = buildConsumer(shardInfo, processorFactory);
    shardInfoShardConsumerMap.put(shardInfo, consumer);
    wlog.infoForce("Created new shardConsumer for : " + shardInfo);
  }
  return consumer;
}

代码示例来源:origin: awslabs/amazon-kinesis-connectors

@Override
public void run() {
  if (worker != null) {
    // Start Amazon Kinesis Client Library worker to process records
    LOG.info("Starting worker in " + getClass().getSimpleName());
    try {
      worker.run();
    } catch (Throwable t) {
      LOG.error(t);
      throw t;
    } finally {
      LOG.error("Worker " + getClass().getSimpleName() + " is not running.");
    }
  } else {
    throw new RuntimeException("Initialize must be called before run.");
  }
}

代码示例来源:origin: com.amazonaws/amazon-kinesis-connectors

new Worker(getKinesisConnectorRecordProcessorFactory(),
          kinesisClientLibConfiguration,
          metricFactory);
} else {
  worker = new Worker(getKinesisConnectorRecordProcessorFactory(), kinesisClientLibConfiguration);

代码示例来源:origin: awslabs/amazon-kinesis-aggregators

public int run() throws Exception {
  configure();
  System.out.println(String.format("Starting %s", appName));
  LOG.info(String.format("Running %s to process stream %s", appName,
      streamName));
  IRecordProcessorFactory recordProcessorFactory = new AggregatorProcessorFactory(
      aggGroup);
  worker = new Worker(recordProcessorFactory, this.config);
  int exitCode = 0;
  int failures = 0;
  // run the worker, tolerating as many failures as is configured
  while (failures < failuresToTolerate || failuresToTolerate == -1) {
    try {
      worker.run();
    } catch (Throwable t) {
      LOG.error("Caught throwable while processing data.", t);
      failures++;
      if (failures < failuresToTolerate) {
        LOG.error("Restarting...");
      } else {
        shutdown();
      }
      exitCode = 1;
    }
  }
  return exitCode;
}

代码示例来源:origin: io.macgyver.rx-aws/rx-aws

public void stop() {
  worker.shutdown();
}

代码示例来源:origin: com.amazonaws/amazon-kinesis-connectors

@Override
public void run() {
  if (worker != null) {
    // Start Amazon Kinesis Client Library worker to process records
    LOG.info("Starting worker in " + getClass().getSimpleName());
    try {
      worker.run();
    } catch (Throwable t) {
      LOG.error(t);
      throw t;
    } finally {
      LOG.error("Worker " + getClass().getSimpleName() + " is not running.");
    }
  } else {
    throw new RuntimeException("Initialize must be called before run.");
  }
}

代码示例来源:origin: awslabs/dynamodb-cross-region-library

return new Worker(factory, kclConfig, streamsAdapterClient, kclDynamoDBClient, kclCloudWatchClient);

代码示例来源:origin: aws/amazon-kinesis-video-streams-parser-library

@Override
public void run() {
  try {
    String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
    KinesisClientLibConfiguration kinesisClientLibConfiguration = new KinesisClientLibConfiguration(APPLICATION_NAME, kdsStreamName, credentialsProvider, workerId);
    kinesisClientLibConfiguration.withInitialPositionInStream(SAMPLE_APPLICATION_INITIAL_POSITION_IN_STREAM).withRegionName(region.getName());
    final IRecordProcessorFactory recordProcessorFactory = () -> new KinesisRecordProcessor(rekognizedFragmentsIndex, credentialsProvider);
    final Worker worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration);
    System.out.printf("Running %s to process stream %s as worker %s...", APPLICATION_NAME, kdsStreamName, workerId);
    int exitCode = 0;
    try {
      worker.run();
    } catch (Throwable t) {
      System.err.println("Caught throwable while processing data.");
      t.printStackTrace();
      exitCode = 1;
    }
    System.out.println("Exit code : " + exitCode);
  } catch (Exception e) {
    e.printStackTrace();
  }
}

代码示例来源:origin: scopely/kinesis-vcr

public void stop() {
  worker.shutdown();
}

代码示例来源:origin: awslabs/dynamodb-cross-region-library

/**
 * Command line main method entry point
 *
 * @param args
 *            command line arguments
 */
public static void main(String[] args) {
  try {
    final Optional<Worker> workerOption = mainUnsafe(args);
    if (!workerOption.isPresent()) {
      return;
    }
    System.out.println("Starting replication now, check logs for more details.");
    workerOption.get().run();
  } catch (ParameterException e) {
    log.error(e);
    JCommander.getConsole().println(e.toString());
    System.exit(StatusCodes.EINVAL);
  } catch (Exception e) {
    log.fatal(e);
    JCommander.getConsole().println(e.toString());
    System.exit(StatusCodes.EINVAL);
  }
}

代码示例来源:origin: com.amazonaws/amazon-kinesis-client

return new Worker(config.getApplicationName(),
    recordProcessorFactory,
    config,

代码示例来源:origin: harishreedharan/usingflumecode

@Override
protected void doStop() throws FlumeException {
 worker.shutdown();
 executor.shutdownNow();
}

代码示例来源:origin: com.sonymobile/lumbermill-aws-kcl

public void start()  {
  int mb = 1024 * 1024;
  LOG.info("Max memory:           {} mb", Runtime.getRuntime().maxMemory() / mb);
  LOG.info("Starting up Kinesis Consumer... (may take a few seconds)");
  AmazonKinesisClient kinesisClient = new AmazonKinesisClient(kinesisCfg.getKinesisCredentialsProvider(),
      kinesisCfg.getKinesisClientConfiguration());
  AmazonDynamoDBClient dynamoDBClient = new AmazonDynamoDBClient(kinesisCfg.getDynamoDBCredentialsProvider(),
      kinesisCfg.getDynamoDBClientConfiguration());
  AmazonCloudWatch cloudWatchClient = new AmazonCloudWatchClient(kinesisCfg.getCloudWatchCredentialsProvider(),
      kinesisCfg.getCloudWatchClientConfiguration());
  Worker worker = new Worker.Builder()
      .recordProcessorFactory(() -> new RecordProcessor(unitOfWorkListener, exceptionStrategy, metricsCallback, dry))
      .config(kinesisCfg)
      .kinesisClient(kinesisClient)
      .dynamoDBClient(dynamoDBClient)
      .cloudWatchClient(cloudWatchClient)
      .build();
  worker.run();
}

相关文章

微信公众号

最新文章

更多