fix: support sealing and unsealing secret in multiple process(more than one workers started by gunicorn) (#469)

* fix: 解决在麒麟系统上使用docker安装时使用celery -D启动 celery 可能出现的问题

* fix: 解决在麒麟系统上使用docker安装时使用celery -D启动 celery 可能出现的问题

* fix: NoneType happend while unsealing the secret funtion, cancel the address check while unseal and seal

* fix: unseal secret function

* fix: remove depens_on in docker-compose

* fix: support sealing and unsealing secret in multiple process(more than one workers started by gunicorn)
This commit is contained in:
loveiwei 2024-04-15 18:08:47 +08:00 committed by GitHub
parent 5008fe0491
commit 0d7101c9f8
7 changed files with 186 additions and 62 deletions

View File

@ -87,7 +87,7 @@ docker compose up -d
- 第一步: 先安装 Docker 环境, 以及Docker Compose (v2) - 第一步: 先安装 Docker 环境, 以及Docker Compose (v2)
- 第二步: 直接使用项目根目录下的install.sh 文件进行 `安装`、`启动`、`暂停`、`查状态`、`删除`、`卸载` - 第二步: 直接使用项目根目录下的install.sh 文件进行 `安装`、`启动`、`暂停`、`查状态`、`删除`、`卸载`
```shell ```shell
curl -so install.sh https://raw.githubusercontent.com/veops/cmdb/master/install.sh curl -so install.sh https://raw.githubusercontent.com/veops/cmdb/deploy_on_kylin_docker/install.sh
sh install.sh install sh install.sh install
``` ```

View File

@ -32,7 +32,7 @@ from api.lib.perm.acl.resource import ResourceCRUD
from api.lib.perm.acl.resource import ResourceTypeCRUD from api.lib.perm.acl.resource import ResourceTypeCRUD
from api.lib.perm.acl.role import RoleCRUD from api.lib.perm.acl.role import RoleCRUD
from api.lib.secrets.inner import KeyManage from api.lib.secrets.inner import KeyManage
from api.lib.secrets.inner import global_key_threshold from api.lib.secrets.inner import global_key_threshold, secrets_shares
from api.lib.secrets.secrets import InnerKVManger from api.lib.secrets.secrets import InnerKVManger
from api.models.acl import App from api.models.acl import App
from api.models.acl import ResourceType from api.models.acl import ResourceType
@ -357,13 +357,13 @@ def cmdb_inner_secrets_unseal(address):
""" """
unseal the secrets feature unseal the secrets feature
""" """
if not valid_address(address): # if not valid_address(address):
return # return
address = "{}/api/v0.1/secrets/unseal".format(address.strip("/")) address = "{}/api/v0.1/secrets/unseal".format(address.strip("/"))
for i in range(global_key_threshold): for i in range(global_key_threshold):
token = click.prompt(f'Enter unseal token {i + 1}', hide_input=True, confirmation_prompt=False) token = click.prompt(f'Enter unseal token {i + 1}', hide_input=True, confirmation_prompt=False)
assert token is not None assert token is not None
resp = requests.post(address, headers={"Unseal-Token": token}) resp = requests.post(address, headers={"Unseal-Token": token}, timeout=5)
if resp.status_code == 200: if resp.status_code == 200:
KeyManage.print_response(resp.json()) KeyManage.print_response(resp.json())
if resp.json().get("status") in ["success", "skip"]: if resp.json().get("status") in ["success", "skip"]:

View File

@ -1,19 +1,15 @@
import json
import os import os
import secrets import secrets
import sys import sys
from base64 import b64decode, b64encode import threading
from base64 import b64decode, b64encode
from Cryptodome.Protocol.SecretSharing import Shamir from Cryptodome.Protocol.SecretSharing import Shamir
from colorama import Back from colorama import Back, Fore, Style, init as colorama_init
from colorama import Fore
from colorama import Style
from colorama import init as colorama_init
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import hashes, padding
from cryptography.hazmat.primitives import padding from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.ciphers import Cipher
from cryptography.hazmat.primitives.ciphers import algorithms
from cryptography.hazmat.primitives.ciphers import modes
from cryptography.hazmat.primitives.ciphers.aead import AESGCM from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from flask import current_app from flask import current_app
@ -27,11 +23,16 @@ backend_encrypt_key_name = "encrypt_key"
backend_root_key_salt_name = "root_key_salt" backend_root_key_salt_name = "root_key_salt"
backend_encrypt_key_salt_name = "encrypt_key_salt" backend_encrypt_key_salt_name = "encrypt_key_salt"
backend_seal_key = "seal_status" backend_seal_key = "seal_status"
success = "success" success = "success"
seal_status = True seal_status = True
secrets_encrypt_key = ""
secrets_root_key = ""
def string_to_bytes(value): def string_to_bytes(value):
if not value:
return ""
if isinstance(value, bytes): if isinstance(value, bytes):
return value return value
if sys.version_info.major == 2: if sys.version_info.major == 2:
@ -44,6 +45,8 @@ def string_to_bytes(value):
class Backend: class Backend:
def __init__(self, backend=None): def __init__(self, backend=None):
self.backend = backend self.backend = backend
# cache is a redis object
self.cache = backend.cache
def get(self, key): def get(self, key):
return self.backend.get(key) return self.backend.get(key)
@ -54,23 +57,33 @@ class Backend:
def update(self, key, value): def update(self, key, value):
return self.backend.update(key, value) return self.backend.update(key, value)
def get_shares(self, key):
return self.backend.get_shares(key)
def set_shares(self, key, value):
return self.backend.set_shares(key, value)
class KeyManage: class KeyManage:
def __init__(self, trigger=None, backend=None): def __init__(self, trigger=None, backend=None):
self.trigger = trigger self.trigger = trigger
self.backend = backend self.backend = backend
self.share_key = "cmdb::secret::secrets_share"
if backend: if backend:
self.backend = Backend(backend) self.backend = Backend(backend)
def init_app(self, app, backend=None): def init_app(self, app, backend=None):
if (sys.argv[0].endswith("gunicorn") or if (sys.argv[0].endswith("gunicorn") or
(len(sys.argv) > 1 and sys.argv[1] in ("run", "cmdb-password-data-migrate"))): (len(sys.argv) > 1 and sys.argv[1] in ("run", "cmdb-password-data-migrate"))):
self.backend = backend
threading.Thread(target=self.watch_root_key, args=(app,)).start()
self.trigger = app.config.get("INNER_TRIGGER_TOKEN") self.trigger = app.config.get("INNER_TRIGGER_TOKEN")
if not self.trigger: if not self.trigger:
return return
self.backend = backend
resp = self.auto_unseal() resp = self.auto_unseal()
self.print_response(resp) self.print_response(resp)
@ -124,6 +137,8 @@ class KeyManage:
return new_shares return new_shares
def is_valid_root_key(self, root_key): def is_valid_root_key(self, root_key):
if not root_key:
return False
root_key_hash, ok = self.hash_root_key(root_key) root_key_hash, ok = self.hash_root_key(root_key)
if not ok: if not ok:
return root_key_hash, ok return root_key_hash, ok
@ -135,35 +150,42 @@ class KeyManage:
else: else:
return "", True return "", True
def auth_root_secret(self, root_key): def auth_root_secret(self, root_key, app):
msg, ok = self.is_valid_root_key(root_key) with app.app_context():
if not ok: msg, ok = self.is_valid_root_key(root_key)
return { if not ok:
"message": msg, return {
"status": "failed" "message": msg,
} "status": "failed"
}
encrypt_key_aes = self.backend.get(backend_encrypt_key_name) encrypt_key_aes = self.backend.get(backend_encrypt_key_name)
if not encrypt_key_aes: if not encrypt_key_aes:
return { return {
"message": "encrypt key is empty", "message": "encrypt key is empty",
"status": "failed" "status": "failed"
} }
secrets_encrypt_key, ok = InnerCrypt.aes_decrypt(string_to_bytes(root_key), encrypt_key_aes) secret_encrypt_key, ok = InnerCrypt.aes_decrypt(string_to_bytes(root_key), encrypt_key_aes)
if ok:
msg, ok = self.backend.update(backend_seal_key, "open")
if ok: if ok:
current_app.config["secrets_encrypt_key"] = secrets_encrypt_key msg, ok = self.backend.update(backend_seal_key, "open")
current_app.config["secrets_root_key"] = root_key if ok:
current_app.config["secrets_shares"] = [] global secrets_encrypt_key, secrets_root_key
return {"message": success, "status": success} secrets_encrypt_key = secret_encrypt_key
return {"message": msg, "status": "failed"} secrets_root_key = root_key
else: self.backend.cache.set(self.share_key, json.dumps([]))
return { return {"message": success, "status": success}
"message": secrets_encrypt_key, return {"message": msg, "status": "failed"}
"status": "failed" else:
} return {
"message": secret_encrypt_key,
"status": "failed"
}
def parse_shares(self, shares, app):
if len(shares) >= global_key_threshold:
recovered_secret = Shamir.combine(shares[:global_key_threshold], False)
return self.auth_root_secret(b64encode(recovered_secret), app)
def unseal(self, key): def unseal(self, key):
if not self.is_seal(): if not self.is_seal():
@ -175,14 +197,12 @@ class KeyManage:
try: try:
t = [i for i in b64decode(key)] t = [i for i in b64decode(key)]
v = (int("".join([chr(i) for i in t[-2:]])), bytes(t[:-2])) v = (int("".join([chr(i) for i in t[-2:]])), bytes(t[:-2]))
shares = current_app.config.get("secrets_shares", []) shares = self.backend.get_shares(self.share_key)
if v not in shares: if v not in shares:
shares.append(v) shares.append(v)
current_app.config["secrets_shares"] = shares self.set_shares(shares)
if len(shares) >= global_key_threshold: if len(shares) >= global_key_threshold:
recovered_secret = Shamir.combine(shares[:global_key_threshold], False) return self.parse_shares(shares, current_app)
return self.auth_root_secret(b64encode(recovered_secret))
else: else:
return { return {
"message": "waiting for inputting other unseal key {0}/{1}".format(len(shares), "message": "waiting for inputting other unseal key {0}/{1}".format(len(shares),
@ -242,8 +262,11 @@ class KeyManage:
msg, ok = self.backend.add(backend_seal_key, "open") msg, ok = self.backend.add(backend_seal_key, "open")
if not ok: if not ok:
return {"message": msg, "status": "failed"}, False return {"message": msg, "status": "failed"}, False
current_app.config["secrets_root_key"] = root_key
current_app.config["secrets_encrypt_key"] = encrypt_key global secrets_encrypt_key, secrets_root_key
secrets_encrypt_key = encrypt_key
secrets_root_key = root_key
self.print_token(shares, root_token=root_key) self.print_token(shares, root_token=root_key)
return {"message": "OK", return {"message": "OK",
@ -266,7 +289,7 @@ class KeyManage:
} }
# TODO # TODO
elif len(self.trigger.strip()) == 24: elif len(self.trigger.strip()) == 24:
res = self.auth_root_secret(self.trigger.encode()) res = self.auth_root_secret(self.trigger.encode(), current_app)
if res.get("status") == success: if res.get("status") == success:
return { return {
"message": success, "message": success,
@ -298,22 +321,31 @@ class KeyManage:
"message": msg, "message": msg,
"status": "failed", "status": "failed",
} }
current_app.config["secrets_root_key"] = '' self.clear()
current_app.config["secrets_encrypt_key"] = '' self.backend.cache.publish(self.share_key, "clear")
return { return {
"message": success, "message": success,
"status": success "status": success
} }
@staticmethod
def clear():
global secrets_encrypt_key, secrets_root_key
secrets_encrypt_key = ''
secrets_root_key = ''
def is_seal(self): def is_seal(self):
""" """
If there is no initialization or the root key is inconsistent, it is considered to be in a sealed state. If there is no initialization or the root key is inconsistent, it is considered to be in a sealed state..
:return: :return:
""" """
secrets_root_key = current_app.config.get("secrets_root_key") # secrets_root_key = current_app.config.get("secrets_root_key")
if not secrets_root_key:
return True
msg, ok = self.is_valid_root_key(secrets_root_key) msg, ok = self.is_valid_root_key(secrets_root_key)
if not ok: if not ok:
return true return True
status = self.backend.get(backend_seal_key) status = self.backend.get(backend_seal_key)
return status == "block" return status == "block"
@ -349,22 +381,53 @@ class KeyManage:
} }
print(status_colors.get(status, Fore.GREEN), message, Style.RESET_ALL) print(status_colors.get(status, Fore.GREEN), message, Style.RESET_ALL)
def set_shares(self, values):
new_value = list()
for v in values:
new_value.append((v[0], b64encode(v[1]).decode("utf-8")))
self.backend.cache.publish(self.share_key, json.dumps(new_value))
self.backend.cache.set(self.share_key, json.dumps(new_value))
def watch_root_key(self, app):
pubsub = self.backend.cache.pubsub()
pubsub.subscribe(self.share_key)
new_value = set()
for message in pubsub.listen():
if message["type"] == "message":
if message["data"] == b"clear":
self.clear()
continue
try:
value = json.loads(message["data"].decode("utf-8"))
for v in value:
new_value.add((v[0], b64decode(v[1])))
except Exception as e:
return []
if len(new_value) >= global_key_threshold:
self.parse_shares(list(new_value), app)
new_value = set()
class InnerCrypt: class InnerCrypt:
def __init__(self): def __init__(self):
secrets_encrypt_key = current_app.config.get("secrets_encrypt_key", "") self.encrypt_key = b64decode(secrets_encrypt_key)
self.encrypt_key = b64decode(secrets_encrypt_key.encode("utf-8")) #self.encrypt_key = b64decode(secrets_encrypt_key, "".encode("utf-8"))
def encrypt(self, plaintext): def encrypt(self, plaintext):
""" """
encrypt method contain aes currently encrypt method contain aes currently
""" """
if not self.encrypt_key:
return ValueError("secret is disabled, please seal firstly"), False
return self.aes_encrypt(self.encrypt_key, plaintext) return self.aes_encrypt(self.encrypt_key, plaintext)
def decrypt(self, ciphertext): def decrypt(self, ciphertext):
""" """
decrypt method contain aes currently decrypt method contain aes currently
""" """
if not self.encrypt_key:
return ValueError("secret is disabled, please seal firstly"), False
return self.aes_decrypt(self.encrypt_key, ciphertext) return self.aes_decrypt(self.encrypt_key, ciphertext)
@classmethod @classmethod
@ -381,6 +444,7 @@ class InnerCrypt:
return b64encode(iv + ciphertext).decode("utf-8"), True return b64encode(iv + ciphertext).decode("utf-8"), True
except Exception as e: except Exception as e:
return str(e), False return str(e), False
@classmethod @classmethod
@ -426,4 +490,4 @@ if __name__ == "__main__":
t_ciphertext, status1 = c.encrypt(t_plaintext) t_ciphertext, status1 = c.encrypt(t_plaintext)
print("Ciphertext:", t_ciphertext) print("Ciphertext:", t_ciphertext)
decrypted_plaintext, status2 = c.decrypt(t_ciphertext) decrypted_plaintext, status2 = c.decrypt(t_ciphertext)
print("Decrypted plaintext:", decrypted_plaintext) print("Decrypted plaintext:", decrypted_plaintext)

View File

@ -1,8 +1,13 @@
import base64
import json
from api.models.cmdb import InnerKV from api.models.cmdb import InnerKV
from api.extensions import rd
class InnerKVManger(object): class InnerKVManger(object):
def __init__(self): def __init__(self):
self.cache = rd.r
pass pass
@classmethod @classmethod
@ -33,3 +38,26 @@ class InnerKVManger(object):
return "success", True return "success", True
return "update failed", True return "update failed", True
@classmethod
def get_shares(cls, key):
new_value = list()
v = rd.get_str(key)
if not v:
return new_value
try:
value = json.loads(v.decode("utf-8"))
for v in value:
new_value.append((v[0], base64.b64decode(v[1])))
except Exception as e:
return []
return new_value
@classmethod
def set_shares(cls, key, value):
new_value = list()
for v in value:
new_value.append((v[0], base64.b64encode(v[1]).decode("utf-8")))
rd.set_str(key, json.dumps(new_value))

View File

@ -117,6 +117,23 @@ class RedisHandler(object):
except Exception as e: except Exception as e:
current_app.logger.error("delete redis key error, {0}".format(str(e))) current_app.logger.error("delete redis key error, {0}".format(str(e)))
def set_str(self, key, value, expired=None):
try:
if expired:
self.r.setex(key, expired, value)
else:
self.r.set(key, value)
except Exception as e:
current_app.logger.error("set redis error, {0}".format(str(e)))
def get_str(self, key):
try:
value = self.r.get(key)
except Exception as e:
current_app.logger.error("get redis error, {0}".format(str(e)))
return
return value
class ESHandler(object): class ESHandler(object):
def __init__(self, flask_app=None): def __init__(self, flask_app=None):

View File

@ -14,6 +14,11 @@ services:
- db-data:/var/lib/mysql - db-data:/var/lib/mysql
- ./docs/mysqld.cnf:/etc/mysql/conf.d/mysqld.cnf - ./docs/mysqld.cnf:/etc/mysql/conf.d/mysqld.cnf
- ./docs/cmdb.sql:/docker-entrypoint-initdb.d/cmdb.sql - ./docs/cmdb.sql:/docker-entrypoint-initdb.d/cmdb.sql
healthcheck:
test: ["CMD", "mysqladmin" ,"ping", "-h", "localhost"]
interval: 10s
timeout: 5s
retries: 5
command: --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci command: --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci
networks: networks:
new: new:
@ -27,6 +32,11 @@ services:
container_name: cmdb-cache container_name: cmdb-cache
environment: environment:
TZ: Asia/Shanghai TZ: Asia/Shanghai
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5
networks: networks:
new: new:
aliases: aliases:
@ -41,6 +51,11 @@ services:
environment: environment:
TZ: Asia/Shanghai TZ: Asia/Shanghai
WAIT_HOSTS: cmdb-db:3306, cmdb-cache:6379 WAIT_HOSTS: cmdb-db:3306, cmdb-cache:6379
depends_on:
cmdb-db:
condition: service_healthy
cmdb-cache:
condition: service_healthy
command: command:
- /bin/sh - /bin/sh
- -c - -c
@ -51,6 +66,9 @@ services:
flask common-check-new-columns flask common-check-new-columns
gunicorn --workers=4 autoapp:app -b 0.0.0.0:5000 -D gunicorn --workers=4 autoapp:app -b 0.0.0.0:5000 -D
#nohup celery -A celery_worker.celery worker -E -Q one_cmdb_async --autoscale=2,5 > one_cmdb_async.log 2>&1 &
#nohup celery -A celery_worker.celery worker -E -Q acl_async --concurrency=2 > one_acl_async.log 2>&1 &
#
celery -A celery_worker.celery worker -E -Q one_cmdb_async --autoscale=4,1 --logfile=one_cmdb_async.log -D celery -A celery_worker.celery worker -E -Q one_cmdb_async --autoscale=4,1 --logfile=one_cmdb_async.log -D
celery -A celery_worker.celery worker -E -Q acl_async --logfile=one_acl_async.log --autoscale=2,1 -D celery -A celery_worker.celery worker -E -Q acl_async --logfile=one_acl_async.log --autoscale=2,1 -D
@ -61,9 +79,6 @@ services:
flask init-department flask init-department
flask cmdb-counter > counter.log 2>&1 flask cmdb-counter > counter.log 2>&1
depends_on:
- cmdb-db
- cmdb-cache
networks: networks:
new: new:
aliases: aliases:

View File

@ -21,7 +21,7 @@ check_docker_compose() {
clone_repo() { clone_repo() {
local repo_url=$1 local repo_url=$1
git clone $repo_url || { git clone -b deploy_on_kylin_docker --single-branch $repo_url || {
echo "error: failed to clone $repo_url" echo "error: failed to clone $repo_url"
exit 1 exit 1
} }