From 4ec3b310e40ed3f068190a8c3e63298ddb1560d3 Mon Sep 17 00:00:00 2001 From: pycook Date: Mon, 18 Nov 2019 20:02:25 +0800 Subject: [PATCH] search by elasticsearch [doing] --- Pipfile | 1 + api/app.py | 2 + api/commands/click_cmdb.py | 21 ++- api/extensions.py | 2 + api/lib/cmdb/ci.py | 8 +- api/lib/cmdb/search/__init__.py | 3 + api/lib/cmdb/search/db/__init__.py | 1 + api/lib/cmdb/{ => search/db}/query_sql.py | 0 api/lib/cmdb/{ => search/db}/search.py | 6 +- api/lib/cmdb/search/es/__init__.py | 1 + api/lib/cmdb/search/es/search.py | 152 ++++++++++++++++++++++ api/lib/utils.py | 45 +++++++ api/settings.py.example | 4 + api/tasks/cmdb.py | 17 ++- api/views/acl/resources.py | 1 + api/views/cmdb/ci.py | 13 +- docs/requirements.txt | 1 + 17 files changed, 256 insertions(+), 22 deletions(-) create mode 100644 api/lib/cmdb/search/__init__.py create mode 100644 api/lib/cmdb/search/db/__init__.py rename api/lib/cmdb/{ => search/db}/query_sql.py (100%) rename api/lib/cmdb/{ => search/db}/search.py (98%) create mode 100644 api/lib/cmdb/search/es/__init__.py create mode 100644 api/lib/cmdb/search/es/search.py diff --git a/Pipfile b/Pipfile index 182432c..c0a1723 100644 --- a/Pipfile +++ b/Pipfile @@ -40,6 +40,7 @@ bs4 = ">=0.0.1" toposort = ">=1.5" requests = ">=2.22.0" PyJWT = ">=1.7.1" +elasticsearch = "==7.0.4" [dev-packages] # Testing diff --git a/api/app.py b/api/app.py index cfd0e93..ab3ac6c 100644 --- a/api/app.py +++ b/api/app.py @@ -21,6 +21,7 @@ from api.extensions import ( migrate, celery, rd, + es ) from api.flask_cas import CAS from api.models.acl import User @@ -98,6 +99,7 @@ def register_extensions(app): login_manager.init_app(app) migrate.init_app(app, db) rd.init_app(app) + es.init_app(app) celery.conf.update(app.config) diff --git a/api/commands/click_cmdb.py b/api/commands/click_cmdb.py index 6b2634d..3eb5be9 100644 --- a/api/commands/click_cmdb.py +++ b/api/commands/click_cmdb.py @@ -4,10 +4,12 @@ import json import click +from flask import current_app from flask.cli import with_appcontext import api.lib.cmdb.ci from api.extensions import db +from api.extensions import es from api.extensions import rd from api.models.cmdb import CI @@ -19,13 +21,22 @@ def init_cache(): cis = CI.get_by(to_dict=False) for ci in cis: - res = rd.get([ci.id]) - if res and list(filter(lambda x: x, res)): - continue + if current_app.config.get("USE_ES"): + res = es.get_index_id(ci.id) + if res: + continue + else: + res = rd.get([ci.id]) + if res and list(filter(lambda x: x, res)): + continue m = api.lib.cmdb.ci.CIManager() ci_dict = m.get_ci_by_id_from_db(ci.id, need_children=False, use_master=False) - rd.delete(ci.id) - rd.add({ci.id: json.dumps(ci_dict)}) + + if current_app.config.get("USE_ES"): + es.create(ci_dict) + else: + rd.delete(ci.id) + rd.add({ci.id: json.dumps(ci_dict)}) db.session.remove() diff --git a/api/extensions.py b/api/extensions.py index bec8ea6..45cf662 100644 --- a/api/extensions.py +++ b/api/extensions.py @@ -9,6 +9,7 @@ from flask_login import LoginManager from flask_migrate import Migrate from flask_sqlalchemy import SQLAlchemy +from api.lib.utils import ESHandler from api.lib.utils import RedisHandler bcrypt = Bcrypt() @@ -19,3 +20,4 @@ cache = Cache() celery = Celery() cors = CORS(supports_credentials=True) rd = RedisHandler(prefix="CMDB_CI") # TODO +es = ESHandler() diff --git a/api/lib/cmdb/ci.py b/api/lib/cmdb/ci.py index ffaabdb..71cc93e 100644 --- a/api/lib/cmdb/ci.py +++ b/api/lib/cmdb/ci.py @@ -23,8 +23,8 @@ from api.lib.cmdb.const import TableMap from api.lib.cmdb.const import type_map from api.lib.cmdb.history import AttributeHistoryManger from api.lib.cmdb.history import CIRelationHistoryManager -from api.lib.cmdb.query_sql import QUERY_CIS_BY_IDS -from api.lib.cmdb.query_sql import QUERY_CIS_BY_VALUE_TABLE +from api.lib.cmdb.search.db.query_sql import QUERY_CIS_BY_IDS +from api.lib.cmdb.search.db.query_sql import QUERY_CIS_BY_VALUE_TABLE from api.lib.cmdb.value import AttributeValueManager from api.lib.decorator import kwargs_required from api.lib.utils import handle_arg_list @@ -112,8 +112,8 @@ class CIManager(object): use_master=use_master) res.update(_res) - res['_type'] = ci_type.id - res['_id'] = ci_id + res['type_id'] = ci_type.id + res['ci_id'] = ci_id return res diff --git a/api/lib/cmdb/search/__init__.py b/api/lib/cmdb/search/__init__.py new file mode 100644 index 0000000..3c49dd7 --- /dev/null +++ b/api/lib/cmdb/search/__init__.py @@ -0,0 +1,3 @@ +# -*- coding:utf-8 -*- + +__all__ = ['db', 'es'] diff --git a/api/lib/cmdb/search/db/__init__.py b/api/lib/cmdb/search/db/__init__.py new file mode 100644 index 0000000..380474e --- /dev/null +++ b/api/lib/cmdb/search/db/__init__.py @@ -0,0 +1 @@ +# -*- coding:utf-8 -*- diff --git a/api/lib/cmdb/query_sql.py b/api/lib/cmdb/search/db/query_sql.py similarity index 100% rename from api/lib/cmdb/query_sql.py rename to api/lib/cmdb/search/db/query_sql.py diff --git a/api/lib/cmdb/search.py b/api/lib/cmdb/search/db/search.py similarity index 98% rename from api/lib/cmdb/search.py rename to api/lib/cmdb/search/db/search.py index 3f3a12c..eeff10f 100644 --- a/api/lib/cmdb/search.py +++ b/api/lib/cmdb/search/db/search.py @@ -13,9 +13,9 @@ from api.lib.cmdb.cache import CITypeCache from api.lib.cmdb.ci import CIManager from api.lib.cmdb.const import RetKey from api.lib.cmdb.const import TableMap -from api.lib.cmdb.query_sql import FACET_QUERY -from api.lib.cmdb.query_sql import QUERY_CI_BY_ATTR_NAME -from api.lib.cmdb.query_sql import QUERY_CI_BY_TYPE +from api.lib.cmdb.search.db.query_sql import FACET_QUERY +from api.lib.cmdb.search.db.query_sql import QUERY_CI_BY_ATTR_NAME +from api.lib.cmdb.search.db.query_sql import QUERY_CI_BY_TYPE from api.lib.utils import handle_arg_list from api.models.cmdb import Attribute from api.models.cmdb import CI diff --git a/api/lib/cmdb/search/es/__init__.py b/api/lib/cmdb/search/es/__init__.py new file mode 100644 index 0000000..380474e --- /dev/null +++ b/api/lib/cmdb/search/es/__init__.py @@ -0,0 +1 @@ +# -*- coding:utf-8 -*- diff --git a/api/lib/cmdb/search/es/search.py b/api/lib/cmdb/search/es/search.py new file mode 100644 index 0000000..e7904d3 --- /dev/null +++ b/api/lib/cmdb/search/es/search.py @@ -0,0 +1,152 @@ +# -*- coding:utf-8 -*- + + +from __future__ import unicode_literals + +from flask import current_app + +from api.extensions import es +from api.lib.cmdb.cache import AttributeCache +from api.lib.cmdb.const import RetKey +from api.lib.utils import handle_arg_list +from api.models.cmdb import Attribute + + +class SearchError(Exception): + def __init__(self, v): + self.v = v + + def __str__(self): + return self.v + + +class Search(object): + def __init__(self, query=None, fl=None, facet_field=None, page=1, ret_key=RetKey.NAME, count=1, sort=None): + self.orig_query = query + self.fl = fl + self.facet_field = facet_field + self.page = page + self.ret_key = ret_key + self.count = count or current_app.config.get("DEFAULT_PAGE_COUNT") + self.sort = sort + + self.query = dict(query=dict(bool=dict(should=[], must=[], must_not=[]))) + + @staticmethod + def _operator_proc(key): + operator = "&" + if key.startswith("+"): + key = key[1:].strip() + elif key.startswith("-"): + operator = "|" + key = key[1:].strip() + elif key.startswith("~"): + operator = "~" + key = key[1:].strip() + + return operator, key + + def _operator2query(self, operator): + if operator == "&": + return self.query['query']['bool']['must'] + elif operator == "|": + return self.query['query']['bool']['should'] + else: + return self.query['query']['bool']['must_not'] + + def _attr_name_proc(self, key): + operator, key = self._operator_proc(key) + + if key in ('ci_type', 'type', '_type'): + return 'ci_type', Attribute.TEXT, operator + + if key in ('id', 'ci_id', '_id'): + return 'ci_id', Attribute.TEXT, operator + + attr = AttributeCache.get(key) + if attr: + return attr.name, attr.value_type, operator + else: + raise SearchError("{0} is not existed".format(key)) + + def _in_query_handle(self, attr, v, operator): + self._operator2query(operator).append({}) + + def _range_query_handle(self, attr, v, operator): + self._operator2query(operator).append({ + "range": { + attr: { + "gte": 10, + "lte": 10, + } + } + }) + + def _comparison_query_handle(self, attr, v, operator): + pass + + def _match_query_handle(self, attr, v, operator): + pass + + def __query_build_by_field(self, queries): + + for q in queries: + if ":" in q: + k = q.split(":")[0].strip() + v = ":".join(q.split(":")[1:]).strip() + field_name, field_type, operator = self._attr_name_proc(k) + if field_name: + # in query + if v.startswith("(") and v.endswith(")"): + self._in_query_handle(field_name, v, operator) + # range query + elif v.startswith("[") and v.endswith("]") and "_TO_" in v: + self._range_query_handle(field_name, v, operator) + # comparison query + elif v.startswith(">=") or v.startswith("<=") or v.startswith(">") or v.startswith("<"): + self._comparison_query_handle(field_name, v, operator) + else: + self._match_query_handle(field_name, v, operator) + else: + raise SearchError("argument q format invalid: {0}".format(q)) + elif q: + raise SearchError("argument q format invalid: {0}".format(q)) + + def _query_build_raw(self): + + queries = handle_arg_list(self.orig_query) + current_app.logger.debug(queries) + + self.__query_build_by_field(queries) + + self.__paginate() + + filter_path = self._fl_build() + + return es.read(self.query, filter_path=filter_path) + + def _facet_build(self): + return {} + + def __paginate(self): + self.query.update({"from": (self.page - 1) * self.count, + "size": self.count}) + + def _fl_build(self): + return ['hits.hits._source.{0}'.format(i) for i in self.fl] + + def search(self): + try: + numfound, cis = self._query_build_raw() + except Exception as e: + current_app.logger.error(str(e)) + raise SearchError("unknown search error") + + if self.facet_field and numfound: + facet = self._facet_build() + else: + facet = dict() + + total = len(cis) + + return cis, {}, total, self.page, numfound, facet diff --git a/api/lib/utils.py b/api/lib/utils.py index b350a7d..a26dfec 100644 --- a/api/lib/utils.py +++ b/api/lib/utils.py @@ -2,6 +2,7 @@ import redis import six +from elasticsearch import Elasticsearch from flask import current_app @@ -72,3 +73,47 @@ class RedisHandler(object): current_app.logger.warn("[{0}] is not in redis".format(key_id)) except Exception as e: current_app.logger.error("delete redis key error, {0}".format(str(e))) + + +class ESHandler(object): + def __init__(self, flask_app=None): + self.flask_app = flask_app + self.es = None + self.index = "cmdb" + + def init_app(self, app): + self.flask_app = app + config = self.flask_app.config + self.es = Elasticsearch(config.get("ES_HOST")) + if not self.es.indices.exists(index=self.index): + self.es.indices.create(index=self.index) + + def get_index_id(self, ci_id): + query = { + 'query': { + 'match': {'ci_id': ci_id} + }, + } + res = self.es.search(index=self.index, body=query) + if res['hits']['hits']: + return res['hits']['hits'][-1].get('_id') + + def create(self, body): + return self.es.index(index=self.index, body=body).get("_id") + + def update(self, ci_id, body): + _id = self.get_index_id(ci_id) + + return self.es.index(index=self.index, id=_id, body=body).get("_id") + + def delete(self, ci_id): + _id = self.get_index_id(ci_id) + + self.es.delete(index=self.index, id=_id) + + def read(self, query, filter_path=None): + filter_path = filter_path or [] + if filter_path: + filter_path.append('hits.total') + res = self.es.search(index=self.index, body=query, filter_path=filter_path) + return res['hits']['total']['value'], [i['_source'] for i in res['hits']['hits']] diff --git a/api/settings.py.example b/api/settings.py.example index c9315fe..fec4f5b 100644 --- a/api/settings.py.example +++ b/api/settings.py.example @@ -75,3 +75,7 @@ DEFAULT_PAGE_COUNT = 50 # # permission WHITE_LIST = ["127.0.0.1"] USE_ACL = False + +# # elastic search +ES_HOST = '127.0.0.1' +USE_ES = False diff --git a/api/tasks/cmdb.py b/api/tasks/cmdb.py index d48943a..78fee55 100644 --- a/api/tasks/cmdb.py +++ b/api/tasks/cmdb.py @@ -10,6 +10,7 @@ import api.lib.cmdb.ci from api.extensions import celery from api.extensions import db from api.extensions import rd +from api.extensions import es from api.lib.cmdb.const import CMDB_QUEUE @@ -20,14 +21,22 @@ def ci_cache(ci_id): m = api.lib.cmdb.ci.CIManager() ci = m.get_ci_by_id_from_db(ci_id, need_children=False, use_master=False) - rd.delete(ci_id) - rd.add({ci_id: json.dumps(ci)}) + if current_app.config.get("USE_ES"): + es.update(ci_id, ci) + else: + rd.delete(ci_id) + rd.add({ci_id: json.dumps(ci)}) - current_app.logger.info("%d caching.........." % ci_id) + current_app.logger.info("%d flush.........." % ci_id) @celery.task(name="cmdb.ci_delete", queue=CMDB_QUEUE) def ci_delete(ci_id): current_app.logger.info(ci_id) - rd.delete(ci_id) + + if current_app.config.get("USE_ES"): + es.delete(ci_id) + else: + rd.delete(ci_id) + current_app.logger.info("%d delete.........." % ci_id) diff --git a/api/views/acl/resources.py b/api/views/acl/resources.py index 3b7bae8..94241cf 100644 --- a/api/views/acl/resources.py +++ b/api/views/acl/resources.py @@ -60,6 +60,7 @@ class ResourceView(APIView): class ResourceGroupView(APIView): url_prefix = ("/resource_groups", "/resource_groups/") + @args_required('app_id') @validate_app def get(self): page = get_page(request.values.get("page", 1)) diff --git a/api/views/cmdb/ci.py b/api/views/cmdb/ci.py index 210f86c..c2e839b 100644 --- a/api/views/cmdb/ci.py +++ b/api/views/cmdb/ci.py @@ -12,8 +12,9 @@ from api.lib.cmdb.ci import CIManager from api.lib.cmdb.const import ExistPolicy from api.lib.cmdb.const import ResourceType, PermEnum from api.lib.cmdb.const import RetKey -from api.lib.cmdb.search import Search -from api.lib.cmdb.search import SearchError +from api.lib.cmdb.search.db.search import Search as SearchFromDB +from api.lib.cmdb.search.es.search import Search as SearchFromES +from api.lib.cmdb.search.db.search import SearchError from api.lib.perm.acl.acl import has_perm_from_args from api.lib.perm.auth import auth_abandoned from api.lib.utils import get_page @@ -144,14 +145,14 @@ class CISearchView(APIView): sort = request.values.get("sort") start = time.time() - s = Search(query, fl, facet, page, ret_key, count, sort) + if current_app.config.get("USE_ES"): + s = SearchFromES(query, fl, facet, page, ret_key, count, sort) + else: + s = SearchFromDB(query, fl, facet, page, ret_key, count, sort) try: response, counter, total, page, numfound, facet = s.search() except SearchError as e: return abort(400, str(e)) - except Exception as e: - current_app.logger.error(str(e)) - return abort(500, "search unknown error") current_app.logger.debug("search time is :{0}".format(time.time() - start)) return self.jsonify(numfound=numfound, total=total, diff --git a/docs/requirements.txt b/docs/requirements.txt index 9ed3509..c9f59d0 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -35,3 +35,4 @@ bs4>=0.0.1 toposort>=1.5 requests>=2.22.0 PyJWT>=1.7.1 +elasticsearch ==7.0.4