mirror of
				https://github.com/veops/cmdb.git
				synced 2025-10-26 05:49:21 +08:00 
			
		
		
		
	* fix: 解决在麒麟系统上使用docker安装时使用celery -D启动 celery 可能出现的问题 * fix: 解决在麒麟系统上使用docker安装时使用celery -D启动 celery 可能出现的问题 * fix: NoneType happend while unsealing the secret funtion, cancel the address check while unseal and seal * fix: unseal secret function * fix: remove depens_on in docker-compose * fix: support sealing and unsealing secret in multiple process(more than one workers started by gunicorn)
		
			
				
	
	
		
			64 lines
		
	
	
		
			1.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			64 lines
		
	
	
		
			1.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import base64
 | |
| import json
 | |
| 
 | |
| from api.models.cmdb import InnerKV
 | |
| from api.extensions import rd
 | |
| 
 | |
| 
 | |
| class InnerKVManger(object):
 | |
|     def __init__(self):
 | |
|         self.cache = rd.r
 | |
|         pass
 | |
| 
 | |
|     @classmethod
 | |
|     def add(cls, key, value):
 | |
|         data = {"key": key, "value": value}
 | |
|         res = InnerKV.create(**data)
 | |
|         if res.key == key:
 | |
|             return "success", True
 | |
| 
 | |
|         return "add failed", False
 | |
| 
 | |
|     @classmethod
 | |
|     def get(cls, key):
 | |
|         res = InnerKV.get_by(first=True, to_dict=False, key=key)
 | |
|         if not res:
 | |
|             return None
 | |
| 
 | |
|         return res.value
 | |
| 
 | |
|     @classmethod
 | |
|     def update(cls, key, value):
 | |
|         res = InnerKV.get_by(first=True, to_dict=False, key=key)
 | |
|         if not res:
 | |
|             return cls.add(key, value)
 | |
| 
 | |
|         t = res.update(value=value)
 | |
|         if t.key == key:
 | |
|             return "success", True
 | |
| 
 | |
|         return "update failed", True
 | |
| 
 | |
|     @classmethod
 | |
|     def get_shares(cls, key):
 | |
|         new_value = list()
 | |
|         v = rd.get_str(key)
 | |
|         if not v:
 | |
|             return new_value
 | |
|         try:
 | |
|             value = json.loads(v.decode("utf-8"))
 | |
|             for v in value:
 | |
|                 new_value.append((v[0], base64.b64decode(v[1])))
 | |
|         except Exception as e:
 | |
|             return []
 | |
|         return new_value
 | |
| 
 | |
|     @classmethod
 | |
|     def set_shares(cls, key, value):
 | |
|         new_value = list()
 | |
|         for v in value:
 | |
|             new_value.append((v[0], base64.b64encode(v[1]).decode("utf-8")))
 | |
|         rd.set_str(key, json.dumps(new_value))
 | |
| 
 | |
| 
 |