cmdb/cmdb-api/api/lib/common_setting/employee.py

657 lines
20 KiB
Python

# -*- coding:utf-8 -*-
import traceback
from datetime import datetime
from flask import abort
from flask_login import current_user
from sqlalchemy import or_, literal_column, func, not_, and_
from werkzeug.datastructures import MultiDict
from wtforms import Form
from wtforms import IntegerField
from wtforms import StringField
from wtforms import validators
from api.extensions import db
from api.lib.common_setting.acl import ACLManager
from api.lib.common_setting.const import COMMON_SETTING_QUEUE, OperatorType
from api.lib.common_setting.resp_format import ErrFormat
from api.models.common_setting import Employee, Department
acl_user_columns = [
'email',
'mobile',
'nickname',
'username',
'password',
'block',
'avatar',
]
employee_pop_columns = ['password']
can_not_edit_columns = ['email']
def edit_acl_user(uid, **kwargs):
user_data = {column: kwargs.get(
column, '') for column in acl_user_columns if kwargs.get(column, '')}
if 'block' in kwargs:
user_data['block'] = kwargs.get('block')
try:
acl = ACLManager()
return acl.edit_user(uid, user_data)
except Exception as e:
abort(400, ErrFormat.acl_edit_user_failed.format(str(e)))
def get_block_value(value):
if value in ['False', 'false', '0', 0]:
value = False
else:
value = True
return value
def get_employee_list_by_direct_supervisor_id(direct_supervisor_id):
return Employee.get_by(direct_supervisor_id=direct_supervisor_id)
def get_department_list_by_director_id(director_id):
return Department.get_by(department_director_id=director_id)
def raise_exception(err):
raise Exception(err)
def check_department_director_id_or_direct_supervisor_id(_id):
get_employee_list_by_direct_supervisor_id(
_id) and raise_exception(ErrFormat.cannot_block_this_employee_is_other_direct_supervisor)
get_department_list_by_director_id(
_id) and raise_exception(ErrFormat.cannot_block_this_employee_is_department_manager)
class EmployeeCRUD(object):
@staticmethod
def get_employee_by_id(_id):
return Employee.get_by(
first=True, to_dict=False, deleted=0, employee_id=_id
) or abort(404, ErrFormat.employee_id_not_found.format(_id))
@staticmethod
def get_employee_by_uid_with_create(_uid):
try:
return EmployeeCRUD.get_employee_by_uid(_uid).to_dict()
except Exception as e:
if '不存在' not in str(e):
abort(400, str(e))
try:
acl = ACLManager('acl')
user_info = acl.get_user_info(_uid)
return EmployeeCRUD.check_acl_user_and_create(user_info)
except Exception as e:
abort(400, str(e))
@staticmethod
def get_employee_by_uid(_uid):
return Employee.get_by(
first=True, to_dict=False, deleted=0, acl_uid=_uid
) or abort(404, ErrFormat.acl_uid_not_found.format(_uid))
@staticmethod
def check_acl_user_and_create(user_info):
existed = Employee.get_by(
first=True, to_dict=False, username=user_info['username'])
if existed:
existed.update(
acl_uid=user_info['uid'],
)
return existed.to_dict()
if not user_info.get('nickname', None):
user_info['nickname'] = user_info['name']
form = EmployeeAddForm(MultiDict(user_info))
data = form.data
data['password'] = ''
data['acl_uid'] = user_info['uid']
employee = CreateEmployee().create_single(**data)
return employee.to_dict()
@staticmethod
def add(**kwargs):
try:
return CreateEmployee().create_single(**kwargs)
except Exception as e:
abort(400, str(e))
@staticmethod
def update(_id, **kwargs):
EmployeeCRUD.check_email_unique(kwargs['email'], _id)
existed = EmployeeCRUD.get_employee_by_id(_id)
try:
edit_acl_user(existed.acl_uid, **kwargs)
for column in employee_pop_columns:
kwargs.pop(column, None)
new_department_id = kwargs.get('department_id', None)
e_list = []
if new_department_id is not None and new_department_id != existed.department_id:
e_list = [dict(
e_acl_rid=existed.acl_rid,
department_id=existed.department_id
)]
existed.update(**kwargs)
if len(e_list) > 0:
from api.tasks.common_setting import edit_employee_department_in_acl
edit_employee_department_in_acl.apply_async(
args=(e_list, new_department_id, current_user.uid),
queue=COMMON_SETTING_QUEUE
)
return existed
except Exception as e:
return abort(400, str(e))
@staticmethod
def edit_employee_by_uid(_uid, **kwargs):
existed = EmployeeCRUD.get_employee_by_uid(_uid)
try:
user = edit_acl_user(_uid, **kwargs)
for column in employee_pop_columns:
if kwargs.get(column):
kwargs.pop(column)
return existed.update(**kwargs)
except Exception as e:
return abort(400, str(e))
@staticmethod
def change_password_by_uid(_uid, password):
existed = EmployeeCRUD.get_employee_by_uid(_uid)
try:
user = edit_acl_user(_uid, password=password)
except Exception as e:
return abort(400, str(e))
@staticmethod
def get_all_position():
criterion = [
Employee.deleted == 0,
]
results = Employee.query.with_entities(
Employee.position_name
).filter(*criterion).group_by(
Employee.position_name
).order_by(
func.CONVERT(literal_column('position_name using gbk'))
).all()
return [item[0] for item in results if (item[0] is not None and item[0] != '')]
@staticmethod
def get_employee_count(block_status):
criterion = [
Employee.deleted == 0
]
if block_status >= 0:
criterion.append(
Employee.block == block_status
)
return Employee.query.filter(
*criterion
).count()
@staticmethod
def check_email_unique(email, _id=0):
criterion = [
Employee.email == email,
Employee.deleted == 0,
]
if _id > 0:
criterion.append(
Employee.employee_id != _id
)
res = Employee.query.filter(
*criterion
).all()
if res:
err = ErrFormat.email_already_exists.format(email)
raise Exception(err)
@staticmethod
def get_employee_list_by_body(department_id, block_status, search='', order='', conditions=[], page=1,
page_size=10):
criterion = [
Employee.deleted == 0
]
if block_status >= 0:
criterion.append(
Employee.block == block_status
)
if len(search) > 0:
search_key = f"%{search}%"
criterion.append(
or_(
Employee.email.like(search_key),
Employee.username.like(search_key),
Employee.nickname.like(search_key)
)
)
if department_id > 0:
from api.lib.common_setting.department import DepartmentCRUD
department_id_list = DepartmentCRUD.get_department_id_list_by_root(
department_id)
criterion.append(
Employee.department_id.in_(department_id_list)
)
if conditions:
query = EmployeeCRUD.parse_condition_list_to_query(conditions).filter(
*criterion
)
else:
query = db.session.query(Employee, Department).outerjoin(Department).filter(
*criterion
)
if len(order) > 0:
query = EmployeeCRUD.format_query_sort(query, order)
pagination = query.paginate(page=page, per_page=page_size)
employees = []
for r in pagination.items:
d = r.Employee.to_dict()
d['department_name'] = r.Department.department_name
employees.append(d)
return {
'data_list': employees,
'page': page,
'page_size': page_size,
'total': pagination.total,
}
@staticmethod
def parse_condition_list_to_query(condition_list):
query = db.session.query(Employee, Department).outerjoin(Department)
query = EmployeeCRUD.get_query_by_conditions(query, condition_list)
return query
@staticmethod
def get_expr_by_condition(column, operator, value, relation):
"""
get expr: (and_list, or_list)
"""
attr = EmployeeCRUD.get_attr_by_column(column)
# 根据operator生成条件表达式
if operator == OperatorType.EQUAL:
expr = [attr == value]
elif operator == OperatorType.NOT_EQUAL:
expr = [attr != value]
elif operator == OperatorType.IN:
expr = [attr.like('%{}%'.format(value))]
elif operator == OperatorType.NOT_IN:
expr = [not_(attr.like('%{}%'.format(value)))]
elif operator == OperatorType.GREATER_THAN:
expr = [attr > value]
elif operator == OperatorType.LESS_THAN:
expr = [attr < value]
elif operator == OperatorType.IS_EMPTY:
if value:
abort(400, ErrFormat.query_column_none_keep_value_empty.format(column))
expr = [attr.is_(None)]
if column not in ["last_login"]:
expr += [attr == '']
expr = [or_(*expr)]
elif operator == OperatorType.IS_NOT_EMPTY:
if value:
abort(400, ErrFormat.query_column_none_keep_value_empty.format(column))
expr = [attr.isnot(None)]
if column not in ["last_login"]:
expr += [attr != '']
expr = [and_(*expr)]
else:
abort(400, ErrFormat.not_support_operator.format(operator))
if relation == "&":
return expr, []
elif relation == "|":
return [], expr
else:
return abort(400, ErrFormat.not_support_relation.format(relation))
@staticmethod
def check_condition(column, operator, value, relation):
if column is None or operator is None or relation is None:
return abort(400, ErrFormat.conditions_field_missing)
if value and column == "last_login":
try:
value = datetime.strptime(value, "%Y-%m-%d %H:%M:%S")
except Exception as e:
abort(400, ErrFormat.datetime_format_error.format(column))
@staticmethod
def get_attr_by_column(column):
if 'department' in column:
attr = Department.__dict__[column]
else:
attr = Employee.__dict__[column]
return attr
@staticmethod
def get_query_by_conditions(query, conditions):
and_list = []
or_list = []
for condition in conditions:
operator = condition.get("operator", None)
column = condition.get("column", None)
relation = condition.get("relation", None)
value = condition.get("value", None)
EmployeeCRUD.check_condition(column, operator, value, relation)
a, o = EmployeeCRUD.get_expr_by_condition(
column, operator, value, relation)
and_list += a
or_list += o
query = query.filter(
Employee.deleted == 0,
or_(and_(*and_list), *or_list)
)
return query
@staticmethod
def get_employee_list_by(department_id, block_status, search='', order='', page=1, page_size=10):
criterion = [
Employee.deleted == 0
]
if block_status >= 0:
criterion.append(
Employee.block == block_status
)
if len(search) > 0:
search_key = f"%{search}%"
criterion.append(
or_(
Employee.email.like(search_key),
Employee.username.like(search_key),
Employee.nickname.like(search_key)
)
)
if department_id > 0:
from api.lib.common_setting.department import DepartmentCRUD
department_id_list = DepartmentCRUD.get_department_id_list_by_root(
department_id)
criterion.append(
Employee.department_id.in_(department_id_list)
)
query = db.session.query(Employee, Department).outerjoin(Department).filter(
*criterion
)
if len(order) > 0:
query = EmployeeCRUD.format_query_sort(query, order)
pagination = query.paginate(page=page, per_page=page_size)
employees = []
for r in pagination.items:
d = r.Employee.to_dict()
d['department_name'] = r.Department.department_name
employees.append(d)
return {
'data_list': employees,
'page': page,
'page_size': page_size,
'total': pagination.total,
}
@staticmethod
def format_query_sort(query, order):
order_list = order.split(',')
all_columns = Employee.get_columns()
for order_column in order_list:
if order_column.startswith('-'):
target_column = order_column[1:]
if target_column not in all_columns:
continue
query = query.order_by(getattr(Employee, target_column).desc())
else:
if order_column not in all_columns:
continue
query = query.order_by(getattr(Employee, order_column).asc())
return query
@staticmethod
def get_employees_by_department_id(department_id, block):
criterion = [
Employee.deleted == 0,
Employee.block == block,
]
if type(department_id) == list:
if len(department_id) == 0:
return []
else:
criterion.append(
Employee.department_id.in_(department_id)
)
else:
criterion.append(
Employee.department_id == department_id
)
results = Employee.query.filter(
*criterion
).all()
return [r.to_dict() for r in results]
def get_user_map(key='uid', acl=None):
"""
{
uid: userinfo
}
"""
if acl is None:
acl = ACLManager()
data = {user[key]: user for user in acl.get_all_users()}
return data
def format_params(params):
for k in ['_key', '_secret']:
params.pop(k, None)
return params
class CreateEmployee(object):
def __init__(self):
self.acl = ACLManager()
self.all_acl_users = self.acl.get_all_users()
def check_acl_user(self, user_data):
target_email = list(filter(lambda x: x['email'] == user_data['email'], self.all_acl_users))
if target_email:
return target_email[0]
target_username = list(filter(lambda x: x['username'] == user_data['username'], self.all_acl_users))
if target_username:
return target_username[0]
def add_acl_user(self, **kwargs):
user_data = {column: kwargs.get(
column, '') for column in acl_user_columns if kwargs.get(column, '')}
try:
existed = self.check_acl_user(user_data)
if not existed:
return self.acl.create_user(user_data)
return existed
except Exception as e:
abort(400, ErrFormat.acl_add_user_failed.format(str(e)))
def create_single(self, **kwargs):
EmployeeCRUD.check_email_unique(kwargs['email'])
user = self.add_acl_user(**kwargs)
kwargs['acl_uid'] = user['uid']
kwargs['last_login'] = user['last_login']
for column in employee_pop_columns:
kwargs.pop(column)
return Employee.create(
**kwargs
)
def create_single_with_import(self, **kwargs):
user = self.add_acl_user(**kwargs)
kwargs['acl_uid'] = user['uid']
kwargs['last_login'] = user['last_login']
for column in employee_pop_columns:
kwargs.pop(column)
existed = Employee.get_by(
first=True, to_dict=False, deleted=0, acl_uid=user['uid']
)
if existed:
return existed
return Employee.create(
**kwargs
)
def get_department_by_name(self, d_name):
return Department.get_by(first=True, department_name=d_name)
def get_end_department_id(self, department_name_list, department_name_map):
parent_id = 0
end_d_id = 0
for d_name in department_name_list:
tmp_d = self.get_department_by_name(d_name)
if not tmp_d:
tmp_d = Department.create(
department_name=d_name, department_parent_id=parent_id).to_dict()
else:
if tmp_d['department_parent_id'] != parent_id:
department_name_map[d_name] = tmp_d
raise Exception(ErrFormat.department_level_relation_error)
department_name_map[d_name] = tmp_d
end_d_id = tmp_d['department_id']
parent_id = tmp_d['department_id']
return end_d_id
def format_department_id(self, employee):
department_name_map = {}
try:
department_name = employee.get('department_name', '')
if len(department_name) == 0:
return employee
department_name_list = department_name.split('/')
employee['department_id'] = self.get_end_department_id(
department_name_list, department_name_map)
except Exception as e:
employee['err'] = str(e)
return employee
def batch_create(self, employee_list):
err_list = []
for employee in employee_list:
try:
username = employee.get('username', None)
if username is None:
employee['username'] = employee['email']
employee = self.format_department_id(employee)
err = employee.get('err', None)
if err:
raise Exception(err)
params = format_params(employee)
form = EmployeeAddForm(MultiDict(params))
if not form.validate():
raise Exception(
','.join(['{}: {}'.format(filed, ','.join(msg)) for filed, msg in form.errors.items()]))
self.create_single_with_import(**form.data)
except Exception as e:
err_list.append({
'email': employee.get('email', ''),
'nickname': employee.get('nickname', ''),
'err': str(e),
})
traceback.print_exc()
return err_list
class EmployeeAddForm(Form):
username = StringField(validators=[
validators.DataRequired(message=ErrFormat.username_is_required),
validators.Length(max=255),
])
email = StringField(validators=[
validators.DataRequired(message=ErrFormat.email_is_required),
validators.Email(message=ErrFormat.email_format_error),
validators.Length(max=255),
])
password = StringField(validators=[
validators.Length(max=255),
])
position_name = StringField(validators=[])
nickname = StringField(validators=[
validators.DataRequired(message=ErrFormat.nickname_is_required),
validators.Length(max=255),
])
sex = StringField(validators=[])
mobile = StringField(validators=[])
department_id = IntegerField(validators=[], default=0)
direct_supervisor_id = IntegerField(validators=[], default=0)
class EmployeeUpdateByUidForm(Form):
nickname = StringField(validators=[
validators.DataRequired(message=ErrFormat.nickname_is_required),
validators.Length(max=255),
])
avatar = StringField(validators=[])
sex = StringField(validators=[])
mobile = StringField(validators=[])