refactor: CI triggers

This commit is contained in:
pycook
2023-09-22 17:39:54 +08:00
parent 6e94c72031
commit d8399f8723
12 changed files with 431 additions and 128 deletions

View File

@@ -4,6 +4,7 @@
import copy
import datetime
import json
import threading
from flask import abort
from flask import current_app
@@ -24,27 +25,33 @@ from api.lib.cmdb.const import CMDB_QUEUE
from api.lib.cmdb.const import ConstraintEnum
from api.lib.cmdb.const import ExistPolicy
from api.lib.cmdb.const import OperateType
from api.lib.cmdb.const import PermEnum, ResourceTypeEnum
from api.lib.cmdb.const import PermEnum
from api.lib.cmdb.const import REDIS_PREFIX_CI
from api.lib.cmdb.const import ResourceTypeEnum
from api.lib.cmdb.const import RetKey
from api.lib.cmdb.history import AttributeHistoryManger
from api.lib.cmdb.history import CIRelationHistoryManager
from api.lib.cmdb.history import CITriggerHistoryManager
from api.lib.cmdb.perms import CIFilterPermsCRUD
from api.lib.cmdb.resp_format import ErrFormat
from api.lib.cmdb.utils import TableMap
from api.lib.cmdb.utils import ValueTypeMap
from api.lib.cmdb.value import AttributeValueManager
from api.lib.decorator import kwargs_required
from api.lib.notify import notify_send
from api.lib.perm.acl.acl import ACLManager
from api.lib.perm.acl.acl import is_app_admin
from api.lib.perm.acl.acl import validate_permission
from api.lib.utils import Lock
from api.lib.utils import handle_arg_list
from api.lib.webhook import webhook_request
from api.models.cmdb import AttributeHistory
from api.models.cmdb import AutoDiscoveryCI
from api.models.cmdb import CI
from api.models.cmdb import CIRelation
from api.models.cmdb import CITypeAttribute
from api.models.cmdb import CITypeRelation
from api.models.cmdb import CITypeTrigger
from api.tasks.cmdb import ci_cache
from api.tasks.cmdb import ci_delete
from api.tasks.cmdb import ci_relation_add
@@ -378,16 +385,17 @@ class CIManager(object):
key2attr = value_manager.valid_attr_value(ci_dict, ci_type.id, ci and ci.id,
ci_type_attrs_name, ci_type_attrs_alias, ci_attr2type_attr)
operate_type = OperateType.UPDATE if ci is not None else OperateType.ADD
try:
ci = ci or CI.create(type_id=ci_type.id, is_auto_discovery=is_auto_discovery)
record_id = value_manager.create_or_update_attr_value2(ci, ci_dict, key2attr)
record_id = value_manager.create_or_update_attr_value(ci, ci_dict, key2attr)
except BadRequest as e:
if existed is None:
cls.delete(ci.id)
raise e
if record_id: # has change
ci_cache.apply_async([ci.id], queue=CMDB_QUEUE)
ci_cache.apply_async(args=(ci.id, operate_type, record_id), queue=CMDB_QUEUE)
if ref_ci_dict: # add relations
ci_relation_add.apply_async(args=(ref_ci_dict, ci.id, current_user.uid), queue=CMDB_QUEUE)
@@ -427,12 +435,12 @@ class CIManager(object):
return abort(403, ErrFormat.ci_filter_perm_attr_no_permission.format(k))
try:
record_id = value_manager.create_or_update_attr_value2(ci, ci_dict, key2attr)
record_id = value_manager.create_or_update_attr_value(ci, ci_dict, key2attr)
except BadRequest as e:
raise e
if record_id: # has change
ci_cache.apply_async([ci_id], queue=CMDB_QUEUE)
ci_cache.apply_async(args=(ci_id, OperateType.UPDATE, record_id), queue=CMDB_QUEUE)
ref_ci_dict = {k: v for k, v in ci_dict.items() if k.startswith("$") and "." in k}
if ref_ci_dict:
@@ -442,9 +450,10 @@ class CIManager(object):
def update_unique_value(ci_id, unique_name, unique_value):
ci = CI.get_by_id(ci_id) or abort(404, ErrFormat.ci_not_found.format("id={}".format(ci_id)))
AttributeValueManager().create_or_update_attr_value(unique_name, unique_value, ci)
key2attr = {unique_name: AttributeCache.get(unique_name)}
record_id = AttributeValueManager().create_or_update_attr_value(ci, {unique_name: unique_value}, key2attr)
ci_cache.apply_async([ci_id], queue=CMDB_QUEUE)
ci_cache.apply_async(args=(ci_id, OperateType.UPDATE, record_id), queue=CMDB_QUEUE)
@classmethod
def delete(cls, ci_id):
@@ -477,9 +486,9 @@ class CIManager(object):
db.session.commit()
AttributeHistoryManger.add(None, ci_id, [(None, OperateType.DELETE, ci_dict, None)], ci.type_id)
record_id = AttributeHistoryManger.add(None, ci_id, [(None, OperateType.DELETE, ci_dict, None)], ci.type_id)
ci_delete.apply_async([ci.id], queue=CMDB_QUEUE)
ci_delete.apply_async(args=(ci_dict, OperateType.DELETE, record_id), queue=CMDB_QUEUE)
return ci_id
@@ -896,3 +905,128 @@ class CIRelationManager(object):
for parent_id in parents:
for ci_id in ci_ids:
cls.delete_2(parent_id, ci_id)
class CITriggerManager(object):
@staticmethod
def get(type_id):
return CITypeTrigger.get_by(type_id=type_id, to_dict=False)
@staticmethod
def _exec_webhook(operate_type, webhook, ci_dict, trigger_id, record_id):
try:
response = webhook_request(webhook, ci_dict).text
is_ok = True
except Exception as e:
current_app.logger.warning("exec webhook failed: {}".format(e))
response = e
is_ok = False
CITriggerHistoryManager.add(operate_type,
record_id,
ci_dict.get('_id'),
trigger_id,
is_ok=is_ok,
webhook=response)
return is_ok
@staticmethod
def _exec_notify(operate_type, notify, ci_dict, trigger_id, record_id, ci_id=None):
if ci_id is not None:
ci_dict = CIManager().get_ci_by_id_from_db(ci_id, need_children=False, use_master=False)
try:
response = notify_send(notify.get('subject'), notify.get('body'), notify.get('tos'), ci_dict)
is_ok = True
except Exception as e:
current_app.logger.warning("send notify failed: {}".format(e))
response = e
is_ok = False
CITriggerHistoryManager.add(operate_type,
record_id,
ci_dict.get('_id'),
trigger_id,
is_ok=is_ok,
notify=response)
return is_ok
@staticmethod
def ci_filter(ci_id, other_filter):
from api.lib.cmdb.search import SearchError
from api.lib.cmdb.search.ci import search
query = "_id:{},{}".format(ci_id, other_filter)
try:
_, _, _, _, numfound, _ = search(query).search()
return numfound
except SearchError as e:
current_app.logger.warning("ci search failed: {}".format(e))
@classmethod
def fire(cls, operate_type, ci_dict, record_id):
type_id = ci_dict.get('_type')
triggers = cls.get(type_id) or []
for trigger in triggers:
if not trigger.option.get('enable'):
continue
if trigger.option.get('filter') and not cls.ci_filter(ci_dict.get('_id'), trigger.option['filter']):
continue
if trigger.option.get('attr_ids') and isinstance(trigger.option['attr_ids'], list):
if not (set(trigger.option['attr_ids']) &
set([i.attr_id for i in AttributeHistory.get_by(record_id=record_id, to_dict=False)])):
continue
if trigger.option.get('action') == operate_type:
if trigger.option.get('webhooks'):
cls._exec_webhook(operate_type, trigger.option['webhooks'], ci_dict, trigger.id, record_id)
elif trigger.option.get('notifies'):
cls._exec_notify(operate_type, trigger.option['notifies'], ci_dict, trigger.id, record_id)
@classmethod
def waiting_cis(cls, trigger):
now = datetime.datetime.today()
delta_time = datetime.timedelta(days=(trigger.option.get('before_days', 0) or 0))
attr = AttributeCache.get(trigger.attr_id)
value_table = TableMap(attr=attr).table
values = value_table.get_by(attr_id=attr.id, to_dict=False)
result = []
for v in values:
if (isinstance(v.value, (datetime.date, datetime.datetime)) and
(v.value - delta_time).strftime('%Y%m%d') == now.strftime("%Y%m%d")):
if trigger.option.get('filter') and not cls.ci_filter(v.ci_id, trigger.option['filter']):
continue
result.append(v)
return result
@classmethod
def trigger_notify(cls, trigger, ci):
"""
only for date attribute
:param trigger:
:param ci:
:return:
"""
if (trigger.notify.get('notify_at') == datetime.datetime.now().strftime("%H:%M") or
not trigger.option.get('notify_at')):
threading.Thread(target=cls._exec_notify, args=(
None, trigger.option['notifies'], None, trigger.id, None, ci.id)).start()
return True
return False

View File

@@ -1166,16 +1166,18 @@ class CITypeUniqueConstraintManager(object):
class CITypeTriggerManager(object):
@staticmethod
def get(type_id):
return CITypeTrigger.get_by(type_id=type_id, to_dict=True)
def get(type_id, to_dict=True):
return CITypeTrigger.get_by(type_id=type_id, to_dict=to_dict)
@staticmethod
def add(type_id, attr_id, notify):
CITypeTrigger.get_by(type_id=type_id, attr_id=attr_id) and abort(400, ErrFormat.ci_type_trigger_duplicate)
def add(type_id, attr_id, option):
for i in CITypeTrigger.get_by(type_id=type_id, attr_id=attr_id, to_dict=False):
if i.option == option:
return abort(400, ErrFormat.ci_type_trigger_duplicate)
not isinstance(notify, dict) and abort(400, ErrFormat.argument_invalid.format("notify"))
not isinstance(option, dict) and abort(400, ErrFormat.argument_invalid.format("option"))
trigger = CITypeTrigger.create(type_id=type_id, attr_id=attr_id, notify=notify)
trigger = CITypeTrigger.create(type_id=type_id, attr_id=attr_id, option=option)
CITypeHistoryManager.add(CITypeOperateType.ADD_TRIGGER,
type_id,
@@ -1185,12 +1187,12 @@ class CITypeTriggerManager(object):
return trigger.to_dict()
@staticmethod
def update(_id, notify):
def update(_id, option):
existed = (CITypeTrigger.get_by_id(_id) or
abort(404, ErrFormat.ci_type_trigger_not_found.format("id={}".format(_id))))
existed2 = existed.to_dict()
new = existed.update(notify=notify)
new = existed.update(option=option)
CITypeHistoryManager.add(CITypeOperateType.UPDATE_TRIGGER,
existed.type_id,
@@ -1210,35 +1212,3 @@ class CITypeTriggerManager(object):
existed.type_id,
trigger_id=_id,
change=existed.to_dict())
@staticmethod
def waiting_cis(trigger):
now = datetime.datetime.today()
delta_time = datetime.timedelta(days=(trigger.notify.get('before_days', 0) or 0))
attr = AttributeCache.get(trigger.attr_id)
value_table = TableMap(attr=attr).table
values = value_table.get_by(attr_id=attr.id, to_dict=False)
result = []
for v in values:
if (isinstance(v.value, (datetime.date, datetime.datetime)) and
(v.value - delta_time).strftime('%Y%m%d') == now.strftime("%Y%m%d")):
result.append(v)
return result
@staticmethod
def trigger_notify(trigger, ci):
if (trigger.notify.get('notify_at') == datetime.datetime.now().strftime("%H:%M") or
not trigger.notify.get('notify_at')):
from api.tasks.cmdb import trigger_notify
trigger_notify.apply_async(args=(trigger.notify, ci.ci_id), queue=CMDB_QUEUE)
return True
return False

View File

@@ -16,6 +16,7 @@ from api.lib.perm.acl.cache import UserCache
from api.models.cmdb import Attribute
from api.models.cmdb import AttributeHistory
from api.models.cmdb import CIRelationHistory
from api.models.cmdb import CITriggerHistory
from api.models.cmdb import CITypeHistory
from api.models.cmdb import CITypeTrigger
from api.models.cmdb import CITypeUniqueConstraint
@@ -286,3 +287,67 @@ class CITypeHistoryManager(object):
change=change)
CITypeHistory.create(**payload)
class CITriggerHistoryManager(object):
@staticmethod
def get(page, page_size, type_id=None, trigger_id=None, operate_type=None):
query = CITriggerHistory.get_by(only_query=True)
if type_id is not None:
query = query.filter(CITriggerHistory.type_id == type_id)
if trigger_id:
query = query.filter(CITriggerHistory.trigger_id == trigger_id)
if operate_type is not None:
query = query.filter(CITriggerHistory.operate_type == operate_type)
numfound = query.count()
query = query.order_by(CITriggerHistory.id.desc())
result = query.offset((page - 1) * page_size).limit(page_size)
result = [i.to_dict() for i in result]
for res in result:
if res.get('trigger_id'):
trigger = CITypeTrigger.get_by_id(res['trigger_id'])
res['trigger'] = trigger and trigger.to_dict()
return numfound, result
@staticmethod
def get_by_ci_id(ci_id):
res = db.session.query(CITriggerHistory, CITypeTrigger, OperationRecord).join(
CITypeTrigger, CITypeTrigger.id == CITriggerHistory.trigger_id).join(
OperationRecord, OperationRecord.id == CITriggerHistory.record_id).filter(
CITriggerHistory.ci_id == ci_id).order_by(CITriggerHistory.id.desc())
result = []
id2trigger = dict()
for i in res:
hist = i.CITriggerHistory
record = i.OperationRecord
item = dict(is_ok=hist.is_ok,
operate_type=hist.operate_type,
notify=hist.notify,
webhook=hist.webhook,
created_at=record.created_at.strftime('%Y-%m-%d %H:%M:%S'),
record_id=record.id,
hid=hist.id
)
if i.CITypeTrigger.id not in id2trigger:
id2trigger[i.CITypeTrigger.id] = i.CITypeTrigger.to_dict()
result.append(item)
return dict(items=result, id2trigger=id2trigger)
@staticmethod
def add(operate_type, record_id, ci_id, trigger_id, is_ok=False, notify=None, webhook=None):
CITriggerHistory.create(operate_type=operate_type,
record_id=record_id,
ci_id=ci_id,
trigger_id=trigger_id,
is_ok=is_ok,
notify=notify,
webhook=webhook)

View File

@@ -18,7 +18,6 @@ from api.extensions import db
from api.lib.cmdb.attribute import AttributeManager
from api.lib.cmdb.cache import AttributeCache
from api.lib.cmdb.cache import CITypeAttributeCache
from api.lib.cmdb.const import ExistPolicy
from api.lib.cmdb.const import OperateType
from api.lib.cmdb.const import ValueTypeEnum
from api.lib.cmdb.history import AttributeHistoryManger
@@ -140,6 +139,7 @@ class AttributeValueManager(object):
try:
db.session.commit()
except Exception as e:
db.session.rollback()
current_app.logger.error("write change failed: {}".format(str(e)))
return record_id
@@ -235,7 +235,7 @@ class AttributeValueManager(object):
return key2attr
def create_or_update_attr_value2(self, ci, ci_dict, key2attr):
def create_or_update_attr_value(self, ci, ci_dict, key2attr):
"""
add or update attribute value, then write history
:param ci: instance object
@@ -288,66 +288,6 @@ class AttributeValueManager(object):
return self._write_change2(changed)
def create_or_update_attr_value(self, key, value, ci, _no_attribute_policy=ExistPolicy.IGNORE, record_id=None):
"""
add or update attribute value, then write history
:param key: id, name or alias
:param value:
:param ci: instance object
:param _no_attribute_policy: ignore or reject
:param record_id: op record
:return:
"""
attr = self._get_attr(key)
if attr is None:
if _no_attribute_policy == ExistPolicy.IGNORE:
return
if _no_attribute_policy == ExistPolicy.REJECT:
return abort(400, ErrFormat.attribute_not_found.format(key))
value_table = TableMap(attr=attr).table
try:
if attr.is_list:
value_list = [self._validate(attr, i, value_table, ci) for i in handle_arg_list(value)]
if not value_list:
self._check_is_required(ci.type_id, attr, '')
existed_attrs = value_table.get_by(attr_id=attr.id, ci_id=ci.id, to_dict=False)
existed_values = [i.value for i in existed_attrs]
added = set(value_list) - set(existed_values)
deleted = set(existed_values) - set(value_list)
for v in added:
value_table.create(ci_id=ci.id, attr_id=attr.id, value=v)
record_id = self._write_change(ci.id, attr.id, OperateType.ADD, None, v, record_id, ci.type_id)
for v in deleted:
existed_attr = existed_attrs[existed_values.index(v)]
existed_attr.delete()
record_id = self._write_change(ci.id, attr.id, OperateType.DELETE, v, None, record_id, ci.type_id)
else:
value = self._validate(attr, value, value_table, ci)
existed_attr = value_table.get_by(attr_id=attr.id, ci_id=ci.id, first=True, to_dict=False)
existed_value = existed_attr and existed_attr.value
if existed_value is None and value is not None:
value_table.create(ci_id=ci.id, attr_id=attr.id, value=value)
record_id = self._write_change(ci.id, attr.id, OperateType.ADD, None, value, record_id, ci.type_id)
else:
if existed_value != value:
if value is None:
existed_attr.delete()
else:
existed_attr.update(value=value)
record_id = self._write_change(ci.id, attr.id, OperateType.UPDATE,
existed_value, value, record_id, ci.type_id)
return record_id
except Exception as e:
current_app.logger.warning(str(e))
return abort(400, ErrFormat.attribute_value_invalid2.format("{}({})".format(attr.alias, attr.name), value))
@staticmethod
def delete_attr_value(attr_id, ci_id):
attr = AttributeCache.get(attr_id)