perf(vault): format code

This commit is contained in:
pycook 2023-10-26 17:39:50 +08:00
parent c4f2a89be1
commit 5bede8ad73
1 changed files with 38 additions and 36 deletions

View File

@ -1,25 +1,30 @@
from base64 import b64decode
from base64 import b64encode
import hvac
from base64 import b64encode, b64decode
class VaultSDK:
def __init__(self, base_url, token, mount_path):
class VaultClient:
def __init__(self, base_url, token, mount_path='cmdb'):
self.client = hvac.Client(url=base_url, token=token)
self.mount_path = mount_path
def create_approle(self, role_name, policies):
def create_app_role(self, role_name, policies):
resp = self.client.create_approle(role_name, policies=policies)
return resp == 200
def delete_approle(self, role_name):
def delete_app_role(self, role_name):
resp = self.client.delete_approle(role_name)
return resp == 204
def update_approle_policies(self, role_name, policies):
def update_app_role_policies(self, role_name, policies):
resp = self.client.update_approle_role(role_name, policies=policies)
return resp == 204
def get_approle(self, role_name):
def get_app_role(self, role_name):
resp = self.client.get_approle(role_name)
resp.json()
if resp.status_code == 200:
@ -36,21 +41,23 @@ class VaultSDK:
else:
return {}
def encrypt_data(self, plaintext):
def encrypt(self, plaintext):
response = self.client.secrets.transit.encrypt_data(name='transit-key', plaintext=plaintext)
ciphertext = response['data']['ciphertext']
return ciphertext
# decrypt data
def decrypt_data(self, ciphertext):
def decrypt(self, ciphertext):
response = self.client.secrets.transit.decrypt_data(name='transit-key', ciphertext=ciphertext)
plaintext = response['data']['plaintext']
return plaintext
def write_data(self, path, data, encrypt=None):
def write(self, path, data, encrypt=None):
if encrypt:
for k, v in data.items():
data[k] = self.encrypt_data(self.encode_base64(v))
data[k] = self.encrypt(self.encode_base64(v))
response = self.client.secrets.kv.v2.create_or_update_secret(
path=path,
secret=data,
@ -60,7 +67,7 @@ class VaultSDK:
return response
# read data
def read_data(self, path, decrypt=None):
def read(self, path, decrypt=True):
try:
response = self.client.secrets.kv.v2.read_secret_version(
path=path, raise_on_deleted_version=False, mount_point=self.mount_path
@ -71,16 +78,17 @@ class VaultSDK:
if decrypt:
try:
for k, v in data.items():
data[k] = self.decode_base64(self.decrypt_data(v))
except Exception:
data[k] = self.decode_base64(self.decrypt(v))
except:
return data, True
return data, True
# update data
def update_data(self, path, data, overwrite=None, encrypt=None):
def update(self, path, data, overwrite=True, encrypt=True):
if encrypt:
for k, v in data.items():
data[k] = self.encrypt_data(self.encode_base64(v))
data[k] = self.encrypt(self.encode_base64(v))
if overwrite:
response = self.client.secrets.kv.v2.create_or_update_secret(
path=path,
@ -89,14 +97,16 @@ class VaultSDK:
)
else:
response = self.client.secrets.kv.v2.patch(path=path, secret=data, mount_point=self.mount_path)
return response
# delete data
def delete_data(self, path):
def delete(self, path):
response = self.client.secrets.kv.v2.delete_metadata_and_all_versions(
path=path,
mount_point=self.mount_path
)
return response
# Base64 encode
@ -104,6 +114,7 @@ class VaultSDK:
def encode_base64(cls, data):
encoded_bytes = b64encode(data.encode())
encoded_string = encoded_bytes.decode()
return encoded_string
# Base64 decode
@ -111,29 +122,20 @@ class VaultSDK:
def decode_base64(cls, encoded_string):
decoded_bytes = b64decode(encoded_string)
decoded_string = decoded_bytes.decode()
return decoded_string
class VaultAppRoleSDK(VaultSDK):
def __int__(self, base_url, role_id, secret_id):
self.base_url = base_url
self.role_id = role_id
self.secret_id = secret_id
self.token = self.get_token()
super(VaultAppRoleSDK, self).__int__(base_url, self.token)
if __name__ == "__main__":
base_url = "http://localhost:8200"
token = "hvs.OALuhK2wToxsn2Z1WFnDaKRw"
mount_path = "cmdb"
_base_url = "http://localhost:8200"
_token = "your token"
path = "test001"
_path = "test001"
# Example
sdk = VaultSDK(base_url, token, mount_path)
sdk = VaultClient(_base_url, _token)
# sdk.enable_secrets_engine()
data = {"key1": "value1", "key2": "value2", "key3": "value3"}
data = sdk.update_data(path, data, overwrite=True, encrypt=True)
print(data.status_code)
data = sdk.read_data(path, decrypt=True)
print(data)
_data = {"key1": "value1", "key2": "value2", "key3": "value3"}
_data = sdk.update(_path, _data, overwrite=True, encrypt=True)
print(_data.status_code)
_data = sdk.read(_path, decrypt=True)
print(_data)