This commit is contained in:
kdyq007 2019-11-19 21:52:33 +08:00
commit c2e4090aea
22 changed files with 506 additions and 33 deletions

View File

@ -35,4 +35,10 @@ RUN pip install --no-cache-dir -r docs/requirements.txt \
&& sed -i "s#{user}:{password}@127.0.0.1:3306/{db}#cmdb:123456@mysql:3306/cmdb#g" api/settings.py \
&& sed -i "s/127.0.0.1/redis/g" api/settings.py
CMD ["bash", "-c", "flask run"]
CMD ["bash", "-c", "flask run"]
# ================================= Search ================================
FROM docker.elastic.co/elasticsearch/elasticsearch:7.4.2 AS cmdb-search
RUN yes | ./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.4.2/elasticsearch-analysis-ik-7.4.2.zip

View File

@ -40,6 +40,7 @@ bs4 = ">=0.0.1"
toposort = ">=1.5"
requests = ">=2.22.0"
PyJWT = ">=1.7.1"
elasticsearch = "==7.0.4"
[dev-packages]
# Testing

View File

@ -21,6 +21,7 @@ from api.extensions import (
migrate,
celery,
rd,
es
)
from api.flask_cas import CAS
from api.models.acl import User
@ -33,7 +34,7 @@ API_PACKAGE = "api"
@login_manager.user_loader
def load_user(user_id):
"""Load user by ID."""
return User.get_by_id(int(user_id))
return User.get_by(uid=int(user_id), first=True, to_dict=False)
class ReverseProxy(object):
@ -98,6 +99,8 @@ def register_extensions(app):
login_manager.init_app(app)
migrate.init_app(app, db)
rd.init_app(app)
if app.config.get("USE_ES"):
es.init_app(app)
celery.conf.update(app.config)

View File

@ -4,6 +4,7 @@
import json
import click
from flask import current_app
from flask.cli import with_appcontext
import api.lib.cmdb.ci
@ -17,15 +18,47 @@ from api.models.cmdb import CI
def init_cache():
db.session.remove()
if current_app.config.get("USE_ES"):
from api.extensions import es
from api.models.cmdb import Attribute
from api.lib.cmdb.const import type_map
attributes = Attribute.get_by(to_dict=False)
for attr in attributes:
other = dict()
other['index'] = True if attr.is_index else False
if attr.value_type == Attribute.TEXT:
other['analyzer'] = 'ik_max_word'
other['search_analyzer'] = 'ik_smart'
if attr.is_index:
other["fields"] = {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
try:
es.update_mapping(attr.name, type_map['es_type'][attr.value_type], other)
except Exception as e:
print(e)
cis = CI.get_by(to_dict=False)
for ci in cis:
res = rd.get([ci.id])
if res and list(filter(lambda x: x, res)):
continue
if current_app.config.get("USE_ES"):
res = es.get_index_id(ci.id)
if res:
continue
else:
res = rd.get([ci.id])
if res and list(filter(lambda x: x, res)):
continue
m = api.lib.cmdb.ci.CIManager()
ci_dict = m.get_ci_by_id_from_db(ci.id, need_children=False, use_master=False)
rd.delete(ci.id)
rd.add({ci.id: json.dumps(ci_dict)})
if current_app.config.get("USE_ES"):
es.create(ci_dict)
else:
rd.delete(ci.id)
rd.add({ci.id: json.dumps(ci_dict)})
db.session.remove()

View File

@ -9,6 +9,7 @@ from flask_login import LoginManager
from flask_migrate import Migrate
from flask_sqlalchemy import SQLAlchemy
from api.lib.utils import ESHandler
from api.lib.utils import RedisHandler
bcrypt = Bcrypt()
@ -19,3 +20,4 @@ cache = Cache()
celery = Celery()
cors = CORS(supports_credentials=True)
rd = RedisHandler(prefix="CMDB_CI") # TODO
es = ESHandler()

View File

@ -96,9 +96,8 @@ class AttributeManager(object):
name = kwargs.pop("name")
alias = kwargs.pop("alias", "")
alias = name if not alias else alias
Attribute.get_by(name=name, first=True) and abort(400, "attribute name <{0}> is already existed".format(name))
Attribute.get_by(alias=alias, first=True) and abort(400,
"attribute alias <{0}> is already existed".format(name))
Attribute.get_by(name=name, first=True) and abort(400, "attribute name <{0}> is duplicated".format(name))
Attribute.get_by(alias=alias, first=True) and abort(400, "attribute alias <{0}> is duplicated".format(name))
attr = Attribute.create(flush=True,
name=name,
@ -118,6 +117,22 @@ class AttributeManager(object):
AttributeCache.clean(attr)
if current_app.config.get("USE_ES"):
from api.extensions import es
other = dict()
other['index'] = True if attr.is_index else False
if attr.value_type == Attribute.TEXT:
other['analyzer'] = 'ik_max_word'
other['search_analyzer'] = 'ik_smart'
if attr.is_index:
other["fields"] = {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
es.update_mapping(name, type_map['es_type'][attr.value_type], other)
return attr.id
def update(self, _id, **kwargs):

View File

@ -23,8 +23,8 @@ from api.lib.cmdb.const import TableMap
from api.lib.cmdb.const import type_map
from api.lib.cmdb.history import AttributeHistoryManger
from api.lib.cmdb.history import CIRelationHistoryManager
from api.lib.cmdb.query_sql import QUERY_CIS_BY_IDS
from api.lib.cmdb.query_sql import QUERY_CIS_BY_VALUE_TABLE
from api.lib.cmdb.search.db.query_sql import QUERY_CIS_BY_IDS
from api.lib.cmdb.search.db.query_sql import QUERY_CIS_BY_VALUE_TABLE
from api.lib.cmdb.value import AttributeValueManager
from api.lib.decorator import kwargs_required
from api.lib.utils import handle_arg_list
@ -112,8 +112,8 @@ class CIManager(object):
use_master=use_master)
res.update(_res)
res['_type'] = ci_type.id
res['_id'] = ci_id
res['type_id'] = ci_type.id
res['ci_id'] = ci_id
return res
@ -346,8 +346,8 @@ class CIManager(object):
if ci_id not in ci_set:
ci_dict = dict()
ci_type = CITypeCache.get(type_id)
ci_dict["_id"] = ci_id
ci_dict["_type"] = type_id
ci_dict["ci_id"] = ci_id
ci_dict["ci_type"] = type_id
ci_dict["ci_type"] = ci_type.name
ci_dict["ci_type_alias"] = ci_type.alias
ci_set.add(ci_id)

View File

@ -93,6 +93,14 @@ type_map = {
'index_{0}'.format(Attribute.DATE): 'c_value_index_datetime',
'index_{0}'.format(Attribute.TIME): 'c_value_index_texts',
'index_{0}'.format(Attribute.FLOAT): 'c_value_index_floats',
},
'es_type': {
Attribute.INT: 'long',
Attribute.TEXT: 'text',
Attribute.DATETIME: 'text',
Attribute.DATE: 'text',
Attribute.TIME: 'text',
Attribute.FLOAT: 'float'
}
}

View File

@ -0,0 +1,3 @@
# -*- coding:utf-8 -*-
__all__ = ['db', 'es']

View File

@ -0,0 +1 @@
# -*- coding:utf-8 -*-

View File

@ -13,9 +13,9 @@ from api.lib.cmdb.cache import CITypeCache
from api.lib.cmdb.ci import CIManager
from api.lib.cmdb.const import RetKey
from api.lib.cmdb.const import TableMap
from api.lib.cmdb.query_sql import FACET_QUERY
from api.lib.cmdb.query_sql import QUERY_CI_BY_ATTR_NAME
from api.lib.cmdb.query_sql import QUERY_CI_BY_TYPE
from api.lib.cmdb.search.db.query_sql import FACET_QUERY
from api.lib.cmdb.search.db.query_sql import QUERY_CI_BY_ATTR_NAME
from api.lib.cmdb.search.db.query_sql import QUERY_CI_BY_TYPE
from api.lib.utils import handle_arg_list
from api.models.cmdb import Attribute
from api.models.cmdb import CI

View File

@ -0,0 +1 @@
# -*- coding:utf-8 -*-

View File

@ -0,0 +1,247 @@
# -*- coding:utf-8 -*-
from __future__ import unicode_literals
from flask import current_app
from api.extensions import es
from api.lib.cmdb.cache import AttributeCache
from api.lib.cmdb.const import RetKey
from api.lib.utils import handle_arg_list
from api.models.cmdb import Attribute
class SearchError(Exception):
def __init__(self, v):
self.v = v
def __str__(self):
return self.v
class Search(object):
def __init__(self, query=None, fl=None, facet_field=None, page=1, ret_key=RetKey.NAME, count=1, sort=None):
self.orig_query = query
self.fl = fl
self.facet_field = facet_field
self.page = page
self.ret_key = ret_key
self.count = count or current_app.config.get("DEFAULT_PAGE_COUNT")
self.sort = sort or "ci_id"
self.query = dict(query=dict(bool=dict(should=[], must=[], must_not=[])))
@staticmethod
def _operator_proc(key):
operator = "&"
if key.startswith("+"):
key = key[1:].strip()
elif key.startswith("-"):
operator = "|"
key = key[1:].strip()
elif key.startswith("~"):
operator = "~"
key = key[1:].strip()
return operator, key
def _operator2query(self, operator):
if operator == "&":
return self.query['query']['bool']['must']
elif operator == "|":
return self.query['query']['bool']['should']
else:
return self.query['query']['bool']['must_not']
def _attr_name_proc(self, key):
operator, key = self._operator_proc(key)
if key in ('ci_type', 'type', '_type'):
return 'ci_type', Attribute.TEXT, operator
if key in ('id', 'ci_id', '_id'):
return 'ci_id', Attribute.TEXT, operator
attr = AttributeCache.get(key)
if attr:
return attr.name, attr.value_type, operator
else:
raise SearchError("{0} is not existed".format(key))
def _in_query_handle(self, attr, v):
terms = v[1:-1].split(";")
operator = "|"
for term in terms:
self._operator2query(operator).append({
"term": {
attr: term
}
})
@staticmethod
def _digit(s):
if s.isdigit():
return int(float(s))
return s
def _range_query_handle(self, attr, v, operator):
left, right = v.split("_TO_")
left, right = left.strip()[1:], right.strip()[:-1]
self._operator2query(operator).append({
"range": {
attr: {
"lte": self._digit(right),
"gte": self._digit(left),
"boost": 2.0
}
}
})
def _comparison_query_handle(self, attr, v, operator):
if v.startswith(">="):
_query = dict(gte=self._digit(v[2:]), boost=2.0)
elif v.startswith("<="):
_query = dict(lte=self._digit(v[2:]), boost=2.0)
elif v.startswith(">"):
_query = dict(gt=self._digit(v[1:]), boost=2.0)
elif v.startswith("<"):
_query = dict(lt=self._digit(v[1:]), boost=2.0)
else:
return
self._operator2query(operator).append({
"range": {
attr: _query
}
})
def _match_query_handle(self, attr, v, operator):
if "*" in v:
self._operator2query(operator).append({
"wildcard": {
attr: v
}
})
else:
if attr == "ci_type" and v.isdigit():
attr = "type_id"
self._operator2query(operator).append({
"term": {
attr: v
}
})
def __query_build_by_field(self, queries):
for q in queries:
if ":" in q:
k = q.split(":")[0].strip()
v = ":".join(q.split(":")[1:]).strip()
field_name, field_type, operator = self._attr_name_proc(k)
if field_name:
# in query
if v.startswith("(") and v.endswith(")"):
self._in_query_handle(field_name, v)
# range query
elif v.startswith("[") and v.endswith("]") and "_TO_" in v:
self._range_query_handle(field_name, v, operator)
# comparison query
elif v.startswith(">=") or v.startswith("<=") or v.startswith(">") or v.startswith("<"):
self._comparison_query_handle(field_name, v, operator)
else:
self._match_query_handle(field_name, v, operator)
else:
raise SearchError("argument q format invalid: {0}".format(q))
elif q:
raise SearchError("argument q format invalid: {0}".format(q))
def _query_build_raw(self):
queries = handle_arg_list(self.orig_query)
current_app.logger.debug(queries)
self.__query_build_by_field(queries)
self._paginate_build()
filter_path = self._fl_build()
self._sort_build()
self._facet_build()
return es.read(self.query, filter_path=filter_path)
def _facet_build(self):
aggregations = dict(aggs={})
for field in self.facet_field:
attr = AttributeCache.get(field)
if not attr:
raise SearchError("Facet by <{0}> does not exist".format(field))
aggregations['aggs'].update({
field: {
"terms": {
"field": "{0}.keyword".format(field)
if attr.value_type not in (Attribute.INT, Attribute.FLOAT) else field
}
}
})
if aggregations['aggs']:
self.query.update(aggregations)
def _sort_build(self):
fields = list(filter(lambda x: x != "", (self.sort or "").split(",")))
sorts = []
for field in fields:
sort_type = "asc"
if field.startswith("+"):
field = field[1:]
elif field.startswith("-"):
field = field[1:]
sort_type = "desc"
else:
field = field
if field == "ci_id":
sorts.append({field: {"order": sort_type}})
continue
attr = AttributeCache.get(field)
if not attr:
raise SearchError("Sort by <{0}> does not exist".format(field))
sort_by = "{0}.keyword".format(field) \
if attr.value_type not in (Attribute.INT, Attribute.FLOAT) else field
sorts.append({sort_by: {"order": sort_type}})
self.query.update(dict(sort=sorts))
def _paginate_build(self):
self.query.update({"from": (self.page - 1) * self.count,
"size": self.count})
def _fl_build(self):
return ['hits.hits._source.{0}'.format(i) for i in self.fl]
def search(self):
try:
numfound, cis, facet = self._query_build_raw()
except Exception as e:
current_app.logger.error(str(e))
raise SearchError("unknown search error")
total = len(cis)
counter = dict()
for ci in cis:
ci_type = ci.get("ci_type")
if ci_type not in counter.keys():
counter[ci_type] = 0
counter[ci_type] += 1
facet_ = dict()
for k in facet:
facet_[k] = [[i['key'], i['doc_count'], k] for i in facet[k]["buckets"]]
return cis, counter, total, self.page, numfound, facet_

View File

@ -43,10 +43,10 @@ class UserCRUD(object):
return User.create(**kwargs)
@staticmethod
def update(rid, **kwargs):
user = User.get_by_id(rid) or abort(404, "User <{0}> does not exist".format(rid))
def update(uid, **kwargs):
user = User.get_by(uid=uid, to_dict=False, first=True) or abort(404, "User <{0}> does not exist".format(uid))
UserCache.clean(rid)
UserCache.clean(uid)
return user.update(**kwargs)
@ -59,7 +59,7 @@ class UserCRUD(object):
@classmethod
def delete(cls, uid):
user = User.get_by_id(uid) or abort(404, "User <{0}> does not exist".format(uid))
user = User.get_by(uid=uid, to_dict=False, first=True) or abort(404, "User <{0}> does not exist".format(uid))
UserCache.clean(user)

View File

@ -2,6 +2,7 @@
import redis
import six
from elasticsearch import Elasticsearch
from flask import current_app
@ -72,3 +73,65 @@ class RedisHandler(object):
current_app.logger.warn("[{0}] is not in redis".format(key_id))
except Exception as e:
current_app.logger.error("delete redis key error, {0}".format(str(e)))
class ESHandler(object):
def __init__(self, flask_app=None):
self.flask_app = flask_app
self.es = None
self.index = "cmdb"
def init_app(self, app):
self.flask_app = app
config = self.flask_app.config
self.es = Elasticsearch(config.get("ES_HOST"))
if not self.es.indices.exists(index=self.index):
self.es.indices.create(index=self.index)
def update_mapping(self, field, value_type, other):
body = {
"properties": {
field: {"type": value_type},
}}
body['properties'][field].update(other)
self.es.indices.put_mapping(
index=self.index,
body=body
)
def get_index_id(self, ci_id):
query = {
'query': {
'match': {'ci_id': ci_id}
},
}
res = self.es.search(index=self.index, body=query)
if res['hits']['hits']:
return res['hits']['hits'][-1].get('_id')
def create(self, body):
return self.es.index(index=self.index, body=body).get("_id")
def update(self, ci_id, body):
_id = self.get_index_id(ci_id)
return self.es.index(index=self.index, id=_id, body=body).get("_id")
def delete(self, ci_id):
_id = self.get_index_id(ci_id)
self.es.delete(index=self.index, id=_id)
def read(self, query, filter_path=None):
filter_path = filter_path or []
if filter_path:
filter_path.append('hits.total')
res = self.es.search(index=self.index, body=query, filter_path=filter_path)
if res['hits'].get('hits'):
return res['hits']['total']['value'], \
[i['_source'] for i in res['hits']['hits']], \
res.get("aggregations", {})
else:
return 0, [], {}

View File

@ -75,3 +75,7 @@ DEFAULT_PAGE_COUNT = 50
# # permission
WHITE_LIST = ["127.0.0.1"]
USE_ACL = False
# # elastic search
ES_HOST = '127.0.0.1'
USE_ES = False

View File

@ -10,6 +10,7 @@ import api.lib.cmdb.ci
from api.extensions import celery
from api.extensions import db
from api.extensions import rd
from api.extensions import es
from api.lib.cmdb.const import CMDB_QUEUE
@ -20,14 +21,22 @@ def ci_cache(ci_id):
m = api.lib.cmdb.ci.CIManager()
ci = m.get_ci_by_id_from_db(ci_id, need_children=False, use_master=False)
rd.delete(ci_id)
rd.add({ci_id: json.dumps(ci)})
if current_app.config.get("USE_ES"):
es.update(ci_id, ci)
else:
rd.delete(ci_id)
rd.add({ci_id: json.dumps(ci)})
current_app.logger.info("%d caching.........." % ci_id)
current_app.logger.info("%d flush.........." % ci_id)
@celery.task(name="cmdb.ci_delete", queue=CMDB_QUEUE)
def ci_delete(ci_id):
current_app.logger.info(ci_id)
rd.delete(ci_id)
if current_app.config.get("USE_ES"):
es.delete(ci_id)
else:
rd.delete(ci_id)
current_app.logger.info("%d delete.........." % ci_id)

View File

@ -6,12 +6,55 @@ from api.lib.decorator import args_required
from api.lib.perm.acl import validate_app
from api.lib.perm.acl.resource import ResourceCRUD
from api.lib.perm.acl.resource import ResourceGroupCRUD
from api.lib.perm.acl.resource import ResourceTypeCRUD
from api.lib.utils import get_page
from api.lib.utils import get_page_size
from api.lib.utils import handle_arg_list
from api.resource import APIView
class ResourceTypeView(APIView):
url_prefix = ("/resource_types", "/resource_types/<int:type_id>")
@args_required('app_id')
@validate_app
def get(self):
page = get_page(request.values.get("page", 1))
page_size = get_page_size(request.values.get("page_size"))
q = request.values.get('q')
app_id = request.values.get('app_id')
numfound, res = ResourceTypeCRUD.search(q, app_id, page, page_size)
return self.jsonify(numfound=numfound,
page=page,
page_size=page_size,
groups=[i.to_dict() for i in res])
@args_required('name')
@args_required('app_id')
@args_required('perms')
@validate_app
def post(self):
name = request.values.get('name')
app_id = request.values.get('app_id')
perms = request.values.get('perms')
rt = ResourceTypeCRUD.add(name, app_id, perms)
return self.jsonify(rt.to_dict())
def put(self, type_id):
rt = ResourceTypeCRUD.update(type_id, **request.values)
return self.jsonify(rt.to_dict())
def delete(self, type_id):
ResourceTypeCRUD.delete(type_id)
return self.jsonify(type_id=type_id)
class ResourceView(APIView):
url_prefix = ("/resources", "/resources/<int:resource_id>")
@ -60,6 +103,7 @@ class ResourceView(APIView):
class ResourceGroupView(APIView):
url_prefix = ("/resource_groups", "/resource_groups/<int:group_id>")
@args_required('app_id')
@validate_app
def get(self):
page = get_page(request.values.get("page", 1))

View File

@ -12,8 +12,9 @@ from api.lib.cmdb.ci import CIManager
from api.lib.cmdb.const import ExistPolicy
from api.lib.cmdb.const import ResourceType, PermEnum
from api.lib.cmdb.const import RetKey
from api.lib.cmdb.search import Search
from api.lib.cmdb.search import SearchError
from api.lib.cmdb.search.db.search import Search as SearchFromDB
from api.lib.cmdb.search.es.search import Search as SearchFromES
from api.lib.cmdb.search.db.search import SearchError
from api.lib.perm.acl.acl import has_perm_from_args
from api.lib.perm.auth import auth_abandoned
from api.lib.utils import get_page
@ -144,14 +145,14 @@ class CISearchView(APIView):
sort = request.values.get("sort")
start = time.time()
s = Search(query, fl, facet, page, ret_key, count, sort)
if current_app.config.get("USE_ES"):
s = SearchFromES(query, fl, facet, page, ret_key, count, sort)
else:
s = SearchFromDB(query, fl, facet, page, ret_key, count, sort)
try:
response, counter, total, page, numfound, facet = s.search()
except SearchError as e:
return abort(400, str(e))
except Exception as e:
current_app.logger.error(str(e))
return abort(500, "search unknown error")
current_app.logger.debug("search time is :{0}".format(time.time() - start))
return self.jsonify(numfound=numfound,
total=total,

View File

@ -26,6 +26,33 @@ services:
aliases:
- redis
cmdb-search:
image: registry.cn-qingdao.aliyuncs.com/pycook/cmdb-search:1.2
# build:
# context: .
# target: cmdb-search
container_name: cmdb-search
environment:
- discovery.type=single-node
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
volumes:
- esdata:/usr/share/elasticsearch/data
ports:
- 9200:9200
networks:
new:
aliases:
- cmdb-search
cmdb-api:
image: registry.cn-qingdao.aliyuncs.com/pycook/cmdb-api:1.0
container_name: cmdb-api
@ -35,12 +62,15 @@ services:
- /bin/sh
- -c
- |
sed -i "s#USE_ES = False#USE_ES = True#g" api/settings.py
sed -i "s#ES_HOST = '127.0.0.1'#ES_HOST = cmdb-search#g" api/settings.py
gunicorn --workers=3 autoapp:app -b 0.0.0.0:5000 -D
flask init-cache
celery worker -A celery_worker.celery -E -Q cmdb_async --concurrency=1
depends_on:
- cmdb-db
- cmdb-cache
- cmdb-search
networks:
new:
aliases:
@ -70,6 +100,7 @@ services:
volumes:
db-data:
esdata:
networks:
new:

View File

@ -35,3 +35,4 @@ bs4>=0.0.1
toposort>=1.5
requests>=2.22.0
PyJWT>=1.7.1
elasticsearch ==7.0.4