Elasticsearch Python脚本查询常用操作

导读:本篇文章讲解 Elasticsearch Python脚本查询常用操作,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

一、ES Python脚本查询操作  http方式

#!coding:utf-8

import json
import logging
import time

import requests

PAGE_RESULT_SCROLL_ID = 'scroll_id'
PAGE_RESULT_SCROLL_SIZE = 'scroll_size'
PAGE_RESULT_TOTAL_SIZE = 'total_size'
PAGE_RESULT_HITS = 'hits'
PAGE_RESULT_DATA = 'data'
PAGE_RESULT_CONVERT_DATA = 'convert_data'

CONVERT_DEST_KEY = 'dest_key'
CONVERT_DEFAULT_VALUE = 'default_value'

current_time = time.strftime("%Y-%m-%d-%H-%M", time.localtime(time.time()))
# 日志设置
log_file = "operate_es_" + current_time + ".log"
logging.FileHandler(filename=log_file, encoding='utf-8')
logging.basicConfig(filename=log_file, level=logging.INFO)


# 创建索引
def create_index(es_url, index_name, es_mapping):
    logging.info("es_url:%s index_name:%s " % (es_url, index_name))

    es_index_url = es_url + index_name
    logging.info("es_index_url: %s" % (es_index_url))
    r = requests.put(es_index_url, json.dumps(es_mapping), headers={"content-type": "application/json"})
    response_json = json.loads(r.text)

    if 200 == r.status_code:
        if response_json['acknowledged']:
            logging.info("index_name: %s 创建成功" % (index_name))
    else:
        logging.info("index_name: %s 创建失败" % (index_name))
        logging.error("错误信息: %s" % r.text.encode(encoding="utf-8"))
        return -1

    logging.info("index create done!")
    return 0


# 批量创建索引
def batch_create_index_with_mapping(es_url, es_index_prefix, es_mapping, batch_num=1, start_index=0):
    logging.info("es_url:" + es_url)
    new_index_array = []
    for i in range(start_index, batch_num):
        suffix = "%03d" % i
        index_name = es_index_prefix + suffix
        es_index_url = es_url + index_name
        logging.info("es_index_url: %s" % (es_index_url))
        r = requests.put(es_index_url, json.dumps(es_mapping), headers={"content-type": "application/json"})
        response_json = json.loads(r.text)

        if 200 == r.status_code:
            if response_json['acknowledged']:
                logging.info("index_name: %s 创建成功" % (index_name))
                new_index_array.append(index_name)

        else:
            logging.info("index_name: %s 创建失败" % (index_name))
            logging.error("错误信息: %s" % r.text.encode(encoding="utf-8"))
            break

    logging.info("index create done!")
    logging.info("new_index_array:%s" % (new_index_array))


# 删除索引
def delete_index(es_url, delete_index):
    delete_es_url = es_url + delete_index
    logging.info("es_url:%s delete_index:%s start..." % (delete_es_url, delete_index))

    r = requests.delete(delete_es_url, headers={"content-type": "application/json"})
    if 200 == r.status_code:
        response_json = json.loads(r.text)
        logging.info("delete数据返回响应: %s " % r.text.encode(encoding="utf-8"))
    else:
        logging.info("es_url:%s delete_index: %s 删除失败" % (es_url, delete_index))
        logging.error("错误信息: %s" % r.text.encode(encoding="utf-8"))
        return -1
    return 0


# 根据前缀删除索引
def del_index_by_prefix(es_url, index_prefix):
    delete_es_url = es_url + index_prefix + "*"
    logging.info("es_url:%s  start..." % (delete_es_url))
    r = requests.delete(delete_es_url, headers={"content-type": "application/json"})
    if 200 == r.status_code:
        response_json = json.loads(r.text)
        logging.info("delete数据返回响应: %s " % r.text.encode(encoding="utf-8"))
    else:
        logging.info("delete_es_url:%s 删除失败" % (delete_es_url))
        logging.error("错误信息: %s" % r.text.encode(encoding="utf-8"))
        return -1
    return 0


# 批量前缀+分桶删除索引
def batch_es_del_index(es_url, index_prefix, bucket_num, start_index=0):
    logging.info("es_url:" + es_url)
    delete_index_array = []
    for i in range(start_index, bucket_num):
        suffix = "%03d" % i
        delete_index = index_prefix + suffix
        ret = es_delete_index(es_url, delete_index)
        if 0 == ret:
            delete_index_array.append(delete_index)
        else:
            logging.info("delete_index:%s 失败" % (delete_index))
            break

    logging.info("batch_es_del_index done!")
    logging.info("delete_index_array:%s" % (delete_index_array))


# 更新索引mapping
def add_properties_to_index(es_url, index_name, doc_type, es_mapping):
    logging.info("es_url:" + es_url)

    es_index_url = es_url + index_name + '/' + doc_type + '/_mapping'
    logging.info("es_index_url: %s" % (es_index_url))
    r = requests.post(es_index_url, json.dumps(es_mapping), headers={"content-type": "application/json"})
    response_json = json.loads(r.text)

    if 200 == r.status_code:
        if response_json['acknowledged']:
            logging.info("index_name: %s 更新成功" % (index_name))
    else:
        logging.info("index_name: %s 更新失败" % (index_name))
        logging.error("错误信息: %s" % r.text.encode(encoding="utf-8"))
        return -1

    logging.info("index modify done!")
    return 0


# 批量更新索引mapping
def batch_add_properties_to_index(es_url, index_prefix, doc_type, es_mapping, bucket_num, start_index=0):
    logging.info("es_url:" + es_url)
    new_index_array = []
    for i in range(start_index, bucket_num):
        suffix = "%03d" % i
        index_name = index_prefix + suffix
        es_index_url = es_url + index_name + '/' + doc_type + '/_mapping'
        logging.info("es_index_url: %s" % (es_index_url))
        r = requests.post(es_index_url, json.dumps(es_mapping), headers={"content-type": "application/json"})
        response_json = json.loads(r.text)

        if 200 == r.status_code:
            if response_json['acknowledged']:
                logging.info("index_name: %s 更新成功" % (index_name))
                new_index_array.append(index_name)

        else:
            logging.info("index_name: %s 更新失败" % (index_name))
            logging.error("错误信息: %s" % r.text.encode(encoding="utf-8"))
            logging.info("index modify done new_index_array:%s" % (new_index_array))
            return -1

    logging.info("index modify done!")
    logging.info("new_index_array:%s" % (new_index_array))
    return 0


# 备份索引内容(结构+数据)
def es_reindex_with_routing(source_index, bak_index, query, routing_filed):
    # 备份数据
    url = es_url + "_reindex"
    data = {
        "source": {
            "index": source_index,
            "query": query
        },
        "dest": {
            "index": bak_index
        },
        "script": {
            "inline": "ctx._routing = ctx._source." + routing_filed,
            "lang": "painless"
        }
    }
    logging.info("source_index:%s to bak_index: %s start..." % (source_index, bak_index))
    r = requests.post(url, json.dumps(data), headers={"content-type": "application/json"})
    if 200 == r.status_code:
        response_json = json.loads(r.text)
        logging.info("备份数据返回响应: %s " % r.text.encode(encoding="utf-8"))
        logging.info("source_index:%s to bak_index: %s 复制成功" % (source_index, bak_index))
        logging.info("备份索引: %s , 备份数据量: %d" % (bak_index, response_json['total']))
    else:
        logging.info("source_index:%s bak_index: %s 备份失败" % (source_index, bak_index))
        logging.error("错误信息: %s" % r.text.encode(encoding="utf-8"))
        return -1
    return 0


# 备份索引
def es_reindex(source_index, bak_index, query):
    # 备份数据
    url = es_url + "_reindex"
    data = {
        "source": {
            "index": source_index,
            "query": query
        },
        "dest": {
            "index": bak_index
        }
    }
    logging.info("source_index:%s to bak_index: %s start..." % (source_index, bak_index))
    r = requests.post(url, json.dumps(data), headers={"content-type": "application/json"})
    if 200 == r.status_code:
        response_json = json.loads(r.text)
        logging.info("备份数据返回响应: %s " % r.text.encode(encoding="utf-8"))
        logging.info("source_index:%s to bak_index: %s 复制成功" % (source_index, bak_index))
        logging.info("备份索引: %s , 备份数据量: %d" % (bak_index, response_json['total']))
    else:
        logging.info("source_index:%s bak_index: %s 备份失败" % (source_index, bak_index))
        logging.error("错误信息: %s" % r.text.encode(encoding="utf-8"))
        return -1
    return 0


# 批量备份
def batch_es_reindex(es_url, source_index_prefix, bak_index_prefix, queryParam, bucket_num, start_index=0):
    '''
    :param es_url:
    :param source_index_prefix:
    :param bak_index_prefix:
    :param queryParam:
    queryParam = {
            "query": {
                "bool": {
                    "must": [
                        {
                            "match_all": {}
                        }
                    ],
                    "must_not": [],
                    "should": []
                }
            }
        }
    :param bucket_num:
    :param start_index:
    :return:
    '''
    logging.info("es_url:" + es_url)
    new_index_array = []
    for i in range(start_index, bucket_num):
        suffix = "%03d" % i
        source_index = source_index_prefix + suffix
        bak_index = bak_index_prefix + suffix
        ret = es_reindex(source_index, bak_index, queryParam["query"])
        if 0 == ret:
            new_index_array.append(bak_index)
        else:
            logging.info("source_index:%s to bak_index: %s 复制失败" % (source_index, bak_index))
            logging.info("do new_index_array:%s over" % (new_index_array))
            return -1

    logging.info("batch_es_reindex done!")
    logging.info("new_index_array:%s" % (new_index_array))
    return 0


# 批量备份加路由指定
def batch_es_reindex_with_routing(es_url, source_index_prefix, bak_index_prefix, queryParam, routing_filed, bucket_num,
                                  start_index=0):
    '''
    :param es_url:
    :param source_index_prefix:
    :param bak_index_prefix:
    :param queryParam:
    queryParam = {
            "query": {
                "bool": {
                    "must": [
                        {
                            "match_all": {}
                        }
                    ],
                    "must_not": [],
                    "should": []
                }
            }
        }
    :param bucket_num:
    :param routing_filed:
    :param start_index:
    :return:
    '''
    logging.info("es_url:" + es_url)
    new_index_array = []
    for i in range(start_index, bucket_num):
        suffix = "%03d" % i
        source_index = source_index_prefix + suffix
        bak_index = bak_index_prefix + suffix
        ret = es_reindex_with_routing(source_index, bak_index, queryParam["query"], routing_filed)
        if 0 == ret:
            new_index_array.append(bak_index)
        else:
            logging.info("source_index:%s to bak_index: %s 复制失败" % (source_index, bak_index))
            logging.info("do new_index_array:%s over" % (new_index_array))
            return -1

    logging.info("batch_es_reindex done!")
    logging.info("new_index_array:%s" % (new_index_array))
    return 0


# 根据业务规则创建索引
def create_index_by_business_code_rel_type_dict(es_url, es_index_prefix, es_mapping,
                                                business_codes_rel_type_dict,
                                                do_reindex=False,
                                                source_index='',
                                                routing_field=''):
    '''
    :param es_url:
    :param es_index_prefix:
    :param es_mapping:
    :param business_codes_rel_type_dict:
    business_codes_rel_type_dict = {"003": ["fk_tl"],"004": ["bq_sb"],"005": [ "jd_zh", "jd_zz"]}
    :param do_reindex:
    :param source_index:
    :param routing_field:
    :return:
    '''
    logging.info("es_url:" + es_url)
    new_index_array = []
    business_codes = business_codes_rel_type_dict.keys()

    for business_code in business_codes:
        relation_types = business_codes_rel_type_dict.get(business_code)
        for relation_type in relation_types:
            index_name = es_index_prefix + business_code + "_" + relation_type
            es_index_url = es_url + index_name
            logging.info("es_index_url: %s" % (es_index_url))
            r = requests.put(es_index_url, json.dumps(es_mapping), headers={"content-type": "application/json"})
            response_json = json.loads(r.text)

            if 200 == r.status_code:
                if response_json['acknowledged']:
                    logging.info("index_name: %s 创建成功" % (index_name))
                    new_index_array.append(index_name)
                    if do_reindex:
                        result = es_reindex_by_rel_type(source_index, index_name, relation_type, routing_field)
                        if 0 != result:
                            logging.info("do new_index_array:%s over" % (new_index_array))
                            return -1
            else:
                logging.info("index_name: %s 创建失败" % (index_name))
                logging.error("错误信息: %s" % r.text.encode(encoding="utf-8"))
                logging.info("do new_index_array:%s over" % (new_index_array))
                return -1
    logging.info("index create done!")
    logging.info("new_index_array:%s" % (new_index_array))
    return 0


# 备份索引内容(结构+数据)
def es_reindex_by_rel_type(source_index, bak_index, rel_type, routing_field):
    # 备份数据
    url = es_url + "_reindex"
    data = {
        "source": {
            "index": source_index,
            "query": {
                "term": {
                    "rel_type": rel_type.swapcase()
                }
            }
        },
        "dest": {
            "index": bak_index

        },
        "script": {
            "inline": "ctx._routing = ctx._source." + routing_field,
            "lang": "painless"
        }

    }
    r = requests.post(url, json.dumps(data), headers={"content-type": "application/json"})
    if 200 == r.status_code:
        response_json = json.loads(r.text)
        logging.info("备份数据返回响应: %s " % r.text.encode(encoding="utf-8"))
        logging.info("备份索引: %s , 备份数据量: %d" % (bak_index, response_json['total']))
    else:
        logging.info("source_index:%s bak_index: %s 备份失败" % (source_index, bak_index))
        logging.error("错误信息: %s" % r.text.encode(encoding="utf-8"))
        return -1
    return 0


# 分页查询
def get_es_data_by_scroll(es_url, index, query={}, scroll_id=None, scroll="5m", batch_size=10000):
    data = []
    try:
        while True:
            if not scroll_id:
                # 每次取的数据量
                query["size"] = batch_size
                curl_url = es_url + index + '/_search?scroll=' + scroll
                logging.info("curl_url:%s" % (curl_url))
                response = requests.post(curl_url, json.dumps(query), headers={'content-type': 'application/json'})
            else:
                curl_url = es_url + '_search/scroll?scroll=' + scroll + '&scroll_id=' + scroll_id
                logging.info("curl_url:%s" % (curl_url))
                response = requests.get(curl_url)
            # 结果返回处理
            if response:
                if 200 == response.status_code:
                    response_json = json.loads(response.text)
                    scroll_id = response_json['_scroll_id']
                    # Update the scroll ID
                    if scroll_id is None:
                        break
                    # Get the number of results that we returned in the last scroll
                    if not response_json['hits']['hits']:
                        break
                    response_data = [doc["_source"] for doc in response_json['hits']['hits']]
                    data.extend(response_data)
                else:
                    logging.info("curl_url:%s 查询失败" % (curl_url))
                    logging.error("错误信息: %s" % response.text.encode(encoding="utf-8"))
                    return None
            else:
                logging.info("curl_url:%s 查询失败" % (curl_url))
                return None
        logging.info("get data size:%s" % (len(data)))
        return data
    except Exception as e:
        logging.error(e)
        logging.error("query fail!")
        print("exception!")
        return None


# 查询
def query(es_url, index, query, batch_size=1000, scroll="3m"):
    try:
        curl_url = es_url + index_name + '/_search?scroll=' + scroll
        # 每次取的数据量
        query["size"] = batch_size
        response = requests.post(curl_url, json.dumps(query), headers={'content-type': 'application/json'})
        if response:
            response_json = json.loads(response.text)
            return response_json
    except Exception as e:
        logging.error(e)
        logging.error("query fail!")
        print("exception!")


# 分页查询
def query_by_scroll(es_url, index, doc_type=None, query=None, scroll='5m', batch_size=1000):
    '''
    :param index:
    :param doc_type:
    :param query:
    queryParam = {
        "query": {
            "bool": {
                "must": [
                    {
                        "range": {
                            "import_es_time": {
                                "lt": "2019-07-31 00:00:00"
                            }
                        }
                    },
                    {
                        "term": {
                            "list_type": "01"
                        }
                    },
                    {
                        "term": {
                            "delete_status": "0"
                        }
                    }
                ],
                "must_not": [],
                "should": []
            }
        }
    }
    :param scroll:
    :param batch_size:
    :return:
    '''
    try:
        logging.info("query: index:%s doc_type:%s scroll:%s batch_size:%s query:%s" % (
            index, doc_type, scroll, batch_size, query))
        # 每次取的数据量
        query["size"] = batch_size
        if doc_type:
            curl_url = es_url + index + '/' + doc_type + '/_search?scroll=' + scroll
        else:
            curl_url = es_url + index + '/_search?scroll=' + scroll

        response = requests.post(curl_url, json.dumps(query), headers={'content-type': 'application/json'})

        if response:
            if 200 == response.status_code:
                response_json = json.loads(response.text)
                return response_json
            else:
                logging.info("curl_url:%s query: %s 失败" % (curl_url, query))
                logging.error("错误信息: %s" % response.text.encode(encoding="utf-8"))
                return None
    except Exception as e:
        logging.error(e)
        logging.error("query fail!")
        print("query_by_scroll exception!")
        return None


# 分页scroll_id查询
def query_by_scroll_id(es_url, index, scroll_id, scroll='5m'):
    if scroll_id is None:
        return
    try:
        curl_url = es_url + '_search/scroll?scroll=' + scroll + '&scroll_id=' + scroll_id
        logging.info("curl_url:%s" % (curl_url))
        response = requests.get(curl_url)
        # 结果返回处理
        if response:
            if 200 == response.status_code:
                response_json = json.loads(response.text)
                return response_json
            else:
                logging.info("curl_url:%s 查询失败" % (curl_url))
                logging.error("错误信息: %s" % response.text.encode(encoding="utf-8"))
                return None
        else:
            logging.info("curl_url:%s 查询失败" % (curl_url))
            return None
    except Exception as e:
        logging.error(e)
        logging.error("query fail! scroll_id:%s" % (scroll_id))
        print("query_by_scroll_id exception!")
        return None


# 分页获取解析数据
def get_and_parse_query_scroll_data(es_url, index, doc_type=None, query=None, scroll='5m', batch_size=1000,
                                    convert_dict={}, add_date_time=False):
    page = query_by_scroll(es_url, index, doc_type=doc_type, query=query, scroll=scroll)
    return convert_es_page_data(page, convert_dict, add_date_time)


# 解析返回数据
def parse_es_page_data(page):
    result_data = {}
    if not page or not page['_scroll_id']:
        logging.warning("query_by_scroll return none")
        print("query_by_scroll return none")
        return result_data
    if page['_scroll_id']:
        scroll_id = page['_scroll_id']
        result_data[PAGE_RESULT_SCROLL_ID] = scroll_id
        print("Scrolling scroll_id:%s" % (scroll_id))
    if page['hits']:
        total_size = page['hits']['total']
        print("Scrolling total_size:%s" % (total_size))
        result_data[PAGE_RESULT_TOTAL_SIZE] = total_size
        hits = page['hits']['hits']
        scroll_size = len(hits)
        result_data[PAGE_RESULT_SCROLL_SIZE] = scroll_size
        result_data[PAGE_RESULT_HITS] = hits
    return result_data


# 根据业务需要转换数据
def convert_es_page_data(page, convert_dict={}, add_date_time=False):
    '''
    :param page:
    :param convert_dict:
    convert_dict 示例
    {"key1": {"dest_key": ["key1","key2"], "default_value":""}}
    :param add_date_time:
    :return:
    '''
    result_data = parse_es_page_data(page)
    if result_data and result_data['hits']:
        result = result_data['hits']
    # parse data
    convert_data = []
    for item in result:
        if item['_source']:
            source_data = item['_source']
            convert_result = {}
            keys = convert_dict.keys()
            for source_key in keys:
                dest_dict = convert_dict.get(source_key, [])
                dst_keys = dest_dict.get(CONVERT_DEST_KEY, [])
                default_value = dest_dict.get(CONVERT_DEFAULT_VALUE, '')
                for dst_key in dst_keys:
                    convert_result[dst_key] = source_data.get(source_key, default_value)
            if add_date_time:
                date_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
                convert_result["date_time"] = date_time
            convert_str = json.dumps(convert_result, ensure_ascii=False)
            convert_data.append(convert_str.encode('utf-8'))
    result_data[PAGE_RESULT_CONVERT_DATA] = convert_data
    return result_data


def main():
    # ES服务器地址
    #  ##开发环境
    es_url = "http://192.168.3.63:9200/"

    #  ##测试环境
    # es_url = "http://192.168.3.206:9200/"
    #  ##预发环境
    # es_url = "http://100.1.1.1:9200/"
    #  ##线上环境
    # es_url = "http://10.1.1.1:9200/"

    BUCKET_MUM = 2
    INDEX_NAME = 'zyc_test'
    DOC_TYPE = 'relation'
    BAK_INDEX_NAME = 'backup' + INDEX_NAME
    INDEX_PREFIX = INDEX_NAME + '_'
    BAK_INDEX_PREFIX = BAK_INDEX_NAME + '_'
    ES_MAPPING = {
        "settings": {
            "index": {
                "search": {
                    "slowlog": {
                        "threshold": {
                            "fetch": {
                                "warn": "100ms"
                            },
                            "query": {
                                "warn": "100ms"
                            }
                        }
                    }
                },
                "refresh_interval": "1s",
                "indexing": {
                    "slowlog": {
                        "threshold": {
                            "index": {
                                "warn": "1s"
                            }
                        }
                    }
                },
                "number_of_shards": "6",
                "translog": {
                    "flush_threshold_size": "1gb",
                    "sync_interval": "120s",
                    "durability": "async"
                }
            }
        },
        "aliases": {
            "vii_relation": {

            }
        },
        "mappings": {
            "relation": {
                "dynamic_date_formats": [
                    "yyyy-MM-dd HH:mm:ss",
                    "yyyy-MM-dd"
                ],
                "dynamic_templates": [
                    {
                        "date_template": {
                            "match_pattern": "regex",
                            "mapping": {
                                "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis",
                                "type": "date"
                            },
                            "match_mapping_type": "string",
                            "match": "(.*_date|.*_timestamp|.*_ts|.*_time)"
                        }
                    },
                    {
                        "keyword_template": {
                            "mapping": {
                                "type": "keyword"
                            },
                            "match_mapping_type": "string"
                        }
                    }
                ],
                "_all": {
                    "enabled": "false"
                },
                "properties": {
                    "pk": {
                        "type": "keyword"
                    },
                    "in_obj_type": {
                        "type": "keyword"
                    },
                    "in_obj_value": {
                        "type": "keyword"
                    },
                    "in_obj_type_value": {
                        "type": "keyword"
                    },
                    "in_obj_tag": {
                        "type": "keyword"
                    },
                    "in_obj_tag_desc": {
                        "type": "keyword"
                    },
                    "out_obj_type": {
                        "type": "keyword"
                    },
                    "out_obj_value": {
                        "type": "keyword"
                    },
                    "out_obj_type_value": {
                        "type": "keyword"
                    },
                    "out_obj_tag": {
                        "type": "keyword"
                    },
                    "out_obj_tag_desc": {
                        "type": "keyword"
                    },
                    "start_time": {
                        "type": "date",
                        "format": "yyyy-MM-dd HH:mm:ss"
                    },
                    "end_time": {
                        "type": "date",
                        "format": "yyyy-MM-dd HH:mm:ss"
                    },
                    "rel_type": {
                        "type": "keyword"
                    },
                    "rel_detail": {
                        "type": "keyword",
                        "index": "false"
                    },
                    "count": {
                        "type": "long"
                    },
                    "similarity": {
                        "type": "double"
                    },
                    "tag_codes": {
                        "type": "keyword"
                    },
                    "delete_status": {
                        "type": "integer"
                    },
                    "tenant_code": {
                        "type": "keyword"
                    },
                    "business_code": {
                        "type": "keyword"
                    },
                    "import_es_time": {
                        "type": "date",
                        "format": "yyyy-MM-dd HH:mm:ss"
                    }

                }
            }
        }
    }

    ADD_ES_MAPPING = {
        "properties": {
            "delete_status": {
                "type": "integer"
            },
            "start_time": {
                "format": "yyyy-MM-dd HH:mm:ss",
                "type": "date"
            },
            "end_time": {
                "format": "yyyy-MM-dd HH:mm:ss",
                "type": "date"
            }
        }
    }
    queryParam = {
        "query": {
            "bool": {
                "must": [
                ],
                "must_not": [],
                "should": []
            }
        }
    }
    QUERY_INDEX_NAME = 'cysaas_object_basic_info'
    QUERY_DOC_TYPE = 'basic'
    logging.info("begin...")
    create_index(es_url, INDEX_NAME, ES_MAPPING)
    time.sleep(5)
    delete_index(es_url, INDEX_NAME)
    time.sleep(5)
    batch_create_index_with_mapping(es_url, INDEX_PREFIX, ES_MAPPING)
    time.sleep(5)
    add_properties_to_index(es_url, INDEX_NAME, DOC_TYPE, ADD_ES_MAPPING)
    time.sleep(5)
    batch_add_properties_to_index(es_url, INDEX_PREFIX, DOC_TYPE, ADD_ES_MAPPING, 2)
    time.sleep(5)
    result = query_by_scroll(es_url, index=QUERY_INDEX_NAME, query=queryParam)
    convert_dict = {"obj_value": {"dest_key": ["obj_value", "value"], "default_value": ""}}
    result_data = get_and_parse_query_scroll_data(es_url, index=QUERY_INDEX_NAME, query=queryParam,
                                                  convert_dict=convert_dict)
    get_data = get_es_data_by_scroll(es_url, index=QUERY_INDEX_NAME, query=queryParam)
    del_index_by_prefix(es_url, INDEX_NAME)
    time.sleep(5)
    batch_es_del_index(es_url, INDEX_PREFIX, 2)
    logging.info("done")
    print("done")


if __name__ == '__main__':
    main()

 二、ES Python脚本查询操作 client方式

#!coding:utf-8

import json
import logging
import time

from elasticsearch import Elasticsearch, helpers

PAGE_RESULT_SCROLL_ID = 'scroll_id'
PAGE_RESULT_SCROLL_SIZE = 'scroll_size'
PAGE_RESULT_TOTAL_SIZE = 'total_size'
PAGE_RESULT_HITS = 'hits'
PAGE_RESULT_DATA = 'data'
PAGE_RESULT_CONVERT_DATA = 'convert_data'

CONVERT_DEST_KEY = 'dest_key'
CONVERT_DEFAULT_VALUE = 'default_value'

current_time = time.strftime("%Y-%m-%d-%H-%M", time.localtime(time.time()))
# 日志设置
log_file = "operate_es_client_" + current_time + ".log"
logging.FileHandler(filename=log_file, encoding='utf-8')
logging.basicConfig(filename=log_file, level=logging.INFO)


def query(es_client, index, query):
    try:
        return helpers.scan(es_client, index=index, scroll="3m", query=query)
    except Exception as e:
        logging.error(e)
        logging.error("query fail!")
        print("exception!")


def query_by_scroll(es_client, index, doc_type=None, query=None, scroll='5m', batch_size=1000, preserve_order=False,
                    **kwargs):
    '''
    :param index:
    :param doc_type:
    :param query:
    queryParam = {
        "query": {
            "bool": {
                "must": [
                    {
                        "range": {
                            "import_es_time": {
                                "lt": "2019-07-31 00:00:00"
                            }
                        }
                    },
                    {
                        "term": {
                            "list_type": "01"
                        }
                    },
                    {
                        "term": {
                            "delete_status": "0"
                        }
                    }
                ],
                "must_not": [],
                "should": []
            }
        }
    }
    :param scroll:
    :param batch_size:
    :param preserve_order:
    :param kwargs:
    :return:
    '''
    if not preserve_order:  # 是否需要scan模式
        kwargs['search_type'] = 'query_then_fetch'
    try:
        logging.info("query: index:%s doc_type:%s scroll:%s batch_size:%s query:%s" % (
            index, doc_type, scroll, batch_size, query))
        resp = es_client.search(index=index,
                                doc_type=doc_type,
                                scroll=scroll,
                                size=batch_size,
                                body=query,
                                **kwargs)
        return resp
    except Exception as e:
        logging.error(e)
        logging.error("query fail!")
        print("exception!")
    return None


def query_by_scroll_id(es_client, scroll_id, scroll='5m'):
    if scroll_id is None:
        return
    try:
        resp = es_client.scroll(scroll_id, scroll=scroll)
        return resp
    except Exception as e:
        logging.error(e)
        logging.error("query fail! scroll_id:%s" % (scroll_id))
        print("exception!")
    return None


def get_and_parse_query_scroll_data(es_client, index, doc_type=None, query=None, scroll='5m', batch_size=1000,
                                    convert_dict={}, add_date_time=False):
    page = query_by_scroll(es_client, index, doc_type=doc_type, query=query, scroll=scroll)
    return convert_es_page_data(page, convert_dict, add_date_time)


def parse_es_page_data(page):
    result_data = {}
    if not page or not page['_scroll_id']:
        logging.warning("query_by_scroll return none")
        print("query_by_scroll return none")
        return result_data
    if page['_scroll_id']:
        scroll_id = page['_scroll_id']
        result_data[PAGE_RESULT_SCROLL_ID] = scroll_id
        print("Scrolling scroll_id:%s" % (scroll_id))
    if page['hits']:
        total_size = page['hits']['total']
        print("Scrolling total_size:%s" % (total_size))
        result_data[PAGE_RESULT_TOTAL_SIZE] = total_size
        hits = page['hits']['hits']
        scroll_size = len(hits)
        result_data[PAGE_RESULT_SCROLL_SIZE] = scroll_size
        result_data[PAGE_RESULT_HITS] = hits
    return result_data


def convert_es_page_data(page, convert_dict={}, add_date_time=False):
    '''
    :param page:
    :param convert_dict:
    convert_dict 示例
    {"key1": {"dest_key": ["key1","key2"], "default_value":""}}
    :param add_date_time:
    :return:
    '''
    result_data = parse_es_page_data(page)
    result = []
    if result_data and result_data['hits']:
        result = result_data['hits']
    # parse data
    convert_data = []
    for item in result:
        if item['_source']:
            source_data = item['_source']
            convert_result = {}
            keys = convert_dict.keys()
            for source_key in keys:
                dest_dict = convert_dict.get(source_key, [])
                dst_keys = dest_dict.get(CONVERT_DEST_KEY, [])
                default_value = dest_dict.get(CONVERT_DEFAULT_VALUE, '')
                for dst_key in dst_keys:
                    convert_result[dst_key] = source_data.get(source_key, default_value)
            if add_date_time:
                date_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
                convert_result["date_time"] = date_time
            convert_str = json.dumps(convert_result, ensure_ascii=False)
            convert_data.append(convert_str.encode('utf-8'))
    result_data[PAGE_RESULT_CONVERT_DATA] = convert_data
    return result_data


def main():
    # ES服务器地址
    #  ##开发环境
    es_host = "192.168.3.63"
    es_url = "http://192.168.3.63:9200/"

    INDEX_NAME = 'cysaas_object_basic_info'
    DOC_TYPE = 'basic'

    queryParam = {
        "query": {
            "bool": {
                "must": [],
                "must_not": [],
                "should": []
            }
        }
    }
    logging.info("begin...")
    # es_client = Elasticsearch([{'host': es_host, 'port': '9200'}])
    es_client = Elasticsearch([es_url], verify_certs=False)
    result = query_by_scroll(es_client, index=INDEX_NAME, doc_type=DOC_TYPE, query=queryParam)
    time.sleep(5)
    result_data = get_and_parse_query_scroll_data(es_client, index=INDEX_NAME, doc_type=DOC_TYPE,
                                                  query=queryParam)
    logging.info("done")
    print("done")


if __name__ == '__main__':
    main()

scan能避免scroll的排序性能消耗,from size分页查询模式会对数据集进行整体排序, 性能损耗是很大的. 如果我们关闭排序,那么可以消耗极少资源返回所有的文档。scan就是不去,而是仅仅从每个有结果的分片中返回数据.

下面是python elasticsearch helpers.scan的源码。对照elasticsearch scroll scan基本用法,很容易就能理解下面的代码。elasticsearch-py把高性能的功能都继承在了helpers模块里,比如helpers.scan helpers.reindex streaming_bulk helpers.bulk  parallel_bulk .  

elasticsearch.helpers.scan(client, query=None, scroll=u'5m', raise_on_error=True, preserve_order=False, **kwargs)



参数介绍:

client – elasticsearch的连接对象

query – elasticsearch dsl查询语句

scroll – 你想让scroll的结果集在server端标记多久

raise_on_error – raise的error class

preserve_order – 这里其实对应的是search_type,是否要求排序

file: helpers/__init__.py

#默认是5m分钟, 默认是search_type是scan扫描模式

def scan(client, query=None, scroll='5m', preserve_order=False, **kwargs):



    if not preserve_order:   #是否需要scan模式

        kwargs['search_type'] = 'scan'

    resp = client.search(body=query, scroll=scroll, **kwargs)



    scroll_id = resp.get('_scroll_id')  #第一次查询拿到_scroll_id token

    if scroll_id is None:

        return



    first_run = True

    while True:

        #如果你server_type不是scan,那么第一次的结果里是包含数据的。

        if preserve_order and first_run:

            first_run = False

        else:

            resp = client.scroll(scroll_id, scroll=scroll)

        if not resp['hits']['hits']:

            break

        for hit in resp['hits']['hits']:

            yield hit    #通过yield生成器来返回数据

        scroll_id = resp.get('_scroll_id')

        if scroll_id is None:

            break

file: client/__init__.py

@query_params('scroll')

def scroll(self, scroll_id, params=None):

    # 第二次scroll的数据请求是直接 /_search/scroll,方法用的是GET

    _, data = self.transport.perform_request('GET', '/_search/scroll',params=params, body=scroll_id)

    return data

对于elasticsearch scanscroll的使用方法, 大家注意一下异常情况. 

data = scan(es,

    query={"query": {"match": {"domain": "xiaorui.cc"}}},

    index="xiaorui_index",

    doc_type="blog"

)

for one in data:

    print one

Elasticsearch博大精深… …  经过我的线下线上测试,使用scroll scan的性能还是不错的,返回的速度不错,明显比那种from size分页要快速,而且节省了elasticsearch的检索资源。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/14027.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!