feat: ci triggers

This commit is contained in:
pycook
2023-09-26 21:18:34 +08:00
parent 983cb4041c
commit 373346e71e
13 changed files with 550 additions and 111 deletions

View File

@@ -54,6 +54,7 @@ from api.models.cmdb import CITypeRelation
from api.models.cmdb import CITypeTrigger
from api.tasks.cmdb import ci_cache
from api.tasks.cmdb import ci_delete
from api.tasks.cmdb import ci_delete_trigger
from api.tasks.cmdb import ci_relation_add
from api.tasks.cmdb import ci_relation_cache
from api.tasks.cmdb import ci_relation_delete
@@ -464,6 +465,17 @@ class CIManager(object):
ci_dict = cls.get_cis_by_ids([ci_id])
ci_dict = ci_dict and ci_dict[0]
triggers = CITriggerManager.get(ci_dict['_type'])
for trigger in triggers:
option = trigger['option']
if not option.get('enable') or option.get('action') != OperateType.DELETE:
continue
if option.get('filter') and not CITriggerManager.ci_filter(ci_dict.get('_id'), option['filter']):
continue
ci_delete_trigger.apply_async(args=(trigger, OperateType.DELETE, ci_dict), queue=CMDB_QUEUE)
attrs = CITypeAttribute.get_by(type_id=ci.type_id, to_dict=False)
attr_names = set([AttributeCache.get(attr.attr_id).name for attr in attrs])
for attr_name in attr_names:
@@ -486,9 +498,9 @@ class CIManager(object):
db.session.commit()
record_id = AttributeHistoryManger.add(None, ci_id, [(None, OperateType.DELETE, ci_dict, None)], ci.type_id)
AttributeHistoryManger.add(None, ci_id, [(None, OperateType.DELETE, ci_dict, None)], ci.type_id)
ci_delete.apply_async(args=(ci_dict, OperateType.DELETE, record_id), queue=CMDB_QUEUE)
ci_delete.apply_async(args=(ci_id,), queue=CMDB_QUEUE)
return ci_id
@@ -910,56 +922,89 @@ class CIRelationManager(object):
class CITriggerManager(object):
@staticmethod
def get(type_id):
return CITypeTrigger.get_by(type_id=type_id, to_dict=False)
db.session.remove()
return CITypeTrigger.get_by(type_id=type_id, to_dict=True)
@staticmethod
def _exec_webhook(operate_type, webhook, ci_dict, trigger_id, record_id):
try:
response = webhook_request(webhook, ci_dict).text
def _update_old_attr_value(record_id, ci_dict):
attr_history = AttributeHistory.get_by(record_id=record_id, to_dict=False)
attr_dict = dict()
for attr_h in attr_history:
attr_dict['old_{}'.format(AttributeCache.get(attr_h.attr_id).name)] = attr_h.old
ci_dict.update({'old_{}'.format(k): ci_dict[k] for k in ci_dict})
ci_dict.update(attr_dict)
@classmethod
def _exec_webhook(cls, operate_type, webhook, ci_dict, trigger_id, trigger_name, record_id, ci_id=None, app=None):
app = app or current_app
with app.app_context():
if operate_type == OperateType.UPDATE:
cls._update_old_attr_value(record_id, ci_dict)
if ci_id is not None:
ci_dict = CIManager().get_ci_by_id_from_db(ci_id, need_children=False, use_master=False)
try:
response = webhook_request(webhook, ci_dict).text
is_ok = True
except Exception as e:
current_app.logger.warning("exec webhook failed: {}".format(e))
response = e
is_ok = False
CITriggerHistoryManager.add(operate_type,
record_id,
ci_dict.get('_id'),
trigger_id,
trigger_name,
is_ok=is_ok,
webhook=response)
return is_ok
@classmethod
def _exec_notify(cls, operate_type, notify, ci_dict, trigger_id, trigger_name, record_id, ci_id=None, app=None):
app = app or current_app
with app.app_context():
if ci_id is not None:
ci_dict = CIManager().get_ci_by_id_from_db(ci_id, need_children=False, use_master=False)
if operate_type == OperateType.UPDATE:
cls._update_old_attr_value(record_id, ci_dict)
is_ok = True
except Exception as e:
current_app.logger.warning("exec webhook failed: {}".format(e))
response = e
is_ok = False
response = ''
for method in (notify.get('method') or []):
try:
res = notify_send(notify.get('subject'), notify.get('body'), [method],
notify.get('tos'), ci_dict)
response = "{}\n{}".format(response, res)
except Exception as e:
current_app.logger.warning("send notify failed: {}".format(e))
response = "{}\n{}".format(response, e)
is_ok = False
CITriggerHistoryManager.add(operate_type,
record_id,
ci_dict.get('_id'),
trigger_id,
is_ok=is_ok,
webhook=response)
CITriggerHistoryManager.add(operate_type,
record_id,
ci_dict.get('_id'),
trigger_id,
trigger_name,
is_ok=is_ok,
notify=response.strip())
return is_ok
@staticmethod
def _exec_notify(operate_type, notify, ci_dict, trigger_id, record_id, ci_id=None):
if ci_id is not None:
ci_dict = CIManager().get_ci_by_id_from_db(ci_id, need_children=False, use_master=False)
try:
response = notify_send(notify.get('subject'), notify.get('body'), notify.get('tos'), ci_dict)
is_ok = True
except Exception as e:
current_app.logger.warning("send notify failed: {}".format(e))
response = e
is_ok = False
CITriggerHistoryManager.add(operate_type,
record_id,
ci_dict.get('_id'),
trigger_id,
is_ok=is_ok,
notify=response)
return is_ok
return is_ok
@staticmethod
def ci_filter(ci_id, other_filter):
from api.lib.cmdb.search import SearchError
from api.lib.cmdb.search.ci import search
query = "_id:{},{}".format(ci_id, other_filter)
query = "{},_id:{}".format(other_filter, ci_id)
try:
_, _, _, _, numfound, _ = search(query).search()
@@ -973,28 +1018,40 @@ class CITriggerManager(object):
triggers = cls.get(type_id) or []
for trigger in triggers:
if not trigger.option.get('enable'):
option = trigger['option']
if not option.get('enable'):
continue
if trigger.option.get('filter') and not cls.ci_filter(ci_dict.get('_id'), trigger.option['filter']):
if option.get('filter') and not cls.ci_filter(ci_dict.get('_id'), option['filter']):
continue
if trigger.option.get('attr_ids') and isinstance(trigger.option['attr_ids'], list):
if not (set(trigger.option['attr_ids']) &
if option.get('attr_ids') and isinstance(option['attr_ids'], list):
if not (set(option['attr_ids']) &
set([i.attr_id for i in AttributeHistory.get_by(record_id=record_id, to_dict=False)])):
continue
if trigger.option.get('action') == operate_type:
if trigger.option.get('webhooks'):
cls._exec_webhook(operate_type, trigger.option['webhooks'], ci_dict, trigger.id, record_id)
elif trigger.option.get('notifies'):
cls._exec_notify(operate_type, trigger.option['notifies'], ci_dict, trigger.id, record_id)
if option.get('action') == operate_type:
cls.fire_by_trigger(trigger, operate_type, ci_dict, record_id)
@classmethod
def fire_by_trigger(cls, trigger, operate_type, ci_dict, record_id=None):
option = trigger['option']
if option.get('webhooks'):
cls._exec_webhook(operate_type, option['webhooks'], ci_dict, trigger['id'],
option.get('name'), record_id)
elif option.get('notifies'):
cls._exec_notify(operate_type, option['notifies'], ci_dict, trigger['id'],
option.get('name'), record_id)
@classmethod
def waiting_cis(cls, trigger):
now = datetime.datetime.today()
delta_time = datetime.timedelta(days=(trigger.option.get('before_days', 0) or 0))
config = trigger.option.get('notifies') or {}
delta_time = datetime.timedelta(days=(config.get('before_days', 0) or 0))
attr = AttributeCache.get(trigger.attr_id)
@@ -1022,10 +1079,17 @@ class CITriggerManager(object):
:param ci:
:return:
"""
if (trigger.notify.get('notify_at') == datetime.datetime.now().strftime("%H:%M") or
not trigger.option.get('notify_at')):
threading.Thread(target=cls._exec_notify, args=(
None, trigger.option['notifies'], None, trigger.id, None, ci.id)).start()
if (trigger.option.get('notifies', {}).get('notify_at') == datetime.datetime.now().strftime("%H:%M") or
not trigger.option.get('notifies', {}).get('notify_at')):
if trigger.option.get('webhooks'):
threading.Thread(target=cls._exec_webhook, args=(
None, trigger.option['webhooks'], None, trigger.id, trigger.option.get('name'), None, ci.ci_id,
current_app._get_current_object())).start()
elif trigger.option.get('notifies'):
threading.Thread(target=cls._exec_notify, args=(
None, trigger.option['notifies'], None, trigger.id, trigger.option.get('name'), None, ci.ci_id,
current_app._get_current_object())).start()
return True

View File

@@ -1187,12 +1187,12 @@ class CITypeTriggerManager(object):
return trigger.to_dict()
@staticmethod
def update(_id, option):
def update(_id, attr_id, option):
existed = (CITypeTrigger.get_by_id(_id) or
abort(404, ErrFormat.ci_type_trigger_not_found.format("id={}".format(_id))))
existed2 = existed.to_dict()
new = existed.update(option=option)
new = existed.update(attr_id=attr_id or None, option=option, filter_none=False)
CITypeHistoryManager.add(CITypeOperateType.UPDATE_TRIGGER,
existed.type_id,

View File

@@ -293,13 +293,13 @@ class CITriggerHistoryManager(object):
@staticmethod
def get(page, page_size, type_id=None, trigger_id=None, operate_type=None):
query = CITriggerHistory.get_by(only_query=True)
if type_id is not None:
if type_id:
query = query.filter(CITriggerHistory.type_id == type_id)
if trigger_id:
query = query.filter(CITriggerHistory.trigger_id == trigger_id)
if operate_type is not None:
if operate_type:
query = query.filter(CITriggerHistory.operate_type == operate_type)
numfound = query.count()
@@ -316,22 +316,22 @@ class CITriggerHistoryManager(object):
@staticmethod
def get_by_ci_id(ci_id):
res = db.session.query(CITriggerHistory, CITypeTrigger, OperationRecord).join(
CITypeTrigger, CITypeTrigger.id == CITriggerHistory.trigger_id).join(
OperationRecord, OperationRecord.id == CITriggerHistory.record_id).filter(
res = db.session.query(CITriggerHistory, CITypeTrigger).join(
CITypeTrigger, CITypeTrigger.id == CITriggerHistory.trigger_id).filter(
CITriggerHistory.ci_id == ci_id).order_by(CITriggerHistory.id.desc())
result = []
id2trigger = dict()
for i in res:
hist = i.CITriggerHistory
record = i.OperationRecord
item = dict(is_ok=hist.is_ok,
operate_type=hist.operate_type,
notify=hist.notify,
trigger_id=hist.trigger_id,
trigger_name=hist.trigger_name,
webhook=hist.webhook,
created_at=record.created_at.strftime('%Y-%m-%d %H:%M:%S'),
record_id=record.id,
created_at=hist.created_at.strftime('%Y-%m-%d %H:%M:%S'),
record_id=hist.record_id,
hid=hist.id
)
if i.CITypeTrigger.id not in id2trigger:
@@ -342,12 +342,13 @@ class CITriggerHistoryManager(object):
return dict(items=result, id2trigger=id2trigger)
@staticmethod
def add(operate_type, record_id, ci_id, trigger_id, is_ok=False, notify=None, webhook=None):
def add(operate_type, record_id, ci_id, trigger_id, trigger_name, is_ok=False, notify=None, webhook=None):
CITriggerHistory.create(operate_type=operate_type,
record_id=record_id,
ci_id=ci_id,
trigger_id=trigger_id,
trigger_name=trigger_name,
is_ok=is_ok,
notify=notify,
webhook=webhook)

View File

@@ -5,23 +5,32 @@ import json
import requests
from flask import current_app
from jinja2 import Template
from markdownify import markdownify as md
from api.lib.mail import send_mail
def _request_messenger(subject, body, tos, sender):
def _request_messenger(subject, body, tos, sender, payload):
params = dict(sender=sender, title=subject,
tos=[to[sender] for to in tos if to.get(sender)])
if not params['tos']:
raise Exception("no receivers")
params['tos'] = [Template(i).render(payload) for i in params['tos'] if i.strip()]
if sender == "email":
params['msgtype'] = 'text/html'
params['content'] = body
else:
params['msgtype'] = 'text'
params['content'] = json.dumps(dict(content=subject or body))
params['msgtype'] = 'markdown'
try:
content = md("{}\n{}".format(subject or '', body or ''))
except Exception as e:
current_app.logger.warning("html2markdown failed: {}".format(e))
content = "{}\n{}".format(subject or '', body or '')
params['content'] = json.dumps(dict(content=content))
resp = requests.post(current_app.config.get('MESSENGER_URL'), json=params)
if resp.status_code != 200:
@@ -32,14 +41,15 @@ def _request_messenger(subject, body, tos, sender):
def notify_send(subject, body, methods, tos, payload=None):
payload = payload or {}
payload = {k: v or '' for k, v in payload.items()}
subject = Template(subject).render(payload)
body = Template(body).render(payload)
res = ''
for method in methods:
if method == "email" and not current_app.config.get('USE_MESSENGER', True):
send_mail(None, [to.get('email') for to in tos], subject, body)
send_mail(None, [Template(to.get('email')).render(payload) for to in tos], subject, body)
res += _request_messenger(subject, body, tos, method) + "\n"
res += (_request_messenger(subject, body, tos, method, payload) + "\n")
return res

View File

@@ -82,12 +82,16 @@ def webhook_request(webhook, payload):
"""
assert webhook.get('url') is not None
payload = {k: v or '' for k, v in payload.items()}
url = Template(webhook['url']).render(payload)
params = webhook.get('parameters') or None
if isinstance(params, dict):
params = json.loads(Template(json.dumps(params)).render(payload))
headers = json.loads(Template(json.dumps(webhook.get('headers') or {})).render(payload))
data = Template(json.dumps(webhook.get('body', ''))).render(payload)
auth = _wrap_auth(**webhook.get('authorization', {}))
@@ -99,7 +103,7 @@ def webhook_request(webhook, payload):
return request(
url,
params=params,
headers=webhook.get('headers') or None,
headers=headers or None,
data=data,
auth=auth
)