From 5bede8ad73304ffa06c2fb82b78014d06744cd40 Mon Sep 17 00:00:00 2001 From: pycook Date: Thu, 26 Oct 2023 17:39:50 +0800 Subject: [PATCH] perf(vault): format code --- cmdb-api/api/lib/secrets/vault.py | 74 ++++++++++++++++--------------- 1 file changed, 38 insertions(+), 36 deletions(-) diff --git a/cmdb-api/api/lib/secrets/vault.py b/cmdb-api/api/lib/secrets/vault.py index 66ce43d..85bc6db 100644 --- a/cmdb-api/api/lib/secrets/vault.py +++ b/cmdb-api/api/lib/secrets/vault.py @@ -1,25 +1,30 @@ +from base64 import b64decode +from base64 import b64encode + import hvac -from base64 import b64encode, b64decode -class VaultSDK: - def __init__(self, base_url, token, mount_path): +class VaultClient: + def __init__(self, base_url, token, mount_path='cmdb'): self.client = hvac.Client(url=base_url, token=token) self.mount_path = mount_path - def create_approle(self, role_name, policies): + def create_app_role(self, role_name, policies): resp = self.client.create_approle(role_name, policies=policies) + return resp == 200 - def delete_approle(self, role_name): + def delete_app_role(self, role_name): resp = self.client.delete_approle(role_name) + return resp == 204 - def update_approle_policies(self, role_name, policies): + def update_app_role_policies(self, role_name, policies): resp = self.client.update_approle_role(role_name, policies=policies) + return resp == 204 - def get_approle(self, role_name): + def get_app_role(self, role_name): resp = self.client.get_approle(role_name) resp.json() if resp.status_code == 200: @@ -36,21 +41,23 @@ class VaultSDK: else: return {} - def encrypt_data(self, plaintext): + def encrypt(self, plaintext): response = self.client.secrets.transit.encrypt_data(name='transit-key', plaintext=plaintext) ciphertext = response['data']['ciphertext'] + return ciphertext # decrypt data - def decrypt_data(self, ciphertext): + def decrypt(self, ciphertext): response = self.client.secrets.transit.decrypt_data(name='transit-key', ciphertext=ciphertext) plaintext = response['data']['plaintext'] + return plaintext - def write_data(self, path, data, encrypt=None): + def write(self, path, data, encrypt=None): if encrypt: for k, v in data.items(): - data[k] = self.encrypt_data(self.encode_base64(v)) + data[k] = self.encrypt(self.encode_base64(v)) response = self.client.secrets.kv.v2.create_or_update_secret( path=path, secret=data, @@ -60,7 +67,7 @@ class VaultSDK: return response # read data - def read_data(self, path, decrypt=None): + def read(self, path, decrypt=True): try: response = self.client.secrets.kv.v2.read_secret_version( path=path, raise_on_deleted_version=False, mount_point=self.mount_path @@ -71,16 +78,17 @@ class VaultSDK: if decrypt: try: for k, v in data.items(): - data[k] = self.decode_base64(self.decrypt_data(v)) - except Exception: + data[k] = self.decode_base64(self.decrypt(v)) + except: return data, True + return data, True # update data - def update_data(self, path, data, overwrite=None, encrypt=None): + def update(self, path, data, overwrite=True, encrypt=True): if encrypt: for k, v in data.items(): - data[k] = self.encrypt_data(self.encode_base64(v)) + data[k] = self.encrypt(self.encode_base64(v)) if overwrite: response = self.client.secrets.kv.v2.create_or_update_secret( path=path, @@ -89,14 +97,16 @@ class VaultSDK: ) else: response = self.client.secrets.kv.v2.patch(path=path, secret=data, mount_point=self.mount_path) + return response # delete data - def delete_data(self, path): + def delete(self, path): response = self.client.secrets.kv.v2.delete_metadata_and_all_versions( path=path, mount_point=self.mount_path ) + return response # Base64 encode @@ -104,6 +114,7 @@ class VaultSDK: def encode_base64(cls, data): encoded_bytes = b64encode(data.encode()) encoded_string = encoded_bytes.decode() + return encoded_string # Base64 decode @@ -111,29 +122,20 @@ class VaultSDK: def decode_base64(cls, encoded_string): decoded_bytes = b64decode(encoded_string) decoded_string = decoded_bytes.decode() + return decoded_string -class VaultAppRoleSDK(VaultSDK): - def __int__(self, base_url, role_id, secret_id): - self.base_url = base_url - self.role_id = role_id - self.secret_id = secret_id - self.token = self.get_token() - super(VaultAppRoleSDK, self).__int__(base_url, self.token) - - if __name__ == "__main__": - base_url = "http://localhost:8200" - token = "hvs.OALuhK2wToxsn2Z1WFnDaKRw" - mount_path = "cmdb" + _base_url = "http://localhost:8200" + _token = "your token" - path = "test001" + _path = "test001" # Example - sdk = VaultSDK(base_url, token, mount_path) + sdk = VaultClient(_base_url, _token) # sdk.enable_secrets_engine() - data = {"key1": "value1", "key2": "value2", "key3": "value3"} - data = sdk.update_data(path, data, overwrite=True, encrypt=True) - print(data.status_code) - data = sdk.read_data(path, decrypt=True) - print(data) + _data = {"key1": "value1", "key2": "value2", "key3": "value3"} + _data = sdk.update(_path, _data, overwrite=True, encrypt=True) + print(_data.status_code) + _data = sdk.read(_path, decrypt=True) + print(_data)