Tomcat中的Java Web套接字导致非常高的内存使用和OOO

gdx19jrr  于 5个月前  发布在  Java
关注(0)|答案(1)|浏览(45)

我正在为一个应用程序编码,该应用程序形成了大约4000个连接并发送数据。但不知何故,它消耗了超过700 GB的内存范围。优化的理想方法是什么?任何垃圾收集器的建议可以帮助?甚至强制GC对每个消息发送都没有帮助。
目标是测试大规模使用Web套接字的持久长连接。

import java.io.IOException;
import java.util.HashMap;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import javax.websocket.Session;

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.adapter.NativeWebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;

import n1wsperf.server.ServerTelemetry.ServerMetricType;

@Component
public class TextSocketHandler extends TextWebSocketHandler {

    private volatile static HashMap<String, WebSocketSession> activeSessions = new HashMap<>() ;

    private static final int maxSizeInKB = 61440; // 60MB or 480Mb

    private static final TextMessage _data = DataService.getData();

    private static  String modeStr = "Request-Response";

    public static void Initialize() {
        Timer timer = new Timer("ActiveSessionTimer");
        TimerTask task = new TimerTask() {
            @Override
            public void run() {
                ServerTelemetry.TrackMetric(ServerMetricType.ACTIVESESSION, activeSessions.size());
                System.out.println("Active Connections : " + activeSessions.size());
            }
        };
        CompletableFuture.runAsync( () -> { timer.schedule(task, 1000, 60000L); });
    }

    public TextSocketHandler() {
        modeStr = ServerConfig.getServerRunConfig().DuplexMode ? "Duplex" : "Request-Response";
    }

    @Async
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        try {
            session.setTextMessageSizeLimit(maxSizeInKB * 1024); // 60MB or 480Mb
            String sessionId = session.getId();
            if (session instanceof NativeWebSocketSession) {
                final Session nativeSession = ((NativeWebSocketSession)session).getNativeSession(Session.class); 
                if (nativeSession != null ) {
                    nativeSession.getUserProperties()
                         .put("org.apache.tomcat.websocket.BLOCKING_SEND_TIMEOUT", 60_000L);
                }
            }
            session.sendMessage(new TextMessage(sessionId));
            activeSessions.putIfAbsent(session.getId(), session);
            ServerTelemetry.TrackMetric(ServerMetricType.NEWSESSION);
            TraceLogger.Info("SocketHandler", String.format("Session(new): %s: Mode: %s: New Connection established with client %s", session.getId(), modeStr,session.getRemoteAddress()), true);

            Run run = ServerConfig.getServerRunConfig();
            if (run.DuplexMode){
                startDuplexCommunicationAsync(sessionId, run);          // non-blocking
            }
            TraceLogger.Info("SocketHandler", String.format("Session (Verbose: %s): %s: Mode: %s Communication is in progress with client %s", run.Verbose, session.getId(), modeStr,session.getRemoteAddress()), true);

        } catch (Exception e) {
            ServerTelemetry.TrackMetric(ServerMetricType.FAILEDSESSION);
            ServerTelemetry.TrackException(e);
            TraceLogger.Error("SocketHandler", String.format("Session: %s Mode: %s Exception: %s", session.getId(), modeStr,e.getMessage()));
        } finally {
            System.gc();
        }
    }
    
    @Async
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        try{
        var sessionId = session.getId();
        var clientAdr = session.getRemoteAddress();

        ServerTelemetry.TrackMetric(ServerMetricType.CLOSEDSESSION);
        TraceLogger.Info("SocketHandler", String.format("Session: %s Mode: %s : Closed Connection with client %s", sessionId, modeStr,clientAdr), true);
        if (activeSessions.keySet().contains(sessionId)){
            activeSessions.remove(sessionId);
            ServerTelemetry.TrackMetric(ServerMetricType.ACTIVESESSION, activeSessions.size());
        }}
        finally {
            System.gc();
        }
    }

    @Async
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        try {
            String sessionId = session.getId();
            WebSocketSession wsSession = activeSessions.get(sessionId);

            long start = System.nanoTime();
            //ServerTelemetry.TrackMetric(ServerMetricType.REQUEST);
            String payload = message.getPayload();
            //ServerTelemetry.TrackMetric(ServerMetricType.REQUESTSIZE, payload.length());  // Bytes
            TraceLogger.Info("SocketHandler", String.format("Session: %s: Mode: %s: Received %s bytes from client %s", sessionId, modeStr,payload.length(), wsSession.getRemoteAddress()));
            
            if (!ServerConfig.getServerRunConfig().DuplexMode) {
                CompletableFuture.runAsync( () -> {
                    try {
                        sendMessage(wsSession, start);
                    } catch (IOException | InterruptedException e) {
                        TraceLogger.Error("SocketHandler", String.format("Session: %s Mode: %s Exception: %s", sessionId, modeStr,e.getMessage()));
                    } 
                }).get();
            }

            long end = System.nanoTime();
            double reqDurationms = TimeUnit.NANOSECONDS.toMillis(end - start);

           // ServerTelemetry.TrackMetric(ServerMetricType.ACTIVESESSION, activeSessions.size());
           // ServerTelemetry.TrackMetric(ServerMetricType.DURATION, reqDurationms);

            
        } catch (Exception e) {
            ServerTelemetry.TrackMetric(ServerMetricType.FAILEDREQUEST);
            ServerTelemetry.TrackException(e);
            TraceLogger.Error("SocketHandler", String.format("Session: %s Mode: %s Exception: %s", session.getId(), modeStr,e.getMessage()));
        } finally {
            System.gc();
        }

    }
    
    @Async
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        try{
        WebSocketSession wsSession = activeSessions.get(session.getId());

        ServerTelemetry.TrackMetric(ServerMetricType.TRANSPORTERROR);
        TraceLogger.Error("SocketHandler", String.format("Session: %s: Mode: %s Error: %s", wsSession.getId(), modeStr,exception.getMessage()));
        if (activeSessions.keySet().contains(wsSession.getId())){
            activeSessions.remove(wsSession.getId());
            ServerTelemetry.TrackMetric(ServerMetricType.ACTIVESESSION, activeSessions.size());
        }}
        finally {
            System.gc();
        }
    }

    private void startDuplexCommunicationAsync(String sessionId, Run run) throws IOException, InterruptedException {
        CompletableFuture.runAsync(() -> {
            long sessionStart = System.nanoTime();
            WebSocketSession wsSession = null;
            for (long current = System.nanoTime(); current < sessionStart + TimeUnit.SECONDS.toNanos(run.KeepAlive_Sec);  ){
                try {
                    wsSession = activeSessions.get(sessionId);
                    sendMessage(wsSession, System.nanoTime());
                    current = System.nanoTime();
                } catch (IOException | InterruptedException e) {
                    TraceLogger.Error("SocketHandler", String.format("Session: %s Mode: %s Exception: %s", sessionId, modeStr,e.getMessage()));
                }
            }
            if (wsSession != null && wsSession.isOpen()){
                try {
                    wsSession.close(CloseStatus.NORMAL);
                    TraceLogger.Info("SocketHandler", String.format("Session: %s Mode: %s: Closing session as the configured keepalive duration (%s seconds) has elapsed, sent session close to client %s", sessionId, modeStr,run.KeepAlive_Sec, wsSession.getRemoteAddress()), true);
                } catch (IOException e) {
                    TraceLogger.Error("SocketHandler", String.format("Session: %s Mode: %s Exception: %s", sessionId, modeStr,e.getMessage()));
                }
            }
        });
        
    }

    private void sendMessage(WebSocketSession session, long start) throws IOException, InterruptedException {
        if (!session.isOpen()) {
            throw new IOException("Session is not in open state");
        }

        var runCfg = ServerConfig.getServerRunConfig();

        try {
            session.sendMessage(_data);
            
            //ServerTelemetry.TrackMetric(ServerMetricType.RESPONSESIZE, _data.getPayloadLength());  // Bytes
            TraceLogger.Info("SocketHandler", String.format("Session: %s: Mode: %s: %s bytes sent to client %s", session.getId(), modeStr, _data.getPayloadLength(), session.getRemoteAddress()));

            long end = System.nanoTime();
            long remainMillis = runCfg.RequestInterval_ms - TimeUnit.NANOSECONDS.toMillis(end - start);
            if (remainMillis > 0) {
                Thread.sleep(remainMillis);
            }

        } catch (IOException e) {
            TraceLogger.Error("SocketHandler", String.format("Session: %s: Mode: %s: Exception: %s", session.getId(), modeStr,e.getMessage()));
        } finally {
            System.gc();
        }

    }

}

字符串
1.各种GC,并行50%的GC时间,仍在执行,可用内存变为零
1.删除日志记录

oyjwcjzk

oyjwcjzk1#

通过快速概述您的TextSocketPath代码,我可以强调几点,这可能有助于优化内存使用并避免潜在的内存泄漏:

  • 活动会话的HashMap:如果在关闭时未正确删除会话,则使用activeSessions存储WebSocket会话可能会导致内存问题。请确保在关闭时从activeSessionsMap中删除会话,以防止内存泄漏。
  • 异步操作和错误处理:在WebSocket方法(afterConnectionEstablished、afterConnectionClosed、DataTextMessage、DataTransportError)中使用CompletableFuture.runAsync()时要小心。如果不进行有效管理,这些DataSocket操作可能会引入争用和潜在的内存开销。
  • 垃圾收集:在WebSocket方法中显式地删除System.gc()可能没有必要,而且可能会适得其反,因为Java的垃圾收集器旨在自动管理内存。
  • 资源处理:始终确保正确关闭资源(如WebSocket会话),并在不再需要它们时进行清理。这包括关闭会话、正确处理异常以及释放与会话关联的任何资源。
  • 数据处理和缓冲:考虑优化传入和传出消息(SMS TextMessage、sendMessage)的处理,以最大限度地减少内存使用。评估是否在内存中不必要地保留了大量有效负载或数据结构。
    要优化内存使用并避免潜在问题:
  • 验证关闭时会话是否正确地从activeSessions中删除。
  • 检查异步操作并确保正确的错误处理和资源清理。
  • 避免不必要的显式垃圾收集调用(System.gc())。
  • 评估如何在WebSocket方法中处理和处理数据,以最大限度地减少内存占用。

分析负载下的应用程序以识别内存热点和导致高内存消耗的区域非常重要。Java Flight Recorder、VisualVM或分析器等工具可以帮助识别WebSocket代码中的内存泄漏或低效内存使用模式。

相关问题