# -*- coding:utf-8 -*- import copy import datetime import json import threading from flask import abort from flask import current_app from flask_login import current_user from werkzeug.exceptions import BadRequest from api.extensions import db from api.extensions import rd from api.lib.cmdb.cache import AttributeCache from api.lib.cmdb.cache import CITypeAttributesCache from api.lib.cmdb.cache import CITypeCache from api.lib.cmdb.ci_type import CITypeAttributeManager from api.lib.cmdb.ci_type import CITypeManager from api.lib.cmdb.ci_type import CITypeRelationManager from api.lib.cmdb.ci_type import CITypeUniqueConstraintManager from api.lib.cmdb.const import AttributeDefaultValueEnum from api.lib.cmdb.const import CMDB_QUEUE from api.lib.cmdb.const import ConstraintEnum from api.lib.cmdb.const import ExistPolicy from api.lib.cmdb.const import OperateType from api.lib.cmdb.const import PermEnum from api.lib.cmdb.const import REDIS_PREFIX_CI from api.lib.cmdb.const import ResourceTypeEnum from api.lib.cmdb.const import RetKey from api.lib.cmdb.const import ValueTypeEnum from api.lib.cmdb.history import AttributeHistoryManger from api.lib.cmdb.history import CIRelationHistoryManager from api.lib.cmdb.history import CITriggerHistoryManager from api.lib.cmdb.perms import CIFilterPermsCRUD from api.lib.cmdb.resp_format import ErrFormat from api.lib.cmdb.utils import TableMap from api.lib.cmdb.utils import ValueTypeMap from api.lib.cmdb.value import AttributeValueManager from api.lib.decorator import kwargs_required from api.lib.notify import notify_send from api.lib.perm.acl.acl import ACLManager from api.lib.perm.acl.acl import is_app_admin from api.lib.perm.acl.acl import validate_permission from api.lib.secrets.inner import InnerCrypt from api.lib.secrets.vault import VaultClient from api.lib.utils import Lock from api.lib.utils import handle_arg_list from api.lib.webhook import webhook_request from api.models.cmdb import AttributeHistory from api.models.cmdb import AutoDiscoveryCI from api.models.cmdb import CI from api.models.cmdb import CIRelation from api.models.cmdb import CITypeAttribute from api.models.cmdb import CITypeRelation from api.models.cmdb import CITypeTrigger from api.tasks.cmdb import ci_cache from api.tasks.cmdb import ci_delete from api.tasks.cmdb import ci_delete_trigger from api.tasks.cmdb import ci_relation_add from api.tasks.cmdb import ci_relation_cache from api.tasks.cmdb import ci_relation_delete PRIVILEGED_USERS = {"worker", "cmdb_agent", "agent"} PASSWORD_DEFAULT_SHOW = "******" class CIManager(object): """ manage CI interface """ def __init__(self): pass @staticmethod def get_by_id(ci_id): return CI.get_by_id(ci_id) @staticmethod def get_type_name(ci_id): ci = CI.get_by_id(ci_id) or abort(404, ErrFormat.ci_not_found.format("id={}".format(ci_id))) return CITypeCache.get(ci.type_id).name @staticmethod def get_type(ci_id): ci = CI.get_by_id(ci_id) or abort(404, ErrFormat.ci_not_found.format("id={}".format(ci_id))) return CITypeCache.get(ci.type_id) @staticmethod def confirm_ci_existed(ci_id): return CI.get_by_id(ci_id) or abort(404, ErrFormat.ci_not_found.format("id={}".format(ci_id))) @classmethod def get_ci_by_id(cls, ci_id, ret_key=RetKey.NAME, fields=None, need_children=True): """ :param ci_id: :param ret_key: name, id, or alias :param fields: attribute list :param need_children: :return: """ ci = CI.get_by_id(ci_id) or abort(404, ErrFormat.ci_not_found.format("id={}".format(ci_id))) res = dict() need_children and res.update(CIRelationManager.get_children(ci_id, ret_key=ret_key)) # one floor ci_type = CITypeCache.get(ci.type_id) res["ci_type"] = ci_type.name res.update(cls.get_cis_by_ids([str(ci_id)], fields=fields, ret_key=ret_key)) res['_type'] = ci_type.id res['_id'] = ci_id return res @classmethod def valid_ci_only_read(cls, ci): if is_app_admin("cmdb"): return validate_permission(CIManager.get_type_name(ci.id), ResourceTypeEnum.CI, PermEnum.READ, "cmdb") acl = ACLManager('cmdb') res = acl.get_resources(ResourceTypeEnum.CI_FILTER) if res and ci.type_id in CIFilterPermsCRUD().get_by_ids(list(map(int, [i['name'] for i in res]))): return abort(403, ErrFormat.no_permission2) @classmethod def _valid_ci_for_no_read(cls, ci, ci_type=None): type_id = ci.type_id if ci else ci_type.id acl = ACLManager('cmdb') res = acl.get_resources(ResourceTypeEnum.CI_FILTER) type2filters = CIFilterPermsCRUD().get_by_ids(list(map(int, [i['name'] for i in res])), type_id=type_id) if res and type_id in type2filters: if type2filters[type_id].get('ci_filter') and ci: from api.lib.cmdb.search import SearchError from api.lib.cmdb.search.ci import search query = "_id:{},{}".format(ci.id, type2filters[type_id].get('ci_filter')) s = search(query) try: response, _, _, _, _, _ = s.search() except SearchError as e: current_app.logger.warning(e) return abort(400, str(e)) if not response: return abort(403, ErrFormat.ci_filter_perm_ci_no_permission) return type2filters[type_id].get('attr_filter') or [] @classmethod def get_ci_by_id_from_db(cls, ci_id, ret_key=RetKey.NAME, fields=None, need_children=True, use_master=False, valid=False): """ :param ci_id: :param ret_key: name, id or alias :param fields: list :param need_children: :param use_master: whether to use master db :param valid: :return: """ ci = CI.get_by_id(ci_id) or abort(404, ErrFormat.ci_not_found.format("id={}".format(ci_id))) valid and cls.valid_ci_only_read(ci) res = dict() need_children and res.update(CIRelationManager.get_children(ci_id, ret_key=ret_key)) # one floor ci_type = CITypeCache.get(ci.type_id) res["ci_type"] = ci_type.name fields = CITypeAttributeManager.get_attr_names_by_type_id(ci.type_id) if not fields else fields unique_key = AttributeCache.get(ci_type.unique_id) _res = AttributeValueManager().get_attr_values(fields, ci_id, ret_key=ret_key, unique_key=unique_key, use_master=use_master) res.update(_res) res['_type'] = ci_type.id res['ci_type_alias'] = ci_type.alias res['_id'] = ci_id return res def get_ci_by_ids(self, ci_id_list, ret_key=RetKey.NAME, fields=None): return [self.get_ci_by_id(ci_id, ret_key=ret_key, fields=fields) for ci_id in ci_id_list] @classmethod def get_cis_by_type(cls, type_id, ret_key=RetKey.NAME, fields="", page=1, per_page=None): cis = db.session.query(CI.id).filter(CI.type_id == type_id).filter(CI.deleted.is_(False)) numfound = cis.count() cis = cis.offset((page - 1) * per_page).limit(per_page) ci_ids = [str(ci.id) for ci in cis] res = cls.get_cis_by_ids(ci_ids, ret_key, fields) return numfound, page, res @classmethod def get_ad_statistics(cls): res = CI.get_by(to_dict=False) result = dict() for i in res: result.setdefault(i.type_id, dict(total=0, auto_discovery=0)) result[i.type_id]['total'] += 1 if i.is_auto_discovery: result[i.type_id]['auto_discovery'] += 1 return result @staticmethod def ci_is_exist(unique_key, unique_value, type_id): """ :param unique_key: is a attribute :param unique_value: :param type_id: :return: """ value_table = TableMap(attr=unique_key).table unique = db.session.query(value_table).join(CI, CI.id == value_table.ci_id).filter( value_table.attr_id == unique_key.id).filter(value_table.value == unique_value).filter( CI.type_id == type_id).filter(CI.deleted.is_(False)).filter(value_table.deleted.is_(False)).first() if unique: return CI.get_by_id(unique.ci_id) @staticmethod def _delete_ci_by_id(ci_id): ci = CI.get_by_id(ci_id) ci.delete() # TODO: soft delete @staticmethod def _valid_unique_constraint(type_id, ci_dict, ci_id=None): unique_constraints = CITypeUniqueConstraintManager.get_by_type_id(type_id) if not unique_constraints: return attr_ids = [] for i in unique_constraints: attr_ids.extend(i.attr_ids) attrs = [AttributeCache.get(i) for i in set(attr_ids)] id2name = {i.id: i.name for i in attrs if i} not_existed_fields = list(set(id2name.values()) - set(ci_dict.keys())) if not_existed_fields and ci_id is not None: ci_dict = copy.deepcopy(ci_dict) ci_dict.update(AttributeValueManager().get_attr_values(not_existed_fields, ci_id)) for constraint in unique_constraints: ci_ids = None for attr_id in constraint.attr_ids: value_table = TableMap(attr_name=id2name[attr_id]).table _ci_ids = set([i.ci_id for i in value_table.get_by(attr_id=attr_id, to_dict=False, value=ci_dict.get(id2name[attr_id]) or None)]) if ci_ids is None: ci_ids = _ci_ids else: ci_ids &= _ci_ids if ci_ids - (ci_id and {ci_id} or set()): return abort(400, ErrFormat.unique_constraint.format( " - ".join([id2name[i] for i in constraint.attr_ids]))) @staticmethod def _auto_inc_id(attr): db.session.remove() value_table = TableMap(attr_name=attr.name).table with Lock("auto_inc_id_{}".format(attr.name), need_lock=True): max_v = value_table.get_by(attr_id=attr.id, only_query=True).order_by( getattr(value_table, 'value').desc()).first() if max_v is not None: return int(max_v.value) + 1 return 1 @classmethod def add(cls, ci_type_name, exist_policy=ExistPolicy.REPLACE, _no_attribute_policy=ExistPolicy.IGNORE, is_auto_discovery=False, _is_admin=False, **ci_dict): """ add ci :param ci_type_name: :param exist_policy: replace or reject or need :param _no_attribute_policy: ignore or reject :param is_auto_discovery: default is False :param _is_admin: default is False :param ci_dict: :return: """ now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') ci_type = CITypeManager.check_is_existed(ci_type_name) unique_key = AttributeCache.get(ci_type.unique_id) or abort( 400, ErrFormat.unique_value_not_found.format("unique_id={}".format(ci_type.unique_id))) unique_value = ci_dict.get(unique_key.name) or ci_dict.get(unique_key.alias) or ci_dict.get(unique_key.id) unique_value = unique_value or abort(400, ErrFormat.unique_key_required.format(unique_key.name)) attrs = CITypeAttributesCache.get2(ci_type_name) ci_type_attrs_name = {attr.name: attr for _, attr in attrs} ci_type_attrs_alias = {attr.alias: attr for _, attr in attrs} ci_attr2type_attr = {type_attr.attr_id: type_attr for type_attr, _ in attrs} ci = None record_id = None password_dict = {} need_lock = current_user.username not in current_app.config.get('PRIVILEGED_USERS', PRIVILEGED_USERS) with Lock(ci_type_name, need_lock=need_lock): existed = cls.ci_is_exist(unique_key, unique_value, ci_type.id) if existed is not None: if exist_policy == ExistPolicy.REJECT: return abort(400, ErrFormat.ci_is_already_existed) if existed.type_id != ci_type.id: existed.update(type_id=ci_type.id) ci = existed else: if exist_policy == ExistPolicy.NEED: return abort(404, ErrFormat.ci_not_found.format("{}={}".format(unique_key.name, unique_value))) limit_attrs = cls._valid_ci_for_no_read(ci, ci_type) if not _is_admin else {} if existed is None: # set default for type_attr, attr in attrs: if attr.default and attr.default.get('default') is not None: if attr.default.get('default') and attr.default.get('default') in ( AttributeDefaultValueEnum.CREATED_AT, AttributeDefaultValueEnum.UPDATED_AT): ci_dict[attr.name] = now elif attr.default.get('default') == AttributeDefaultValueEnum.AUTO_INC_ID: ci_dict[attr.name] = cls._auto_inc_id(attr) elif ((attr.name not in ci_dict and attr.alias not in ci_dict) or ( ci_dict.get(attr.name) is None and ci_dict.get(attr.alias) is None)): ci_dict[attr.name] = attr.default.get('default') if (type_attr.is_required and not attr.is_computed and (attr.name not in ci_dict and attr.alias not in ci_dict)): return abort(400, ErrFormat.attribute_value_required.format(attr.name)) else: for type_attr, attr in attrs: if attr.default and attr.default.get('default') == AttributeDefaultValueEnum.UPDATED_AT: ci_dict[attr.name] = now computed_attrs = [] for _, attr in attrs: if attr.is_computed: computed_attrs.append(attr.to_dict()) elif attr.is_password: if attr.name in ci_dict: password_dict[attr.id] = ci_dict.pop(attr.name) elif attr.alias in ci_dict: password_dict[attr.id] = ci_dict.pop(attr.alias) value_manager = AttributeValueManager() if computed_attrs: value_manager.handle_ci_compute_attributes(ci_dict, computed_attrs, ci) cls._valid_unique_constraint(ci_type.id, ci_dict, ci and ci.id) ref_ci_dict = dict() for k in ci_dict: if k.startswith("$") and "." in k: ref_ci_dict[k] = ci_dict[k] continue if k not in ci_type_attrs_name and ( k not in ci_type_attrs_alias and _no_attribute_policy == ExistPolicy.REJECT): return abort(400, ErrFormat.attribute_not_found.format(k)) if limit_attrs and ci_type_attrs_name.get(k) not in limit_attrs and ( ci_type_attrs_alias.get(k) not in limit_attrs): return abort(403, ErrFormat.ci_filter_perm_attr_no_permission.format(k)) ci_dict = {k: v for k, v in ci_dict.items() if k in ci_type_attrs_name or k in ci_type_attrs_alias} key2attr = value_manager.valid_attr_value(ci_dict, ci_type.id, ci and ci.id, ci_type_attrs_name, ci_type_attrs_alias, ci_attr2type_attr) operate_type = OperateType.UPDATE if ci is not None else OperateType.ADD try: ci = ci or CI.create(type_id=ci_type.id, is_auto_discovery=is_auto_discovery) record_id = value_manager.create_or_update_attr_value(ci, ci_dict, key2attr) except BadRequest as e: if existed is None: cls.delete(ci.id) raise e if password_dict: for attr_id in password_dict: record_id = cls.save_password(ci.id, attr_id, password_dict[attr_id], record_id, ci_type.id) if record_id: # has change ci_cache.apply_async(args=(ci.id, operate_type, record_id), queue=CMDB_QUEUE) if ref_ci_dict: # add relations ci_relation_add.apply_async(args=(ref_ci_dict, ci.id, current_user.uid), queue=CMDB_QUEUE) return ci.id def update(self, ci_id, _is_admin=False, **ci_dict): now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') ci = self.confirm_ci_existed(ci_id) attrs = CITypeAttributesCache.get2(ci.type_id) ci_type_attrs_name = {attr.name: attr for _, attr in attrs} ci_attr2type_attr = {type_attr.attr_id: type_attr for type_attr, _ in attrs} for _, attr in attrs: if attr.default and attr.default.get('default') == AttributeDefaultValueEnum.UPDATED_AT: ci_dict[attr.name] = now password_dict = dict() computed_attrs = list() for _, attr in attrs: if attr.is_computed: computed_attrs.append(attr.to_dict()) elif attr.is_password: if attr.name in ci_dict: password_dict[attr.id] = ci_dict.pop(attr.name) elif attr.alias in ci_dict: password_dict[attr.id] = ci_dict.pop(attr.alias) value_manager = AttributeValueManager() if computed_attrs: value_manager.handle_ci_compute_attributes(ci_dict, computed_attrs, ci) limit_attrs = self._valid_ci_for_no_read(ci) if not _is_admin else {} record_id = None need_lock = current_user.username not in current_app.config.get('PRIVILEGED_USERS', PRIVILEGED_USERS) with Lock(ci.ci_type.name, need_lock=need_lock): self._valid_unique_constraint(ci.type_id, ci_dict, ci_id) ci_dict = {k: v for k, v in ci_dict.items() if k in ci_type_attrs_name} key2attr = value_manager.valid_attr_value(ci_dict, ci.type_id, ci.id, ci_type_attrs_name, ci_attr2type_attr=ci_attr2type_attr) if limit_attrs: for k in ci_dict: if k not in limit_attrs: return abort(403, ErrFormat.ci_filter_perm_attr_no_permission.format(k)) try: record_id = value_manager.create_or_update_attr_value(ci, ci_dict, key2attr) except BadRequest as e: raise e if password_dict: for attr_id in password_dict: record_id = self.save_password(ci.id, attr_id, password_dict[attr_id], record_id, ci.type_id) if record_id: # has change ci_cache.apply_async(args=(ci_id, OperateType.UPDATE, record_id), queue=CMDB_QUEUE) ref_ci_dict = {k: v for k, v in ci_dict.items() if k.startswith("$") and "." in k} if ref_ci_dict: ci_relation_add.apply_async(args=(ref_ci_dict, ci.id), queue=CMDB_QUEUE) @staticmethod def update_unique_value(ci_id, unique_name, unique_value): ci = CI.get_by_id(ci_id) or abort(404, ErrFormat.ci_not_found.format("id={}".format(ci_id))) key2attr = {unique_name: AttributeCache.get(unique_name)} record_id = AttributeValueManager().create_or_update_attr_value(ci, {unique_name: unique_value}, key2attr) ci_cache.apply_async(args=(ci_id, OperateType.UPDATE, record_id), queue=CMDB_QUEUE) @classmethod def delete(cls, ci_id): ci = CI.get_by_id(ci_id) or abort(404, ErrFormat.ci_not_found.format("id={}".format(ci_id))) cls._valid_ci_for_no_read(ci) ci_dict = cls.get_cis_by_ids([ci_id]) ci_dict = ci_dict and ci_dict[0] if ci_dict: triggers = CITriggerManager.get(ci_dict['_type']) for trigger in triggers: option = trigger['option'] if not option.get('enable') or option.get('action') != OperateType.DELETE: continue if option.get('filter') and not CITriggerManager.ci_filter(ci_dict.get('_id'), option['filter']): continue ci_delete_trigger.apply_async(args=(trigger, OperateType.DELETE, ci_dict), queue=CMDB_QUEUE) attrs = CITypeAttribute.get_by(type_id=ci.type_id, to_dict=False) attr_names = set([AttributeCache.get(attr.attr_id).name for attr in attrs]) for attr_name in attr_names: value_table = TableMap(attr_name=attr_name).table for item in value_table.get_by(ci_id=ci_id, to_dict=False): item.delete(commit=False) for item in CIRelation.get_by(first_ci_id=ci_id, to_dict=False): ci_relation_delete.apply_async(args=(item.first_ci_id, item.second_ci_id), queue=CMDB_QUEUE) item.delete(commit=False) for item in CIRelation.get_by(second_ci_id=ci_id, to_dict=False): ci_relation_delete.apply_async(args=(item.first_ci_id, item.second_ci_id), queue=CMDB_QUEUE) item.delete(commit=False) ad_ci = AutoDiscoveryCI.get_by(ci_id=ci_id, to_dict=False, first=True) ad_ci and ad_ci.update(is_accept=False, accept_by=None, accept_time=None, filter_none=False, commit=False) ci.delete(commit=False) # TODO: soft delete db.session.commit() if ci_dict: AttributeHistoryManger.add(None, ci_id, [(None, OperateType.DELETE, ci_dict, None)], ci.type_id) ci_delete.apply_async(args=(ci_id,), queue=CMDB_QUEUE) return ci_id @staticmethod def add_heartbeat(ci_type, unique_value): ci_type = CITypeManager().check_is_existed(ci_type) unique_key = AttributeCache.get(ci_type.unique_id) value_table = TableMap(attr=unique_key).table v = (value_table.get_by(attr_id=unique_key.id, value=unique_value, to_dict=False, first=True) or abort(404, ErrFormat.not_found)) ci = CI.get_by_id(v.ci_id) or abort(404, ErrFormat.ci_not_found.format("id={}".format(v.ci_id))) ci.update(heartbeat=datetime.datetime.now()) @classmethod @kwargs_required("type_id", "page") def get_heartbeat(cls, **kwargs): query = db.session.query(CI.id, CI.heartbeat).filter(CI.deleted.is_(False)) expire = datetime.datetime.now() - datetime.timedelta(minutes=72) type_ids = handle_arg_list(kwargs["type_id"]) query = query.filter(CI.type_id.in_(type_ids)) page = kwargs.get("page") agent_status = kwargs.get("agent_status") if agent_status == -1: query = query.filter(CI.heartbeat.is_(None)) elif agent_status == 0: query = query.filter(CI.heartbeat <= expire) elif agent_status == 1: query = query.filter(CI.heartbeat > expire) numfound = query.count() per_page_count = current_app.config.get("DEFAULT_PAGE_COUNT") cis = query.offset((page - 1) * per_page_count).limit(per_page_count).all() ci_ids = [ci.id for ci in cis] heartbeat_dict = {} for ci in cis: if agent_status is not None: heartbeat_dict[ci.id] = agent_status else: if ci.heartbeat is None: heartbeat_dict[ci.id] = -1 elif ci.heartbeat <= expire: heartbeat_dict[ci.id] = 0 else: heartbeat_dict[ci.id] = 1 current_app.logger.debug(heartbeat_dict) ci_ids = list(map(str, ci_ids)) res = cls.get_cis_by_ids(ci_ids, fields=["hostname", "private_ip"]) result = [(i.get("hostname"), i.get("private_ip")[0], i.get("ci_type"), heartbeat_dict.get(i.get("_id"))) for i in res if i.get("private_ip")] return numfound, result @staticmethod def _get_cis_from_cache(ci_ids, ret_key=RetKey.NAME, fields=None, unique_required=False, excludes=None): res = rd.get(ci_ids, REDIS_PREFIX_CI) if res is not None and None not in res and ret_key == RetKey.NAME: res = list(map(json.loads, res)) if not fields and not excludes: return res elif fields: _res = [] for d in res: _d = dict() _d["_id"], _d["_type"] = d.get("_id"), d.get("_type") _d["ci_type"] = d.get("ci_type") if unique_required: _d[d.get('unique')] = d.get(d.get('unique')) for field in fields + ['ci_type_alias', 'unique', 'unique_alias']: _d[field] = d.get(field) _res.append(_d) return _res else: excludes = set(excludes) for i in res: for e in excludes: i.pop(e, None) return res @staticmethod def _get_cis_from_db(ci_ids, ret_key=RetKey.NAME, fields=None, value_tables=None, excludes=None): from api.lib.cmdb.search.ci.db.query_sql import QUERY_CIS_BY_IDS from api.lib.cmdb.search.ci.db.query_sql import QUERY_CIS_BY_VALUE_TABLE if not fields: filter_fields_sql = "" else: _fields = list() for field in fields: attr = AttributeCache.get(field) if attr is not None and not attr.is_password: _fields.append(str(attr.id)) filter_fields_sql = "WHERE A.attr_id in ({0})".format(",".join(_fields)) ci2pos = {int(_id): _pos for _pos, _id in enumerate(ci_ids)} res = [None] * len(ci_ids) ci_ids = ",".join(map(str, ci_ids)) if value_tables is None: value_tables = ValueTypeMap.table_name.values() value_sql = " UNION ".join([QUERY_CIS_BY_VALUE_TABLE.format(value_table, ci_ids) for value_table in value_tables]) query_sql = QUERY_CIS_BY_IDS.format(filter_fields_sql, value_sql) # current_app.logger.debug(query_sql) cis = db.session.execute(query_sql).fetchall() ci_set = set() ci_dict = dict() unique_id2obj = dict() excludes = excludes and set(excludes) for ci_id, type_id, attr_id, attr_name, attr_alias, value, value_type, is_list, is_password in cis: if not fields and excludes and (attr_name in excludes or attr_alias in excludes): continue if ci_id not in ci_set: ci_dict = dict() ci_type = CITypeCache.get(type_id) ci_dict["_id"] = ci_id ci_dict["_type"] = type_id ci_dict["ci_type"] = ci_type.name ci_dict["ci_type_alias"] = ci_type.alias if ci_type.unique_id not in unique_id2obj: unique_id2obj[ci_type.unique_id] = AttributeCache.get(ci_type.unique_id) ci_dict["unique"] = unique_id2obj[ci_type.unique_id] and unique_id2obj[ci_type.unique_id].name ci_dict["unique_alias"] = unique_id2obj[ci_type.unique_id] and unique_id2obj[ci_type.unique_id].alias ci_set.add(ci_id) res[ci2pos[ci_id]] = ci_dict if ret_key == RetKey.NAME: attr_key = attr_name elif ret_key == RetKey.ALIAS: attr_key = attr_alias elif ret_key == RetKey.ID: attr_key = attr_id else: return abort(400, ErrFormat.argument_invalid.format("ret_key")) if is_password and value: ci_dict[attr_key] = PASSWORD_DEFAULT_SHOW else: value = ValueTypeMap.serialize2[value_type](value) if is_list: ci_dict.setdefault(attr_key, []).append(value) else: ci_dict[attr_key] = value return res @classmethod def get_cis_by_ids(cls, ci_ids, ret_key=RetKey.NAME, fields=None, value_tables=None, unique_required=False, excludes=None): """ :param ci_ids: list of CI instance ID, eg. ['1', '2'] :param ret_key: name, id or alias :param fields: :param value_tables: :param unique_required: :param excludes: exclude field list :return: """ if not ci_ids: return [] fields = [] if fields is None or not isinstance(fields, list) else fields ci_id_tuple = tuple(map(int, ci_ids)) res = cls._get_cis_from_cache(ci_id_tuple, ret_key, fields, unique_required, excludes=excludes) if res is not None: return res current_app.logger.warning("cache not hit...............") return cls._get_cis_from_db(ci_ids, ret_key, fields, value_tables, excludes=excludes) @classmethod def save_password(cls, ci_id, attr_id, value, record_id, type_id): changed = None encrypt_value = None value_table = ValueTypeMap.table[ValueTypeEnum.PASSWORD] if current_app.config.get('SECRETS_ENGINE') == 'inner': if value: encrypt_value, status = InnerCrypt().encrypt(value) if not status: current_app.logger.error('save password failed: {}'.format(encrypt_value)) return abort(400, ErrFormat.password_save_failed.format(encrypt_value)) else: encrypt_value = PASSWORD_DEFAULT_SHOW existed = value_table.get_by(ci_id=ci_id, attr_id=attr_id, first=True, to_dict=False) if existed is None: if value: value_table.create(ci_id=ci_id, attr_id=attr_id, value=encrypt_value) changed = [(ci_id, attr_id, OperateType.ADD, '', PASSWORD_DEFAULT_SHOW, type_id)] elif existed.value != encrypt_value: if value: existed.update(ci_id=ci_id, attr_id=attr_id, value=encrypt_value) changed = [(ci_id, attr_id, OperateType.UPDATE, PASSWORD_DEFAULT_SHOW, PASSWORD_DEFAULT_SHOW, type_id)] else: existed.delete() changed = [(ci_id, attr_id, OperateType.DELETE, PASSWORD_DEFAULT_SHOW, '', type_id)] if current_app.config.get('SECRETS_ENGINE') == 'vault': vault = VaultClient(current_app.config.get('VAULT_URL'), current_app.config.get('VAULT_TOKEN')) if value: try: vault.update("/{}/{}".format(ci_id, attr_id), dict(v=value)) except Exception as e: current_app.logger.error('save password to vault failed: {}'.format(e)) return abort(400, ErrFormat.password_save_failed.format('write vault failed')) else: try: vault.delete("/{}/{}".format(ci_id, attr_id)) except Exception as e: current_app.logger.warning('delete password to vault failed: {}'.format(e)) if changed is not None: return AttributeValueManager.write_change2(changed, record_id) @classmethod def load_password(cls, ci_id, attr_id): ci = CI.get_by_id(ci_id) or abort(404, ErrFormat.ci_not_found.format(ci_id)) limit_attrs = cls._valid_ci_for_no_read(ci, ci.ci_type) if limit_attrs: attr = AttributeCache.get(attr_id) if attr and attr.name not in limit_attrs: return abort(403, ErrFormat.no_permission2) if current_app.config.get('SECRETS_ENGINE', 'inner') == 'inner': value_table = ValueTypeMap.table[ValueTypeEnum.PASSWORD] v = value_table.get_by(ci_id=ci_id, attr_id=attr_id, first=True, to_dict=False) v = v and v.value if not v: return decrypt_value, status = InnerCrypt().decrypt(v) if not status: current_app.logger.error('load password failed: {}'.format(decrypt_value)) return abort(400, ErrFormat.password_load_failed.format(decrypt_value)) return decrypt_value elif current_app.config.get('SECRETS_ENGINE') == 'vault': vault = VaultClient(current_app.config.get('VAULT_URL'), current_app.config.get('VAULT_TOKEN')) data, status = vault.read("/{}/{}".format(ci_id, attr_id)) if not status: current_app.logger.error('read password from vault failed: {}'.format(data)) return abort(400, ErrFormat.password_load_failed.format(data)) return data.get('v') class CIRelationManager(object): """ Manage relation between CIs """ def __init__(self): pass @classmethod def get_children(cls, ci_id, ret_key=RetKey.NAME): second_cis = CIRelation.get_by(first_ci_id=ci_id, to_dict=False) second_ci_ids = (second_ci.second_ci_id for second_ci in second_cis) ci_type2ci_ids = dict() for ci_id in second_ci_ids: type_id = CI.get_by_id(ci_id).type_id ci_type2ci_ids.setdefault(type_id, []).append(ci_id) res = {} for type_id in ci_type2ci_ids: ci_type = CITypeCache.get(type_id) children = CIManager.get_cis_by_ids(list(map(str, ci_type2ci_ids[type_id])), ret_key=ret_key) res[ci_type.name] = children return res @staticmethod def get_second_cis(first_ci_id, relation_type_id=None, page=1, per_page=None): second_cis = db.session.query(CI.id).filter(CI.deleted.is_(False)).join( CIRelation, CIRelation.second_ci_id == CI.id).filter( CIRelation.first_ci_id == first_ci_id).filter(CIRelation.deleted.is_(False)) if relation_type_id is not None: second_cis = second_cis.filter(CIRelation.relation_type_id == relation_type_id) numfound = second_cis.count() if per_page != "all": second_cis = second_cis.offset((page - 1) * per_page).limit(per_page).all() ci_ids = [str(son.id) for son in second_cis] result = CIManager.get_cis_by_ids(ci_ids) return numfound, len(ci_ids), result @staticmethod def _sort_handler(sort_by, query_sql): if sort_by.startswith("+"): sort_type = "asc" sort_by = sort_by[1:] elif sort_by.startswith("-"): sort_type = "desc" sort_by = sort_by[1:] else: sort_type = "asc" attr = AttributeCache.get(sort_by) if attr is None: return query_sql attr_id = attr.id value_table = TableMap(attr_name=sort_by).table ci_table = query_sql.subquery() query_sql = db.session.query(ci_table.c.id, value_table.value).join( value_table, value_table.ci_id == ci_table.c.id).filter( value_table.attr_id == attr_id).filter(ci_table.deleted.is_(False)).order_by( getattr(value_table.value, sort_type)()) return query_sql @classmethod def get_first_cis(cls, second_ci, relation_type_id=None, page=1, per_page=None): first_cis = db.session.query(CIRelation.first_ci_id).filter( CIRelation.second_ci_id == second_ci).filter(CIRelation.deleted.is_(False)) if relation_type_id is not None: first_cis = first_cis.filter(CIRelation.relation_type_id == relation_type_id) numfound = first_cis.count() if per_page != "all": first_cis = first_cis.offset((page - 1) * per_page).limit(per_page).all() first_ci_ids = [str(first_ci.first_ci_id) for first_ci in first_cis] result = CIManager.get_cis_by_ids(first_ci_ids) return numfound, len(first_ci_ids), result @classmethod def get_ancestor_ids(cls, ci_ids, level=1): for _ in range(level): cis = db.session.query(CIRelation.first_ci_id).filter( CIRelation.second_ci_id.in_(ci_ids)).filter(CIRelation.deleted.is_(False)) ci_ids = [i.first_ci_id for i in cis] return ci_ids @staticmethod def _check_constraint(first_ci_id, first_type_id, second_ci_id, second_type_id, type_relation): db.session.remove() if type_relation.constraint == ConstraintEnum.Many2Many: return first_existed = CIRelation.get_by(first_ci_id=first_ci_id, relation_type_id=type_relation.relation_type_id, to_dict=False) second_existed = CIRelation.get_by(second_ci_id=second_ci_id, relation_type_id=type_relation.relation_type_id, to_dict=False) if type_relation.constraint == ConstraintEnum.One2One: for i in first_existed: if i.second_ci.type_id == second_type_id: return abort(400, ErrFormat.relation_constraint.format("1-1")) for i in second_existed: if i.first_ci.type_id == first_type_id: return abort(400, ErrFormat.relation_constraint.format("1-1")) if type_relation.constraint == ConstraintEnum.One2Many: for i in second_existed: if i.first_ci.type_id == first_type_id: return abort(400, ErrFormat.relation_constraint.format("1-N")) @classmethod def add(cls, first_ci_id, second_ci_id, more=None, relation_type_id=None): first_ci = CIManager.confirm_ci_existed(first_ci_id) second_ci = CIManager.confirm_ci_existed(second_ci_id) existed = CIRelation.get_by(first_ci_id=first_ci_id, second_ci_id=second_ci_id, to_dict=False, first=True) if existed is not None: if existed.relation_type_id != relation_type_id and relation_type_id is not None: existed.update(relation_type_id=relation_type_id) CIRelationHistoryManager().add(existed, OperateType.UPDATE) else: if relation_type_id is None: type_relation = CITypeRelation.get_by(parent_id=first_ci.type_id, child_id=second_ci.type_id, first=True, to_dict=False) relation_type_id = type_relation and type_relation.relation_type_id relation_type_id or abort(404, ErrFormat.relation_not_found.format("{} -> {}".format( first_ci.ci_type.name, second_ci.ci_type.name))) if current_app.config.get('USE_ACL'): resource_name = CITypeRelationManager.acl_resource_name(first_ci.ci_type.name, second_ci.ci_type.name) if not ACLManager().has_permission( resource_name, ResourceTypeEnum.CI_TYPE_RELATION, PermEnum.ADD): return abort(403, ErrFormat.no_permission.format(resource_name, PermEnum.ADD)) else: type_relation = CITypeRelation.get_by_id(relation_type_id) with Lock("ci_relation_add_{}_{}".format(first_ci.type_id, second_ci.type_id), need_lock=True): cls._check_constraint(first_ci_id, first_ci.type_id, second_ci_id, second_ci.type_id, type_relation) existed = CIRelation.create(first_ci_id=first_ci_id, second_ci_id=second_ci_id, relation_type_id=relation_type_id) CIRelationHistoryManager().add(existed, OperateType.ADD) ci_relation_cache.apply_async(args=(first_ci_id, second_ci_id), queue=CMDB_QUEUE) if more is not None: existed.upadte(more=more) return existed.id @staticmethod def delete(cr_id): cr = CIRelation.get_by_id(cr_id) or abort(404, ErrFormat.relation_not_found.format("id={}".format(cr_id))) if current_app.config.get('USE_ACL'): resource_name = CITypeRelationManager.acl_resource_name(cr.first_ci.ci_type.name, cr.second_ci.ci_type.name) if not ACLManager().has_permission( resource_name, ResourceTypeEnum.CI_TYPE_RELATION, PermEnum.DELETE): return abort(403, ErrFormat.no_permission.format(resource_name, PermEnum.DELETE)) cr.delete() his_manager = CIRelationHistoryManager() his_manager.add(cr, operate_type=OperateType.DELETE) ci_relation_delete.apply_async(args=(cr.first_ci_id, cr.second_ci_id), queue=CMDB_QUEUE) return cr_id @classmethod def delete_2(cls, first_ci_id, second_ci_id): cr = CIRelation.get_by(first_ci_id=first_ci_id, second_ci_id=second_ci_id, to_dict=False, first=True) ci_relation_delete.apply_async(args=(first_ci_id, second_ci_id), queue=CMDB_QUEUE) return cls.delete(cr.id) @classmethod def batch_update(cls, ci_ids, parents, children): """ only for many to one :param ci_ids: :param parents: :param children: :return: """ if isinstance(parents, list): for parent_id in parents: for ci_id in ci_ids: cls.add(parent_id, ci_id) if isinstance(children, list): for child_id in children: for ci_id in ci_ids: cls.add(ci_id, child_id) @classmethod def batch_delete(cls, ci_ids, parents): """ only for many to one :param ci_ids: :param parents: :return: """ if isinstance(parents, list): for parent_id in parents: for ci_id in ci_ids: cls.delete_2(parent_id, ci_id) class CITriggerManager(object): @staticmethod def get(type_id): db.session.remove() return CITypeTrigger.get_by(type_id=type_id, to_dict=True) @staticmethod def _update_old_attr_value(record_id, ci_dict): attr_history = AttributeHistory.get_by(record_id=record_id, to_dict=False) attr_dict = dict() for attr_h in attr_history: attr_dict['old_{}'.format(AttributeCache.get(attr_h.attr_id).name)] = attr_h.old ci_dict.update({'old_{}'.format(k): ci_dict[k] for k in ci_dict}) ci_dict.update(attr_dict) @classmethod def _exec_webhook(cls, operate_type, webhook, ci_dict, trigger_id, trigger_name, record_id, ci_id=None, app=None): app = app or current_app with app.app_context(): if operate_type == OperateType.UPDATE: cls._update_old_attr_value(record_id, ci_dict) if ci_id is not None: ci_dict = CIManager().get_ci_by_id_from_db(ci_id, need_children=False, use_master=False) try: response = webhook_request(webhook, ci_dict).text is_ok = True except Exception as e: current_app.logger.warning("exec webhook failed: {}".format(e)) response = e is_ok = False CITriggerHistoryManager.add(operate_type, record_id, ci_dict.get('_id'), trigger_id, trigger_name, is_ok=is_ok, webhook=response) return is_ok @classmethod def _exec_notify(cls, operate_type, notify, ci_dict, trigger_id, trigger_name, record_id, ci_id=None, app=None): app = app or current_app with app.app_context(): if ci_id is not None: ci_dict = CIManager().get_ci_by_id_from_db(ci_id, need_children=False, use_master=False) if operate_type == OperateType.UPDATE: cls._update_old_attr_value(record_id, ci_dict) is_ok = True response = '' for method in (notify.get('method') or []): try: res = notify_send(notify.get('subject'), notify.get('body'), [method], notify.get('tos'), ci_dict) response = "{}\n{}".format(response, res) except Exception as e: current_app.logger.warning("send notify failed: {}".format(e)) response = "{}\n{}".format(response, e) is_ok = False CITriggerHistoryManager.add(operate_type, record_id, ci_dict.get('_id'), trigger_id, trigger_name, is_ok=is_ok, notify=response.strip()) return is_ok @staticmethod def ci_filter(ci_id, other_filter): from api.lib.cmdb.search import SearchError from api.lib.cmdb.search.ci import search query = "{},_id:{}".format(other_filter, ci_id) try: _, _, _, _, numfound, _ = search(query).search() return numfound except SearchError as e: current_app.logger.warning("ci search failed: {}".format(e)) @classmethod def fire(cls, operate_type, ci_dict, record_id): type_id = ci_dict.get('_type') triggers = cls.get(type_id) or [] for trigger in triggers: option = trigger['option'] if not option.get('enable'): continue if option.get('filter') and not cls.ci_filter(ci_dict.get('_id'), option['filter']): continue if option.get('attr_ids') and isinstance(option['attr_ids'], list): if not (set(option['attr_ids']) & set([i.attr_id for i in AttributeHistory.get_by(record_id=record_id, to_dict=False)])): continue if option.get('action') == operate_type: cls.fire_by_trigger(trigger, operate_type, ci_dict, record_id) @classmethod def fire_by_trigger(cls, trigger, operate_type, ci_dict, record_id=None): option = trigger['option'] if option.get('webhooks'): cls._exec_webhook(operate_type, option['webhooks'], ci_dict, trigger['id'], option.get('name'), record_id) elif option.get('notifies'): cls._exec_notify(operate_type, option['notifies'], ci_dict, trigger['id'], option.get('name'), record_id) @classmethod def waiting_cis(cls, trigger): now = datetime.datetime.today() config = trigger.option.get('notifies') or {} delta_time = datetime.timedelta(days=(config.get('before_days', 0) or 0)) attr = AttributeCache.get(trigger.attr_id) value_table = TableMap(attr=attr).table values = value_table.get_by(attr_id=attr.id, to_dict=False) result = [] for v in values: if (isinstance(v.value, (datetime.date, datetime.datetime)) and (v.value - delta_time).strftime('%Y%m%d') == now.strftime("%Y%m%d")): if trigger.option.get('filter') and not cls.ci_filter(v.ci_id, trigger.option['filter']): continue result.append(v) return result @classmethod def trigger_notify(cls, trigger, ci): """ only for date attribute :param trigger: :param ci: :return: """ if (trigger.option.get('notifies', {}).get('notify_at') == datetime.datetime.now().strftime("%H:%M") or not trigger.option.get('notifies', {}).get('notify_at')): if trigger.option.get('webhooks'): threading.Thread(target=cls._exec_webhook, args=( None, trigger.option['webhooks'], None, trigger.id, trigger.option.get('name'), None, ci.ci_id, current_app._get_current_object())).start() elif trigger.option.get('notifies'): threading.Thread(target=cls._exec_notify, args=( None, trigger.option['notifies'], None, trigger.id, trigger.option.get('name'), None, ci.ci_id, current_app._get_current_object())).start() return True return False