我有一个从apachekafka读取消息并将数据发送到rest端点的处理器。
服务器只有4个内核和4 gb ram,其中最大2gb分配给java进程
消息以4k/秒的速率生成和使用。
运行几分钟后,程序的内存就用完了。
异步调用httprest端点而不等待响应的最佳方法是什么
如何管理httpclient连接?我的印象是,我需要启动客户端,而不是关闭它,这样我就可以重用连接
你看到下面的代码有什么问题吗
公共类someprocesor实现bprocessor{
private ThreadPoolExecutor exec = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);
private CompletionService<Boolean> pool = new ExecutorCompletionService<Boolean>(exec);
CloseableHttpAsyncClient httpclient = null ;
@Override
public void begin() {
httpclient = HttpAsyncClients.createDefault();
RequestConfig requestConfig = RequestConfig.custom().setConnectionRequestTimeout(5000).setConnectTimeout(5000).setSocketTimeout(5000).build();
HttpAsyncClients.custom().setDefaultRequestConfig(requestConfig).build();
// Start the client
httpclient.start();
}
@Override
public void process(MessageAndMetadata<?, ?> mMData, List<Map> events) {
List<Map<String, Object>> listMap = new ArrayList<>();
// loop and extract the data from events into the above List
//..
//..
// submit to seperate thread to post to HTTP
pool.submit(new HttpThread(listMap);
}
private class HttpThread implements Callable<Boolean> {
List<Map<String, Object>> listMap = null;
public HttpThread(List<Map<String, Object>> listMap) {
this.listMap = listMap;
}
@Override
public Boolean call() throws Exception {
return postToHttp(listMap);
}
}
private Boolean postToHttp(List<Map<String, Object>> listMap) throws UnsupportedEncodingException {
for (Map<String, Object> map : listMap) {
try {
HttpPost postRequest = new HttpPost("https://myserver:8080/services/collector");
postRequest.addHeader(HttpHeaders.ACCEPT, "application/json");
postRequest.addHeader(HttpHeaders.CONTENT_TYPE, "application/json");
postRequest.addHeader(HttpHeaders.CONNECTION, "keep-alive");
StringEntity input = new StringEntity(methodToConvertMapToJSON(map));
input.setContentType("application/json");
postRequest.setEntity(input);
httpclient.execute(postRequest, null);
} catch (Exception e) {
return false;
} catch (Throwable th) {
return false;
}
}
return true;
}
}
1条答案
按热度按时间wb1gzix01#
需要使用http响应或释放连接,否则连接将消耗资源。改变
到