mirror of https://github.com/veops/cmdb.git
feat(api): encrypting webhook configurations (#255)
This commit is contained in:
parent
2ae4aeee67
commit
c711c3d3fd
|
@ -10,14 +10,18 @@ from api.lib.exception import CommitException
|
||||||
|
|
||||||
class FormatMixin(object):
|
class FormatMixin(object):
|
||||||
def to_dict(self):
|
def to_dict(self):
|
||||||
res = dict([(k, getattr(self, k) if not isinstance(
|
res = dict()
|
||||||
getattr(self, k), (datetime.datetime, datetime.date, datetime.time)) else str(
|
for k in getattr(self, "__mapper__").c.keys():
|
||||||
getattr(self, k))) for k in getattr(self, "__mapper__").c.keys()])
|
if k in {'password', '_password', 'secret', '_secret'}:
|
||||||
# FIXME: getattr(cls, "__table__").columns k.name
|
continue
|
||||||
|
|
||||||
res.pop('password', None)
|
if k.startswith('_'):
|
||||||
res.pop('_password', None)
|
k = k[1:]
|
||||||
res.pop('secret', None)
|
|
||||||
|
if not isinstance(getattr(self, k), (datetime.datetime, datetime.date, datetime.time)):
|
||||||
|
res[k] = getattr(self, k)
|
||||||
|
else:
|
||||||
|
res[k] = str(getattr(self, k))
|
||||||
|
|
||||||
return res
|
return res
|
||||||
|
|
||||||
|
|
|
@ -12,6 +12,9 @@ from Crypto.Cipher import AES
|
||||||
from elasticsearch import Elasticsearch
|
from elasticsearch import Elasticsearch
|
||||||
from flask import current_app
|
from flask import current_app
|
||||||
|
|
||||||
|
from api.lib.secrets.inner import InnerCrypt
|
||||||
|
from api.lib.secrets.inner import KeyManage
|
||||||
|
|
||||||
|
|
||||||
class BaseEnum(object):
|
class BaseEnum(object):
|
||||||
_ALL_ = set() # type: Set[str]
|
_ALL_ = set() # type: Set[str]
|
||||||
|
@ -286,3 +289,33 @@ class AESCrypto(object):
|
||||||
text_decrypted = cipher.decrypt(encode_bytes)
|
text_decrypted = cipher.decrypt(encode_bytes)
|
||||||
|
|
||||||
return cls.unpad(text_decrypted).decode('utf8')
|
return cls.unpad(text_decrypted).decode('utf8')
|
||||||
|
|
||||||
|
|
||||||
|
class Crypto(AESCrypto):
|
||||||
|
@classmethod
|
||||||
|
def encrypt(cls, data):
|
||||||
|
from api.lib.secrets.secrets import InnerKVManger
|
||||||
|
|
||||||
|
if not KeyManage(backend=InnerKVManger()).is_seal():
|
||||||
|
res, status = InnerCrypt().encrypt(data)
|
||||||
|
if status:
|
||||||
|
return res
|
||||||
|
|
||||||
|
return AESCrypto().encrypt(data)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def decrypt(cls, data):
|
||||||
|
from api.lib.secrets.secrets import InnerKVManger
|
||||||
|
|
||||||
|
if not KeyManage(backend=InnerKVManger()).is_seal():
|
||||||
|
try:
|
||||||
|
res, status = InnerCrypt().decrypt(data)
|
||||||
|
if status:
|
||||||
|
return res
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
try:
|
||||||
|
return AESCrypto().decrypt(data)
|
||||||
|
except:
|
||||||
|
return data
|
||||||
|
|
|
@ -12,7 +12,9 @@ from api.lib.cmdb.const import CITypeOperateType
|
||||||
from api.lib.cmdb.const import ConstraintEnum
|
from api.lib.cmdb.const import ConstraintEnum
|
||||||
from api.lib.cmdb.const import OperateType
|
from api.lib.cmdb.const import OperateType
|
||||||
from api.lib.cmdb.const import ValueTypeEnum
|
from api.lib.cmdb.const import ValueTypeEnum
|
||||||
from api.lib.database import Model, Model2
|
from api.lib.database import Model
|
||||||
|
from api.lib.database import Model2
|
||||||
|
from api.lib.utils import Crypto
|
||||||
|
|
||||||
|
|
||||||
# template
|
# template
|
||||||
|
@ -89,13 +91,37 @@ class Attribute(Model):
|
||||||
compute_expr = db.Column(db.Text)
|
compute_expr = db.Column(db.Text)
|
||||||
compute_script = db.Column(db.Text)
|
compute_script = db.Column(db.Text)
|
||||||
|
|
||||||
choice_web_hook = db.Column(db.JSON)
|
_choice_web_hook = db.Column('choice_web_hook', db.JSON)
|
||||||
choice_other = db.Column(db.JSON)
|
choice_other = db.Column(db.JSON)
|
||||||
|
|
||||||
uid = db.Column(db.Integer, index=True)
|
uid = db.Column(db.Integer, index=True)
|
||||||
|
|
||||||
option = db.Column(db.JSON)
|
option = db.Column(db.JSON)
|
||||||
|
|
||||||
|
def _get_webhook(self):
|
||||||
|
if self._choice_web_hook:
|
||||||
|
if self._choice_web_hook.get('headers') and "Cookie" in self._choice_web_hook['headers']:
|
||||||
|
self._choice_web_hook['headers']['Cookie'] = Crypto.decrypt(self._choice_web_hook['headers']['Cookie'])
|
||||||
|
|
||||||
|
if self._choice_web_hook.get('authorization'):
|
||||||
|
for k, v in self._choice_web_hook['authorization'].items():
|
||||||
|
self._choice_web_hook['authorization'][k] = Crypto.decrypt(v)
|
||||||
|
|
||||||
|
return self._choice_web_hook
|
||||||
|
|
||||||
|
def _set_webhook(self, data):
|
||||||
|
if data:
|
||||||
|
if data.get('headers') and "Cookie" in data['headers']:
|
||||||
|
data['headers']['Cookie'] = Crypto.encrypt(data['headers']['Cookie'])
|
||||||
|
|
||||||
|
if data.get('authorization'):
|
||||||
|
for k, v in data['authorization'].items():
|
||||||
|
data['authorization'][k] = Crypto.encrypt(v)
|
||||||
|
|
||||||
|
self._choice_web_hook = data
|
||||||
|
|
||||||
|
choice_web_hook = db.synonym("_choice_web_hook", descriptor=property(_get_webhook, _set_webhook))
|
||||||
|
|
||||||
|
|
||||||
class CITypeAttribute(Model):
|
class CITypeAttribute(Model):
|
||||||
__tablename__ = "c_ci_type_attributes"
|
__tablename__ = "c_ci_type_attributes"
|
||||||
|
@ -130,7 +156,25 @@ class CITypeTrigger(Model):
|
||||||
|
|
||||||
type_id = db.Column(db.Integer, db.ForeignKey('c_ci_types.id'), nullable=False)
|
type_id = db.Column(db.Integer, db.ForeignKey('c_ci_types.id'), nullable=False)
|
||||||
attr_id = db.Column(db.Integer, db.ForeignKey("c_attributes.id"))
|
attr_id = db.Column(db.Integer, db.ForeignKey("c_attributes.id"))
|
||||||
option = db.Column('notify', db.JSON)
|
_option = db.Column('notify', db.JSON)
|
||||||
|
|
||||||
|
def _get_option(self):
|
||||||
|
if self._option and self._option.get('webhooks'):
|
||||||
|
if self._option['webhooks'].get('authorization'):
|
||||||
|
for k, v in self._option['webhooks']['authorization'].items():
|
||||||
|
self._option['webhooks']['authorization'][k] = Crypto.decrypt(v)
|
||||||
|
|
||||||
|
return self._option
|
||||||
|
|
||||||
|
def _set_option(self, data):
|
||||||
|
if data and data.get('webhooks'):
|
||||||
|
if data['webhooks'].get('authorization'):
|
||||||
|
for k, v in data['webhooks']['authorization'].items():
|
||||||
|
data['webhooks']['authorization'][k] = Crypto.encrypt(v)
|
||||||
|
|
||||||
|
self._option = data
|
||||||
|
|
||||||
|
option = db.synonym("_option", descriptor=property(_get_option, _set_option))
|
||||||
|
|
||||||
|
|
||||||
class CITriggerHistory(Model):
|
class CITriggerHistory(Model):
|
||||||
|
|
Loading…
Reference in New Issue