apache httpclient-内存不足问题

vjhs03f7  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(372)

我有一个从apachekafka读取消息并将数据发送到rest端点的处理器。
服务器只有4个内核和4 gb ram,其中最大2gb分配给java进程
消息以4k/秒的速率生成和使用。
运行几分钟后,程序的内存就用完了。
异步调用httprest端点而不等待响应的最佳方法是什么
如何管理httpclient连接?我的印象是,我需要启动客户端,而不是关闭它,这样我就可以重用连接
你看到下面的代码有什么问题吗
公共类someprocesor实现bprocessor{

private ThreadPoolExecutor exec = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);
private CompletionService<Boolean> pool = new ExecutorCompletionService<Boolean>(exec);
CloseableHttpAsyncClient httpclient = null ; 

@Override
public void begin() {
    httpclient = HttpAsyncClients.createDefault();
    RequestConfig requestConfig = RequestConfig.custom().setConnectionRequestTimeout(5000).setConnectTimeout(5000).setSocketTimeout(5000).build();
    HttpAsyncClients.custom().setDefaultRequestConfig(requestConfig).build();
    // Start the client
    httpclient.start();

}

@Override
public void process(MessageAndMetadata<?, ?> mMData, List<Map> events) {

    List<Map<String, Object>> listMap = new ArrayList<>();  

    // loop and extract the data from events into the above List
    //..
    //..

    // submit to seperate thread to post to HTTP
    pool.submit(new HttpThread(listMap);
}

private class HttpThread implements Callable<Boolean> {
    List<Map<String, Object>> listMap = null;
    public HttpThread(List<Map<String, Object>> listMap) {
        this.listMap = listMap;
    }
    @Override
    public Boolean call() throws Exception {
        return postToHttp(listMap);
    }
}

private Boolean postToHttp(List<Map<String, Object>> listMap) throws UnsupportedEncodingException {
    for (Map<String, Object> map : listMap) {

        try {
            HttpPost postRequest = new HttpPost("https://myserver:8080/services/collector");
            postRequest.addHeader(HttpHeaders.ACCEPT, "application/json");
            postRequest.addHeader(HttpHeaders.CONTENT_TYPE, "application/json");
            postRequest.addHeader(HttpHeaders.CONNECTION, "keep-alive");

            StringEntity input = new StringEntity(methodToConvertMapToJSON(map));
            input.setContentType("application/json");
            postRequest.setEntity(input);

            httpclient.execute(postRequest, null);

        } catch (Exception e) {
            return false;
        } catch (Throwable th) {
            return false;
        }
    }
    return true;
}

}

wb1gzix0

wb1gzix01#

需要使用http响应或释放连接,否则连接将消耗资源。改变

httpclient.execute(postRequest, null);

HttpResponse response = httpclient.execute(postRequest, null).get();
if(response.getStatusLine().getStatusCode() != 200) {
// do something
}
// release the connection, better add to a finally clause
postRequest.releaseConnection();

相关问题