我正在为一个应用程序编码,该应用程序形成了大约4000个连接并发送数据。但不知何故,它消耗了超过700 GB的内存范围。优化的理想方法是什么?任何垃圾收集器的建议可以帮助?甚至强制GC对每个消息发送都没有帮助。
目标是测试大规模使用Web套接字的持久长连接。
import java.io.IOException;
import java.util.HashMap;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.websocket.Session;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.adapter.NativeWebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import n1wsperf.server.ServerTelemetry.ServerMetricType;
@Component
public class TextSocketHandler extends TextWebSocketHandler {
private volatile static HashMap<String, WebSocketSession> activeSessions = new HashMap<>() ;
private static final int maxSizeInKB = 61440; // 60MB or 480Mb
private static final TextMessage _data = DataService.getData();
private static String modeStr = "Request-Response";
public static void Initialize() {
Timer timer = new Timer("ActiveSessionTimer");
TimerTask task = new TimerTask() {
@Override
public void run() {
ServerTelemetry.TrackMetric(ServerMetricType.ACTIVESESSION, activeSessions.size());
System.out.println("Active Connections : " + activeSessions.size());
}
};
CompletableFuture.runAsync( () -> { timer.schedule(task, 1000, 60000L); });
}
public TextSocketHandler() {
modeStr = ServerConfig.getServerRunConfig().DuplexMode ? "Duplex" : "Request-Response";
}
@Async
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
try {
session.setTextMessageSizeLimit(maxSizeInKB * 1024); // 60MB or 480Mb
String sessionId = session.getId();
if (session instanceof NativeWebSocketSession) {
final Session nativeSession = ((NativeWebSocketSession)session).getNativeSession(Session.class);
if (nativeSession != null ) {
nativeSession.getUserProperties()
.put("org.apache.tomcat.websocket.BLOCKING_SEND_TIMEOUT", 60_000L);
}
}
session.sendMessage(new TextMessage(sessionId));
activeSessions.putIfAbsent(session.getId(), session);
ServerTelemetry.TrackMetric(ServerMetricType.NEWSESSION);
TraceLogger.Info("SocketHandler", String.format("Session(new): %s: Mode: %s: New Connection established with client %s", session.getId(), modeStr,session.getRemoteAddress()), true);
Run run = ServerConfig.getServerRunConfig();
if (run.DuplexMode){
startDuplexCommunicationAsync(sessionId, run); // non-blocking
}
TraceLogger.Info("SocketHandler", String.format("Session (Verbose: %s): %s: Mode: %s Communication is in progress with client %s", run.Verbose, session.getId(), modeStr,session.getRemoteAddress()), true);
} catch (Exception e) {
ServerTelemetry.TrackMetric(ServerMetricType.FAILEDSESSION);
ServerTelemetry.TrackException(e);
TraceLogger.Error("SocketHandler", String.format("Session: %s Mode: %s Exception: %s", session.getId(), modeStr,e.getMessage()));
} finally {
System.gc();
}
}
@Async
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
try{
var sessionId = session.getId();
var clientAdr = session.getRemoteAddress();
ServerTelemetry.TrackMetric(ServerMetricType.CLOSEDSESSION);
TraceLogger.Info("SocketHandler", String.format("Session: %s Mode: %s : Closed Connection with client %s", sessionId, modeStr,clientAdr), true);
if (activeSessions.keySet().contains(sessionId)){
activeSessions.remove(sessionId);
ServerTelemetry.TrackMetric(ServerMetricType.ACTIVESESSION, activeSessions.size());
}}
finally {
System.gc();
}
}
@Async
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
try {
String sessionId = session.getId();
WebSocketSession wsSession = activeSessions.get(sessionId);
long start = System.nanoTime();
//ServerTelemetry.TrackMetric(ServerMetricType.REQUEST);
String payload = message.getPayload();
//ServerTelemetry.TrackMetric(ServerMetricType.REQUESTSIZE, payload.length()); // Bytes
TraceLogger.Info("SocketHandler", String.format("Session: %s: Mode: %s: Received %s bytes from client %s", sessionId, modeStr,payload.length(), wsSession.getRemoteAddress()));
if (!ServerConfig.getServerRunConfig().DuplexMode) {
CompletableFuture.runAsync( () -> {
try {
sendMessage(wsSession, start);
} catch (IOException | InterruptedException e) {
TraceLogger.Error("SocketHandler", String.format("Session: %s Mode: %s Exception: %s", sessionId, modeStr,e.getMessage()));
}
}).get();
}
long end = System.nanoTime();
double reqDurationms = TimeUnit.NANOSECONDS.toMillis(end - start);
// ServerTelemetry.TrackMetric(ServerMetricType.ACTIVESESSION, activeSessions.size());
// ServerTelemetry.TrackMetric(ServerMetricType.DURATION, reqDurationms);
} catch (Exception e) {
ServerTelemetry.TrackMetric(ServerMetricType.FAILEDREQUEST);
ServerTelemetry.TrackException(e);
TraceLogger.Error("SocketHandler", String.format("Session: %s Mode: %s Exception: %s", session.getId(), modeStr,e.getMessage()));
} finally {
System.gc();
}
}
@Async
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
try{
WebSocketSession wsSession = activeSessions.get(session.getId());
ServerTelemetry.TrackMetric(ServerMetricType.TRANSPORTERROR);
TraceLogger.Error("SocketHandler", String.format("Session: %s: Mode: %s Error: %s", wsSession.getId(), modeStr,exception.getMessage()));
if (activeSessions.keySet().contains(wsSession.getId())){
activeSessions.remove(wsSession.getId());
ServerTelemetry.TrackMetric(ServerMetricType.ACTIVESESSION, activeSessions.size());
}}
finally {
System.gc();
}
}
private void startDuplexCommunicationAsync(String sessionId, Run run) throws IOException, InterruptedException {
CompletableFuture.runAsync(() -> {
long sessionStart = System.nanoTime();
WebSocketSession wsSession = null;
for (long current = System.nanoTime(); current < sessionStart + TimeUnit.SECONDS.toNanos(run.KeepAlive_Sec); ){
try {
wsSession = activeSessions.get(sessionId);
sendMessage(wsSession, System.nanoTime());
current = System.nanoTime();
} catch (IOException | InterruptedException e) {
TraceLogger.Error("SocketHandler", String.format("Session: %s Mode: %s Exception: %s", sessionId, modeStr,e.getMessage()));
}
}
if (wsSession != null && wsSession.isOpen()){
try {
wsSession.close(CloseStatus.NORMAL);
TraceLogger.Info("SocketHandler", String.format("Session: %s Mode: %s: Closing session as the configured keepalive duration (%s seconds) has elapsed, sent session close to client %s", sessionId, modeStr,run.KeepAlive_Sec, wsSession.getRemoteAddress()), true);
} catch (IOException e) {
TraceLogger.Error("SocketHandler", String.format("Session: %s Mode: %s Exception: %s", sessionId, modeStr,e.getMessage()));
}
}
});
}
private void sendMessage(WebSocketSession session, long start) throws IOException, InterruptedException {
if (!session.isOpen()) {
throw new IOException("Session is not in open state");
}
var runCfg = ServerConfig.getServerRunConfig();
try {
session.sendMessage(_data);
//ServerTelemetry.TrackMetric(ServerMetricType.RESPONSESIZE, _data.getPayloadLength()); // Bytes
TraceLogger.Info("SocketHandler", String.format("Session: %s: Mode: %s: %s bytes sent to client %s", session.getId(), modeStr, _data.getPayloadLength(), session.getRemoteAddress()));
long end = System.nanoTime();
long remainMillis = runCfg.RequestInterval_ms - TimeUnit.NANOSECONDS.toMillis(end - start);
if (remainMillis > 0) {
Thread.sleep(remainMillis);
}
} catch (IOException e) {
TraceLogger.Error("SocketHandler", String.format("Session: %s: Mode: %s: Exception: %s", session.getId(), modeStr,e.getMessage()));
} finally {
System.gc();
}
}
}
字符串
1.各种GC,并行50%的GC时间,仍在执行,可用内存变为零
1.删除日志记录
1条答案
按热度按时间oyjwcjzk1#
通过快速概述您的TextSocketPath代码,我可以强调几点,这可能有助于优化内存使用并避免潜在的内存泄漏:
要优化内存使用并避免潜在问题:
分析负载下的应用程序以识别内存热点和导致高内存消耗的区域非常重要。Java Flight Recorder、VisualVM或分析器等工具可以帮助识别WebSocket代码中的内存泄漏或低效内存使用模式。