mirror of https://github.com/veops/cmdb.git
fix: support sealing and unsealing secret in multiple process(more than one workers started by gunicorn) (#469)
* fix: 解决在麒麟系统上使用docker安装时使用celery -D启动 celery 可能出现的问题 * fix: 解决在麒麟系统上使用docker安装时使用celery -D启动 celery 可能出现的问题 * fix: NoneType happend while unsealing the secret funtion, cancel the address check while unseal and seal * fix: unseal secret function * fix: remove depens_on in docker-compose * fix: support sealing and unsealing secret in multiple process(more than one workers started by gunicorn)
This commit is contained in:
parent
a042b4fe39
commit
32529fba9b
|
@ -87,7 +87,7 @@ docker compose up -d
|
|||
- 第一步: 先安装 Docker 环境, 以及Docker Compose (v2)
|
||||
- 第二步: 直接使用项目根目录下的install.sh 文件进行 `安装`、`启动`、`暂停`、`查状态`、`删除`、`卸载`
|
||||
```shell
|
||||
curl -so install.sh https://raw.githubusercontent.com/veops/cmdb/master/install.sh
|
||||
curl -so install.sh https://raw.githubusercontent.com/veops/cmdb/deploy_on_kylin_docker/install.sh
|
||||
sh install.sh install
|
||||
```
|
||||
|
||||
|
|
|
@ -32,7 +32,7 @@ from api.lib.perm.acl.resource import ResourceCRUD
|
|||
from api.lib.perm.acl.resource import ResourceTypeCRUD
|
||||
from api.lib.perm.acl.role import RoleCRUD
|
||||
from api.lib.secrets.inner import KeyManage
|
||||
from api.lib.secrets.inner import global_key_threshold
|
||||
from api.lib.secrets.inner import global_key_threshold, secrets_shares
|
||||
from api.lib.secrets.secrets import InnerKVManger
|
||||
from api.models.acl import App
|
||||
from api.models.acl import ResourceType
|
||||
|
@ -357,13 +357,13 @@ def cmdb_inner_secrets_unseal(address):
|
|||
"""
|
||||
unseal the secrets feature
|
||||
"""
|
||||
if not valid_address(address):
|
||||
return
|
||||
# if not valid_address(address):
|
||||
# return
|
||||
address = "{}/api/v0.1/secrets/unseal".format(address.strip("/"))
|
||||
for i in range(global_key_threshold):
|
||||
token = click.prompt(f'Enter unseal token {i + 1}', hide_input=True, confirmation_prompt=False)
|
||||
assert token is not None
|
||||
resp = requests.post(address, headers={"Unseal-Token": token})
|
||||
resp = requests.post(address, headers={"Unseal-Token": token}, timeout=5)
|
||||
if resp.status_code == 200:
|
||||
KeyManage.print_response(resp.json())
|
||||
if resp.json().get("status") in ["success", "skip"]:
|
||||
|
|
|
@ -1,19 +1,15 @@
|
|||
import json
|
||||
import os
|
||||
import secrets
|
||||
import sys
|
||||
from base64 import b64decode, b64encode
|
||||
import threading
|
||||
|
||||
from base64 import b64decode, b64encode
|
||||
from Cryptodome.Protocol.SecretSharing import Shamir
|
||||
from colorama import Back
|
||||
from colorama import Fore
|
||||
from colorama import Style
|
||||
from colorama import init as colorama_init
|
||||
from colorama import Back, Fore, Style, init as colorama_init
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives import padding
|
||||
from cryptography.hazmat.primitives.ciphers import Cipher
|
||||
from cryptography.hazmat.primitives.ciphers import algorithms
|
||||
from cryptography.hazmat.primitives.ciphers import modes
|
||||
from cryptography.hazmat.primitives import hashes, padding
|
||||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||||
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
||||
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
||||
from flask import current_app
|
||||
|
@ -27,11 +23,16 @@ backend_encrypt_key_name = "encrypt_key"
|
|||
backend_root_key_salt_name = "root_key_salt"
|
||||
backend_encrypt_key_salt_name = "encrypt_key_salt"
|
||||
backend_seal_key = "seal_status"
|
||||
|
||||
success = "success"
|
||||
seal_status = True
|
||||
|
||||
secrets_encrypt_key = ""
|
||||
secrets_root_key = ""
|
||||
|
||||
def string_to_bytes(value):
|
||||
if not value:
|
||||
return ""
|
||||
if isinstance(value, bytes):
|
||||
return value
|
||||
if sys.version_info.major == 2:
|
||||
|
@ -44,6 +45,8 @@ def string_to_bytes(value):
|
|||
class Backend:
|
||||
def __init__(self, backend=None):
|
||||
self.backend = backend
|
||||
# cache is a redis object
|
||||
self.cache = backend.cache
|
||||
|
||||
def get(self, key):
|
||||
return self.backend.get(key)
|
||||
|
@ -54,23 +57,33 @@ class Backend:
|
|||
def update(self, key, value):
|
||||
return self.backend.update(key, value)
|
||||
|
||||
def get_shares(self, key):
|
||||
return self.backend.get_shares(key)
|
||||
|
||||
def set_shares(self, key, value):
|
||||
return self.backend.set_shares(key, value)
|
||||
|
||||
|
||||
class KeyManage:
|
||||
|
||||
def __init__(self, trigger=None, backend=None):
|
||||
self.trigger = trigger
|
||||
self.backend = backend
|
||||
self.share_key = "cmdb::secret::secrets_share"
|
||||
if backend:
|
||||
self.backend = Backend(backend)
|
||||
|
||||
def init_app(self, app, backend=None):
|
||||
if (sys.argv[0].endswith("gunicorn") or
|
||||
(len(sys.argv) > 1 and sys.argv[1] in ("run", "cmdb-password-data-migrate"))):
|
||||
|
||||
self.backend = backend
|
||||
threading.Thread(target=self.watch_root_key, args=(app,)).start()
|
||||
|
||||
self.trigger = app.config.get("INNER_TRIGGER_TOKEN")
|
||||
if not self.trigger:
|
||||
return
|
||||
|
||||
self.backend = backend
|
||||
resp = self.auto_unseal()
|
||||
self.print_response(resp)
|
||||
|
||||
|
@ -124,6 +137,8 @@ class KeyManage:
|
|||
return new_shares
|
||||
|
||||
def is_valid_root_key(self, root_key):
|
||||
if not root_key:
|
||||
return False
|
||||
root_key_hash, ok = self.hash_root_key(root_key)
|
||||
if not ok:
|
||||
return root_key_hash, ok
|
||||
|
@ -135,35 +150,42 @@ class KeyManage:
|
|||
else:
|
||||
return "", True
|
||||
|
||||
def auth_root_secret(self, root_key):
|
||||
msg, ok = self.is_valid_root_key(root_key)
|
||||
if not ok:
|
||||
return {
|
||||
"message": msg,
|
||||
"status": "failed"
|
||||
}
|
||||
def auth_root_secret(self, root_key, app):
|
||||
with app.app_context():
|
||||
msg, ok = self.is_valid_root_key(root_key)
|
||||
if not ok:
|
||||
return {
|
||||
"message": msg,
|
||||
"status": "failed"
|
||||
}
|
||||
|
||||
encrypt_key_aes = self.backend.get(backend_encrypt_key_name)
|
||||
if not encrypt_key_aes:
|
||||
return {
|
||||
"message": "encrypt key is empty",
|
||||
"status": "failed"
|
||||
}
|
||||
encrypt_key_aes = self.backend.get(backend_encrypt_key_name)
|
||||
if not encrypt_key_aes:
|
||||
return {
|
||||
"message": "encrypt key is empty",
|
||||
"status": "failed"
|
||||
}
|
||||
|
||||
secrets_encrypt_key, ok = InnerCrypt.aes_decrypt(string_to_bytes(root_key), encrypt_key_aes)
|
||||
if ok:
|
||||
msg, ok = self.backend.update(backend_seal_key, "open")
|
||||
secret_encrypt_key, ok = InnerCrypt.aes_decrypt(string_to_bytes(root_key), encrypt_key_aes)
|
||||
if ok:
|
||||
current_app.config["secrets_encrypt_key"] = secrets_encrypt_key
|
||||
current_app.config["secrets_root_key"] = root_key
|
||||
current_app.config["secrets_shares"] = []
|
||||
return {"message": success, "status": success}
|
||||
return {"message": msg, "status": "failed"}
|
||||
else:
|
||||
return {
|
||||
"message": secrets_encrypt_key,
|
||||
"status": "failed"
|
||||
}
|
||||
msg, ok = self.backend.update(backend_seal_key, "open")
|
||||
if ok:
|
||||
global secrets_encrypt_key, secrets_root_key
|
||||
secrets_encrypt_key = secret_encrypt_key
|
||||
secrets_root_key = root_key
|
||||
self.backend.cache.set(self.share_key, json.dumps([]))
|
||||
return {"message": success, "status": success}
|
||||
return {"message": msg, "status": "failed"}
|
||||
else:
|
||||
return {
|
||||
"message": secret_encrypt_key,
|
||||
"status": "failed"
|
||||
}
|
||||
|
||||
def parse_shares(self, shares, app):
|
||||
if len(shares) >= global_key_threshold:
|
||||
recovered_secret = Shamir.combine(shares[:global_key_threshold], False)
|
||||
return self.auth_root_secret(b64encode(recovered_secret), app)
|
||||
|
||||
def unseal(self, key):
|
||||
if not self.is_seal():
|
||||
|
@ -175,14 +197,12 @@ class KeyManage:
|
|||
try:
|
||||
t = [i for i in b64decode(key)]
|
||||
v = (int("".join([chr(i) for i in t[-2:]])), bytes(t[:-2]))
|
||||
shares = current_app.config.get("secrets_shares", [])
|
||||
shares = self.backend.get_shares(self.share_key)
|
||||
if v not in shares:
|
||||
shares.append(v)
|
||||
current_app.config["secrets_shares"] = shares
|
||||
|
||||
self.set_shares(shares)
|
||||
if len(shares) >= global_key_threshold:
|
||||
recovered_secret = Shamir.combine(shares[:global_key_threshold], False)
|
||||
return self.auth_root_secret(b64encode(recovered_secret))
|
||||
return self.parse_shares(shares, current_app)
|
||||
else:
|
||||
return {
|
||||
"message": "waiting for inputting other unseal key {0}/{1}".format(len(shares),
|
||||
|
@ -242,8 +262,11 @@ class KeyManage:
|
|||
msg, ok = self.backend.add(backend_seal_key, "open")
|
||||
if not ok:
|
||||
return {"message": msg, "status": "failed"}, False
|
||||
current_app.config["secrets_root_key"] = root_key
|
||||
current_app.config["secrets_encrypt_key"] = encrypt_key
|
||||
|
||||
global secrets_encrypt_key, secrets_root_key
|
||||
secrets_encrypt_key = encrypt_key
|
||||
secrets_root_key = root_key
|
||||
|
||||
self.print_token(shares, root_token=root_key)
|
||||
|
||||
return {"message": "OK",
|
||||
|
@ -266,7 +289,7 @@ class KeyManage:
|
|||
}
|
||||
# TODO
|
||||
elif len(self.trigger.strip()) == 24:
|
||||
res = self.auth_root_secret(self.trigger.encode())
|
||||
res = self.auth_root_secret(self.trigger.encode(), current_app)
|
||||
if res.get("status") == success:
|
||||
return {
|
||||
"message": success,
|
||||
|
@ -298,22 +321,31 @@ class KeyManage:
|
|||
"message": msg,
|
||||
"status": "failed",
|
||||
}
|
||||
current_app.config["secrets_root_key"] = ''
|
||||
current_app.config["secrets_encrypt_key"] = ''
|
||||
self.clear()
|
||||
self.backend.cache.publish(self.share_key, "clear")
|
||||
|
||||
return {
|
||||
"message": success,
|
||||
"status": success
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def clear():
|
||||
global secrets_encrypt_key, secrets_root_key
|
||||
secrets_encrypt_key = ''
|
||||
secrets_root_key = ''
|
||||
|
||||
def is_seal(self):
|
||||
"""
|
||||
If there is no initialization or the root key is inconsistent, it is considered to be in a sealed state.
|
||||
If there is no initialization or the root key is inconsistent, it is considered to be in a sealed state..
|
||||
:return:
|
||||
"""
|
||||
secrets_root_key = current_app.config.get("secrets_root_key")
|
||||
# secrets_root_key = current_app.config.get("secrets_root_key")
|
||||
if not secrets_root_key:
|
||||
return True
|
||||
msg, ok = self.is_valid_root_key(secrets_root_key)
|
||||
if not ok:
|
||||
return true
|
||||
return True
|
||||
status = self.backend.get(backend_seal_key)
|
||||
return status == "block"
|
||||
|
||||
|
@ -349,22 +381,53 @@ class KeyManage:
|
|||
}
|
||||
print(status_colors.get(status, Fore.GREEN), message, Style.RESET_ALL)
|
||||
|
||||
def set_shares(self, values):
|
||||
new_value = list()
|
||||
for v in values:
|
||||
new_value.append((v[0], b64encode(v[1]).decode("utf-8")))
|
||||
self.backend.cache.publish(self.share_key, json.dumps(new_value))
|
||||
self.backend.cache.set(self.share_key, json.dumps(new_value))
|
||||
|
||||
def watch_root_key(self, app):
|
||||
pubsub = self.backend.cache.pubsub()
|
||||
pubsub.subscribe(self.share_key)
|
||||
|
||||
new_value = set()
|
||||
for message in pubsub.listen():
|
||||
if message["type"] == "message":
|
||||
if message["data"] == b"clear":
|
||||
self.clear()
|
||||
continue
|
||||
try:
|
||||
value = json.loads(message["data"].decode("utf-8"))
|
||||
for v in value:
|
||||
new_value.add((v[0], b64decode(v[1])))
|
||||
except Exception as e:
|
||||
return []
|
||||
if len(new_value) >= global_key_threshold:
|
||||
self.parse_shares(list(new_value), app)
|
||||
new_value = set()
|
||||
|
||||
|
||||
class InnerCrypt:
|
||||
def __init__(self):
|
||||
secrets_encrypt_key = current_app.config.get("secrets_encrypt_key", "")
|
||||
self.encrypt_key = b64decode(secrets_encrypt_key.encode("utf-8"))
|
||||
self.encrypt_key = b64decode(secrets_encrypt_key)
|
||||
#self.encrypt_key = b64decode(secrets_encrypt_key, "".encode("utf-8"))
|
||||
|
||||
def encrypt(self, plaintext):
|
||||
"""
|
||||
encrypt method contain aes currently
|
||||
"""
|
||||
if not self.encrypt_key:
|
||||
return ValueError("secret is disabled, please seal firstly"), False
|
||||
return self.aes_encrypt(self.encrypt_key, plaintext)
|
||||
|
||||
def decrypt(self, ciphertext):
|
||||
"""
|
||||
decrypt method contain aes currently
|
||||
"""
|
||||
if not self.encrypt_key:
|
||||
return ValueError("secret is disabled, please seal firstly"), False
|
||||
return self.aes_decrypt(self.encrypt_key, ciphertext)
|
||||
|
||||
@classmethod
|
||||
|
@ -381,6 +444,7 @@ class InnerCrypt:
|
|||
|
||||
return b64encode(iv + ciphertext).decode("utf-8"), True
|
||||
except Exception as e:
|
||||
|
||||
return str(e), False
|
||||
|
||||
@classmethod
|
||||
|
@ -426,4 +490,4 @@ if __name__ == "__main__":
|
|||
t_ciphertext, status1 = c.encrypt(t_plaintext)
|
||||
print("Ciphertext:", t_ciphertext)
|
||||
decrypted_plaintext, status2 = c.decrypt(t_ciphertext)
|
||||
print("Decrypted plaintext:", decrypted_plaintext)
|
||||
print("Decrypted plaintext:", decrypted_plaintext)
|
|
@ -1,8 +1,13 @@
|
|||
import base64
|
||||
import json
|
||||
|
||||
from api.models.cmdb import InnerKV
|
||||
from api.extensions import rd
|
||||
|
||||
|
||||
class InnerKVManger(object):
|
||||
def __init__(self):
|
||||
self.cache = rd.r
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
|
@ -33,3 +38,26 @@ class InnerKVManger(object):
|
|||
return "success", True
|
||||
|
||||
return "update failed", True
|
||||
|
||||
@classmethod
|
||||
def get_shares(cls, key):
|
||||
new_value = list()
|
||||
v = rd.get_str(key)
|
||||
if not v:
|
||||
return new_value
|
||||
try:
|
||||
value = json.loads(v.decode("utf-8"))
|
||||
for v in value:
|
||||
new_value.append((v[0], base64.b64decode(v[1])))
|
||||
except Exception as e:
|
||||
return []
|
||||
return new_value
|
||||
|
||||
@classmethod
|
||||
def set_shares(cls, key, value):
|
||||
new_value = list()
|
||||
for v in value:
|
||||
new_value.append((v[0], base64.b64encode(v[1]).decode("utf-8")))
|
||||
rd.set_str(key, json.dumps(new_value))
|
||||
|
||||
|
||||
|
|
|
@ -117,6 +117,23 @@ class RedisHandler(object):
|
|||
except Exception as e:
|
||||
current_app.logger.error("delete redis key error, {0}".format(str(e)))
|
||||
|
||||
def set_str(self, key, value, expired=None):
|
||||
try:
|
||||
if expired:
|
||||
self.r.setex(key, expired, value)
|
||||
else:
|
||||
self.r.set(key, value)
|
||||
except Exception as e:
|
||||
current_app.logger.error("set redis error, {0}".format(str(e)))
|
||||
|
||||
def get_str(self, key):
|
||||
try:
|
||||
value = self.r.get(key)
|
||||
except Exception as e:
|
||||
current_app.logger.error("get redis error, {0}".format(str(e)))
|
||||
return
|
||||
return value
|
||||
|
||||
|
||||
class ESHandler(object):
|
||||
def __init__(self, flask_app=None):
|
||||
|
|
|
@ -14,6 +14,11 @@ services:
|
|||
- db-data:/var/lib/mysql
|
||||
- ./docs/mysqld.cnf:/etc/mysql/conf.d/mysqld.cnf
|
||||
- ./docs/cmdb.sql:/docker-entrypoint-initdb.d/cmdb.sql
|
||||
healthcheck:
|
||||
test: ["CMD", "mysqladmin" ,"ping", "-h", "localhost"]
|
||||
interval: 10s
|
||||
timeout: 5s
|
||||
retries: 5
|
||||
command: --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci
|
||||
networks:
|
||||
new:
|
||||
|
@ -27,6 +32,11 @@ services:
|
|||
container_name: cmdb-cache
|
||||
environment:
|
||||
TZ: Asia/Shanghai
|
||||
healthcheck:
|
||||
test: ["CMD", "redis-cli", "ping"]
|
||||
interval: 10s
|
||||
timeout: 5s
|
||||
retries: 5
|
||||
networks:
|
||||
new:
|
||||
aliases:
|
||||
|
@ -41,6 +51,11 @@ services:
|
|||
environment:
|
||||
TZ: Asia/Shanghai
|
||||
WAIT_HOSTS: cmdb-db:3306, cmdb-cache:6379
|
||||
depends_on:
|
||||
cmdb-db:
|
||||
condition: service_healthy
|
||||
cmdb-cache:
|
||||
condition: service_healthy
|
||||
command:
|
||||
- /bin/sh
|
||||
- -c
|
||||
|
@ -51,6 +66,9 @@ services:
|
|||
flask common-check-new-columns
|
||||
gunicorn --workers=4 autoapp:app -b 0.0.0.0:5000 -D
|
||||
|
||||
#nohup celery -A celery_worker.celery worker -E -Q one_cmdb_async --autoscale=2,5 > one_cmdb_async.log 2>&1 &
|
||||
#nohup celery -A celery_worker.celery worker -E -Q acl_async --concurrency=2 > one_acl_async.log 2>&1 &
|
||||
#
|
||||
celery -A celery_worker.celery worker -E -Q one_cmdb_async --autoscale=4,1 --logfile=one_cmdb_async.log -D
|
||||
celery -A celery_worker.celery worker -E -Q acl_async --logfile=one_acl_async.log --autoscale=2,1 -D
|
||||
|
||||
|
@ -61,9 +79,6 @@ services:
|
|||
flask init-department
|
||||
flask cmdb-counter > counter.log 2>&1
|
||||
|
||||
depends_on:
|
||||
- cmdb-db
|
||||
- cmdb-cache
|
||||
networks:
|
||||
new:
|
||||
aliases:
|
||||
|
|
|
@ -21,7 +21,7 @@ check_docker_compose() {
|
|||
|
||||
clone_repo() {
|
||||
local repo_url=$1
|
||||
git clone $repo_url || {
|
||||
git clone -b deploy_on_kylin_docker --single-branch $repo_url || {
|
||||
echo "error: failed to clone $repo_url"
|
||||
exit 1
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue