本文整理了Java中com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run()
方法的一些代码示例,展示了Worker.run()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Worker.run()
方法的具体详情如下:
包路径:com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker
类名称:Worker
方法名:run
[英]Start consuming data from the stream, and pass it to the application record processors.
[中]开始使用流中的数据,并将其传递给应用程序记录处理器。
代码示例来源:origin: com.amazonaws/amazon-kinesis-client
@Override
public Integer call() throws Exception {
int exitCode = 0;
try {
worker.run();
} catch (Throwable t) {
LOG.error("Caught throwable while processing data.", t);
exitCode = 1;
}
return exitCode;
}
代码示例来源:origin: harishreedharan/usingflumecode
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()){
worker = new Worker(processorFactory, clientConfig);
worker.run(); // Returns when worker is shutdown
}
}
});
代码示例来源:origin: awslabs/amazon-kinesis-connectors
@Override
public void run() {
if (worker != null) {
// Start Amazon Kinesis Client Library worker to process records
LOG.info("Starting worker in " + getClass().getSimpleName());
try {
worker.run();
} catch (Throwable t) {
LOG.error(t);
throw t;
} finally {
LOG.error("Worker " + getClass().getSimpleName() + " is not running.");
}
} else {
throw new RuntimeException("Initialize must be called before run.");
}
}
代码示例来源:origin: com.amazonaws/amazon-kinesis-connectors
@Override
public void run() {
if (worker != null) {
// Start Amazon Kinesis Client Library worker to process records
LOG.info("Starting worker in " + getClass().getSimpleName());
try {
worker.run();
} catch (Throwable t) {
LOG.error(t);
throw t;
} finally {
LOG.error("Worker " + getClass().getSimpleName() + " is not running.");
}
} else {
throw new RuntimeException("Initialize must be called before run.");
}
}
代码示例来源:origin: awslabs/amazon-kinesis-aggregators
public int run() throws Exception {
configure();
System.out.println(String.format("Starting %s", appName));
LOG.info(String.format("Running %s to process stream %s", appName,
streamName));
IRecordProcessorFactory recordProcessorFactory = new AggregatorProcessorFactory(
aggGroup);
worker = new Worker(recordProcessorFactory, this.config);
int exitCode = 0;
int failures = 0;
// run the worker, tolerating as many failures as is configured
while (failures < failuresToTolerate || failuresToTolerate == -1) {
try {
worker.run();
} catch (Throwable t) {
LOG.error("Caught throwable while processing data.", t);
failures++;
if (failures < failuresToTolerate) {
LOG.error("Restarting...");
} else {
shutdown();
}
exitCode = 1;
}
}
return exitCode;
}
代码示例来源:origin: awslabs/dynamodb-cross-region-library
/**
* Command line main method entry point
*
* @param args
* command line arguments
*/
public static void main(String[] args) {
try {
final Optional<Worker> workerOption = mainUnsafe(args);
if (!workerOption.isPresent()) {
return;
}
System.out.println("Starting replication now, check logs for more details.");
workerOption.get().run();
} catch (ParameterException e) {
log.error(e);
JCommander.getConsole().println(e.toString());
System.exit(StatusCodes.EINVAL);
} catch (Exception e) {
log.fatal(e);
JCommander.getConsole().println(e.toString());
System.exit(StatusCodes.EINVAL);
}
}
代码示例来源:origin: aws/amazon-kinesis-video-streams-parser-library
@Override
public void run() {
try {
String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
KinesisClientLibConfiguration kinesisClientLibConfiguration = new KinesisClientLibConfiguration(APPLICATION_NAME, kdsStreamName, credentialsProvider, workerId);
kinesisClientLibConfiguration.withInitialPositionInStream(SAMPLE_APPLICATION_INITIAL_POSITION_IN_STREAM).withRegionName(region.getName());
final IRecordProcessorFactory recordProcessorFactory = () -> new KinesisRecordProcessor(rekognizedFragmentsIndex, credentialsProvider);
final Worker worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration);
System.out.printf("Running %s to process stream %s as worker %s...", APPLICATION_NAME, kdsStreamName, workerId);
int exitCode = 0;
try {
worker.run();
} catch (Throwable t) {
System.err.println("Caught throwable while processing data.");
t.printStackTrace();
exitCode = 1;
}
System.out.println("Exit code : " + exitCode);
} catch (Exception e) {
e.printStackTrace();
}
}
代码示例来源:origin: com.sonymobile/lumbermill-aws-kcl
public void start() {
int mb = 1024 * 1024;
LOG.info("Max memory: {} mb", Runtime.getRuntime().maxMemory() / mb);
LOG.info("Starting up Kinesis Consumer... (may take a few seconds)");
AmazonKinesisClient kinesisClient = new AmazonKinesisClient(kinesisCfg.getKinesisCredentialsProvider(),
kinesisCfg.getKinesisClientConfiguration());
AmazonDynamoDBClient dynamoDBClient = new AmazonDynamoDBClient(kinesisCfg.getDynamoDBCredentialsProvider(),
kinesisCfg.getDynamoDBClientConfiguration());
AmazonCloudWatch cloudWatchClient = new AmazonCloudWatchClient(kinesisCfg.getCloudWatchCredentialsProvider(),
kinesisCfg.getCloudWatchClientConfiguration());
Worker worker = new Worker.Builder()
.recordProcessorFactory(() -> new RecordProcessor(unitOfWorkListener, exceptionStrategy, metricsCallback, dry))
.config(kinesisCfg)
.kinesisClient(kinesisClient)
.dynamoDBClient(dynamoDBClient)
.cloudWatchClient(cloudWatchClient)
.build();
worker.run();
}
内容来源于网络,如有侵权,请联系作者删除!