From 2ce8a6bdbf139c46ccd5f9df113c12bddb08b242 Mon Sep 17 00:00:00 2001 From: pycook Date: Tue, 19 Nov 2019 18:16:31 +0800 Subject: [PATCH] elastic search [done] --- Dockerfile | 8 +++- api/commands/click_cmdb.py | 24 +++++++++- api/lib/cmdb/attribute.py | 21 ++++++-- api/lib/cmdb/ci.py | 4 +- api/lib/cmdb/const.py | 8 ++++ api/lib/cmdb/search/es/search.py | 82 +++++++++++++++++++++----------- api/lib/utils.py | 24 ++++++++-- docker-compose.yml | 31 ++++++++++++ 8 files changed, 163 insertions(+), 39 deletions(-) diff --git a/Dockerfile b/Dockerfile index 2414e8f..842cacb 100644 --- a/Dockerfile +++ b/Dockerfile @@ -35,4 +35,10 @@ RUN pip install --no-cache-dir -r docs/requirements.txt \ && sed -i "s#{user}:{password}@127.0.0.1:3306/{db}#cmdb:123456@mysql:3306/cmdb#g" api/settings.py \ && sed -i "s/127.0.0.1/redis/g" api/settings.py -CMD ["bash", "-c", "flask run"] \ No newline at end of file +CMD ["bash", "-c", "flask run"] + + +# ================================= Search ================================ +FROM docker.elastic.co/elasticsearch/elasticsearch:7.4.2 AS cmdb-search + +RUN yes | ./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.4.2/elasticsearch-analysis-ik-7.4.2.zip diff --git a/api/commands/click_cmdb.py b/api/commands/click_cmdb.py index 3eb5be9..0bfab82 100644 --- a/api/commands/click_cmdb.py +++ b/api/commands/click_cmdb.py @@ -9,7 +9,6 @@ from flask.cli import with_appcontext import api.lib.cmdb.ci from api.extensions import db -from api.extensions import es from api.extensions import rd from api.models.cmdb import CI @@ -19,6 +18,29 @@ from api.models.cmdb import CI def init_cache(): db.session.remove() + if current_app.config.get("USE_ES"): + from api.extensions import es + from api.models.cmdb import Attribute + from api.lib.cmdb.const import type_map + attributes = Attribute.get_by(to_dict=False) + for attr in attributes: + other = dict() + other['index'] = True if attr.is_index else False + if attr.value_type == Attribute.TEXT: + other['analyzer'] = 'ik_max_word' + other['search_analyzer'] = 'ik_smart' + if attr.is_index: + other["fields"] = { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + try: + es.update_mapping(attr.name, type_map['es_type'][attr.value_type], other) + except Exception as e: + print(e) + cis = CI.get_by(to_dict=False) for ci in cis: if current_app.config.get("USE_ES"): diff --git a/api/lib/cmdb/attribute.py b/api/lib/cmdb/attribute.py index 82bce55..cfef2b4 100644 --- a/api/lib/cmdb/attribute.py +++ b/api/lib/cmdb/attribute.py @@ -96,9 +96,8 @@ class AttributeManager(object): name = kwargs.pop("name") alias = kwargs.pop("alias", "") alias = name if not alias else alias - Attribute.get_by(name=name, first=True) and abort(400, "attribute name <{0}> is already existed".format(name)) - Attribute.get_by(alias=alias, first=True) and abort(400, - "attribute alias <{0}> is already existed".format(name)) + Attribute.get_by(name=name, first=True) and abort(400, "attribute name <{0}> is duplicated".format(name)) + Attribute.get_by(alias=alias, first=True) and abort(400, "attribute alias <{0}> is duplicated".format(name)) attr = Attribute.create(flush=True, name=name, @@ -118,6 +117,22 @@ class AttributeManager(object): AttributeCache.clean(attr) + if current_app.config.get("USE_ES"): + from api.extensions import es + other = dict() + other['index'] = True if attr.is_index else False + if attr.value_type == Attribute.TEXT: + other['analyzer'] = 'ik_max_word' + other['search_analyzer'] = 'ik_smart' + if attr.is_index: + other["fields"] = { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + es.update_mapping(name, type_map['es_type'][attr.value_type], other) + return attr.id def update(self, _id, **kwargs): diff --git a/api/lib/cmdb/ci.py b/api/lib/cmdb/ci.py index 71cc93e..1c1ec37 100644 --- a/api/lib/cmdb/ci.py +++ b/api/lib/cmdb/ci.py @@ -346,8 +346,8 @@ class CIManager(object): if ci_id not in ci_set: ci_dict = dict() ci_type = CITypeCache.get(type_id) - ci_dict["_id"] = ci_id - ci_dict["_type"] = type_id + ci_dict["ci_id"] = ci_id + ci_dict["ci_type"] = type_id ci_dict["ci_type"] = ci_type.name ci_dict["ci_type_alias"] = ci_type.alias ci_set.add(ci_id) diff --git a/api/lib/cmdb/const.py b/api/lib/cmdb/const.py index d7e0b39..511c599 100644 --- a/api/lib/cmdb/const.py +++ b/api/lib/cmdb/const.py @@ -93,6 +93,14 @@ type_map = { 'index_{0}'.format(Attribute.DATE): 'c_value_index_datetime', 'index_{0}'.format(Attribute.TIME): 'c_value_index_texts', 'index_{0}'.format(Attribute.FLOAT): 'c_value_index_floats', + }, + 'es_type': { + Attribute.INT: 'long', + Attribute.TEXT: 'text', + Attribute.DATETIME: 'text', + Attribute.DATE: 'text', + Attribute.TIME: 'text', + Attribute.FLOAT: 'float' } } diff --git a/api/lib/cmdb/search/es/search.py b/api/lib/cmdb/search/es/search.py index 7752802..1650fd0 100644 --- a/api/lib/cmdb/search/es/search.py +++ b/api/lib/cmdb/search/es/search.py @@ -28,9 +28,9 @@ class Search(object): self.page = page self.ret_key = ret_key self.count = count or current_app.config.get("DEFAULT_PAGE_COUNT") - self.sort = sort + self.sort = sort or "ci_id" - self.query = dict(filter=dict(bool=dict(should=[], must=[], must_not=[]))) + self.query = dict(query=dict(bool=dict(should=[], must=[], must_not=[]))) @staticmethod def _operator_proc(key): @@ -91,21 +91,22 @@ class Search(object): self._operator2query(operator).append({ "range": { attr: { - "to": self._digit(right), - "from": self._digit(left), + "lte": self._digit(right), + "gte": self._digit(left), + "boost": 2.0 } } }) def _comparison_query_handle(self, attr, v, operator): if v.startswith(">="): - _query = dict(gte=self._digit(v[2:])) + _query = dict(gte=self._digit(v[2:]), boost=2.0) elif v.startswith("<="): - _query = dict(lte=self._digit(v[2:])) + _query = dict(lte=self._digit(v[2:]), boost=2.0) elif v.startswith(">"): - _query = dict(gt=self._digit(v[1:])) + _query = dict(gt=self._digit(v[1:]), boost=2.0) elif v.startswith("<"): - _query = dict(lt=self._digit(v[1:])) + _query = dict(lt=self._digit(v[1:]), boost=2.0) else: return @@ -123,6 +124,8 @@ class Search(object): } }) else: + if attr == "ci_type" and v.isdigit(): + attr = "type_id" self._operator2query(operator).append({ "term": { attr: v @@ -164,23 +167,32 @@ class Search(object): filter_path = self._fl_build() - sort = self._sort_build() + self._sort_build() - return es.read(self.query, filter_path=filter_path, sort=sort) + self._facet_build() + + return es.read(self.query, filter_path=filter_path) def _facet_build(self): - return { - "aggs": { - self.facet_field: { - "cardinality": { - "field": self.facet_field + aggregations = dict(aggs={}) + for field in self.facet_field: + attr = AttributeCache.get(field) + if not attr: + raise SearchError("Facet by <{0}> does not exist".format(field)) + aggregations['aggs'].update({ + field: { + "terms": { + "field": "{0}.keyword".format(field) + if attr.value_type not in (Attribute.INT, Attribute.FLOAT) else field } } - } - } + }) + + if aggregations['aggs']: + self.query.update(aggregations) def _sort_build(self): - fields = list(filter(lambda x: x != "", self.sort or "")) + fields = list(filter(lambda x: x != "", (self.sort or "").split(","))) sorts = [] for field in fields: sort_type = "asc" @@ -190,10 +202,20 @@ class Search(object): field = field[1:] sort_type = "desc" else: + field = field + if field == "ci_id": + sorts.append({field: {"order": sort_type}}) continue - sorts.append({field: {"order": sort_type}}) - return sorts + attr = AttributeCache.get(field) + if not attr: + raise SearchError("Sort by <{0}> does not exist".format(field)) + + sort_by = "{0}.keyword".format(field) \ + if attr.value_type not in (Attribute.INT, Attribute.FLOAT) else field + sorts.append({sort_by: {"order": sort_type}}) + + self.query.update(dict(sort=sorts)) def _paginate_build(self): self.query.update({"from": (self.page - 1) * self.count, @@ -204,16 +226,22 @@ class Search(object): def search(self): try: - numfound, cis = self._query_build_raw() + numfound, cis, facet = self._query_build_raw() except Exception as e: current_app.logger.error(str(e)) raise SearchError("unknown search error") - if self.facet_field and numfound: - facet = self._facet_build() - else: - facet = dict() - total = len(cis) - return cis, {}, total, self.page, numfound, facet + counter = dict() + for ci in cis: + ci_type = ci.get("ci_type") + if ci_type not in counter.keys(): + counter[ci_type] = 0 + counter[ci_type] += 1 + + facet_ = dict() + for k in facet: + facet_[k] = [[i['key'], i['doc_count'], k] for i in facet[k]["buckets"]] + + return cis, counter, total, self.page, numfound, facet_ diff --git a/api/lib/utils.py b/api/lib/utils.py index 1f70db4..b3bb7cf 100644 --- a/api/lib/utils.py +++ b/api/lib/utils.py @@ -88,6 +88,18 @@ class ESHandler(object): if not self.es.indices.exists(index=self.index): self.es.indices.create(index=self.index) + def update_mapping(self, field, value_type, other): + body = { + "properties": { + field: {"type": value_type}, + }} + body['properties'][field].update(other) + + self.es.indices.put_mapping( + index=self.index, + body=body + ) + def get_index_id(self, ci_id): query = { 'query': { @@ -111,13 +123,15 @@ class ESHandler(object): self.es.delete(index=self.index, id=_id) - def read(self, query, filter_path=None, sort=None): + def read(self, query, filter_path=None): filter_path = filter_path or [] if filter_path: filter_path.append('hits.total') - if sort: - query.update(dict(sort=sort)) - res = self.es.search(index=self.index, body=query, filter_path=filter_path) - return res['hits']['total']['value'], [i['_source'] for i in res['hits']['hits']] + if res['hits'].get('hits'): + return res['hits']['total']['value'], \ + [i['_source'] for i in res['hits']['hits']], \ + res.get("aggregations", {}) + else: + return 0, [], {} diff --git a/docker-compose.yml b/docker-compose.yml index b421b3e..8351480 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -26,6 +26,33 @@ services: aliases: - redis + cmdb-search: + image: registry.cn-qingdao.aliyuncs.com/pycook/cmdb-search:1.2 +# build: +# context: . +# target: cmdb-search + container_name: cmdb-search + environment: + - discovery.type=single-node + - cluster.name=docker-cluster + - bootstrap.memory_lock=true + - "ES_JAVA_OPTS=-Xms512m -Xmx512m" + ulimits: + memlock: + soft: -1 + hard: -1 + nofile: + soft: 65536 + hard: 65536 + volumes: + - esdata:/usr/share/elasticsearch/data + ports: + - 9200:9200 + networks: + new: + aliases: + - cmdb-search + cmdb-api: image: registry.cn-qingdao.aliyuncs.com/pycook/cmdb-api:1.0 container_name: cmdb-api @@ -35,12 +62,15 @@ services: - /bin/sh - -c - | + sed -i "s#USE_ES = False#USE_ES = True#g" api/settings.py + sed -i "s#ES_HOST = '127.0.0.1'#ES_HOST = cmdb-search#g" api/settings.py gunicorn --workers=3 autoapp:app -b 0.0.0.0:5000 -D flask init-cache celery worker -A celery_worker.celery -E -Q cmdb_async --concurrency=1 depends_on: - cmdb-db - cmdb-cache + - cmdb-search networks: new: aliases: @@ -70,6 +100,7 @@ services: volumes: db-data: + esdata: networks: new: \ No newline at end of file