mirror of
https://github.com/veops/cmdb.git
synced 2025-08-09 22:26:09 +08:00
feat: secrets
This commit is contained in:
@@ -1,25 +1,24 @@
|
||||
import os
|
||||
import secrets
|
||||
import sys
|
||||
from base64 import b64encode, b64decode
|
||||
|
||||
from Cryptodome.Protocol.SecretSharing import Shamir
|
||||
from colorama import Back
|
||||
from colorama import Fore
|
||||
from colorama import Style
|
||||
from colorama import init as colorama_init
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives import padding
|
||||
from cryptography.hazmat.primitives.ciphers import Cipher
|
||||
from cryptography.hazmat.primitives.ciphers import algorithms
|
||||
from cryptography.hazmat.primitives.ciphers import modes
|
||||
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives import padding
|
||||
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
||||
|
||||
import os
|
||||
import secrets
|
||||
import sys
|
||||
from Cryptodome.Protocol.SecretSharing import Shamir
|
||||
from flask import current_app
|
||||
|
||||
# global_root_key just for test here
|
||||
global_root_key = ""
|
||||
global_encrypt_key = ""
|
||||
global_iv_length = 16
|
||||
global_key_shares = 5 # Number of generated key shares
|
||||
global_key_threshold = 3 # Minimum number of shares required to rebuild the key
|
||||
@@ -44,7 +43,7 @@ def string_to_bytes(value):
|
||||
return byte_string
|
||||
|
||||
|
||||
class cache_backend:
|
||||
class CacheBackend:
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
@@ -62,7 +61,7 @@ class cache_backend:
|
||||
class Backend:
|
||||
def __init__(self, backend=None):
|
||||
if not backend:
|
||||
self.backend = cache_backend
|
||||
self.backend = CacheBackend
|
||||
else:
|
||||
self.backend = backend
|
||||
|
||||
@@ -73,14 +72,13 @@ class Backend:
|
||||
return self.backend.add(key, value)
|
||||
|
||||
|
||||
class KeyMange:
|
||||
class KeyManage:
|
||||
|
||||
def __init__(self, trigger=None, backend=None):
|
||||
self.trigger = trigger
|
||||
self.backend = backend
|
||||
if backend:
|
||||
self.backend = Backend(backend)
|
||||
pass
|
||||
|
||||
def hash_root_key(self, value):
|
||||
algorithm = hashes.SHA256()
|
||||
@@ -136,7 +134,6 @@ class KeyMange:
|
||||
"status": "failed"
|
||||
}
|
||||
backend_root_key_hash = self.backend.get(backend_root_key_name)
|
||||
print(root_key, root_key_hash, backend_root_key_hash)
|
||||
if not backend_root_key_hash:
|
||||
return {
|
||||
"message": "should init firstly",
|
||||
@@ -153,12 +150,11 @@ class KeyMange:
|
||||
"message": "encrypt key is empty",
|
||||
"status": "failed"
|
||||
}
|
||||
global global_encrypt_key
|
||||
global global_root_key
|
||||
global global_shares
|
||||
global_encrypt_key = InnerCrypt.aes_decrypt(string_to_bytes(root_key), encrypt_key_aes)
|
||||
global_root_key = root_key
|
||||
global_shares = []
|
||||
secrets_encrypt_key = InnerCrypt.aes_decrypt(string_to_bytes(root_key), encrypt_key_aes)
|
||||
setattr(current_app, 'secrets_encrypt_key', secrets_encrypt_key)
|
||||
setattr(current_app, 'secrets_root_key', root_key)
|
||||
setattr(current_app, 'secrets_shares', [])
|
||||
|
||||
return {
|
||||
"message": success,
|
||||
"status": success
|
||||
@@ -170,7 +166,7 @@ class KeyMange:
|
||||
"message": "current status is unseal, skip",
|
||||
"status": "skip"
|
||||
}
|
||||
global global_shares, global_root_key, global_encrypt_key
|
||||
global global_shares
|
||||
try:
|
||||
t = [i for i in b64decode(key)]
|
||||
v = (int("".join([chr(i) for i in t[-2:]])), bytes(t[:-2]))
|
||||
@@ -197,6 +193,7 @@ class KeyMange:
|
||||
return "already exist", [], False
|
||||
secret = AESGCM.generate_key(128)
|
||||
shares = self.generate_keys(secret)
|
||||
|
||||
return b64encode(secret), shares, True
|
||||
|
||||
def init(self):
|
||||
@@ -229,10 +226,11 @@ class KeyMange:
|
||||
if not ok:
|
||||
return {"message": msg}, False
|
||||
#
|
||||
global global_root_key, global_encrypt_key
|
||||
global_root_key = root_key
|
||||
global_encrypt_key = encrypt_key
|
||||
setattr(current_app, 'secrets_root_key', root_key)
|
||||
setattr(current_app, 'secrets_encrypt_key', encrypt_key)
|
||||
|
||||
self.print_token(shares, root_token=root_key)
|
||||
|
||||
return {"message": "OK",
|
||||
"details": {
|
||||
"root_token": root_key,
|
||||
@@ -289,10 +287,9 @@ class KeyMange:
|
||||
"status": "failed"
|
||||
}
|
||||
else:
|
||||
global global_root_key
|
||||
global global_encrypt_key
|
||||
global_root_key = ""
|
||||
global_encrypt_key = ""
|
||||
setattr(current_app, 'secrets_root_key', '')
|
||||
setattr(current_app, 'secrets_encrypt_key', '')
|
||||
|
||||
return {
|
||||
"message": success,
|
||||
"status": success
|
||||
@@ -303,9 +300,11 @@ class KeyMange:
|
||||
If there is no initialization or the root key is inconsistent, it is considered to be in a sealed state.
|
||||
:return:
|
||||
"""
|
||||
secrets_root_key = getattr(current_app, 'secrets_root_key')
|
||||
root_key = self.backend.get(backend_root_key_name)
|
||||
if root_key == "" or root_key != global_root_key:
|
||||
if root_key == "" or root_key != secrets_root_key:
|
||||
return "invalid root key", True
|
||||
|
||||
return "", False
|
||||
|
||||
@classmethod
|
||||
@@ -340,9 +339,11 @@ class KeyMange:
|
||||
else:
|
||||
print(Fore.GREEN, message)
|
||||
|
||||
|
||||
class InnerCrypt:
|
||||
def __init__(self, trigger=None):
|
||||
self.encrypt_key = b64decode(global_encrypt_key.encode("utf-8"))
|
||||
def __init__(self):
|
||||
secrets_encrypt_key = getattr(current_app, 'secrets_encrypt_key', '')
|
||||
self.encrypt_key = b64decode(secrets_encrypt_key.encode("utf-8"))
|
||||
|
||||
def encrypt(self, plaintext):
|
||||
"""
|
||||
@@ -389,8 +390,7 @@ class InnerCrypt:
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
print(global_encrypt_key)
|
||||
km = KeyMange()
|
||||
km = KeyManage()
|
||||
# info, shares, status = km.generate_unseal_keys()
|
||||
# print(info, shares, status)
|
||||
# print("..................")
|
||||
@@ -415,4 +415,3 @@ if __name__ == "__main__":
|
||||
print("Ciphertext:", t_ciphertext)
|
||||
decrypted_plaintext, status2 = c.decrypt(t_ciphertext)
|
||||
print("Decrypted plaintext:", decrypted_plaintext)
|
||||
print(global_encrypt_key)
|
||||
|
@@ -136,6 +136,6 @@ if __name__ == "__main__":
|
||||
# sdk.enable_secrets_engine()
|
||||
_data = {"key1": "value1", "key2": "value2", "key3": "value3"}
|
||||
_data = sdk.update(_path, _data, overwrite=True, encrypt=True)
|
||||
print(_data.status_code)
|
||||
print(_data)
|
||||
_data = sdk.read(_path, decrypt=True)
|
||||
print(_data)
|
||||
|
Reference in New Issue
Block a user