javax.ws.rs.container.AsyncResponse类的使用及代码示例

x33g5p2x  于2022-01-15 转载在 其他  
字(13.3k)|赞(0)|评价(0)|浏览(215)

本文整理了Java中javax.ws.rs.container.AsyncResponse类的一些代码示例,展示了AsyncResponse类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。AsyncResponse类的具体详情如下:
包路径:javax.ws.rs.container.AsyncResponse
类名称:AsyncResponse

AsyncResponse介绍

[英]An injectable JAX-RS asynchronous response that provides means for asynchronous server side response processing.

A new instance of AsyncResponse may be injected into a javax.ws.rs.HttpMethod parameter using the Suspended annotation.

Each asynchronous response instance is bound to the running request and can be used to asynchronously provide the request processing result or otherwise manipulate the suspended client connection. The available operations include:

  • updating suspended state data (time-out value, response ...)
  • resuming the suspended request processing
  • canceling the suspended request processing

Following example demonstrates the use of the AsyncResponse for asynchronous HTTP request processing:

@Path("/messages/next") 
public class MessagingResource { 
private static final BlockingQueue<AsyncResponse> suspended = 
new ArrayBlockingQueue<AsyncResponse>(5); 
@GET 
public void readMessage(@Suspended AsyncResponse ar) throws InterruptedException { 
suspended.put(ar); 
} 
@POST 
public String postMessage(final String message) throws InterruptedException { 
final AsyncResponse ar = suspended.take(); 
ar.resume(message); // resumes the processing of one GET request 
return "Message sent"; 
} 
}

If the asynchronous response was suspended with a positive timeout value, and has not been explicitly resumed before the timeout has expired, the processing will be resumed once the specified timeout threshold is reached, provided a positive timeout value was set on the response.

By default a timed-out asynchronous response is resumed with a javax.ws.rs.WebApplicationExceptionthat has javax.ws.rs.core.Response.Status#SERVICE_UNAVAILABLEerror response status code set. This default behavior may be overridden by AsyncResponse#setTimeoutHandler(TimeoutHandler) a custom TimeoutHandler.
[中]一种可注入的JAX-RS异步响应,为异步服务器端响应处理提供方法。
AsyncResponse的一个新实例可以注入javax。ws。使用挂起注释的rs.HttpMethod参数。
每个异步响应实例都绑定到正在运行的请求,并可用于异步提供请求处理结果或以其他方式操作挂起的客户端连接。可用的操作包括:
*更新挂起状态数据(超时值、响应…)
*正在恢复暂停的请求处理
*取消挂起的请求处理
以下示例演示如何使用AsyncResponse进行异步HTTP请求处理:

@Path("/messages/next") 
public class MessagingResource { 
private static final BlockingQueue<AsyncResponse> suspended = 
new ArrayBlockingQueue<AsyncResponse>(5); 
@GET 
public void readMessage(@Suspended AsyncResponse ar) throws InterruptedException { 
suspended.put(ar); 
} 
@POST 
public String postMessage(final String message) throws InterruptedException { 
final AsyncResponse ar = suspended.take(); 
ar.resume(message); // resumes the processing of one GET request 
return "Message sent"; 
} 
}

如果异步响应以正超时值挂起,并且在超时过期之前未显式恢复,则一旦达到指定的超时阈值,将恢复处理,前提是在响应上设置了正超时值。
默认情况下,使用javax恢复超时异步响应。ws。具有javax的rs.webapplicationexception。ws。rs.core。回答状态#服务_不可用错误响应状态代码集。此默认行为可能会被自定义TimeoutHandler的AsyncResponse#setTimeoutHandler(TimeoutHandler)覆盖。

代码示例

代码示例来源:origin: prestodb/presto

@GET
@Path("{queryId}/{token}")
@Produces(MediaType.APPLICATION_JSON)
public void getQueryResults(
    @PathParam("queryId") QueryId queryId,
    @PathParam("token") long token,
    @QueryParam("maxWait") Duration maxWait,
    @QueryParam("targetResultSize") DataSize targetResultSize,
    @HeaderParam(X_FORWARDED_PROTO) String proto,
    @Context UriInfo uriInfo,
    @Suspended AsyncResponse asyncResponse)
{
  Query query = queries.get(queryId);
  if (query == null) {
    asyncResponse.resume(Response.status(Status.NOT_FOUND).build());
    return;
  }
  if (isNullOrEmpty(proto)) {
    proto = uriInfo.getRequestUri().getScheme();
  }
  asyncQueryResults(query, OptionalLong.of(token), maxWait, targetResultSize, uriInfo, proto, asyncResponse);
}

代码示例来源:origin: javaee-samples/javaee7-samples

@GET
public void getList(@Suspended final AsyncResponse ar) throws NamingException {
  ar.setTimeoutHandler(new TimeoutHandler() {
    @Override
    public void handleTimeout(AsyncResponse ar) {
      ar.resume("Operation timed out");
    }
  });
  ar.setTimeout(4000, TimeUnit.MILLISECONDS);
  ar.register(new MyCompletionCallback());
  ar.register(new MyConnectionCallback());
  ManagedThreadFactory threadFactory = (ManagedThreadFactory) new InitialContext()
    .lookup("java:comp/DefaultManagedThreadFactory");
  Executors.newSingleThreadExecutor(threadFactory).submit(new Runnable() {
    @Override
    public void run() {
      try {
        Thread.sleep(3000);
        ar.resume(response[0]);
      } catch (InterruptedException ex) {
      }
    }
  });
}

代码示例来源:origin: resteasy/Resteasy

@GET
@Path("test")
public void test(final @Suspended AsyncResponse response)
{
 response.setTimeout(5000, TimeUnit.MILLISECONDS);
 Thread t = new Thread()
 {
   @Override
   public void run()
   {
    try
    {
      LOG.info("TestResource: async thread started");
      Thread.sleep(10000);
      Response jaxrs = Response.ok("test").type(MediaType.TEXT_PLAIN).build();
      response.resume(jaxrs);
      LOG.info("TestResource: async thread finished");
    }
    catch (Exception e)
    {
      LOG.error(e.getMessage(), e);
    }
   }
 };
 t.start();
}

代码示例来源:origin: prestodb/presto

@GET
@Path("{taskId}/results/{bufferId}/{token}")
@Produces(PRESTO_PAGES)
public void getResults(
    @PathParam("taskId") TaskId taskId,
    @PathParam("bufferId") OutputBufferId bufferId,
    @PathParam("token") final long token,
    @HeaderParam(PRESTO_MAX_SIZE) DataSize maxSize,
    @Suspended AsyncResponse asyncResponse)
    return Response.status(status)
        .entity(entity)
        .header(PRESTO_TASK_INSTANCE_ID, result.getTaskInstanceId())
        .header(PRESTO_PAGE_TOKEN, result.getToken())
        .header(PRESTO_PAGE_NEXT_TOKEN, result.getNextToken())
        .header(PRESTO_BUFFER_COMPLETE, result.isBufferComplete())
  bindAsyncResponse(asyncResponse, responseFuture, responseExecutor)
      .withTimeout(timeout,
          Response.status(Status.NO_CONTENT)
              .header(PRESTO_TASK_INSTANCE_ID, taskManager.getTaskInstanceId(taskId))
              .header(PRESTO_PAGE_TOKEN, token)
  asyncResponse.register((CompletionCallback) throwable -> resultsRequestTime.add(Duration.nanosSince(start)));

代码示例来源:origin: atomix/atomix

@GET
@Path("/{subject}/subscribers/{id}")
@Produces(MediaType.TEXT_PLAIN)
public void nextSession(@PathParam("subject") String subject, @PathParam("id") String id, @Context EventManager events, @Suspended AsyncResponse response) {
 EventLog<Consumer<String>, String> eventLog = events.getEventLog(ClusterEventService.class, getEventLogName(subject, id));
 if (eventLog == null) {
  LOGGER.warn("Unknown subscriber {}", id);
  response.resume(Response.status(Status.NOT_FOUND).build());
  return;
 }
 eventLog.nextEvent().whenComplete((event, error) -> {
  if (error == null) {
   response.resume(Response.ok(event).build());
  } else {
   LOGGER.warn("Subscriber {} closed", id);
   response.resume(Response.noContent().build());
  }
 });
}

代码示例来源:origin: atomix/atomix

@POST
@Path("/{name}/clear")
public void clear(
  @PathParam("name") String name,
  @Suspended AsyncResponse response) {
 getPrimitive(name).thenCompose(map -> map.clear()).whenComplete((result, error) -> {
  if (error == null) {
   response.resume(Response.noContent().build());
  } else {
   LOGGER.warn("{}", error);
   response.resume(Response.serverError().build());
  }
 });
}

代码示例来源:origin: apache/cxf

@GET
@Path("books/unmappedFromFilter")
@Produces("text/plain")
public void handleContinuationRequestUnmappedFromFilter(@Suspended AsyncResponse response) {
  response.register(new CallbackImpl());
  response.resume(Response.ok().build());
}

代码示例来源:origin: apache/cxf

@GET
@Path("/setTimeOut")
public String setTimeOut() {
  boolean setTimeout = asyncResponse.setTimeout(2, TimeUnit.SECONDS);
  return String.valueOf(setTimeout);
}

代码示例来源:origin: jersey/jersey

@GET
  @ManagedAsync
  public void longGet(@Suspended final AsyncResponse ar, @QueryParam("id") int requestId) {
    try {
      Thread.sleep(SLEEP_TIME_IN_MILLIS);
    } catch (InterruptedException ex) {
      LOGGER.log(Level.SEVERE, "Response processing interrupted", ex);
    }
    ar.resume(requestId + " - " + NOTIFICATION_RESPONSE);
  }
}

代码示例来源:origin: confluentinc/kafka-streams-examples

public static void setTimeout(long timeout, AsyncResponse asyncResponse) {
 asyncResponse.setTimeout(timeout, TimeUnit.MILLISECONDS);
 asyncResponse.setTimeoutHandler(resp -> resp.resume(
   Response.status(Response.Status.GATEWAY_TIMEOUT)
     .entity("HTTP GET timed out after " + timeout + " ms\n")
     .build()));
}

代码示例来源:origin: labsai/EDDI

@Override
public void importBot(InputStream zippedBotConfigFiles, AsyncResponse response) {
  try {
    response.setTimeout(60, TimeUnit.SECONDS);
    File targetDir = new File(FileUtilities.buildPath(tmpPath.toString(), UUID.randomUUID().toString()));
    this.zipArchive.unzip(zippedBotConfigFiles, targetDir);
    String targetDirPath = targetDir.getPath();
    Files.newDirectoryStream(Paths.get(targetDirPath),
        path -> path.toString().endsWith(BOT_FILE_ENDING))
        .forEach(botFilePath -> {
          try {
            String botFileString = readFile(botFilePath);
            BotConfiguration botConfiguration =
                jsonSerialization.deserialize(botFileString, BotConfiguration.class);
            botConfiguration.getPackages().forEach(packageUri ->
                parsePackage(targetDirPath, packageUri, botConfiguration, response));
            URI newBotUri = createNewBot(botConfiguration);
            updateDocumentDescriptor(Paths.get(targetDirPath), buildOldBotUri(botFilePath), newBotUri);
            response.resume(Response.ok().location(newBotUri).build());
          } catch (IOException | RestInterfaceFactory.RestInterfaceFactoryException e) {
            log.error(e.getLocalizedMessage(), e);
            response.resume(new InternalServerErrorException());
          }
        });
  } catch (IOException e) {
    log.error(e.getLocalizedMessage(), e);
    response.resume(new InternalServerErrorException());
  }
}

代码示例来源:origin: stackoverflow.com

@POST
@Path("bid/{exchangeId}/{key}")
public void handleBid(@PathParam("exchangeId") final String exchangeId, @PathParam("key") final String key,
      final OrtbBidRequest request, @Suspended final AsyncResponse asyncResponse) throws FilterException, IOException {

  // Timeout for clarity.
  asyncResponse.setTimeout(timeoutMs, TimeUnit.MILLISECONDS);
  asyncResponse.setTimeoutHandler(timeoutHandler);

  // Sending no data as an example
  asyncResponse.resume(Response.noContent().build());
}

代码示例来源:origin: devicehive/devicehive-java-server

logger.warn("DeviceCommand wait request failed. BAD REQUEST: Command with id = {} was not sent for device with id = {}",
    commandId, deviceId);
asyncResponse.resume(ResponseFactory.response(BAD_REQUEST));
return;
if (!asyncResponse.isDone()) {
  asyncResponse.resume(ResponseFactory.response(
      OK,
      com,
future.thenAccept(pair -> {
  final DeviceCommand deviceCommand = pair.getRight();
  if (!asyncResponse.isDone() && deviceCommand.getIsUpdated()) {
    asyncResponse.resume(ResponseFactory.response(
        OK,
        deviceCommand,
    asyncResponse.setTimeout(1, TimeUnit.MILLISECONDS); // setting timeout to 0 would cause
    asyncResponse.setTimeout(timeout, TimeUnit.SECONDS);
asyncResponse.register((CompletionCallback) throwable -> {
  try {
    commandService.sendUnsubscribeRequest(Collections.singleton(future.get().getLeft()));
  } catch (InterruptedException | ExecutionException e) {
    if (!asyncResponse.isDone()) {
      asyncResponse.resume(ResponseFactory.response(INTERNAL_SERVER_ERROR));
if (!asyncResponse.isDone()) {

代码示例来源:origin: stackoverflow.com

@GET
@Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON})
public void list(@Suspended
    final AsyncResponse asyncResponse) {
  asyncResponse.setTimeout(10, TimeUnit.SECONDS);
  executorService.submit(() -> {
    List<Product> res = super.listProducts();
    Product[] arr = res.toArray(new Product[res.size()]);
    asyncResponse.resume(arr);
  });
}

代码示例来源:origin: jersey/jersey

@Override
  public void run() {
    try {
      Thread.sleep(SLEEP_TIME_IN_MILLIS);
    } catch (InterruptedException ex) {
      LOGGER.log(Level.SEVERE, "Response processing interrupted", ex);
    }
    ar.resume("Complex result for " + query);
  }
});

代码示例来源:origin: devicehive/devicehive-java-server

asyncResponse.setTimeoutHandler(asyncRes ->
    asyncRes.resume(ResponseFactory.response(Response.Status.NO_CONTENT)));
  asyncResponse.resume(ResponseFactory.response(BAD_REQUEST));
  return;
  asyncResponse.resume(ResponseFactory.response(NOT_FOUND));
  return;
        logger.warn("DeviceCommand wait request failed. NOT FOUND: No command found with id = {} for deviceId = {}",
            commandId, deviceId);
        asyncResponse.resume(ResponseFactory.response(Response.Status.NO_CONTENT));
      } else {
        waitForCommand(device, commandId, timeout, command.get(), asyncResponse);

代码示例来源:origin: stackoverflow.com

@Path("/callback")
public class AsyncCallback {
  @POST
  @Consumes(MediaType.APPLICATION_JSON)
  public void postWithAsync(@Suspended AsyncResponse asyncResponse,  
                     SomeObject object) {

    asyncResponse.register(new CompletionCallback() {
      @Override
      public void onComplete(Throwable error) {
        if (error == null) {
          System.out.println("Processing new Request");
        } else {
          System.out.println("Exception in Request handling");
        }
      }
    });
    Response response = Response.ok("Success").build();
    // Carry on like nothing happened
    asyncResponse.resume(response);
  }
}

代码示例来源:origin: documents4j/documents4j

public static IInputStreamConsumer to(AsyncResponse asyncResponse, DocumentType targetType, long requestTimeout) {
  AsynchronousConversionResponse response = new AsynchronousConversionResponse(asyncResponse, targetType);
  asyncResponse.setTimeoutHandler(response);
  asyncResponse.setTimeout(requestTimeout, TimeUnit.MILLISECONDS);
  return response;
}

代码示例来源:origin: stackoverflow.com

@Path("/poll")
@GET
public void poll(@Suspended final AsyncResponse asyncResponse)
    throws InterruptedException {
  asyncResponse.setTimeout(30, TimeUnit.SECONDS);
  this.asyncResponse = asyncResponse;
}

@POST
@Path("/printed")
public Response printCallback(String barcode) throws IOException {
  // ...

  this.asyncResponse.resume("MESSAGE");

  return Response.ok().build();
}

代码示例来源:origin: SmartDataAnalytics/jena-sparql-api

.register(new ConnectionCallback() {
  @Override
  public void onDisconnect(AsyncResponse disconnect) {
.register(new CompletionCallback() {
  @Override
  public void onComplete(Throwable t) {

相关文章