feat(api): Add auto subscription config

This commit is contained in:
pycook
2025-08-14 20:19:49 +08:00
parent 5048f2a788
commit a809933a5f
3 changed files with 226 additions and 30 deletions

View File

@@ -1,9 +1,8 @@
# -*- coding:utf-8 -*-
import copy
from collections import defaultdict
import copy
import six
import toposort
from flask import abort
from flask import current_app
@@ -16,6 +15,7 @@ from api.lib.cmdb.cache import CITypeAttributesCache
from api.lib.cmdb.cache import CITypeCache
from api.lib.cmdb.cache import CMDBCounterCache
from api.lib.cmdb.ci_type import CITypeAttributeManager
from api.lib.cmdb.ci_type import CITypeManager
from api.lib.cmdb.const import BUILTIN_ATTRIBUTES
from api.lib.cmdb.const import ConstraintEnum
from api.lib.cmdb.const import PermEnum
@@ -29,6 +29,7 @@ from api.lib.perm.acl.acl import ACLManager
from api.models.cmdb import CITypeGroup
from api.models.cmdb import CITypeGroupItem
from api.models.cmdb import CITypeRelation
from api.models.cmdb import PreferenceAutoSubscriptionConfig
from api.models.cmdb import PreferenceCITypeOrder
from api.models.cmdb import PreferenceRelationView
from api.models.cmdb import PreferenceSearchOption
@@ -49,15 +50,27 @@ class PreferenceManager(object):
type2group = {}
for i in db.session.query(CITypeGroupItem, CITypeGroup).join(
CITypeGroup, CITypeGroup.id == CITypeGroupItem.group_id).filter(
CITypeGroup.deleted.is_(False)).filter(CITypeGroupItem.deleted.is_(False)):
CITypeGroup.deleted.is_(False)).filter(CITypeGroupItem.deleted.is_(False)):
type2group[i.CITypeGroupItem.type_id] = i.CITypeGroup.to_dict()
types = db.session.query(PreferenceShowAttributes.type_id).filter(
PreferenceShowAttributes.uid == current_user.uid).filter(
PreferenceShowAttributes.deleted.is_(False)).group_by(
PreferenceShowAttributes.type_id).all() if instance else []
types = sorted(types, key=lambda x: {i.type_id: idx for idx, i in enumerate(
ci_type_order) if not i.is_tree}.get(x.type_id, 1))
if instance:
auto_types = PreferenceManager.get_auto_subscription_types(current_user.uid)
if auto_types is not None:
class TypeIdObj:
def __init__(self, type_id):
self.type_id = type_id
types = [TypeIdObj(t) for t in auto_types]
else:
types = db.session.query(PreferenceShowAttributes.type_id).filter(
PreferenceShowAttributes.uid == current_user.uid).filter(
PreferenceShowAttributes.deleted.is_(False)).group_by(
PreferenceShowAttributes.type_id).all()
else:
types = []
types = sorted(types, key=lambda x: {i.type_id: idx for idx, i in enumerate(ci_type_order)
if not i.is_tree}.get(x.type_id, 1))
group_types = []
other_types = []
group2idx = {}
@@ -104,19 +117,26 @@ class PreferenceManager(object):
ci_type_order = sorted(PreferenceCITypeOrder.get_by(uid=current_user.uid, to_dict=False), key=lambda x: x.order)
if instance:
types = db.session.query(PreferenceShowAttributes.type_id,
PreferenceShowAttributes.uid, PreferenceShowAttributes.created_at).filter(
PreferenceShowAttributes.deleted.is_(False)).filter(
PreferenceShowAttributes.uid == current_user.uid).group_by(
PreferenceShowAttributes.uid, PreferenceShowAttributes.type_id)
for i in types:
result['self']['instance'].append(i.type_id)
if str(i.created_at) > str(result['self']['type_id2subs_time'].get(i.type_id, "")):
result['self']['type_id2subs_time'][i.type_id] = i.created_at
# Try auto subscription first, fallback to manual if not configured
auto_types = PreferenceManager.get_auto_subscription_types(current_user.uid)
if auto_types is not None:
result['self']['instance'] = auto_types
for type_id in auto_types:
result['self']['type_id2subs_time'][type_id] = ""
else:
types = db.session.query(PreferenceShowAttributes.type_id,
PreferenceShowAttributes.uid, PreferenceShowAttributes.created_at).filter(
PreferenceShowAttributes.deleted.is_(False)).filter(
PreferenceShowAttributes.uid == current_user.uid).group_by(
PreferenceShowAttributes.uid, PreferenceShowAttributes.type_id)
for i in types:
result['self']['instance'].append(i.type_id)
if str(i.created_at) > str(result['self']['type_id2subs_time'].get(i.type_id, "")):
result['self']['type_id2subs_time'][i.type_id] = i.created_at
instance_order = [i.type_id for i in ci_type_order if not i.is_tree]
if len(instance_order) == len(result['self']['instance']):
result['self']['instance'] = instance_order
instance_order = [i.type_id for i in ci_type_order if not i.is_tree]
if len(instance_order) == len(result['self']['instance']):
result['self']['instance'] = instance_order
if tree:
types = PreferenceTreeView.get_by(uid=current_user.uid, to_dict=False)
@@ -131,15 +151,73 @@ class PreferenceManager(object):
return result
@staticmethod
def get_auto_subscription_types(uid):
"""Get user's auto-subscribed CI types based on config rules"""
config = PreferenceAutoSubscriptionConfig.get_by(
uid=uid, enabled=True, first=True, to_dict=False
)
if not config:
return None
all_permitted_types = PreferenceManager._get_permitted_ci_types()
result_types = PreferenceManager._apply_subscription_config(config, all_permitted_types)
return result_types
@staticmethod
def _get_permitted_ci_types():
"""Get CI types that user has read permission for"""
from api.lib.perm.acl.acl import is_app_admin
if not current_app.config.get('USE_ACL') or is_app_admin('cmdb'):
return [t['id'] for t in CITypeManager.get_ci_types()]
# Regular user: filter by permissions
permitted_resources = ACLManager('cmdb').get_resources(ResourceTypeEnum.CI)
permitted_names = {r.get('name') for r in permitted_resources}
return [ci_type_dict['id'] for ci_type_dict in CITypeManager.get_ci_types()
if ci_type_dict['name'] in permitted_names]
@staticmethod
def _apply_subscription_config(config, all_permitted_types):
"""Apply subscription rules: 'all' mode excludes, 'none' mode includes"""
result_types = set()
if config.base_strategy == 'all':
# Start with all types, then exclude
result_types = set(all_permitted_types)
if config.group_ids:
exclude_group_type_ids = PreferenceManager._get_types_by_group_ids(config.group_ids)
result_types.difference_update(exclude_group_type_ids)
if config.type_ids:
result_types.difference_update(config.type_ids)
else: # base_strategy == 'none'
# Start empty, then include
if config.group_ids:
include_group_type_ids = PreferenceManager._get_types_by_group_ids(config.group_ids)
result_types.update(t for t in include_group_type_ids if t in all_permitted_types)
if config.type_ids:
result_types.update(t for t in config.type_ids if t in all_permitted_types)
return list(result_types)
@staticmethod
def _get_types_by_group_ids(group_ids):
return [i.type_id for i in CITypeGroupItem.get_by(
__func_in___key_group_id=group_ids, to_dict=False, fl=['type_id'])]
@staticmethod
def get_show_attributes(type_id):
_type = CITypeCache.get(type_id) or abort(404, ErrFormat.ci_type_not_found)
type_id = _type and _type.id
if not isinstance(type_id, six.integer_types):
_type = CITypeCache.get(type_id)
type_id = _type and _type.id
attrs = PreferenceShowAttributes.get_by(uid=current_user.uid, type_id=type_id, to_dict=False)
result = []
@@ -170,11 +248,11 @@ class PreferenceManager(object):
i.update(dict(choice_value=AttributeManager.get_choice_values(
i["id"], i["value_type"], i.get("choice_web_hook"), i.get("choice_other"))))
if (_type.name in SysComputedAttributes.type2attr and
i['name'] in SysComputedAttributes.type2attr[_type.name]):
i['sys_computed'] = True
else:
i['sys_computed'] = False
if (_type.name in SysComputedAttributes.type2attr and
i['name'] in SysComputedAttributes.type2attr[_type.name]):
i['sys_computed'] = True
else:
i['sys_computed'] = False
return is_subscribed, result
@@ -523,3 +601,54 @@ class PreferenceManager(object):
db.session.rollback()
current_app.logger.error("upsert citype order failed: {}".format(e))
return abort(400, ErrFormat.unknown_error)
@staticmethod
def get_auto_subscription_config():
"""Get user's auto subscription configuration"""
config = PreferenceAutoSubscriptionConfig.get_by(
uid=current_user.uid, first=True, to_dict=True
)
return config
@staticmethod
def create_or_update_auto_subscription_config(base_strategy, group_ids=None, type_ids=None,
enabled=True, description=None):
"""Create or update user's auto subscription config"""
config = PreferenceAutoSubscriptionConfig.get_by(
uid=current_user.uid, first=True, to_dict=False
)
data = {
'base_strategy': base_strategy,
'group_ids': group_ids or [],
'type_ids': type_ids or [],
'enabled': enabled,
'description': description
}
if config:
return config.update(**data)
else:
data['uid'] = current_user.uid
return PreferenceAutoSubscriptionConfig.create(**data)
@staticmethod
def delete_auto_subscription_config():
"""Delete user's auto subscription configuration"""
config = PreferenceAutoSubscriptionConfig.get_by(
uid=current_user.uid, first=True, to_dict=False
)
if config:
config.soft_delete()
return True
@staticmethod
def toggle_auto_subscription_config(enabled):
"""Enable or disable user's auto subscription config"""
config = PreferenceAutoSubscriptionConfig.get_by(
uid=current_user.uid, first=True, to_dict=False
)
if not config:
return abort(404, "Auto subscription config not found")
return config.update(enabled=enabled)