elasticsearch在存储日志时抛出503错误

nnsrf1az  于 2021-06-10  发布在  ElasticSearch
关注(0)|答案(0)|浏览(645)

从kafka读取日志消息并从我的reactive spring应用程序存储到elastic search中,得到以下两个错误:

1. reactor.retry.RetryExhaustedException: 
org.springframework.web.client.HttpServerErrorException: 
503 POST request to /hotels-transactional-logs-local-2020.11.17/_doc 
returned error code 503.

2. reactor.retry.RetryExhaustedException: 
io.netty.handler.timeout.ReadTimeoutException

我的索引设置:

{
        "logs-dev-2020.11.17": {
            "settings": {
            "index": {
                "highlight": {
                    "max_analyzed_offset": "5000000"
                },
                "number_of_shards": "3",
                "provided_name": "logs-dev-2020.11.17",
                "creation_date": "1604558592095",
                "number_of_replicas": "2",
                "uuid": "wjIOSfZOSLyBFTt1cT-whQ",
                "version": {
                "created": "7020199"
                }
            }
        }
    }
}

在spring webclient中保存方法:

public Mono<Log> saveDataIntoIndex(Log esLog, String index) {
    return reactiveElasticsearchOperations.save(esLog, index, "_doc")
        .publishOn(Schedulers.newElastic("es", 5))
        .retryWhen(
                Retry.anyOf(Exception.class)
                        .randomBackoff(Duration.ofSeconds(5), Duration.ofMinutes(1))
                        .retryMax(2)
        );
}

reactiveelasticsearchoperations创建代码:

@Bean
public ReactiveElasticsearchClient reactiveElasticsearchClient() {
    ClientConfiguration clientConfiguration = ClientConfiguration.builder()
            .connectedTo(elasticSearchEndpoints.split(","))
            .withWebClientConfigurer(webClient -> {
                ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
                        .codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(-1))
                        .build();
                return webClient.mutate().exchangeStrategies(exchangeStrategies).build();
            })
            .build();

    return ReactiveRestClients.create(clientConfiguration);
}

@Bean
public ElasticsearchConverter elasticsearchConverter() {
    return new MappingElasticsearchConverter(elasticsearchMappingContext());
}

@Bean
public SimpleElasticsearchMappingContext elasticsearchMappingContext() {
    return new SimpleElasticsearchMappingContext();
}

@Bean
public ReactiveElasticsearchOperations reactiveElasticsearchOperations() {
    return new ReactiveElasticsearchTemplate(reactiveElasticsearchClient(), elasticsearchConverter());
}

其他信息:
ElasticSearch7.2.1版
集群健康状况良好,它们是集群中的3个节点
索引将每天创建,每个索引有3个碎片
它是在进行负荷试验时发生的。

How do we control this? 
Somewhere I need to slow down the read and write? If so, how can I do?

我已尝试增加读取超时秒数,但问题仍然存在
带锁定超时(持续时间秒(100))

@Bean
public ReactiveElasticsearchClient reactiveElasticsearchClient() {
    ClientConfiguration clientConfiguration = ClientConfiguration.builder()
            .connectedTo(elasticSearchEndpoints.split(","))
            .withSocketTimeout(Duration.ofSeconds(100)) //the read timeout, max amount of time the client should wait while receiving no data from the server and the response is incomplete
            .withWebClientConfigurer(webClient -> {
                ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
                        .codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(-1))
                        .build();
                return webClient.mutate().exchangeStrategies(exchangeStrategies).build();
            })
            .build();

    return ReactiveRestClients.create(clientConfiguration);
}

我浏览了很多网站。没有解决办法。请帮忙。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题