无法从angular 10应用程序接收jersey服务器发送的事件

j9per5c4  于 2021-10-10  发布在  Java
关注(0)|答案(1)|浏览(283)

我想使用服务器端事件(sse)来跟踪服务器上运行的批处理过程。
客户端是一个angular 10应用程序,向iis 10 web服务器发送请求,该服务器使用arr将请求转发给tomcat 9,我使用jersey 2.32管理sse请求。
由于需要身份验证,我还使用sse.js来添加授权头。
发送请求很好。它们由jersey接收,我可以在tomcat控制台中看到消息被正确发送(参见下面的java代码)。
不幸的是,我从未在angular客户端应用程序中收到任何消息,我使用浏览器控制台检查的唯一一件事是设置了事件源。
sse get请求处于挂起状态,从未收到响应。
我在谷歌上搜索了很多,想知道我的代码中有什么错误,或者我需要在iis、arr或tomcat中配置什么,但是我被绊住了,所以非常感谢您的帮助。
以下是angular应用程序中使用的代码。
监测服务:

export class BatchProcessMonitoringService {

  constructor(private zone: NgZone, private sseService: ServerSideEventsService) { }

  public getMonitoringData(url: string, authorizationString: string, formData: FormData): Observable<string> {
    return this.getServerSentEvent(url, authorizationString, formData);
  }

  // get event source (SSE)
  private getServerSentEvent(url: string, authorizationString: string, data: FormData): Observable<string> {

    return new Observable((observer: Observer<any>) => {
      const eventSource = this.sseService.getEventSourceWithGet(url, authorizationString, data);
      console.log('batch process event source set');
      console.log(eventSource);
      eventSource.stream();
      eventSource.onmessage = (event: any) => {
        this.zone.run(() => {
          console.log('batch process event received in angular');
          console.log(event);
          observer.next(event);
        });
      };
      eventSource.onerror = (error: any) => {
        this.zone.run(() => {
          console.log('batch process event error in angular');
          console.log(error);
          observer.error(error);
        });
      };
    });
  }

  public closeConnection(): void {
    this.sseService.closeEventSource();
  }

}

sse服务:

export class ServerSideEventsService {
  eventSource: SSE;

  constructor(
    private sseService: ServerSideEventsService
  ) { }

  // create an event source of POST request
  public getEventSourceWithPost(url: string, authorizationString: string, formData: FormData): SSE {
    return this.buildEventSource(url, authorizationString, 'POST', formData);
  }

  // create an event source of GET request
  public getEventSourceWithGet(url: string, authorizationString: string, formData: FormData): SSE {
    return this.buildEventSource(url, authorizationString, 'GET', formData);
  }

  // build the event source
  private buildEventSource(url: string, authorizationString: string, meth: string, formData: FormData): SSE {
    const options = this.buildOptions(meth, authorizationString, formData);
    this.eventSource = new SSE(url, options);
    console.log('sse service this.eventSource');
    console.log(this.eventSource);
    // add listener
    this.eventSource.addEventListener('message', (e: any) => {
      console.log('sse service message received');
      console.log(e);
      return e.data;
    });
    return this.eventSource;
  }

  // build query options
  private buildOptions(
    meth: string,
    authorizationString: string,
    formData: FormData
  ): {
    payload: FormData;
    method: string;
    headers: { Authorization: string };
  } {
    const headerDict = {
      'Content-Type': 'application/json',
      Authorization: authorizationString
    };
    return {
      payload: formData,
      method: meth,
      headers: headerDict
    };
  }

  public closeEventSource(): void {
    if (!!this.eventSource) {
      this.eventSource.close();
    }
  }

下面是java get方法的代码,名为:

@GET
    @Path("/{userId_timestamp}")
    @Produces(SseFeature.SERVER_SENT_EVENTS)
    public void getUserImportProcessEvent(
            @Context SseEventSink eventSink,
            @Context Sse sse,
            @PathParam("userId_timestamp") String userId_timestamp) {
        final EventOutput eventOutput = new EventOutput();
        new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    // ... code that waits 1 second
                    final OutboundEvent.Builder eventBuilder = new OutboundEvent.Builder();
                    eventBuilder.name("message");
                    eventBuilder.data(String.class, "Hello world " + i + "!");
                    final OutboundEvent event = eventBuilder.build();
                    eventOutput.write(event);
                    System.out.println("write event: " + i);
                }
            } catch (IOException e) {
                throw new RuntimeException("Error when writing the event.", e);
            } finally {
                try {
                    eventOutput.close();
                } catch (IOException ioClose) {
                    throw new RuntimeException("Error when closing the event output.", ioClose);
                }
            }
        }).start();
        return;
    }
fdx2calv

fdx2calv1#

固定的。我的jersey java代码不正确,下面是正确的代码。

// Server Side Events with User Import process monitoring data
    @GET
    @Path("/{userId_timestamp}")
    @Produces(SseFeature.SERVER_SENT_EVENTS)
    public EventOutput getUserImportProcessEvent(
            @PathParam("userId_timestamp") String userId_timestamp) {
        final EventOutput eventOutput = new EventOutput();
        new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    // ... code that waits 1 second
                    final OutboundEvent.Builder eventBuilder = new OutboundEvent.Builder();
                    eventBuilder.name("message");
                    eventBuilder.data(String.class, "Hello world " + i + "!");
                    final OutboundEvent event = eventBuilder.build();
                    eventOutput.write(event);
                    System.out.println("write event: " + i);
                }
            } catch (IOException e) {
                throw new RuntimeException("Error when writing the event.", e);
            } finally {
                try {
                    eventOutput.close();
                } catch (IOException ioClose) {
                    throw new RuntimeException("Error when closing the event output.", ioClose);
                }
            }
        }).start();
        return eventOutput;
    }

相关问题