本文整理了Java中com.amazonaws.services.sqs.model.ReceiveMessageRequest.withMaxNumberOfMessages
方法的一些代码示例,展示了ReceiveMessageRequest.withMaxNumberOfMessages
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ReceiveMessageRequest.withMaxNumberOfMessages
方法的具体详情如下:
包路径:com.amazonaws.services.sqs.model.ReceiveMessageRequest
类名称:ReceiveMessageRequest
方法名:withMaxNumberOfMessages
[英]The maximum number of messages to return. Amazon SQS never returns more messages than this value (however, fewer messages might be returned). Valid values: 1 to 10. Default: 1.
[中]要返回的最大消息数。Amazon SQS不会返回超过此值的消息(但是,返回的消息可能会更少)。有效值:1到10。默认值:1。
代码示例来源:origin: Netflix/conductor
@VisibleForTesting
List<Message> receiveMessages() {
try {
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest()
.withQueueUrl(queueURL)
.withVisibilityTimeout(visibilityTimeoutInSeconds)
.withMaxNumberOfMessages(batchSize);
ReceiveMessageResult result = client.receiveMessage(receiveMessageRequest);
List<Message> messages = result.getMessages().stream()
.map(msg -> new Message(msg.getMessageId(), msg.getBody(), msg.getReceiptHandle()))
.collect(Collectors.toList());
Monitors.recordEventQueueMessagesProcessed(QUEUE_TYPE, this.queueName, messages.size());
return messages;
} catch (Exception e) {
logger.error("Exception while getting messages from SQS", e);
Monitors.recordObservableQMessageReceivedErrors(QUEUE_TYPE);
}
return new ArrayList<>();
}
代码示例来源:origin: aws/aws-sdk-java
/**
* Attempts to retrieve messages from SQS and upon completion (successful or unsuccessful)
* reports the batch as complete and open
*/
public void run() {
try {
visibilityDeadlineNano = System.nanoTime() + visibilityTimeoutNanos;
ReceiveMessageRequest request = new ReceiveMessageRequest(qUrl).withMaxNumberOfMessages(config
.getMaxBatchSize());
ResultConverter.appendUserAgent(request, AmazonSQSBufferedAsyncClient.USER_AGENT);
if (config.getVisibilityTimeoutSeconds() > 0) {
request.setVisibilityTimeout(config.getVisibilityTimeoutSeconds());
visibilityDeadlineNano = System.nanoTime()
+ TimeUnit.NANOSECONDS.convert(config.getVisibilityTimeoutSeconds(), TimeUnit.SECONDS);
}
if (config.isLongPoll()) {
request.withWaitTimeSeconds(config.getLongPollWaitTimeoutSeconds());
}
messages = sqsClient.receiveMessage(request).getMessages();
} catch (AmazonClientException e) {
exception = e;
} finally {
// whatever happened, we are done and can be considered open
open = true;
parentBuffer.reportBatchFinished(this);
}
}
}
代码示例来源:origin: aws-amplify/aws-sdk-android
/**
* Attempts to retrieve messages from SQS and upon completion
* (successful or unsuccessful) reports the batch as complete and open
*/
@Override
public void run() {
try {
visibilityDeadlineNano = System.nanoTime() + visibilityTimeoutNanos;
ReceiveMessageRequest request = new ReceiveMessageRequest(qUrl)
.withMaxNumberOfMessages(config.getMaxBatchSize());
ResultConverter.appendUserAgent(request, AmazonSQSBufferedAsyncClient.USER_AGENT);
if (config.getVisibilityTimeoutSeconds() > 0) {
request.setVisibilityTimeout(config.getVisibilityTimeoutSeconds());
visibilityDeadlineNano = System.nanoTime()
+ TimeUnit.NANOSECONDS.convert(config.getVisibilityTimeoutSeconds(),
TimeUnit.SECONDS);
}
if (config.isLongPoll()) {
request.withWaitTimeSeconds(config.getLongPollWaitTimeoutSeconds());
}
messages = sqsClient.receiveMessage(request).getMessages();
} catch (AmazonClientException e) {
exception = e;
} finally {
// whatever happened, we are done and can be considered open
open = true;
parentBuffer.reportBatchFinished(this);
}
}
}
代码示例来源:origin: com.github.davidmoten/rxjava2-aws
private static ReceiveMessageRequest request(String queueUrl, int waitTimeSeconds) {
return new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(20).withWaitTimeSeconds(waitTimeSeconds);
}
代码示例来源:origin: gofore/aws-training
@Get("json")
public JsonNode logs() {
ReceiveMessageRequest request = new ReceiveMessageRequest(queueUrl)
.withWaitTimeSeconds(20)
.withMaxNumberOfMessages(10);
return sqsClient.receiveMessage(request)
.thenApply(ReceiveMessageResult::getMessages)
.whenComplete(Consumers.onSuccess(msgs -> msgs.forEach(deleteMessage())))
.thenApply(msgs -> msgs.stream().map(readNode()))
.thenApply(toArray())
.join();
}
代码示例来源:origin: com.github.davidmoten/rxjava2-aws
@Override
public State call() {
queueUrl = sqs.getQueueUrl(queueName).getQueueUrl();
request = new ReceiveMessageRequest(queueUrl) //
.withWaitTimeSeconds(20) //
.withMaxNumberOfMessages(10);
return new State(new LinkedList<>());
}
代码示例来源:origin: org.springframework.cloud/spring-cloud-aws-messaging
public ReceiveMessageRequest getReceiveMessageRequest() {
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(this.destinationUrl).
withAttributeNames(RECEIVING_ATTRIBUTES).
withMessageAttributeNames(RECEIVING_MESSAGE_ATTRIBUTES);
if (this.maxNumberOfMessages != null) {
receiveMessageRequest.withMaxNumberOfMessages(this.maxNumberOfMessages);
} else {
receiveMessageRequest.withMaxNumberOfMessages(DEFAULT_MAX_NUMBER_OF_MESSAGES);
}
if (this.visibilityTimeout != null) {
receiveMessageRequest.withVisibilityTimeout(this.visibilityTimeout);
}
if (this.waitTimeOut != null) {
receiveMessageRequest.setWaitTimeSeconds(this.waitTimeOut);
}
return receiveMessageRequest;
}
代码示例来源:origin: spring-cloud/spring-cloud-aws
public ReceiveMessageRequest getReceiveMessageRequest() {
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(this.destinationUrl).
withAttributeNames(RECEIVING_ATTRIBUTES).
withMessageAttributeNames(RECEIVING_MESSAGE_ATTRIBUTES);
if (this.maxNumberOfMessages != null) {
receiveMessageRequest.withMaxNumberOfMessages(this.maxNumberOfMessages);
} else {
receiveMessageRequest.withMaxNumberOfMessages(DEFAULT_MAX_NUMBER_OF_MESSAGES);
}
if (this.visibilityTimeout != null) {
receiveMessageRequest.withVisibilityTimeout(this.visibilityTimeout);
}
if (this.waitTimeOut != null) {
receiveMessageRequest.setWaitTimeSeconds(this.waitTimeOut);
}
return receiveMessageRequest;
}
代码示例来源:origin: org.symphonyoss.s2.fugue/aws-fugue
/* package */ SqsSubscriber(SqsSubscriberManager manager, AmazonSQS sqsClient, String queueUrl,
String subscriptionName, ITraceContextTransactionFactory traceFactory,
IThreadSafeRetryableConsumer<String> consumer, ICounter counter, IBusyCounter busyCounter, String tenantId)
{
super(manager, subscriptionName, counter, busyCounter, EXTENSION_FREQUENCY_MILLIS);
if(Fugue.isDebugSingleThread())
{
messageBatchSize_ = 1;
}
manager_ = manager;
sqsClient_ = sqsClient;
queueUrl_ = queueUrl;
traceFactory_ = traceFactory;
consumer_ = consumer;
nonIdleSubscriber_ = new NonIdleSubscriber();
tenantId_ = tenantId;
blockingPullRequest_ = new ReceiveMessageRequest(queueUrl_)
.withMaxNumberOfMessages(messageBatchSize_ )
.withWaitTimeSeconds(20);
nonBlockingPullRequest_ = new ReceiveMessageRequest(queueUrl_)
.withMaxNumberOfMessages(messageBatchSize_ );
}
代码示例来源:origin: bazaarvoice/emodb
@Override
public List<ScanRangeTask> claimScanRangeTasks(int max, Duration ttl) {
if (max == 0) {
return ImmutableList.of();
}
List<Message> messages = _sqs.receiveMessage(new ReceiveMessageRequest()
.withQueueUrl(getQueueUrl(_pendingScanRangeQueue))
.withMaxNumberOfMessages(Math.min(max, 10)) // SQS cannot claim more than 10 messages
.withVisibilityTimeout(toSeconds(ttl))
).getMessages();
return FluentIterable.from(messages)
.transform(new Function<Message, ScanRangeTask>() {
@Override
public ScanRangeTask apply(Message message) {
QueueScanRangeTask task = JsonHelper.fromJson(message.getBody(), QueueScanRangeTask.class);
task.setMessageId(message.getReceiptHandle());
return task;
}
})
.toList();
}
代码示例来源:origin: com.netflix.conductor/conductor-contribs
@VisibleForTesting
List<Message> receiveMessages() {
try {
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest()
.withQueueUrl(queueURL)
.withVisibilityTimeout(visibilityTimeoutInSeconds)
.withMaxNumberOfMessages(batchSize);
ReceiveMessageResult result = client.receiveMessage(receiveMessageRequest);
List<Message> messages = result.getMessages().stream()
.map(msg -> new Message(msg.getMessageId(), msg.getBody(), msg.getReceiptHandle()))
.collect(Collectors.toList());
Monitors.recordEventQueueMessagesProcessed(QUEUE_TYPE, this.queueName, messages.size());
return messages;
} catch (Exception e) {
logger.error("Exception while getting messages from SQS", e);
Monitors.recordObservableQMessageReceivedErrors(QUEUE_TYPE);
}
return new ArrayList<>();
}
代码示例来源:origin: com.netflix.spinnaker.echo/echo-pubsub-aws
private void listenForMessages() {
while (isEnabled.get()) {
ReceiveMessageResult receiveMessageResult = amazonSQS.receiveMessage(
new ReceiveMessageRequest(queueId)
.withMaxNumberOfMessages(AWS_MAX_NUMBER_OF_MESSAGES)
.withVisibilityTimeout(subscription.getVisibilityTimeout())
.withWaitTimeSeconds(subscription.getWaitTimeSeconds())
.withMessageAttributeNames("All")
);
if (receiveMessageResult.getMessages().isEmpty()) {
log.debug("Received no messages for queue: {}", queueARN);
continue;
}
receiveMessageResult.getMessages().forEach(this::handleMessage);
}
}
代码示例来源:origin: com.netflix.suro/suro-server
@Override
public String recv() {
ReceiveMessageRequest request = new ReceiveMessageRequest()
.withQueueUrl(queueUrls.get(0))
.withMaxNumberOfMessages(1);
try {
ReceiveMessageResult result = sqsClient.receiveMessage(request);
if (!result.getMessages().isEmpty()) {
Message msg = result.getMessages().get(0);
recvMessageCount.incrementAndGet();
DeleteMessageRequest deleteReq = new DeleteMessageRequest()
.withQueueUrl(queueUrls.get(0))
.withReceiptHandle(msg.getReceiptHandle());
sqsClient.deleteMessage(deleteReq);
if (enableBase64Encoding) {
return new String(
Base64.decodeBase64(msg.getBody().getBytes()),
Charsets.UTF_8);
} else {
return msg.getBody();
}
} else {
return "";
}
} catch (Exception e) {
log.error("Exception while recving SQS notice: " + e.getMessage(), e);
return "";
}
}
代码示例来源:origin: bazaarvoice/emodb
@Override
public List<ScanRangeComplete> claimCompleteScanRanges(Duration ttl) {
List<Message> messages = _sqs.receiveMessage(new ReceiveMessageRequest()
.withQueueUrl(getQueueUrl(_completeScanRangeQueue))
.withMaxNumberOfMessages(10)
.withVisibilityTimeout(toSeconds(ttl))
).getMessages();
return FluentIterable.from(messages)
.transform(new Function<Message, ScanRangeComplete>() {
@Override
public ScanRangeComplete apply(Message message) {
QueueScanRangeComplete completion = JsonHelper.fromJson(message.getBody(), QueueScanRangeComplete.class);
completion.setMessageId(message.getReceiptHandle());
return completion;
}
})
.toList();
}
代码示例来源:origin: org.springframework.cloud/spring-cloud-aws-messaging
@Override
public Message<String> receive(long timeout) {
ReceiveMessageResult receiveMessageResult = this.amazonSqs.receiveMessage(
new ReceiveMessageRequest(this.queueUrl).
withMaxNumberOfMessages(1).
withWaitTimeSeconds(Long.valueOf(timeout).intValue()).
withAttributeNames(ATTRIBUTE_NAMES).
withMessageAttributeNames(MESSAGE_ATTRIBUTE_NAMES));
if (receiveMessageResult.getMessages().isEmpty()) {
return null;
}
com.amazonaws.services.sqs.model.Message amazonMessage = receiveMessageResult.getMessages().get(0);
Message<String> message = createMessage(amazonMessage);
this.amazonSqs.deleteMessage(new DeleteMessageRequest(this.queueUrl, amazonMessage.getReceiptHandle()));
return message;
}
代码示例来源:origin: spring-cloud/spring-cloud-aws
@Override
public Message<String> receive(long timeout) {
ReceiveMessageResult receiveMessageResult = this.amazonSqs.receiveMessage(
new ReceiveMessageRequest(this.queueUrl).
withMaxNumberOfMessages(1).
withWaitTimeSeconds(Long.valueOf(timeout).intValue()).
withAttributeNames(ATTRIBUTE_NAMES).
withMessageAttributeNames(MESSAGE_ATTRIBUTE_NAMES));
if (receiveMessageResult.getMessages().isEmpty()) {
return null;
}
com.amazonaws.services.sqs.model.Message amazonMessage = receiveMessageResult.getMessages().get(0);
Message<String> message = createMessage(amazonMessage);
this.amazonSqs.deleteMessage(new DeleteMessageRequest(this.queueUrl, amazonMessage.getReceiptHandle()));
return message;
}
代码示例来源:origin: classmethod/gradle-aws-plugin
while (counter < maxNumberOfMessages) {
ReceiveMessageRequest request = new ReceiveMessageRequest().withQueueUrl(queueUrl)
.withMaxNumberOfMessages(MAX_MESSAGE_CONSUME_BATCH_SIZE).withVisibilityTimeout(30);
List<DeleteMessageBatchRequestEntry> messagesToDelete =
sqs.receiveMessage(request).getMessages().stream().map(message -> {
代码示例来源:origin: com.amazonaws/aws-java-sdk-sqs
/**
* Attempts to retrieve messages from SQS and upon completion (successful or unsuccessful)
* reports the batch as complete and open
*/
public void run() {
try {
visibilityDeadlineNano = System.nanoTime() + visibilityTimeoutNanos;
ReceiveMessageRequest request = new ReceiveMessageRequest(qUrl).withMaxNumberOfMessages(config
.getMaxBatchSize());
ResultConverter.appendUserAgent(request, AmazonSQSBufferedAsyncClient.USER_AGENT);
if (config.getVisibilityTimeoutSeconds() > 0) {
request.setVisibilityTimeout(config.getVisibilityTimeoutSeconds());
visibilityDeadlineNano = System.nanoTime()
+ TimeUnit.NANOSECONDS.convert(config.getVisibilityTimeoutSeconds(), TimeUnit.SECONDS);
}
if (config.isLongPoll()) {
request.withWaitTimeSeconds(config.getLongPollWaitTimeoutSeconds());
}
messages = sqsClient.receiveMessage(request).getMessages();
} catch (AmazonClientException e) {
exception = e;
} finally {
// whatever happened, we are done and can be considered open
open = true;
parentBuffer.reportBatchFinished(this);
}
}
}
代码示例来源:origin: com.amazonaws/amazon-sqs-java-messaging-lib
.withMaxNumberOfMessages(prefetchBatchSize)
.withAttributeNames(ALL)
.withMessageAttributeNames(ALL)
代码示例来源:origin: spring-projects/spring-integration-aws
@Bean
public AmazonSQSAsync amazonSqs() {
AmazonSQSAsync sqs = mock(AmazonSQSAsync.class);
given(sqs.getQueueUrl(new GetQueueUrlRequest("testQueue")))
.willReturn(new GetQueueUrlResult().withQueueUrl("http://testQueue.amazonaws.com"));
given(sqs.receiveMessage(new ReceiveMessageRequest("http://testQueue.amazonaws.com")
.withAttributeNames("All")
.withMessageAttributeNames("All")
.withMaxNumberOfMessages(10)
.withWaitTimeSeconds(20)))
.willReturn(new ReceiveMessageResult()
.withMessages(new Message().withBody("messageContent"),
new Message().withBody("messageContent2")))
.willReturn(new ReceiveMessageResult());
given(sqs.getQueueAttributes(any(GetQueueAttributesRequest.class)))
.willReturn(new GetQueueAttributesResult());
return sqs;
}
内容来源于网络,如有侵权,请联系作者删除!