elasticsearch java客户端解析聚合json查询

tyky79it  于 5个月前  发布在  ElasticSearch
关注(0)|答案(3)|浏览(66)

我正在使用spring-data-elasticsearch,我只是想知道是否有可能以json字符串的形式发送聚合查询。
所以查询看起来像这样:

{
  "size": 0,
  "aggs": {
    "cspByState": {
      "terms": {
        "field": "properties.csp.keyword"
      },
      "aggregations": {
        "levelType": {
          "terms": {
            "field": "level.keyword"
          }
        }
      }
    }
  },
  "query": {
    "bool": {
      "must": [{
          "bool": {
            "should": [{
                "wildcard": {
                  "event": {
                    "value": "input.start.*"
                  }
                }
              },
              {
                "wildcard": {
                  "event": {
                    "value": "input.finish.*"
                  }
                }
              }
            ]
          }
        }
      ],
      "filter": [
        {
          "range": {
            "dateTime": {
              "gte": "2021-03-01T00:00Z",
              "lte": "2021-03-11T23:59:59.99Z",
              "format": "strict_date_optional_time"
            }
          }
        }
      ]
    }
  }
}

字符串
我试着做以下操作,但它总是在未知聚合上失败:

final WrapperQueryBuilder wrapper = new WrapperQueryBuilder(queryString);
final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(wrapper);
searchSourceBuilder.size(0);
final SearchRequest searchRequest = new SearchRequest("csp-index");
searchRequest.source(searchSourceBuilder);

Flux.from(elasticsearchTemplate.execute(client -> client.aggregate(searchRequest))).collectList().block();
Caused by: ElasticsearchException[Elasticsearch exception [type=named_object_not_found_exception, reason=[2:11] unknown field [aggs]]]
    at org.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:496)

的数据

ffvjumwh

ffvjumwh1#

以这些文字

final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(wrapper);
searchSourceBuilder.size(0);

字符串
您设置了搜索请求的query部分和size部分。您不能将整个搜索请求作为查询参数传入。
聚合必须设置为

searchSourceBuilder.aggregation(aggregationBuilder)


我不知道Elasticsearch是否提供了一个解析JSON字符串的AggregationBuilder,当我简短地查看AggregationBuilders类时,我没有看到一个。

vojdkbi0

vojdkbi02#

最后我用下面的代码完成了它。主要的事情是将typed_keys查询属性发送到弹性,然后从DefaultReactiveElasticsearchClient获取一些方法来进行响应解析。

@Service
@Log4j2
@RequiredArgsConstructor
public class ElasticSearchService {
  
  private final ReactiveElasticsearchOperations elasticsearchTemplate;
  private final DfElasticSearchProperties elasticSearchProperties;
  private final Mustache.Compiler compiler;
  
  public Flux<Aggregation> transactionStatistics(LocalDate date) {
    final Map<String, Object> parameters = new HashMap<>();
    parameters.put("dateFrom", date.atStartOfDay(ZoneOffset.UTC));
    parameters.put("dateTo", date.atTime(LocalTime.MAX).atZone(ZoneOffset.UTC));
    
    final String queryString = compiler.loadTemplate("transaction-statistics").execute(parameters);
    
    return aggregate(queryString);
  }
  
  private Flux<Aggregation> aggregate(String request) {
    return Flux.from(elasticsearchTemplate.execute(client -> sendRequest(client, request)));
  }
  
  private Flux<Aggregation> sendRequest(ReactiveElasticsearchClient client, String request) {
    return client.execute(c -> sendRequest(c, request))
             .flatMapMany(c -> c.body(BodyExtractors.toMono(byte[].class))
                                 .map(it -> new String(it, StandardCharsets.UTF_8))
                                 .flatMap(json -> getSearchResponseFromJson(json)))
             .flatMap(response -> Flux.fromIterable(response.getAggregations()));
  }
  
  private Mono<ClientResponse> sendRequest(WebClient client, String request) {
    return client.method(HttpMethod.GET).uri(
      builder -> builder.path("/").path(elasticSearchProperties.getCspEventsIndex()).path("/_search").queryParam(
        "typed_keys").build()).contentType(MediaType.APPLICATION_JSON).bodyValue(request).exchange();
  }
  
  private static Mono<SearchResponse> getSearchResponseFromJson(String jsonResponse) {
    try {
      XContentParser parser = createParser(jsonResponse);
      return Mono.just(SearchResponse.fromXContent(parser));
    } catch (Exception e) {
      return Mono.error(new ElasticsearchException(String.format("Unable to parse response [%s]", jsonResponse), e));
    }
  }
  
  private static XContentParser createParser(String content) throws IOException {
    return XContentType.fromMediaTypeOrFormat(MediaType.APPLICATION_JSON_VALUE) //
      .xContent() //
      .createParser(new NamedXContentRegistry(getDefaultNamedXContents()),
        DeprecationHandler.THROW_UNSUPPORTED_OPERATION, content);
  }
}

字符串

ndasle7k

ndasle7k3#

仍然不可能使用Elasticsearch Java API Client 8以JSON形式发送聚合。我在代码中完成了聚合的创建,并仅使用JSON形式的查询:

NativeQuery query = NativeQuery.builder()
        // rewrite JSON aggregation into code
        .withAggregation("xxx",
            new Aggregation.Builder().terms(tb -> tb.field("field"))
                .build())
        .withQuery(new StringQuery(queryString))
        .withMaxResults(0)
        .build();

    Flux<Aggregation> aggregation = elasticsearchTemplate
        .aggregate(query, Object.class, IndexCoordinates.of("index-name"))
        .cast(ElasticsearchAggregation.class)
        .map(ElasticsearchAggregation::aggregation);

字符串
这个API有点奇怪,因为几乎在任何地方都可以使用构建器函数(例如Function<TermsAggregation.Builder, ObjectBuilder<TermsAggregation>> fn)。但是withAggregation只需要Aggregation而不是Function<Aggregation.Builder, Aggregation.Builder>,所以你必须调用build()

相关问题