cmdb/cmdb-api/api/lib/secrets/inner.py

430 lines
15 KiB
Python

import os
import secrets
import sys
from base64 import b64decode, b64encode
from Cryptodome.Protocol.SecretSharing import Shamir
from colorama import Back
from colorama import Fore
from colorama import Style
from colorama import init as colorama_init
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.ciphers import Cipher
from cryptography.hazmat.primitives.ciphers import algorithms
from cryptography.hazmat.primitives.ciphers import modes
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from flask import current_app
global_iv_length = 16
global_key_shares = 5 # Number of generated key shares
global_key_threshold = 3 # Minimum number of shares required to rebuild the key
backend_root_key_name = "root_key"
backend_encrypt_key_name = "encrypt_key"
backend_root_key_salt_name = "root_key_salt"
backend_encrypt_key_salt_name = "encrypt_key_salt"
backend_seal_key = "seal_status"
success = "success"
seal_status = True
def string_to_bytes(value):
if isinstance(value, bytes):
return value
if sys.version_info.major == 2:
byte_string = value
else:
byte_string = value.encode("utf-8")
return byte_string
class Backend:
def __init__(self, backend=None):
self.backend = backend
def get(self, key):
return self.backend.get(key)
def add(self, key, value):
return self.backend.add(key, value)
def update(self, key, value):
return self.backend.update(key, value)
class KeyManage:
def __init__(self, trigger=None, backend=None):
self.trigger = trigger
self.backend = backend
if backend:
self.backend = Backend(backend)
def init_app(self, app, backend=None):
if (sys.argv[0].endswith("gunicorn") or
(len(sys.argv) > 1 and sys.argv[1] in ("run", "cmdb-password-data-migrate"))):
self.trigger = app.config.get("INNER_TRIGGER_TOKEN")
if not self.trigger:
return
self.backend = backend
resp = self.auto_unseal()
self.print_response(resp)
def hash_root_key(self, value):
algorithm = hashes.SHA256()
salt = self.backend.get(backend_root_key_salt_name)
if not salt:
salt = secrets.token_hex(16)
msg, ok = self.backend.add(backend_root_key_salt_name, salt)
if not ok:
return msg, ok
kdf = PBKDF2HMAC(
algorithm=algorithm,
length=32,
salt=string_to_bytes(salt),
iterations=100000,
)
key = kdf.derive(string_to_bytes(value))
return b64encode(key).decode('utf-8'), True
def generate_encrypt_key(self, key):
algorithm = hashes.SHA256()
salt = self.backend.get(backend_encrypt_key_salt_name)
if not salt:
salt = secrets.token_hex(32)
kdf = PBKDF2HMAC(
algorithm=algorithm,
length=32,
salt=string_to_bytes(salt),
iterations=100000,
backend=default_backend()
)
key = kdf.derive(string_to_bytes(key))
msg, ok = self.backend.add(backend_encrypt_key_salt_name, salt)
if ok:
return b64encode(key).decode('utf-8'), ok
else:
return msg, ok
@classmethod
def generate_keys(cls, secret):
shares = Shamir.split(global_key_threshold, global_key_shares, secret, False)
new_shares = []
for share in shares:
t = [i for i in share[1]] + [ord(i) for i in "{:0>2}".format(share[0])]
new_shares.append(b64encode(bytes(t)))
return new_shares
def is_valid_root_key(self, root_key):
root_key_hash, ok = self.hash_root_key(root_key)
if not ok:
return root_key_hash, ok
backend_root_key_hash = self.backend.get(backend_root_key_name)
if not backend_root_key_hash:
return "should init firstly", False
elif backend_root_key_hash != root_key_hash:
return "invalid root key", False
else:
return "", True
def auth_root_secret(self, root_key):
msg, ok = self.is_valid_root_key(root_key)
if not ok:
return {
"message": msg,
"status": "failed"
}
encrypt_key_aes = self.backend.get(backend_encrypt_key_name)
if not encrypt_key_aes:
return {
"message": "encrypt key is empty",
"status": "failed"
}
secrets_encrypt_key, ok = InnerCrypt.aes_decrypt(string_to_bytes(root_key), encrypt_key_aes)
if ok:
msg, ok = self.backend.update(backend_seal_key, "open")
if ok:
current_app.config["secrets_encrypt_key"] = secrets_encrypt_key
current_app.config["secrets_root_key"] = root_key
current_app.config["secrets_shares"] = []
return {"message": success, "status": success}
return {"message": msg, "status": "failed"}
else:
return {
"message": secrets_encrypt_key,
"status": "failed"
}
def unseal(self, key):
if not self.is_seal():
return {
"message": "current status is unseal, skip",
"status": "skip"
}
try:
t = [i for i in b64decode(key)]
v = (int("".join([chr(i) for i in t[-2:]])), bytes(t[:-2]))
shares = current_app.config.get("secrets_shares", [])
if v not in shares:
shares.append(v)
current_app.config["secrets_shares"] = shares
if len(shares) >= global_key_threshold:
recovered_secret = Shamir.combine(shares[:global_key_threshold], False)
return self.auth_root_secret(b64encode(recovered_secret))
else:
return {
"message": "waiting for inputting other unseal key {0}/{1}".format(len(shares),
global_key_threshold),
"status": "waiting"
}
except Exception as e:
return {
"message": "invalid token: " + str(e),
"status": "failed"
}
def generate_unseal_keys(self):
info = self.backend.get(backend_root_key_name)
if info:
return "already exist", [], False
secret = AESGCM.generate_key(128)
shares = self.generate_keys(secret)
return b64encode(secret), shares, True
def init(self):
"""
init the master key, unseal key and store in backend
:return:
"""
root_key = self.backend.get(backend_root_key_name)
if root_key:
return {"message": "already init, skip", "status": "skip"}, False
else:
root_key, shares, status = self.generate_unseal_keys()
if not status:
return {"message": root_key, "status": "failed"}, False
# hash root key and store in backend
root_key_hash, ok = self.hash_root_key(root_key)
if not ok:
return {"message": root_key_hash, "status": "failed"}, False
msg, ok = self.backend.add(backend_root_key_name, root_key_hash)
if not ok:
return {"message": msg, "status": "failed"}, False
# generate encrypt key from root_key and store in backend
encrypt_key, ok = self.generate_encrypt_key(root_key)
if not ok:
return {"message": encrypt_key, "status": "failed"}
encrypt_key_aes, status = InnerCrypt.aes_encrypt(root_key, encrypt_key)
if not status:
return {"message": encrypt_key_aes, "status": "failed"}
msg, ok = self.backend.add(backend_encrypt_key_name, encrypt_key_aes)
if not ok:
return {"message": msg, "status": "failed"}, False
msg, ok = self.backend.add(backend_seal_key, "open")
if not ok:
return {"message": msg, "status": "failed"}, False
current_app.config["secrets_root_key"] = root_key
current_app.config["secrets_encrypt_key"] = encrypt_key
self.print_token(shares, root_token=root_key)
return {"message": "OK",
"details": {
"root_token": root_key,
"seal_tokens": shares,
}}, True
def auto_unseal(self):
if not self.trigger:
return {
"message": "trigger config is empty, skip",
"status": "skip"
}
if self.trigger.startswith("http"):
return {
"message": "todo in next step, skip",
"status": "skip"
}
# TODO
elif len(self.trigger.strip()) == 24:
res = self.auth_root_secret(self.trigger.encode())
if res.get("status") == success:
return {
"message": success,
"status": success
}
else:
return {
"message": res.get("message"),
"status": "failed"
}
else:
return {
"message": "trigger config is invalid, skip",
"status": "skip"
}
def seal(self, root_key):
root_key = root_key.encode()
msg, ok = self.is_valid_root_key(root_key)
if not ok:
return {
"message": msg,
"status": "failed"
}
else:
msg, ok = self.backend.update(backend_seal_key, "block")
if not ok:
return {
"message": msg,
"status": "failed",
}
current_app.config["secrets_root_key"] = ''
current_app.config["secrets_encrypt_key"] = ''
return {
"message": success,
"status": success
}
def is_seal(self):
"""
If there is no initialization or the root key is inconsistent, it is considered to be in a sealed state.
:return:
"""
secrets_root_key = current_app.config.get("secrets_root_key")
msg, ok = self.is_valid_root_key(secrets_root_key)
if not ok:
return true
status = self.backend.get(backend_seal_key)
return status == "block"
@classmethod
def print_token(cls, shares, root_token):
"""
data: {"message": "OK",
"details": {
"root_token": root_key,
"seal_tokens": shares,
}}
"""
colorama_init()
print(Style.BRIGHT, "Please be sure to store the Unseal Key in a secure location and avoid losing it."
" The Unseal Key is required to unseal the system every time when it restarts."
" Successful unsealing is necessary to enable the password feature." + Style.RESET_ALL)
for i, v in enumerate(shares):
print(
"unseal token " + str(i + 1) + ": " + Fore.RED + Back.BLACK + v.decode("utf-8") + Style.RESET_ALL)
print()
print(Fore.GREEN + "root token: " + root_token.decode("utf-8") + Style.RESET_ALL)
@classmethod
def print_response(cls, data):
status = data.get("status", "")
message = data.get("message", "")
status_colors = {
"skip": Style.BRIGHT,
"failed": Fore.RED,
"waiting": Fore.YELLOW,
}
print(status_colors.get(status, Fore.GREEN), message, Style.RESET_ALL)
class InnerCrypt:
def __init__(self):
secrets_encrypt_key = current_app.config.get("secrets_encrypt_key", "")
self.encrypt_key = b64decode(secrets_encrypt_key.encode("utf-8"))
def encrypt(self, plaintext):
"""
encrypt method contain aes currently
"""
return self.aes_encrypt(self.encrypt_key, plaintext)
def decrypt(self, ciphertext):
"""
decrypt method contain aes currently
"""
return self.aes_decrypt(self.encrypt_key, ciphertext)
@classmethod
def aes_encrypt(cls, key, plaintext):
if isinstance(plaintext, str):
plaintext = string_to_bytes(plaintext)
iv = os.urandom(global_iv_length)
try:
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
encryptor = cipher.encryptor()
v_padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_plaintext = v_padder.update(plaintext) + v_padder.finalize()
ciphertext = encryptor.update(padded_plaintext) + encryptor.finalize()
return b64encode(iv + ciphertext).decode("utf-8"), True
except Exception as e:
return str(e), False
@classmethod
def aes_decrypt(cls, key, ciphertext):
try:
s = b64decode(ciphertext.encode("utf-8"))
iv = s[:global_iv_length]
ciphertext = s[global_iv_length:]
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
decrypter = cipher.decryptor()
decrypted_padded_plaintext = decrypter.update(ciphertext) + decrypter.finalize()
unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder()
plaintext = unpadder.update(decrypted_padded_plaintext) + unpadder.finalize()
return plaintext.decode('utf-8'), True
except Exception as e:
return str(e), False
if __name__ == "__main__":
km = KeyManage()
# info, shares, status = km.generate_unseal_keys()
# print(info, shares, status)
# print("..................")
# for i in shares:
# print(b64encode(i[1]).decode())
res1, ok1 = km.init()
if not ok1:
print(res1)
# for j in res["details"]["seal_tokens"]:
# r = km.unseal(j)
# if r["status"] != "waiting":
# if r["status"] != "success":
# print("r........", r)
# else:
# print(r)
# break
t_plaintext = b"Hello, World!" # The plaintext to encrypt
c = InnerCrypt()
t_ciphertext, status1 = c.encrypt(t_plaintext)
print("Ciphertext:", t_ciphertext)
decrypted_plaintext, status2 = c.decrypt(t_ciphertext)
print("Decrypted plaintext:", decrypted_plaintext)