mirror of
				https://github.com/veops/cmdb.git
				synced 2025-11-04 05:36:17 +08:00 
			
		
		
		
	feat: add inner password storage and optimize flask command about inner cmdb (#248)
Co-authored-by: fxiang21 <fxiang21@126.com>
This commit is contained in:
		@@ -318,6 +318,17 @@ def cmdb_index_table_upgrade():
 | 
			
		||||
    db.session.commit()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def valid_address(address):
 | 
			
		||||
    if not address.startswith(("http://127.0.0.1", "https://127.0.0.1")):
 | 
			
		||||
        response = {
 | 
			
		||||
            "message": "Address should start with http://127.0.0.1 or https://127.0.0.1",
 | 
			
		||||
            "status": "failed"
 | 
			
		||||
        }
 | 
			
		||||
        KeyManage.print_response(response)
 | 
			
		||||
        return False
 | 
			
		||||
    return True
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@click.command()
 | 
			
		||||
@click.option(
 | 
			
		||||
    '-a',
 | 
			
		||||
@@ -329,15 +340,26 @@ def cmdb_inner_secrets_init(address):
 | 
			
		||||
    """
 | 
			
		||||
    init inner secrets for password feature
 | 
			
		||||
    """
 | 
			
		||||
    KeyManage(backend=InnerKVManger).init()
 | 
			
		||||
    res, ok = KeyManage(backend=InnerKVManger).init()
 | 
			
		||||
    if not ok:
 | 
			
		||||
        if res.get("status") == "failed":
 | 
			
		||||
            KeyManage.print_response(res)
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
    if address and address.startswith("http") and current_app.config.get("INNER_TRIGGER_TOKEN", "") != "":
 | 
			
		||||
    token = res.get("details", {}).get("root_token", "")
 | 
			
		||||
    if valid_address(address):
 | 
			
		||||
        token = current_app.config.get("INNER_TRIGGER_TOKEN", "") if not token else token
 | 
			
		||||
        if not token:
 | 
			
		||||
            token = click.prompt(f'Enter root token', hide_input=True, confirmation_prompt=False)
 | 
			
		||||
        assert token is not None
 | 
			
		||||
        resp = requests.post("{}/api/v0.1/secrets/auto_seal".format(address.strip("/")),
 | 
			
		||||
                             headers={"Inner-Token": current_app.config.get("INNER_TRIGGER_TOKEN", "")})
 | 
			
		||||
                             headers={"Inner-Token": token})
 | 
			
		||||
        if resp.status_code == 200:
 | 
			
		||||
            KeyManage.print_response(resp.json())
 | 
			
		||||
        else:
 | 
			
		||||
            KeyManage.print_response({"message": resp.text, "status": "failed"})
 | 
			
		||||
            KeyManage.print_response({"message": resp.text or resp.status_code, "status": "failed"})
 | 
			
		||||
    else:
 | 
			
		||||
        KeyManage.print_response(res)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@click.command()
 | 
			
		||||
@@ -352,18 +374,19 @@ def cmdb_inner_secrets_unseal(address):
 | 
			
		||||
    """
 | 
			
		||||
    unseal the secrets feature
 | 
			
		||||
    """
 | 
			
		||||
    address = "{}/api/v0.1/secrets/unseal".format(address.strip("/"))
 | 
			
		||||
    if not address.startswith("http"):
 | 
			
		||||
        KeyManage.print_response({"message": "invalid address, should start with http", "status": "failed"})
 | 
			
		||||
    if not valid_address(address):
 | 
			
		||||
        return
 | 
			
		||||
    address = "{}/api/v0.1/secrets/unseal".format(address.strip("/"))
 | 
			
		||||
    for i in range(global_key_threshold):
 | 
			
		||||
        token = click.prompt(f'Enter unseal token {i + 1}', hide_input=True, confirmation_prompt=False)
 | 
			
		||||
        assert token is not None
 | 
			
		||||
        resp = requests.post(address, headers={"Unseal-Token": token})
 | 
			
		||||
        if resp.status_code == 200:
 | 
			
		||||
            KeyManage.print_response(resp.json())
 | 
			
		||||
            if resp.json().get("status") in ["success", "skip"]:
 | 
			
		||||
                return
 | 
			
		||||
        else:
 | 
			
		||||
            KeyManage.print_response({"message": resp.text, "status": "failed"})
 | 
			
		||||
            KeyManage.print_response({"message": resp.status_code, "status": "failed"})
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@@ -388,15 +411,16 @@ def cmdb_inner_secrets_seal(address, token):
 | 
			
		||||
    """
 | 
			
		||||
    assert address is not None
 | 
			
		||||
    assert token is not None
 | 
			
		||||
    if address.startswith("http"):
 | 
			
		||||
        address = "{}/api/v0.1/secrets/seal".format(address.strip("/"))
 | 
			
		||||
        resp = requests.post(address, headers={
 | 
			
		||||
            "Inner-Token": token,
 | 
			
		||||
        })
 | 
			
		||||
        if resp.status_code == 200:
 | 
			
		||||
            KeyManage.print_response(resp.json())
 | 
			
		||||
        else:
 | 
			
		||||
            KeyManage.print_response({"message": resp.text, "status": "failed"})
 | 
			
		||||
    if not valid_address(address):
 | 
			
		||||
        return
 | 
			
		||||
    address = "{}/api/v0.1/secrets/seal".format(address.strip("/"))
 | 
			
		||||
    resp = requests.post(address, headers={
 | 
			
		||||
        "Inner-Token": token,
 | 
			
		||||
    })
 | 
			
		||||
    if resp.status_code == 200:
 | 
			
		||||
        KeyManage.print_response(resp.json())
 | 
			
		||||
    else:
 | 
			
		||||
        KeyManage.print_response({"message": resp.status_code, "status": "failed"})
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@click.command()
 | 
			
		||||
 
 | 
			
		||||
@@ -9,12 +9,10 @@ from flask_login import LoginManager
 | 
			
		||||
from flask_migrate import Migrate
 | 
			
		||||
from flask_sqlalchemy import SQLAlchemy
 | 
			
		||||
 | 
			
		||||
from api.lib.secrets.inner import KeyManage
 | 
			
		||||
from api.lib.utils import ESHandler
 | 
			
		||||
from api.lib.utils import RedisHandler
 | 
			
		||||
 | 
			
		||||
from api.lib.secrets.inner import KeyManage
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
bcrypt = Bcrypt()
 | 
			
		||||
login_manager = LoginManager()
 | 
			
		||||
db = SQLAlchemy(session_options={"autoflush": False})
 | 
			
		||||
 
 | 
			
		||||
@@ -1,8 +1,4 @@
 | 
			
		||||
import os
 | 
			
		||||
import secrets
 | 
			
		||||
import sys
 | 
			
		||||
from base64 import b64decode, b64encode
 | 
			
		||||
 | 
			
		||||
from colorama import Back
 | 
			
		||||
from colorama import Fore
 | 
			
		||||
from colorama import init as colorama_init
 | 
			
		||||
@@ -17,6 +13,9 @@ from cryptography.hazmat.primitives.ciphers import modes
 | 
			
		||||
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
 | 
			
		||||
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
 | 
			
		||||
from flask import current_app
 | 
			
		||||
import os
 | 
			
		||||
import secrets
 | 
			
		||||
import sys
 | 
			
		||||
 | 
			
		||||
global_iv_length = 16
 | 
			
		||||
global_key_shares = 5  # Number of generated key shares
 | 
			
		||||
@@ -26,6 +25,7 @@ backend_root_key_name = "root_key"
 | 
			
		||||
backend_encrypt_key_name = "encrypt_key"
 | 
			
		||||
backend_root_key_salt_name = "root_key_salt"
 | 
			
		||||
backend_encrypt_key_salt_name = "encrypt_key_salt"
 | 
			
		||||
backend_seal_key = "seal_status"
 | 
			
		||||
success = "success"
 | 
			
		||||
seal_status = True
 | 
			
		||||
 | 
			
		||||
@@ -51,6 +51,9 @@ class Backend:
 | 
			
		||||
    def add(self, key, value):
 | 
			
		||||
        return self.backend.add(key, value)
 | 
			
		||||
 | 
			
		||||
    def update(self, key, value):
 | 
			
		||||
        return self.backend.update(key, value)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class KeyManage:
 | 
			
		||||
 | 
			
		||||
@@ -61,13 +64,13 @@ class KeyManage:
 | 
			
		||||
            self.backend = Backend(backend)
 | 
			
		||||
 | 
			
		||||
    def init_app(self, app, backend=None):
 | 
			
		||||
        self.trigger = app.config.get("INNER_TRIGGER_TOKEN")
 | 
			
		||||
        if not self.trigger:
 | 
			
		||||
            return
 | 
			
		||||
        self.backend = backend
 | 
			
		||||
 | 
			
		||||
        resp = self.auto_unseal()
 | 
			
		||||
        self.print_response(resp)
 | 
			
		||||
        if sys.argv[0].endswith("gunicorn") or sys.argv[1] == "run":
 | 
			
		||||
            self.trigger = app.config.get("INNER_TRIGGER_TOKEN")
 | 
			
		||||
            if not self.trigger:
 | 
			
		||||
                return
 | 
			
		||||
            self.backend = backend
 | 
			
		||||
            resp = self.auto_unseal()
 | 
			
		||||
            self.print_response(resp)
 | 
			
		||||
 | 
			
		||||
    def hash_root_key(self, value):
 | 
			
		||||
        algorithm = hashes.SHA256()
 | 
			
		||||
@@ -118,23 +121,23 @@ class KeyManage:
 | 
			
		||||
 | 
			
		||||
        return new_shares
 | 
			
		||||
 | 
			
		||||
    def auth_root_secret(self, root_key):
 | 
			
		||||
    def is_valid_root_key(self, root_key):
 | 
			
		||||
        root_key_hash, ok = self.hash_root_key(root_key)
 | 
			
		||||
        if not ok:
 | 
			
		||||
            return {
 | 
			
		||||
                "message": root_key_hash,
 | 
			
		||||
                "status": "failed"
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            return root_key_hash, ok
 | 
			
		||||
        backend_root_key_hash = self.backend.get(backend_root_key_name)
 | 
			
		||||
        if not backend_root_key_hash:
 | 
			
		||||
            return {
 | 
			
		||||
                "message": "should init firstly",
 | 
			
		||||
                "status": "failed"
 | 
			
		||||
            }
 | 
			
		||||
            return "should init firstly", False
 | 
			
		||||
        elif backend_root_key_hash != root_key_hash:
 | 
			
		||||
            return "invalid root key", False
 | 
			
		||||
        else:
 | 
			
		||||
            return "", True
 | 
			
		||||
 | 
			
		||||
    def auth_root_secret(self, root_key):
 | 
			
		||||
        msg, ok = self.is_valid_root_key(root_key)
 | 
			
		||||
        if not ok:
 | 
			
		||||
            return {
 | 
			
		||||
                "message": "invalid root key",
 | 
			
		||||
                "message": msg,
 | 
			
		||||
                "status": "failed"
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
@@ -147,10 +150,13 @@ class KeyManage:
 | 
			
		||||
 | 
			
		||||
        secrets_encrypt_key, ok = InnerCrypt.aes_decrypt(string_to_bytes(root_key), encrypt_key_aes)
 | 
			
		||||
        if ok:
 | 
			
		||||
            current_app.config["secrets_encrypt_key"] = secrets_encrypt_key
 | 
			
		||||
            current_app.config["secrets_root_key"] = root_key
 | 
			
		||||
            current_app.config["secrets_shares"] = []
 | 
			
		||||
            return {"message": success, "status": success}
 | 
			
		||||
            msg, ok = self.backend.update(backend_seal_key, "open")
 | 
			
		||||
            if ok:
 | 
			
		||||
                current_app.config["secrets_encrypt_key"] = secrets_encrypt_key
 | 
			
		||||
                current_app.config["secrets_root_key"] = root_key
 | 
			
		||||
                current_app.config["secrets_shares"] = []
 | 
			
		||||
                return {"message": success, "status": success}
 | 
			
		||||
            return {"message": msg, "status": "failed"}
 | 
			
		||||
        else:
 | 
			
		||||
            return {
 | 
			
		||||
                "message": secrets_encrypt_key,
 | 
			
		||||
@@ -204,34 +210,36 @@ class KeyManage:
 | 
			
		||||
        """
 | 
			
		||||
        root_key = self.backend.get(backend_root_key_name)
 | 
			
		||||
        if root_key:
 | 
			
		||||
            return {"message": "already init, skip"}, False
 | 
			
		||||
            return {"message": "already init, skip", "status": "skip"}, False
 | 
			
		||||
        else:
 | 
			
		||||
            root_key, shares, status = self.generate_unseal_keys()
 | 
			
		||||
            if not status:
 | 
			
		||||
                return {"message": root_key}, False
 | 
			
		||||
                return {"message": root_key, "status": "failed"}, False
 | 
			
		||||
 | 
			
		||||
            # hash root key and store in backend
 | 
			
		||||
            root_key_hash, ok = self.hash_root_key(root_key)
 | 
			
		||||
            if not ok:
 | 
			
		||||
                return {"message": root_key_hash}, False
 | 
			
		||||
                return {"message": root_key_hash, "status": "failed"}, False
 | 
			
		||||
 | 
			
		||||
            msg, ok = self.backend.add(backend_root_key_name, root_key_hash)
 | 
			
		||||
            if not ok:
 | 
			
		||||
                return {"message": msg}, False
 | 
			
		||||
                return {"message": msg, "status": "failed"}, False
 | 
			
		||||
 | 
			
		||||
            # generate encrypt key from root_key and store in backend
 | 
			
		||||
            encrypt_key, ok = self.generate_encrypt_key(root_key)
 | 
			
		||||
            if not ok:
 | 
			
		||||
                return {"message": encrypt_key}
 | 
			
		||||
                return {"message": encrypt_key, "status": "failed"}
 | 
			
		||||
 | 
			
		||||
            encrypt_key_aes, status = InnerCrypt.aes_encrypt(root_key, encrypt_key)
 | 
			
		||||
            if not status:
 | 
			
		||||
                return {"message": encrypt_key_aes}
 | 
			
		||||
                return {"message": encrypt_key_aes, "status": "failed"}
 | 
			
		||||
 | 
			
		||||
            msg, ok = self.backend.add(backend_encrypt_key_name, encrypt_key_aes)
 | 
			
		||||
            if not ok:
 | 
			
		||||
                return {"message": msg}, False
 | 
			
		||||
 | 
			
		||||
                return {"message": msg, "status": "failed"}, False
 | 
			
		||||
            msg, ok = self.backend.add(backend_seal_key, "open")
 | 
			
		||||
            if not ok:
 | 
			
		||||
                return {"message": msg, "status": "failed"}, False
 | 
			
		||||
            current_app.config["secrets_root_key"] = root_key
 | 
			
		||||
            current_app.config["secrets_encrypt_key"] = encrypt_key
 | 
			
		||||
            self.print_token(shares, root_token=root_key)
 | 
			
		||||
@@ -275,28 +283,21 @@ class KeyManage:
 | 
			
		||||
 | 
			
		||||
    def seal(self, root_key):
 | 
			
		||||
        root_key = root_key.encode()
 | 
			
		||||
        root_key_hash, ok = self.hash_root_key(root_key)
 | 
			
		||||
        msg, ok = self.is_valid_root_key(root_key)
 | 
			
		||||
        if not ok:
 | 
			
		||||
            return {
 | 
			
		||||
                "message": root_key_hash,
 | 
			
		||||
                "status": "failed"
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
        backend_root_key_hash = self.backend.get(backend_root_key_name)
 | 
			
		||||
        if not backend_root_key_hash:
 | 
			
		||||
            return {
 | 
			
		||||
                "message": "not init, seal skip",
 | 
			
		||||
                "status": "skip"
 | 
			
		||||
            }
 | 
			
		||||
        elif root_key_hash != backend_root_key_hash:
 | 
			
		||||
            return {
 | 
			
		||||
                "message": "invalid root key",
 | 
			
		||||
                "message": msg,
 | 
			
		||||
                "status": "failed"
 | 
			
		||||
            }
 | 
			
		||||
        else:
 | 
			
		||||
            msg, ok = self.backend.update(backend_seal_key, "block")
 | 
			
		||||
            if not ok:
 | 
			
		||||
                return {
 | 
			
		||||
                    "message": msg,
 | 
			
		||||
                    "status": "failed",
 | 
			
		||||
                }
 | 
			
		||||
            current_app.config["secrets_root_key"] = ''
 | 
			
		||||
            current_app.config["secrets_encrypt_key"] = ''
 | 
			
		||||
 | 
			
		||||
            return {
 | 
			
		||||
                "message": success,
 | 
			
		||||
                "status": success
 | 
			
		||||
@@ -308,11 +309,11 @@ class KeyManage:
 | 
			
		||||
        :return:
 | 
			
		||||
        """
 | 
			
		||||
        secrets_root_key = current_app.config.get("secrets_root_key")
 | 
			
		||||
        root_key = self.backend.get(backend_root_key_name)
 | 
			
		||||
        if root_key == "" or root_key != secrets_root_key:
 | 
			
		||||
            return "invalid root key", True
 | 
			
		||||
 | 
			
		||||
        return "", False
 | 
			
		||||
        msg, ok = self.is_valid_root_key(secrets_root_key)
 | 
			
		||||
        if not ok:
 | 
			
		||||
            return {"message": msg, "status": "failed"}
 | 
			
		||||
        status = self.backend.get(backend_seal_key)
 | 
			
		||||
        return status == "block"
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def print_token(cls, shares, root_token):
 | 
			
		||||
@@ -330,7 +331,7 @@ class KeyManage:
 | 
			
		||||
 | 
			
		||||
        for i, v in enumerate(shares):
 | 
			
		||||
            print(
 | 
			
		||||
                "unseal token " + str(i + 1) + ": " + Fore.RED + Back.CYAN + v.decode("utf-8") + Style.RESET_ALL)
 | 
			
		||||
                "unseal token " + str(i + 1) + ": " + Fore.RED + Back.BLACK + v.decode("utf-8") + Style.RESET_ALL)
 | 
			
		||||
            print()
 | 
			
		||||
 | 
			
		||||
        print(Fore.GREEN + "root token:  " + root_token.decode("utf-8") + Style.RESET_ALL)
 | 
			
		||||
@@ -339,14 +340,12 @@ class KeyManage:
 | 
			
		||||
    def print_response(cls, data):
 | 
			
		||||
        status = data.get("status", "")
 | 
			
		||||
        message = data.get("message", "")
 | 
			
		||||
        if status == "skip":
 | 
			
		||||
            print(Style.BRIGHT, message)
 | 
			
		||||
        elif status == "failed":
 | 
			
		||||
            print(Fore.RED, message)
 | 
			
		||||
        elif status == "waiting":
 | 
			
		||||
            print(Fore.YELLOW, message)
 | 
			
		||||
        else:
 | 
			
		||||
            print(Fore.GREEN, message)
 | 
			
		||||
        status_colors = {
 | 
			
		||||
            "skip": Style.BRIGHT,
 | 
			
		||||
            "failed": Fore.RED,
 | 
			
		||||
            "waiting": Fore.YELLOW,
 | 
			
		||||
        }
 | 
			
		||||
        print(status_colors.get(status, Fore.GREEN), message, Style.RESET_ALL)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class InnerCrypt:
 | 
			
		||||
 
 | 
			
		||||
@@ -11,7 +11,6 @@ class InnerKVManger(object):
 | 
			
		||||
        res = InnerKV.create(**data)
 | 
			
		||||
        if res.key == key:
 | 
			
		||||
            return "success", True
 | 
			
		||||
 | 
			
		||||
        return "add failed", False
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
@@ -21,3 +20,14 @@ class InnerKVManger(object):
 | 
			
		||||
            return None
 | 
			
		||||
 | 
			
		||||
        return res.value
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def update(cls, key, value):
 | 
			
		||||
        res = InnerKV.get_by(first=True, to_dict=False, **{"key": key})
 | 
			
		||||
        if not res:
 | 
			
		||||
            return None
 | 
			
		||||
        res.value = value
 | 
			
		||||
        t = res.update()
 | 
			
		||||
        if t.key == key:
 | 
			
		||||
            return "success", True
 | 
			
		||||
        return "update failed", True
 | 
			
		||||
 
 | 
			
		||||
@@ -1,38 +1,38 @@
 | 
			
		||||
from api.lib.perm.auth import auth_abandoned
 | 
			
		||||
from api.resource import APIView
 | 
			
		||||
from api.lib.secrets.inner import KeyManage
 | 
			
		||||
from api.lib.secrets.secrets import InnerKVManger
 | 
			
		||||
 | 
			
		||||
from flask import request, abort
 | 
			
		||||
from flask import current_app
 | 
			
		||||
from flask import request
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class InnerSecretUnSealView(APIView):
 | 
			
		||||
    url_prefix = "/secrets/unseal"
 | 
			
		||||
 | 
			
		||||
    @auth_abandoned
 | 
			
		||||
    def post(self):
 | 
			
		||||
        unseal_key = request.headers.get("Unseal-Token")
 | 
			
		||||
        res = KeyManage(backend=InnerKVManger()).unseal(unseal_key)
 | 
			
		||||
        # if res.get("status") == "failed":
 | 
			
		||||
        #     return abort(400, res.get("message"))
 | 
			
		||||
        return self.jsonify(**res)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class InnerSecretSealView(APIView):
 | 
			
		||||
    url_prefix = "/secrets/seal"
 | 
			
		||||
 | 
			
		||||
    @auth_abandoned
 | 
			
		||||
    def post(self):
 | 
			
		||||
        unseal_key = request.headers.get("Inner-Token")
 | 
			
		||||
        res = KeyManage(backend=InnerKVManger()).seal(unseal_key)
 | 
			
		||||
        # if res.get("status") == "failed":
 | 
			
		||||
        #     return abort(400, res.get("message"))
 | 
			
		||||
        return self.jsonify(**res)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class InnerSecretAutoSealView(APIView):
 | 
			
		||||
    url_prefix = "/secrets/auto_seal"
 | 
			
		||||
 | 
			
		||||
    @auth_abandoned
 | 
			
		||||
    def post(self):
 | 
			
		||||
        unseal_key = request.headers.get("Inner-Token")
 | 
			
		||||
        res = KeyManage(backend=InnerKVManger()).seal(unseal_key)
 | 
			
		||||
        # if res.get("status") == "failed":
 | 
			
		||||
        #     return abort(400, res.get("message"))
 | 
			
		||||
        root_key = request.headers.get("Inner-Token")
 | 
			
		||||
        res = KeyManage(trigger=root_key,
 | 
			
		||||
                        backend=InnerKVManger()).auto_unseal()
 | 
			
		||||
        return self.jsonify(**res)
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user