mirror of
https://github.com/veops/cmdb.git
synced 2025-08-09 22:26:09 +08:00
feat: add secrets feature
This commit is contained in:
@@ -60,8 +60,13 @@ class KeyManage:
|
||||
if backend:
|
||||
self.backend = Backend(backend)
|
||||
|
||||
def init_app(self, app):
|
||||
self.auto_unseal()
|
||||
def init_app(self, app, backend=None):
|
||||
self.trigger = app.config.get("INNER_TRIGGER_TOKEN")
|
||||
if not self.trigger:
|
||||
return
|
||||
self.backend = backend
|
||||
# resp = self.auto_unseal()
|
||||
# self.print_response(resp)
|
||||
|
||||
def hash_root_key(self, value):
|
||||
algorithm = hashes.SHA256()
|
||||
@@ -133,15 +138,17 @@ class KeyManage:
|
||||
"message": "encrypt key is empty",
|
||||
"status": "failed"
|
||||
}
|
||||
secrets_encrypt_key = InnerCrypt.aes_decrypt(string_to_bytes(root_key), encrypt_key_aes)
|
||||
setattr(current_app, 'secrets_encrypt_key', secrets_encrypt_key)
|
||||
setattr(current_app, 'secrets_root_key', root_key)
|
||||
setattr(current_app, 'secrets_shares', [])
|
||||
|
||||
return {
|
||||
"message": success,
|
||||
"status": success
|
||||
}
|
||||
secrets_encrypt_key, ok = InnerCrypt.aes_decrypt(string_to_bytes(root_key), encrypt_key_aes)
|
||||
if ok:
|
||||
current_app.config["secrets_encrypt_key"] = secrets_encrypt_key
|
||||
current_app.config["secrets_root_key"] = root_key
|
||||
current_app.config["secrets_shares"] = []
|
||||
return {"message": success, "status": success}
|
||||
else:
|
||||
return {
|
||||
"message": secrets_encrypt_key,
|
||||
"status": "failed"
|
||||
}
|
||||
|
||||
def unseal(self, key):
|
||||
if not self.is_seal():
|
||||
@@ -152,10 +159,14 @@ class KeyManage:
|
||||
try:
|
||||
t = [i for i in b64decode(key)]
|
||||
v = (int("".join([chr(i) for i in t[-2:]])), bytes(t[:-2]))
|
||||
shares = getattr(current_app, "secrets_shares", [])
|
||||
print("............")
|
||||
# shares = getattr(current_app.config, "secrets_shares", [])
|
||||
shares = current_app.config.get("secrets_shares", [])
|
||||
print("222222222222")
|
||||
if v not in shares:
|
||||
shares.append(v)
|
||||
setattr(current_app, "secrets_shares", shares)
|
||||
current_app.config["secrets_shares"] = shares
|
||||
print("shares:", shares)
|
||||
if len(shares) >= global_key_threshold:
|
||||
recovered_secret = Shamir.combine(shares[:global_key_threshold])
|
||||
return self.auth_root_secret(b64encode(recovered_secret))
|
||||
@@ -209,10 +220,10 @@ class KeyManage:
|
||||
msg, ok = self.backend.add(backend_encrypt_key_name, encrypt_key_aes)
|
||||
if not ok:
|
||||
return {"message": msg}, False
|
||||
#
|
||||
setattr(current_app, 'secrets_root_key', root_key)
|
||||
setattr(current_app, 'secrets_encrypt_key', encrypt_key)
|
||||
|
||||
current_app.config["secrets_root_key"] = root_key
|
||||
current_app.config["secrets_encrypt_key"] = encrypt_key
|
||||
print(".....", current_app.config["secrets_root_key"], current_app.config["secrets_encrypt_key"])
|
||||
self.print_token(shares, root_token=root_key)
|
||||
|
||||
return {"message": "OK",
|
||||
@@ -271,8 +282,8 @@ class KeyManage:
|
||||
"status": "failed"
|
||||
}
|
||||
else:
|
||||
setattr(current_app, 'secrets_root_key', '')
|
||||
setattr(current_app, 'secrets_encrypt_key', '')
|
||||
current_app.config["secrets_root_key"] = ''
|
||||
current_app.config["secrets_encrypt_key"] = ''
|
||||
|
||||
return {
|
||||
"message": success,
|
||||
@@ -284,7 +295,7 @@ class KeyManage:
|
||||
If there is no initialization or the root key is inconsistent, it is considered to be in a sealed state.
|
||||
:return:
|
||||
"""
|
||||
secrets_root_key = getattr(current_app, 'secrets_root_key')
|
||||
secrets_root_key = current_app.config.get("secrets_root_key")
|
||||
root_key = self.backend.get(backend_root_key_name)
|
||||
if root_key == "" or root_key != secrets_root_key:
|
||||
return "invalid root key", True
|
||||
@@ -326,7 +337,8 @@ class KeyManage:
|
||||
|
||||
class InnerCrypt:
|
||||
def __init__(self):
|
||||
secrets_encrypt_key = getattr(current_app, 'secrets_encrypt_key', '')
|
||||
secrets_encrypt_key = current_app.config.get("secrets_encrypt_key", "")
|
||||
print("secrets_encrypt_key:", secrets_encrypt_key)
|
||||
self.encrypt_key = b64decode(secrets_encrypt_key.encode("utf-8"))
|
||||
|
||||
def encrypt(self, plaintext):
|
||||
|
Reference in New Issue
Block a user