从1.0版本开始ElasticSearch
, 新的聚合 API http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-aggregations.html允许按多个字段分组,使用子聚合。假设你想按字段分组field1
, field2
and field3
:
{
"aggs": {
"agg1": {
"terms": {
"field": "field1"
},
"aggs": {
"agg2": {
"terms": {
"field": "field2"
},
"aggs": {
"agg3": {
"terms": {
"field": "field3"
}
}
}
}
}
}
}
}
当然,这可以适用于您想要的任意多个领域。
Update:
为了完整起见,以下是上述查询的输出。下面还有用于生成聚合查询并将结果展平到字典列表中的 python 代码。
{
"aggregations": {
"agg1": {
"buckets": [{
"doc_count": <count>,
"key": <value of field1>,
"agg2": {
"buckets": [{
"doc_count": <count>,
"key": <value of field2>,
"agg3": {
"buckets": [{
"doc_count": <count>,
"key": <value of field3>
},
{
"doc_count": <count>,
"key": <value of field3>
}, ...
]
},
{
"doc_count": <count>,
"key": <value of field2>,
"agg3": {
"buckets": [{
"doc_count": <count>,
"key": <value of field3>
},
{
"doc_count": <count>,
"key": <value of field3>
}, ...
]
}, ...
]
},
{
"doc_count": <count>,
"key": <value of field1>,
"agg2": {
"buckets": [{
"doc_count": <count>,
"key": <value of field2>,
"agg3": {
"buckets": [{
"doc_count": <count>,
"key": <value of field3>
},
{
"doc_count": <count>,
"key": <value of field3>
}, ...
]
},
{
"doc_count": <count>,
"key": <value of field2>,
"agg3": {
"buckets": [{
"doc_count": <count>,
"key": <value of field3>
},
{
"doc_count": <count>,
"key": <value of field3>
}, ...
]
}, ...
]
}, ...
]
}
}
}
以下 python 代码根据给定的字段列表执行分组。我你指定include_missing=True
,它还包括一些缺少某些字段的值的组合(如果您有 Elasticsearch 2.0 版本,则不需要它,这要归功于this https://github.com/elastic/elasticsearch/pull/11042)
def group_by(es, fields, include_missing):
current_level_terms = {'terms': {'field': fields[0]}}
agg_spec = {fields[0]: current_level_terms}
if include_missing:
current_level_missing = {'missing': {'field': fields[0]}}
agg_spec[fields[0] + '_missing'] = current_level_missing
for field in fields[1:]:
next_level_terms = {'terms': {'field': field}}
current_level_terms['aggs'] = {
field: next_level_terms,
}
if include_missing:
next_level_missing = {'missing': {'field': field}}
current_level_terms['aggs'][field + '_missing'] = next_level_missing
current_level_missing['aggs'] = {
field: next_level_terms,
field + '_missing': next_level_missing,
}
current_level_missing = next_level_missing
current_level_terms = next_level_terms
agg_result = es.search(body={'aggs': agg_spec})['aggregations']
return get_docs_from_agg_result(agg_result, fields, include_missing)
def get_docs_from_agg_result(agg_result, fields, include_missing):
current_field = fields[0]
buckets = agg_result[current_field]['buckets']
if include_missing:
buckets.append(agg_result[(current_field + '_missing')])
if len(fields) == 1:
return [
{
current_field: bucket.get('key'),
'doc_count': bucket['doc_count'],
}
for bucket in buckets if bucket['doc_count'] > 0
]
result = []
for bucket in buckets:
records = get_docs_from_agg_result(bucket, fields[1:], include_missing)
value = bucket.get('key')
for record in records:
record[current_field] = value
result.extend(records)
return result