com.amazonaws.services.sqs.model.ReceiveMessageRequest.withMaxNumberOfMessages()方法的使用及代码示例

x33g5p2x  于2022-01-29 转载在 其他  
字(13.9k)|赞(0)|评价(0)|浏览(100)

本文整理了Java中com.amazonaws.services.sqs.model.ReceiveMessageRequest.withMaxNumberOfMessages方法的一些代码示例,展示了ReceiveMessageRequest.withMaxNumberOfMessages的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ReceiveMessageRequest.withMaxNumberOfMessages方法的具体详情如下:
包路径:com.amazonaws.services.sqs.model.ReceiveMessageRequest
类名称:ReceiveMessageRequest
方法名:withMaxNumberOfMessages

ReceiveMessageRequest.withMaxNumberOfMessages介绍

[英]The maximum number of messages to return. Amazon SQS never returns more messages than this value (however, fewer messages might be returned). Valid values: 1 to 10. Default: 1.
[中]要返回的最大消息数。Amazon SQS不会返回超过此值的消息(但是,返回的消息可能会更少)。有效值:1到10。默认值:1。

代码示例

代码示例来源:origin: Netflix/conductor

@VisibleForTesting
List<Message> receiveMessages() {
  try {
    ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest()
        .withQueueUrl(queueURL)
        .withVisibilityTimeout(visibilityTimeoutInSeconds)
        .withMaxNumberOfMessages(batchSize);
    ReceiveMessageResult result = client.receiveMessage(receiveMessageRequest);
    List<Message> messages = result.getMessages().stream()
        .map(msg -> new Message(msg.getMessageId(), msg.getBody(), msg.getReceiptHandle()))
        .collect(Collectors.toList());
    Monitors.recordEventQueueMessagesProcessed(QUEUE_TYPE, this.queueName, messages.size());
    return messages;
  } catch (Exception e) {
    logger.error("Exception while getting messages from SQS", e);
    Monitors.recordObservableQMessageReceivedErrors(QUEUE_TYPE);
  }
  return new ArrayList<>();
}

代码示例来源:origin: aws/aws-sdk-java

/**
   * Attempts to retrieve messages from SQS and upon completion (successful or unsuccessful)
   * reports the batch as complete and open
   */
  public void run() {
    try {
      visibilityDeadlineNano = System.nanoTime() + visibilityTimeoutNanos;
      ReceiveMessageRequest request = new ReceiveMessageRequest(qUrl).withMaxNumberOfMessages(config
          .getMaxBatchSize());
      ResultConverter.appendUserAgent(request, AmazonSQSBufferedAsyncClient.USER_AGENT);
      if (config.getVisibilityTimeoutSeconds() > 0) {
        request.setVisibilityTimeout(config.getVisibilityTimeoutSeconds());
        visibilityDeadlineNano = System.nanoTime()
            + TimeUnit.NANOSECONDS.convert(config.getVisibilityTimeoutSeconds(), TimeUnit.SECONDS);
      }
      if (config.isLongPoll()) {
        request.withWaitTimeSeconds(config.getLongPollWaitTimeoutSeconds());
      }
      messages = sqsClient.receiveMessage(request).getMessages();
    } catch (AmazonClientException e) {
      exception = e;
    } finally {
      // whatever happened, we are done and can be considered open
      open = true;
      parentBuffer.reportBatchFinished(this);
    }
  }
}

代码示例来源:origin: aws-amplify/aws-sdk-android

/**
   * Attempts to retrieve messages from SQS and upon completion
   * (successful or unsuccessful) reports the batch as complete and open
   */
  @Override
  public void run() {
    try {
      visibilityDeadlineNano = System.nanoTime() + visibilityTimeoutNanos;
      ReceiveMessageRequest request = new ReceiveMessageRequest(qUrl)
          .withMaxNumberOfMessages(config.getMaxBatchSize());
      ResultConverter.appendUserAgent(request, AmazonSQSBufferedAsyncClient.USER_AGENT);
      if (config.getVisibilityTimeoutSeconds() > 0) {
        request.setVisibilityTimeout(config.getVisibilityTimeoutSeconds());
        visibilityDeadlineNano = System.nanoTime()
            + TimeUnit.NANOSECONDS.convert(config.getVisibilityTimeoutSeconds(),
                TimeUnit.SECONDS);
      }
      if (config.isLongPoll()) {
        request.withWaitTimeSeconds(config.getLongPollWaitTimeoutSeconds());
      }
      messages = sqsClient.receiveMessage(request).getMessages();
    } catch (AmazonClientException e) {
      exception = e;
    } finally {
      // whatever happened, we are done and can be considered open
      open = true;
      parentBuffer.reportBatchFinished(this);
    }
  }
}

代码示例来源:origin: com.github.davidmoten/rxjava2-aws

private static ReceiveMessageRequest request(String queueUrl, int waitTimeSeconds) {
  return new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(20).withWaitTimeSeconds(waitTimeSeconds);
}

代码示例来源:origin: gofore/aws-training

@Get("json")
public JsonNode logs() {
  ReceiveMessageRequest request = new ReceiveMessageRequest(queueUrl)
      .withWaitTimeSeconds(20)
      .withMaxNumberOfMessages(10);
  return sqsClient.receiveMessage(request)
      .thenApply(ReceiveMessageResult::getMessages)
      .whenComplete(Consumers.onSuccess(msgs -> msgs.forEach(deleteMessage())))
      .thenApply(msgs -> msgs.stream().map(readNode()))
      .thenApply(toArray())
      .join();
}

代码示例来源:origin: com.github.davidmoten/rxjava2-aws

@Override
public State call() {
  queueUrl = sqs.getQueueUrl(queueName).getQueueUrl();
  request = new ReceiveMessageRequest(queueUrl) //
      .withWaitTimeSeconds(20) //
      .withMaxNumberOfMessages(10);
  return new State(new LinkedList<>());
}

代码示例来源:origin: org.springframework.cloud/spring-cloud-aws-messaging

public ReceiveMessageRequest getReceiveMessageRequest() {
  ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(this.destinationUrl).
      withAttributeNames(RECEIVING_ATTRIBUTES).
      withMessageAttributeNames(RECEIVING_MESSAGE_ATTRIBUTES);
  if (this.maxNumberOfMessages != null) {
    receiveMessageRequest.withMaxNumberOfMessages(this.maxNumberOfMessages);
  } else {
    receiveMessageRequest.withMaxNumberOfMessages(DEFAULT_MAX_NUMBER_OF_MESSAGES);
  }
  if (this.visibilityTimeout != null) {
    receiveMessageRequest.withVisibilityTimeout(this.visibilityTimeout);
  }
  if (this.waitTimeOut != null) {
    receiveMessageRequest.setWaitTimeSeconds(this.waitTimeOut);
  }
  return receiveMessageRequest;
}

代码示例来源:origin: spring-cloud/spring-cloud-aws

public ReceiveMessageRequest getReceiveMessageRequest() {
  ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(this.destinationUrl).
      withAttributeNames(RECEIVING_ATTRIBUTES).
      withMessageAttributeNames(RECEIVING_MESSAGE_ATTRIBUTES);
  if (this.maxNumberOfMessages != null) {
    receiveMessageRequest.withMaxNumberOfMessages(this.maxNumberOfMessages);
  } else {
    receiveMessageRequest.withMaxNumberOfMessages(DEFAULT_MAX_NUMBER_OF_MESSAGES);
  }
  if (this.visibilityTimeout != null) {
    receiveMessageRequest.withVisibilityTimeout(this.visibilityTimeout);
  }
  if (this.waitTimeOut != null) {
    receiveMessageRequest.setWaitTimeSeconds(this.waitTimeOut);
  }
  return receiveMessageRequest;
}

代码示例来源:origin: org.symphonyoss.s2.fugue/aws-fugue

/* package */ SqsSubscriber(SqsSubscriberManager manager, AmazonSQS sqsClient, String queueUrl,
  String subscriptionName, ITraceContextTransactionFactory traceFactory,
  IThreadSafeRetryableConsumer<String> consumer, ICounter counter, IBusyCounter busyCounter, String tenantId)
{
 super(manager, subscriptionName, counter, busyCounter, EXTENSION_FREQUENCY_MILLIS);
 
 if(Fugue.isDebugSingleThread())
 {
  messageBatchSize_ = 1;
 }
 
 manager_ = manager;
 sqsClient_ = sqsClient;
 queueUrl_ = queueUrl;
 traceFactory_ = traceFactory;
 consumer_ = consumer;
 nonIdleSubscriber_ = new NonIdleSubscriber();
 tenantId_ = tenantId;
 blockingPullRequest_ = new ReceiveMessageRequest(queueUrl_)
   .withMaxNumberOfMessages(messageBatchSize_ )
   .withWaitTimeSeconds(20);
 
 nonBlockingPullRequest_ = new ReceiveMessageRequest(queueUrl_)
   .withMaxNumberOfMessages(messageBatchSize_ );
}

代码示例来源:origin: bazaarvoice/emodb

@Override
public List<ScanRangeTask> claimScanRangeTasks(int max, Duration ttl) {
  if (max == 0) {
    return ImmutableList.of();
  }
  List<Message> messages = _sqs.receiveMessage(new ReceiveMessageRequest()
      .withQueueUrl(getQueueUrl(_pendingScanRangeQueue))
      .withMaxNumberOfMessages(Math.min(max, 10))           // SQS cannot claim more than 10 messages
      .withVisibilityTimeout(toSeconds(ttl))
  ).getMessages();
  return FluentIterable.from(messages)
      .transform(new Function<Message, ScanRangeTask>() {
        @Override
        public ScanRangeTask apply(Message message) {
          QueueScanRangeTask task = JsonHelper.fromJson(message.getBody(), QueueScanRangeTask.class);
          task.setMessageId(message.getReceiptHandle());
          return task;
        }
      })
      .toList();
}

代码示例来源:origin: com.netflix.conductor/conductor-contribs

@VisibleForTesting
List<Message> receiveMessages() {
  try {
    ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest()
        .withQueueUrl(queueURL)
        .withVisibilityTimeout(visibilityTimeoutInSeconds)
        .withMaxNumberOfMessages(batchSize);
    ReceiveMessageResult result = client.receiveMessage(receiveMessageRequest);
    List<Message> messages = result.getMessages().stream()
        .map(msg -> new Message(msg.getMessageId(), msg.getBody(), msg.getReceiptHandle()))
        .collect(Collectors.toList());
    Monitors.recordEventQueueMessagesProcessed(QUEUE_TYPE, this.queueName, messages.size());
    return messages;
  } catch (Exception e) {
    logger.error("Exception while getting messages from SQS", e);
    Monitors.recordObservableQMessageReceivedErrors(QUEUE_TYPE);
  }
  return new ArrayList<>();
}

代码示例来源:origin: com.netflix.spinnaker.echo/echo-pubsub-aws

private void listenForMessages() {
 while (isEnabled.get()) {
  ReceiveMessageResult receiveMessageResult = amazonSQS.receiveMessage(
   new ReceiveMessageRequest(queueId)
    .withMaxNumberOfMessages(AWS_MAX_NUMBER_OF_MESSAGES)
    .withVisibilityTimeout(subscription.getVisibilityTimeout())
    .withWaitTimeSeconds(subscription.getWaitTimeSeconds())
    .withMessageAttributeNames("All")
  );
  if (receiveMessageResult.getMessages().isEmpty()) {
   log.debug("Received no messages for queue: {}", queueARN);
   continue;
  }
  receiveMessageResult.getMessages().forEach(this::handleMessage);
 }
}

代码示例来源:origin: com.netflix.suro/suro-server

@Override
public String recv() {
  ReceiveMessageRequest request = new ReceiveMessageRequest()
      .withQueueUrl(queueUrls.get(0))
      .withMaxNumberOfMessages(1);
  try {
    ReceiveMessageResult result = sqsClient.receiveMessage(request);
    if (!result.getMessages().isEmpty()) {
      Message msg = result.getMessages().get(0);
      recvMessageCount.incrementAndGet();
      DeleteMessageRequest deleteReq = new DeleteMessageRequest()
          .withQueueUrl(queueUrls.get(0))
          .withReceiptHandle(msg.getReceiptHandle());
      sqsClient.deleteMessage(deleteReq);
      if (enableBase64Encoding) {
        return new String(
            Base64.decodeBase64(msg.getBody().getBytes()),
            Charsets.UTF_8);
      } else {
        return msg.getBody();
      }
    } else {
      return "";
    }
  } catch (Exception e) {
    log.error("Exception while recving SQS notice: " + e.getMessage(), e);
    return "";
  }
}

代码示例来源:origin: bazaarvoice/emodb

@Override
public List<ScanRangeComplete> claimCompleteScanRanges(Duration ttl) {
  List<Message> messages = _sqs.receiveMessage(new ReceiveMessageRequest()
      .withQueueUrl(getQueueUrl(_completeScanRangeQueue))
      .withMaxNumberOfMessages(10)
      .withVisibilityTimeout(toSeconds(ttl))
  ).getMessages();
  return FluentIterable.from(messages)
      .transform(new Function<Message, ScanRangeComplete>() {
        @Override
        public ScanRangeComplete apply(Message message) {
          QueueScanRangeComplete completion = JsonHelper.fromJson(message.getBody(), QueueScanRangeComplete.class);
          completion.setMessageId(message.getReceiptHandle());
          return completion;
        }
      })
      .toList();
}

代码示例来源:origin: org.springframework.cloud/spring-cloud-aws-messaging

@Override
public Message<String> receive(long timeout) {
  ReceiveMessageResult receiveMessageResult = this.amazonSqs.receiveMessage(
      new ReceiveMessageRequest(this.queueUrl).
          withMaxNumberOfMessages(1).
          withWaitTimeSeconds(Long.valueOf(timeout).intValue()).
          withAttributeNames(ATTRIBUTE_NAMES).
          withMessageAttributeNames(MESSAGE_ATTRIBUTE_NAMES));
  if (receiveMessageResult.getMessages().isEmpty()) {
    return null;
  }
  com.amazonaws.services.sqs.model.Message amazonMessage = receiveMessageResult.getMessages().get(0);
  Message<String> message = createMessage(amazonMessage);
  this.amazonSqs.deleteMessage(new DeleteMessageRequest(this.queueUrl, amazonMessage.getReceiptHandle()));
  return message;
}

代码示例来源:origin: spring-cloud/spring-cloud-aws

@Override
public Message<String> receive(long timeout) {
  ReceiveMessageResult receiveMessageResult = this.amazonSqs.receiveMessage(
      new ReceiveMessageRequest(this.queueUrl).
          withMaxNumberOfMessages(1).
          withWaitTimeSeconds(Long.valueOf(timeout).intValue()).
          withAttributeNames(ATTRIBUTE_NAMES).
          withMessageAttributeNames(MESSAGE_ATTRIBUTE_NAMES));
  if (receiveMessageResult.getMessages().isEmpty()) {
    return null;
  }
  com.amazonaws.services.sqs.model.Message amazonMessage = receiveMessageResult.getMessages().get(0);
  Message<String> message = createMessage(amazonMessage);
  this.amazonSqs.deleteMessage(new DeleteMessageRequest(this.queueUrl, amazonMessage.getReceiptHandle()));
  return message;
}

代码示例来源:origin: classmethod/gradle-aws-plugin

while (counter < maxNumberOfMessages) {
  ReceiveMessageRequest request = new ReceiveMessageRequest().withQueueUrl(queueUrl)
    .withMaxNumberOfMessages(MAX_MESSAGE_CONSUME_BATCH_SIZE).withVisibilityTimeout(30);
  List<DeleteMessageBatchRequestEntry> messagesToDelete =
      sqs.receiveMessage(request).getMessages().stream().map(message -> {

代码示例来源:origin: com.amazonaws/aws-java-sdk-sqs

/**
   * Attempts to retrieve messages from SQS and upon completion (successful or unsuccessful)
   * reports the batch as complete and open
   */
  public void run() {
    try {
      visibilityDeadlineNano = System.nanoTime() + visibilityTimeoutNanos;
      ReceiveMessageRequest request = new ReceiveMessageRequest(qUrl).withMaxNumberOfMessages(config
          .getMaxBatchSize());
      ResultConverter.appendUserAgent(request, AmazonSQSBufferedAsyncClient.USER_AGENT);
      if (config.getVisibilityTimeoutSeconds() > 0) {
        request.setVisibilityTimeout(config.getVisibilityTimeoutSeconds());
        visibilityDeadlineNano = System.nanoTime()
            + TimeUnit.NANOSECONDS.convert(config.getVisibilityTimeoutSeconds(), TimeUnit.SECONDS);
      }
      if (config.isLongPoll()) {
        request.withWaitTimeSeconds(config.getLongPollWaitTimeoutSeconds());
      }
      messages = sqsClient.receiveMessage(request).getMessages();
    } catch (AmazonClientException e) {
      exception = e;
    } finally {
      // whatever happened, we are done and can be considered open
      open = true;
      parentBuffer.reportBatchFinished(this);
    }
  }
}

代码示例来源:origin: com.amazonaws/amazon-sqs-java-messaging-lib

.withMaxNumberOfMessages(prefetchBatchSize)
.withAttributeNames(ALL)
.withMessageAttributeNames(ALL)

代码示例来源:origin: spring-projects/spring-integration-aws

@Bean
public AmazonSQSAsync amazonSqs() {
  AmazonSQSAsync sqs = mock(AmazonSQSAsync.class);
  given(sqs.getQueueUrl(new GetQueueUrlRequest("testQueue")))
      .willReturn(new GetQueueUrlResult().withQueueUrl("http://testQueue.amazonaws.com"));
  given(sqs.receiveMessage(new ReceiveMessageRequest("http://testQueue.amazonaws.com")
      .withAttributeNames("All")
      .withMessageAttributeNames("All")
      .withMaxNumberOfMessages(10)
      .withWaitTimeSeconds(20)))
      .willReturn(new ReceiveMessageResult()
          .withMessages(new Message().withBody("messageContent"),
              new Message().withBody("messageContent2")))
      .willReturn(new ReceiveMessageResult());
  given(sqs.getQueueAttributes(any(GetQueueAttributesRequest.class)))
      .willReturn(new GetQueueAttributesResult());
  return sqs;
}

相关文章

微信公众号

最新文章

更多