mirror of https://github.com/veops/cmdb.git
feat(api): encrypting webhook configurations (#255)
This commit is contained in:
parent
07412169b3
commit
a50f6d4fe4
|
@ -10,14 +10,18 @@ from api.lib.exception import CommitException
|
|||
|
||||
class FormatMixin(object):
|
||||
def to_dict(self):
|
||||
res = dict([(k, getattr(self, k) if not isinstance(
|
||||
getattr(self, k), (datetime.datetime, datetime.date, datetime.time)) else str(
|
||||
getattr(self, k))) for k in getattr(self, "__mapper__").c.keys()])
|
||||
# FIXME: getattr(cls, "__table__").columns k.name
|
||||
res = dict()
|
||||
for k in getattr(self, "__mapper__").c.keys():
|
||||
if k in {'password', '_password', 'secret', '_secret'}:
|
||||
continue
|
||||
|
||||
res.pop('password', None)
|
||||
res.pop('_password', None)
|
||||
res.pop('secret', None)
|
||||
if k.startswith('_'):
|
||||
k = k[1:]
|
||||
|
||||
if not isinstance(getattr(self, k), (datetime.datetime, datetime.date, datetime.time)):
|
||||
res[k] = getattr(self, k)
|
||||
else:
|
||||
res[k] = str(getattr(self, k))
|
||||
|
||||
return res
|
||||
|
||||
|
|
|
@ -12,6 +12,9 @@ from Crypto.Cipher import AES
|
|||
from elasticsearch import Elasticsearch
|
||||
from flask import current_app
|
||||
|
||||
from api.lib.secrets.inner import InnerCrypt
|
||||
from api.lib.secrets.inner import KeyManage
|
||||
|
||||
|
||||
class BaseEnum(object):
|
||||
_ALL_ = set() # type: Set[str]
|
||||
|
@ -286,3 +289,33 @@ class AESCrypto(object):
|
|||
text_decrypted = cipher.decrypt(encode_bytes)
|
||||
|
||||
return cls.unpad(text_decrypted).decode('utf8')
|
||||
|
||||
|
||||
class Crypto(AESCrypto):
|
||||
@classmethod
|
||||
def encrypt(cls, data):
|
||||
from api.lib.secrets.secrets import InnerKVManger
|
||||
|
||||
if not KeyManage(backend=InnerKVManger()).is_seal():
|
||||
res, status = InnerCrypt().encrypt(data)
|
||||
if status:
|
||||
return res
|
||||
|
||||
return AESCrypto().encrypt(data)
|
||||
|
||||
@classmethod
|
||||
def decrypt(cls, data):
|
||||
from api.lib.secrets.secrets import InnerKVManger
|
||||
|
||||
if not KeyManage(backend=InnerKVManger()).is_seal():
|
||||
try:
|
||||
res, status = InnerCrypt().decrypt(data)
|
||||
if status:
|
||||
return res
|
||||
except:
|
||||
pass
|
||||
|
||||
try:
|
||||
return AESCrypto().decrypt(data)
|
||||
except:
|
||||
return data
|
||||
|
|
|
@ -12,7 +12,9 @@ from api.lib.cmdb.const import CITypeOperateType
|
|||
from api.lib.cmdb.const import ConstraintEnum
|
||||
from api.lib.cmdb.const import OperateType
|
||||
from api.lib.cmdb.const import ValueTypeEnum
|
||||
from api.lib.database import Model, Model2
|
||||
from api.lib.database import Model
|
||||
from api.lib.database import Model2
|
||||
from api.lib.utils import Crypto
|
||||
|
||||
|
||||
# template
|
||||
|
@ -89,13 +91,37 @@ class Attribute(Model):
|
|||
compute_expr = db.Column(db.Text)
|
||||
compute_script = db.Column(db.Text)
|
||||
|
||||
choice_web_hook = db.Column(db.JSON)
|
||||
_choice_web_hook = db.Column('choice_web_hook', db.JSON)
|
||||
choice_other = db.Column(db.JSON)
|
||||
|
||||
uid = db.Column(db.Integer, index=True)
|
||||
|
||||
option = db.Column(db.JSON)
|
||||
|
||||
def _get_webhook(self):
|
||||
if self._choice_web_hook:
|
||||
if self._choice_web_hook.get('headers') and "Cookie" in self._choice_web_hook['headers']:
|
||||
self._choice_web_hook['headers']['Cookie'] = Crypto.decrypt(self._choice_web_hook['headers']['Cookie'])
|
||||
|
||||
if self._choice_web_hook.get('authorization'):
|
||||
for k, v in self._choice_web_hook['authorization'].items():
|
||||
self._choice_web_hook['authorization'][k] = Crypto.decrypt(v)
|
||||
|
||||
return self._choice_web_hook
|
||||
|
||||
def _set_webhook(self, data):
|
||||
if data:
|
||||
if data.get('headers') and "Cookie" in data['headers']:
|
||||
data['headers']['Cookie'] = Crypto.encrypt(data['headers']['Cookie'])
|
||||
|
||||
if data.get('authorization'):
|
||||
for k, v in data['authorization'].items():
|
||||
data['authorization'][k] = Crypto.encrypt(v)
|
||||
|
||||
self._choice_web_hook = data
|
||||
|
||||
choice_web_hook = db.synonym("_choice_web_hook", descriptor=property(_get_webhook, _set_webhook))
|
||||
|
||||
|
||||
class CITypeAttribute(Model):
|
||||
__tablename__ = "c_ci_type_attributes"
|
||||
|
@ -130,7 +156,25 @@ class CITypeTrigger(Model):
|
|||
|
||||
type_id = db.Column(db.Integer, db.ForeignKey('c_ci_types.id'), nullable=False)
|
||||
attr_id = db.Column(db.Integer, db.ForeignKey("c_attributes.id"))
|
||||
option = db.Column('notify', db.JSON)
|
||||
_option = db.Column('notify', db.JSON)
|
||||
|
||||
def _get_option(self):
|
||||
if self._option and self._option.get('webhooks'):
|
||||
if self._option['webhooks'].get('authorization'):
|
||||
for k, v in self._option['webhooks']['authorization'].items():
|
||||
self._option['webhooks']['authorization'][k] = Crypto.decrypt(v)
|
||||
|
||||
return self._option
|
||||
|
||||
def _set_option(self, data):
|
||||
if data and data.get('webhooks'):
|
||||
if data['webhooks'].get('authorization'):
|
||||
for k, v in data['webhooks']['authorization'].items():
|
||||
data['webhooks']['authorization'][k] = Crypto.encrypt(v)
|
||||
|
||||
self._option = data
|
||||
|
||||
option = db.synonym("_option", descriptor=property(_get_option, _set_option))
|
||||
|
||||
|
||||
class CITriggerHistory(Model):
|
||||
|
|
Loading…
Reference in New Issue