elastic search [done]

This commit is contained in:
pycook 2019-11-19 18:16:31 +08:00
parent 7ae58611c0
commit 2ce8a6bdbf
8 changed files with 163 additions and 39 deletions

View File

@ -36,3 +36,9 @@ RUN pip install --no-cache-dir -r docs/requirements.txt \
&& sed -i "s/127.0.0.1/redis/g" api/settings.py && sed -i "s/127.0.0.1/redis/g" api/settings.py
CMD ["bash", "-c", "flask run"] CMD ["bash", "-c", "flask run"]
# ================================= Search ================================
FROM docker.elastic.co/elasticsearch/elasticsearch:7.4.2 AS cmdb-search
RUN yes | ./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.4.2/elasticsearch-analysis-ik-7.4.2.zip

View File

@ -9,7 +9,6 @@ from flask.cli import with_appcontext
import api.lib.cmdb.ci import api.lib.cmdb.ci
from api.extensions import db from api.extensions import db
from api.extensions import es
from api.extensions import rd from api.extensions import rd
from api.models.cmdb import CI from api.models.cmdb import CI
@ -19,6 +18,29 @@ from api.models.cmdb import CI
def init_cache(): def init_cache():
db.session.remove() db.session.remove()
if current_app.config.get("USE_ES"):
from api.extensions import es
from api.models.cmdb import Attribute
from api.lib.cmdb.const import type_map
attributes = Attribute.get_by(to_dict=False)
for attr in attributes:
other = dict()
other['index'] = True if attr.is_index else False
if attr.value_type == Attribute.TEXT:
other['analyzer'] = 'ik_max_word'
other['search_analyzer'] = 'ik_smart'
if attr.is_index:
other["fields"] = {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
try:
es.update_mapping(attr.name, type_map['es_type'][attr.value_type], other)
except Exception as e:
print(e)
cis = CI.get_by(to_dict=False) cis = CI.get_by(to_dict=False)
for ci in cis: for ci in cis:
if current_app.config.get("USE_ES"): if current_app.config.get("USE_ES"):

View File

@ -96,9 +96,8 @@ class AttributeManager(object):
name = kwargs.pop("name") name = kwargs.pop("name")
alias = kwargs.pop("alias", "") alias = kwargs.pop("alias", "")
alias = name if not alias else alias alias = name if not alias else alias
Attribute.get_by(name=name, first=True) and abort(400, "attribute name <{0}> is already existed".format(name)) Attribute.get_by(name=name, first=True) and abort(400, "attribute name <{0}> is duplicated".format(name))
Attribute.get_by(alias=alias, first=True) and abort(400, Attribute.get_by(alias=alias, first=True) and abort(400, "attribute alias <{0}> is duplicated".format(name))
"attribute alias <{0}> is already existed".format(name))
attr = Attribute.create(flush=True, attr = Attribute.create(flush=True,
name=name, name=name,
@ -118,6 +117,22 @@ class AttributeManager(object):
AttributeCache.clean(attr) AttributeCache.clean(attr)
if current_app.config.get("USE_ES"):
from api.extensions import es
other = dict()
other['index'] = True if attr.is_index else False
if attr.value_type == Attribute.TEXT:
other['analyzer'] = 'ik_max_word'
other['search_analyzer'] = 'ik_smart'
if attr.is_index:
other["fields"] = {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
es.update_mapping(name, type_map['es_type'][attr.value_type], other)
return attr.id return attr.id
def update(self, _id, **kwargs): def update(self, _id, **kwargs):

View File

@ -346,8 +346,8 @@ class CIManager(object):
if ci_id not in ci_set: if ci_id not in ci_set:
ci_dict = dict() ci_dict = dict()
ci_type = CITypeCache.get(type_id) ci_type = CITypeCache.get(type_id)
ci_dict["_id"] = ci_id ci_dict["ci_id"] = ci_id
ci_dict["_type"] = type_id ci_dict["ci_type"] = type_id
ci_dict["ci_type"] = ci_type.name ci_dict["ci_type"] = ci_type.name
ci_dict["ci_type_alias"] = ci_type.alias ci_dict["ci_type_alias"] = ci_type.alias
ci_set.add(ci_id) ci_set.add(ci_id)

View File

@ -93,6 +93,14 @@ type_map = {
'index_{0}'.format(Attribute.DATE): 'c_value_index_datetime', 'index_{0}'.format(Attribute.DATE): 'c_value_index_datetime',
'index_{0}'.format(Attribute.TIME): 'c_value_index_texts', 'index_{0}'.format(Attribute.TIME): 'c_value_index_texts',
'index_{0}'.format(Attribute.FLOAT): 'c_value_index_floats', 'index_{0}'.format(Attribute.FLOAT): 'c_value_index_floats',
},
'es_type': {
Attribute.INT: 'long',
Attribute.TEXT: 'text',
Attribute.DATETIME: 'text',
Attribute.DATE: 'text',
Attribute.TIME: 'text',
Attribute.FLOAT: 'float'
} }
} }

View File

@ -28,9 +28,9 @@ class Search(object):
self.page = page self.page = page
self.ret_key = ret_key self.ret_key = ret_key
self.count = count or current_app.config.get("DEFAULT_PAGE_COUNT") self.count = count or current_app.config.get("DEFAULT_PAGE_COUNT")
self.sort = sort self.sort = sort or "ci_id"
self.query = dict(filter=dict(bool=dict(should=[], must=[], must_not=[]))) self.query = dict(query=dict(bool=dict(should=[], must=[], must_not=[])))
@staticmethod @staticmethod
def _operator_proc(key): def _operator_proc(key):
@ -91,21 +91,22 @@ class Search(object):
self._operator2query(operator).append({ self._operator2query(operator).append({
"range": { "range": {
attr: { attr: {
"to": self._digit(right), "lte": self._digit(right),
"from": self._digit(left), "gte": self._digit(left),
"boost": 2.0
} }
} }
}) })
def _comparison_query_handle(self, attr, v, operator): def _comparison_query_handle(self, attr, v, operator):
if v.startswith(">="): if v.startswith(">="):
_query = dict(gte=self._digit(v[2:])) _query = dict(gte=self._digit(v[2:]), boost=2.0)
elif v.startswith("<="): elif v.startswith("<="):
_query = dict(lte=self._digit(v[2:])) _query = dict(lte=self._digit(v[2:]), boost=2.0)
elif v.startswith(">"): elif v.startswith(">"):
_query = dict(gt=self._digit(v[1:])) _query = dict(gt=self._digit(v[1:]), boost=2.0)
elif v.startswith("<"): elif v.startswith("<"):
_query = dict(lt=self._digit(v[1:])) _query = dict(lt=self._digit(v[1:]), boost=2.0)
else: else:
return return
@ -123,6 +124,8 @@ class Search(object):
} }
}) })
else: else:
if attr == "ci_type" and v.isdigit():
attr = "type_id"
self._operator2query(operator).append({ self._operator2query(operator).append({
"term": { "term": {
attr: v attr: v
@ -164,23 +167,32 @@ class Search(object):
filter_path = self._fl_build() filter_path = self._fl_build()
sort = self._sort_build() self._sort_build()
return es.read(self.query, filter_path=filter_path, sort=sort) self._facet_build()
return es.read(self.query, filter_path=filter_path)
def _facet_build(self): def _facet_build(self):
return { aggregations = dict(aggs={})
"aggs": { for field in self.facet_field:
self.facet_field: { attr = AttributeCache.get(field)
"cardinality": { if not attr:
"field": self.facet_field raise SearchError("Facet by <{0}> does not exist".format(field))
} aggregations['aggs'].update({
} field: {
"terms": {
"field": "{0}.keyword".format(field)
if attr.value_type not in (Attribute.INT, Attribute.FLOAT) else field
} }
} }
})
if aggregations['aggs']:
self.query.update(aggregations)
def _sort_build(self): def _sort_build(self):
fields = list(filter(lambda x: x != "", self.sort or "")) fields = list(filter(lambda x: x != "", (self.sort or "").split(",")))
sorts = [] sorts = []
for field in fields: for field in fields:
sort_type = "asc" sort_type = "asc"
@ -190,10 +202,20 @@ class Search(object):
field = field[1:] field = field[1:]
sort_type = "desc" sort_type = "desc"
else: else:
continue field = field
if field == "ci_id":
sorts.append({field: {"order": sort_type}}) sorts.append({field: {"order": sort_type}})
continue
return sorts attr = AttributeCache.get(field)
if not attr:
raise SearchError("Sort by <{0}> does not exist".format(field))
sort_by = "{0}.keyword".format(field) \
if attr.value_type not in (Attribute.INT, Attribute.FLOAT) else field
sorts.append({sort_by: {"order": sort_type}})
self.query.update(dict(sort=sorts))
def _paginate_build(self): def _paginate_build(self):
self.query.update({"from": (self.page - 1) * self.count, self.query.update({"from": (self.page - 1) * self.count,
@ -204,16 +226,22 @@ class Search(object):
def search(self): def search(self):
try: try:
numfound, cis = self._query_build_raw() numfound, cis, facet = self._query_build_raw()
except Exception as e: except Exception as e:
current_app.logger.error(str(e)) current_app.logger.error(str(e))
raise SearchError("unknown search error") raise SearchError("unknown search error")
if self.facet_field and numfound:
facet = self._facet_build()
else:
facet = dict()
total = len(cis) total = len(cis)
return cis, {}, total, self.page, numfound, facet counter = dict()
for ci in cis:
ci_type = ci.get("ci_type")
if ci_type not in counter.keys():
counter[ci_type] = 0
counter[ci_type] += 1
facet_ = dict()
for k in facet:
facet_[k] = [[i['key'], i['doc_count'], k] for i in facet[k]["buckets"]]
return cis, counter, total, self.page, numfound, facet_

View File

@ -88,6 +88,18 @@ class ESHandler(object):
if not self.es.indices.exists(index=self.index): if not self.es.indices.exists(index=self.index):
self.es.indices.create(index=self.index) self.es.indices.create(index=self.index)
def update_mapping(self, field, value_type, other):
body = {
"properties": {
field: {"type": value_type},
}}
body['properties'][field].update(other)
self.es.indices.put_mapping(
index=self.index,
body=body
)
def get_index_id(self, ci_id): def get_index_id(self, ci_id):
query = { query = {
'query': { 'query': {
@ -111,13 +123,15 @@ class ESHandler(object):
self.es.delete(index=self.index, id=_id) self.es.delete(index=self.index, id=_id)
def read(self, query, filter_path=None, sort=None): def read(self, query, filter_path=None):
filter_path = filter_path or [] filter_path = filter_path or []
if filter_path: if filter_path:
filter_path.append('hits.total') filter_path.append('hits.total')
if sort:
query.update(dict(sort=sort))
res = self.es.search(index=self.index, body=query, filter_path=filter_path) res = self.es.search(index=self.index, body=query, filter_path=filter_path)
return res['hits']['total']['value'], [i['_source'] for i in res['hits']['hits']] if res['hits'].get('hits'):
return res['hits']['total']['value'], \
[i['_source'] for i in res['hits']['hits']], \
res.get("aggregations", {})
else:
return 0, [], {}

View File

@ -26,6 +26,33 @@ services:
aliases: aliases:
- redis - redis
cmdb-search:
image: registry.cn-qingdao.aliyuncs.com/pycook/cmdb-search:1.2
# build:
# context: .
# target: cmdb-search
container_name: cmdb-search
environment:
- discovery.type=single-node
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
volumes:
- esdata:/usr/share/elasticsearch/data
ports:
- 9200:9200
networks:
new:
aliases:
- cmdb-search
cmdb-api: cmdb-api:
image: registry.cn-qingdao.aliyuncs.com/pycook/cmdb-api:1.0 image: registry.cn-qingdao.aliyuncs.com/pycook/cmdb-api:1.0
container_name: cmdb-api container_name: cmdb-api
@ -35,12 +62,15 @@ services:
- /bin/sh - /bin/sh
- -c - -c
- | - |
sed -i "s#USE_ES = False#USE_ES = True#g" api/settings.py
sed -i "s#ES_HOST = '127.0.0.1'#ES_HOST = cmdb-search#g" api/settings.py
gunicorn --workers=3 autoapp:app -b 0.0.0.0:5000 -D gunicorn --workers=3 autoapp:app -b 0.0.0.0:5000 -D
flask init-cache flask init-cache
celery worker -A celery_worker.celery -E -Q cmdb_async --concurrency=1 celery worker -A celery_worker.celery -E -Q cmdb_async --concurrency=1
depends_on: depends_on:
- cmdb-db - cmdb-db
- cmdb-cache - cmdb-cache
- cmdb-search
networks: networks:
new: new:
aliases: aliases:
@ -70,6 +100,7 @@ services:
volumes: volumes:
db-data: db-data:
esdata:
networks: networks:
new: new: