mirror of https://github.com/veops/cmdb.git
Dev api 20231019 (#210)
* fix(acl): get resources * fix(celery worker): db server has gone away
This commit is contained in:
parent
e0e2ca6294
commit
bdd2adcfc2
|
@ -4,8 +4,13 @@
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
|
|
||||||
from flask import abort
|
from flask import abort
|
||||||
|
from flask import current_app
|
||||||
from flask import request
|
from flask import request
|
||||||
|
from sqlalchemy.exc import InvalidRequestError
|
||||||
|
from sqlalchemy.exc import OperationalError
|
||||||
|
from sqlalchemy.exc import StatementError
|
||||||
|
|
||||||
|
from api.extensions import db
|
||||||
from api.lib.resp_format import CommonErrFormat
|
from api.lib.resp_format import CommonErrFormat
|
||||||
|
|
||||||
|
|
||||||
|
@ -70,3 +75,40 @@ def args_validate(model_cls, exclude_args=None):
|
||||||
return wrapper
|
return wrapper
|
||||||
|
|
||||||
return decorate
|
return decorate
|
||||||
|
|
||||||
|
|
||||||
|
def reconnect_db(func):
|
||||||
|
@wraps(func)
|
||||||
|
def wrapper(*args, **kwargs):
|
||||||
|
try:
|
||||||
|
return func(*args, **kwargs)
|
||||||
|
except (StatementError, OperationalError, InvalidRequestError) as e:
|
||||||
|
error_msg = str(e)
|
||||||
|
if 'Lost connection' in error_msg or 'reconnect until invalid transaction' in error_msg or \
|
||||||
|
'can be emitted within this transaction' in error_msg:
|
||||||
|
current_app.logger.info('[reconnect_db] lost connect rollback then retry')
|
||||||
|
db.session.rollback()
|
||||||
|
return func(*args, **kwargs)
|
||||||
|
else:
|
||||||
|
raise e
|
||||||
|
except Exception as e:
|
||||||
|
raise e
|
||||||
|
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
|
def _flush_db():
|
||||||
|
db.session.commit()
|
||||||
|
|
||||||
|
|
||||||
|
def flush_db(func):
|
||||||
|
@wraps(func)
|
||||||
|
def wrapper(*args, **kwargs):
|
||||||
|
_flush_db()
|
||||||
|
return func(*args, **kwargs)
|
||||||
|
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
|
def run_flush_db():
|
||||||
|
_flush_db()
|
||||||
|
|
|
@ -260,7 +260,8 @@ class ResourceCRUD(object):
|
||||||
numfound = query.count()
|
numfound = query.count()
|
||||||
res = [i.to_dict() for i in query.offset((page - 1) * page_size).limit(page_size)]
|
res = [i.to_dict() for i in query.offset((page - 1) * page_size).limit(page_size)]
|
||||||
for i in res:
|
for i in res:
|
||||||
i['user'] = UserCache.get(i['uid']).nickname if i['uid'] else ''
|
user = UserCache.get(i['uid']) if i['uid'] else ''
|
||||||
|
i['user'] = user and user.nickname
|
||||||
|
|
||||||
return numfound, res
|
return numfound, res
|
||||||
|
|
||||||
|
|
|
@ -9,7 +9,8 @@ from werkzeug.exceptions import BadRequest
|
||||||
from werkzeug.exceptions import NotFound
|
from werkzeug.exceptions import NotFound
|
||||||
|
|
||||||
from api.extensions import celery
|
from api.extensions import celery
|
||||||
from api.extensions import db
|
from api.lib.decorator import flush_db
|
||||||
|
from api.lib.decorator import reconnect_db
|
||||||
from api.lib.perm.acl.audit import AuditCRUD
|
from api.lib.perm.acl.audit import AuditCRUD
|
||||||
from api.lib.perm.acl.audit import AuditOperateSource
|
from api.lib.perm.acl.audit import AuditOperateSource
|
||||||
from api.lib.perm.acl.audit import AuditOperateType
|
from api.lib.perm.acl.audit import AuditOperateType
|
||||||
|
@ -28,6 +29,7 @@ from api.models.acl import Trigger
|
||||||
name="acl.role_rebuild",
|
name="acl.role_rebuild",
|
||||||
queue=ACL_QUEUE,
|
queue=ACL_QUEUE,
|
||||||
once={"graceful": True, "unlock_before_run": True})
|
once={"graceful": True, "unlock_before_run": True})
|
||||||
|
@reconnect_db
|
||||||
def role_rebuild(rids, app_id):
|
def role_rebuild(rids, app_id):
|
||||||
rids = rids if isinstance(rids, list) else [rids]
|
rids = rids if isinstance(rids, list) else [rids]
|
||||||
for rid in rids:
|
for rid in rids:
|
||||||
|
@ -37,6 +39,7 @@ def role_rebuild(rids, app_id):
|
||||||
|
|
||||||
|
|
||||||
@celery.task(name="acl.update_resource_to_build_role", queue=ACL_QUEUE)
|
@celery.task(name="acl.update_resource_to_build_role", queue=ACL_QUEUE)
|
||||||
|
@reconnect_db
|
||||||
def update_resource_to_build_role(resource_id, app_id, group_id=None):
|
def update_resource_to_build_role(resource_id, app_id, group_id=None):
|
||||||
rids = [i.id for i in Role.get_by(__func_isnot__key_uid=None, fl='id', to_dict=False)]
|
rids = [i.id for i in Role.get_by(__func_isnot__key_uid=None, fl='id', to_dict=False)]
|
||||||
rids += [i.id for i in Role.get_by(app_id=app_id, fl='id', to_dict=False)]
|
rids += [i.id for i in Role.get_by(app_id=app_id, fl='id', to_dict=False)]
|
||||||
|
@ -52,9 +55,9 @@ def update_resource_to_build_role(resource_id, app_id, group_id=None):
|
||||||
|
|
||||||
|
|
||||||
@celery.task(name="acl.apply_trigger", queue=ACL_QUEUE)
|
@celery.task(name="acl.apply_trigger", queue=ACL_QUEUE)
|
||||||
|
@flush_db
|
||||||
|
@reconnect_db
|
||||||
def apply_trigger(_id, resource_id=None, operator_uid=None):
|
def apply_trigger(_id, resource_id=None, operator_uid=None):
|
||||||
db.session.remove()
|
|
||||||
|
|
||||||
from api.lib.perm.acl.permission import PermissionCRUD
|
from api.lib.perm.acl.permission import PermissionCRUD
|
||||||
|
|
||||||
trigger = Trigger.get_by_id(_id)
|
trigger = Trigger.get_by_id(_id)
|
||||||
|
@ -118,9 +121,9 @@ def apply_trigger(_id, resource_id=None, operator_uid=None):
|
||||||
|
|
||||||
|
|
||||||
@celery.task(name="acl.cancel_trigger", queue=ACL_QUEUE)
|
@celery.task(name="acl.cancel_trigger", queue=ACL_QUEUE)
|
||||||
|
@flush_db
|
||||||
|
@reconnect_db
|
||||||
def cancel_trigger(_id, resource_id=None, operator_uid=None):
|
def cancel_trigger(_id, resource_id=None, operator_uid=None):
|
||||||
db.session.remove()
|
|
||||||
|
|
||||||
from api.lib.perm.acl.permission import PermissionCRUD
|
from api.lib.perm.acl.permission import PermissionCRUD
|
||||||
|
|
||||||
trigger = Trigger.get_by_id(_id)
|
trigger = Trigger.get_by_id(_id)
|
||||||
|
@ -186,6 +189,7 @@ def cancel_trigger(_id, resource_id=None, operator_uid=None):
|
||||||
|
|
||||||
|
|
||||||
@celery.task(name="acl.op_record", queue=ACL_QUEUE)
|
@celery.task(name="acl.op_record", queue=ACL_QUEUE)
|
||||||
|
@reconnect_db
|
||||||
def op_record(app, rolename, operate_type, obj):
|
def op_record(app, rolename, operate_type, obj):
|
||||||
if isinstance(app, int):
|
if isinstance(app, int):
|
||||||
app = AppCache.get(app)
|
app = AppCache.get(app)
|
||||||
|
|
|
@ -16,6 +16,8 @@ from api.lib.cmdb.cache import CITypeAttributesCache
|
||||||
from api.lib.cmdb.const import CMDB_QUEUE
|
from api.lib.cmdb.const import CMDB_QUEUE
|
||||||
from api.lib.cmdb.const import REDIS_PREFIX_CI
|
from api.lib.cmdb.const import REDIS_PREFIX_CI
|
||||||
from api.lib.cmdb.const import REDIS_PREFIX_CI_RELATION
|
from api.lib.cmdb.const import REDIS_PREFIX_CI_RELATION
|
||||||
|
from api.lib.decorator import flush_db
|
||||||
|
from api.lib.decorator import reconnect_db
|
||||||
from api.lib.perm.acl.cache import UserCache
|
from api.lib.perm.acl.cache import UserCache
|
||||||
from api.lib.utils import Lock
|
from api.lib.utils import Lock
|
||||||
from api.lib.utils import handle_arg_list
|
from api.lib.utils import handle_arg_list
|
||||||
|
@ -25,11 +27,12 @@ from api.models.cmdb import CITypeAttribute
|
||||||
|
|
||||||
|
|
||||||
@celery.task(name="cmdb.ci_cache", queue=CMDB_QUEUE)
|
@celery.task(name="cmdb.ci_cache", queue=CMDB_QUEUE)
|
||||||
|
@flush_db
|
||||||
|
@reconnect_db
|
||||||
def ci_cache(ci_id, operate_type, record_id):
|
def ci_cache(ci_id, operate_type, record_id):
|
||||||
from api.lib.cmdb.ci import CITriggerManager
|
from api.lib.cmdb.ci import CITriggerManager
|
||||||
|
|
||||||
time.sleep(0.01)
|
time.sleep(0.01)
|
||||||
db.session.remove()
|
|
||||||
|
|
||||||
m = api.lib.cmdb.ci.CIManager()
|
m = api.lib.cmdb.ci.CIManager()
|
||||||
ci_dict = m.get_ci_by_id_from_db(ci_id, need_children=False, use_master=False)
|
ci_dict = m.get_ci_by_id_from_db(ci_id, need_children=False, use_master=False)
|
||||||
|
@ -49,9 +52,10 @@ def ci_cache(ci_id, operate_type, record_id):
|
||||||
|
|
||||||
|
|
||||||
@celery.task(name="cmdb.batch_ci_cache", queue=CMDB_QUEUE)
|
@celery.task(name="cmdb.batch_ci_cache", queue=CMDB_QUEUE)
|
||||||
|
@flush_db
|
||||||
|
@reconnect_db
|
||||||
def batch_ci_cache(ci_ids, ): # only for attribute change index
|
def batch_ci_cache(ci_ids, ): # only for attribute change index
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
db.session.remove()
|
|
||||||
|
|
||||||
for ci_id in ci_ids:
|
for ci_id in ci_ids:
|
||||||
m = api.lib.cmdb.ci.CIManager()
|
m = api.lib.cmdb.ci.CIManager()
|
||||||
|
@ -66,6 +70,7 @@ def batch_ci_cache(ci_ids, ): # only for attribute change index
|
||||||
|
|
||||||
|
|
||||||
@celery.task(name="cmdb.ci_delete", queue=CMDB_QUEUE)
|
@celery.task(name="cmdb.ci_delete", queue=CMDB_QUEUE)
|
||||||
|
@reconnect_db
|
||||||
def ci_delete(ci_id):
|
def ci_delete(ci_id):
|
||||||
current_app.logger.info(ci_id)
|
current_app.logger.info(ci_id)
|
||||||
|
|
||||||
|
@ -78,6 +83,7 @@ def ci_delete(ci_id):
|
||||||
|
|
||||||
|
|
||||||
@celery.task(name="cmdb.ci_delete_trigger", queue=CMDB_QUEUE)
|
@celery.task(name="cmdb.ci_delete_trigger", queue=CMDB_QUEUE)
|
||||||
|
@reconnect_db
|
||||||
def ci_delete_trigger(trigger, operate_type, ci_dict):
|
def ci_delete_trigger(trigger, operate_type, ci_dict):
|
||||||
current_app.logger.info('delete ci {} trigger'.format(ci_dict['_id']))
|
current_app.logger.info('delete ci {} trigger'.format(ci_dict['_id']))
|
||||||
from api.lib.cmdb.ci import CITriggerManager
|
from api.lib.cmdb.ci import CITriggerManager
|
||||||
|
@ -89,9 +95,9 @@ def ci_delete_trigger(trigger, operate_type, ci_dict):
|
||||||
|
|
||||||
|
|
||||||
@celery.task(name="cmdb.ci_relation_cache", queue=CMDB_QUEUE)
|
@celery.task(name="cmdb.ci_relation_cache", queue=CMDB_QUEUE)
|
||||||
|
@flush_db
|
||||||
|
@reconnect_db
|
||||||
def ci_relation_cache(parent_id, child_id):
|
def ci_relation_cache(parent_id, child_id):
|
||||||
db.session.remove()
|
|
||||||
|
|
||||||
with Lock("CIRelation_{}".format(parent_id)):
|
with Lock("CIRelation_{}".format(parent_id)):
|
||||||
children = rd.get([parent_id], REDIS_PREFIX_CI_RELATION)[0]
|
children = rd.get([parent_id], REDIS_PREFIX_CI_RELATION)[0]
|
||||||
children = json.loads(children) if children is not None else {}
|
children = json.loads(children) if children is not None else {}
|
||||||
|
@ -106,6 +112,8 @@ def ci_relation_cache(parent_id, child_id):
|
||||||
|
|
||||||
|
|
||||||
@celery.task(name="cmdb.ci_relation_add", queue=CMDB_QUEUE)
|
@celery.task(name="cmdb.ci_relation_add", queue=CMDB_QUEUE)
|
||||||
|
@flush_db
|
||||||
|
@reconnect_db
|
||||||
def ci_relation_add(parent_dict, child_id, uid):
|
def ci_relation_add(parent_dict, child_id, uid):
|
||||||
"""
|
"""
|
||||||
:param parent_dict: key is '$parent_model.attr_name'
|
:param parent_dict: key is '$parent_model.attr_name'
|
||||||
|
@ -121,8 +129,6 @@ def ci_relation_add(parent_dict, child_id, uid):
|
||||||
current_app.test_request_context().push()
|
current_app.test_request_context().push()
|
||||||
login_user(UserCache.get(uid))
|
login_user(UserCache.get(uid))
|
||||||
|
|
||||||
db.session.remove()
|
|
||||||
|
|
||||||
for parent in parent_dict:
|
for parent in parent_dict:
|
||||||
parent_ci_type_name, _attr_name = parent.strip()[1:].split('.', 1)
|
parent_ci_type_name, _attr_name = parent.strip()[1:].split('.', 1)
|
||||||
attr_name = CITypeAttributeManager.get_attr_name(parent_ci_type_name, _attr_name)
|
attr_name = CITypeAttributeManager.get_attr_name(parent_ci_type_name, _attr_name)
|
||||||
|
@ -147,10 +153,14 @@ def ci_relation_add(parent_dict, child_id, uid):
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
current_app.logger.warning(e)
|
current_app.logger.warning(e)
|
||||||
finally:
|
finally:
|
||||||
db.session.remove()
|
try:
|
||||||
|
db.session.commit()
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
@celery.task(name="cmdb.ci_relation_delete", queue=CMDB_QUEUE)
|
@celery.task(name="cmdb.ci_relation_delete", queue=CMDB_QUEUE)
|
||||||
|
@reconnect_db
|
||||||
def ci_relation_delete(parent_id, child_id):
|
def ci_relation_delete(parent_id, child_id):
|
||||||
with Lock("CIRelation_{}".format(parent_id)):
|
with Lock("CIRelation_{}".format(parent_id)):
|
||||||
children = rd.get([parent_id], REDIS_PREFIX_CI_RELATION)[0]
|
children = rd.get([parent_id], REDIS_PREFIX_CI_RELATION)[0]
|
||||||
|
@ -165,9 +175,10 @@ def ci_relation_delete(parent_id, child_id):
|
||||||
|
|
||||||
|
|
||||||
@celery.task(name="cmdb.ci_type_attribute_order_rebuild", queue=CMDB_QUEUE)
|
@celery.task(name="cmdb.ci_type_attribute_order_rebuild", queue=CMDB_QUEUE)
|
||||||
|
@flush_db
|
||||||
|
@reconnect_db
|
||||||
def ci_type_attribute_order_rebuild(type_id, uid):
|
def ci_type_attribute_order_rebuild(type_id, uid):
|
||||||
current_app.logger.info('rebuild attribute order')
|
current_app.logger.info('rebuild attribute order')
|
||||||
db.session.remove()
|
|
||||||
|
|
||||||
from api.lib.cmdb.ci_type import CITypeAttributeGroupManager
|
from api.lib.cmdb.ci_type import CITypeAttributeGroupManager
|
||||||
|
|
||||||
|
@ -188,11 +199,11 @@ def ci_type_attribute_order_rebuild(type_id, uid):
|
||||||
|
|
||||||
|
|
||||||
@celery.task(name="cmdb.calc_computed_attribute", queue=CMDB_QUEUE)
|
@celery.task(name="cmdb.calc_computed_attribute", queue=CMDB_QUEUE)
|
||||||
|
@flush_db
|
||||||
|
@reconnect_db
|
||||||
def calc_computed_attribute(attr_id, uid):
|
def calc_computed_attribute(attr_id, uid):
|
||||||
from api.lib.cmdb.ci import CIManager
|
from api.lib.cmdb.ci import CIManager
|
||||||
|
|
||||||
db.session.remove()
|
|
||||||
|
|
||||||
current_app.test_request_context().push()
|
current_app.test_request_context().push()
|
||||||
login_user(UserCache.get(uid))
|
login_user(UserCache.get(uid))
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue