cmdb/cmdb-api/api/lib/common_setting/employee.py

831 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding:utf-8 -*-
import traceback
from datetime import datetime
from flask import abort
from flask_login import current_user
from sqlalchemy import or_, literal_column, func, not_, and_
from werkzeug.datastructures import MultiDict
from wtforms import Form
from wtforms import IntegerField
from wtforms import StringField
from wtforms import validators
from api.extensions import db
from api.lib.common_setting.acl import ACLManager
from api.lib.common_setting.const import COMMON_SETTING_QUEUE, OperatorType
from api.lib.common_setting.resp_format import ErrFormat
from api.models.common_setting import Employee, Department
def edit_acl_user(uid, **kwargs):
user_data = {column: kwargs.get(
column, '') for column in acl_user_columns if kwargs.get(column, '')}
if 'block' in kwargs:
user_data['block'] = kwargs.get('block')
try:
acl = ACLManager()
return acl.edit_user(uid, user_data)
except Exception as e:
abort(400, ErrFormat.acl_edit_user_failed.format(str(e)))
def get_block_value(value):
if value in ['False', 'false', '0', 0]:
value = False
else:
value = True
return value
def get_employee_list_by_direct_supervisor_id(direct_supervisor_id):
return Employee.get_by(direct_supervisor_id=direct_supervisor_id)
def get_department_list_by_director_id(director_id):
return Department.get_by(department_director_id=director_id)
def raise_exception(err):
raise Exception(err)
def check_department_director_id_or_direct_supervisor_id(_id):
get_employee_list_by_direct_supervisor_id(
_id) and raise_exception(ErrFormat.cannot_block_this_employee_is_other_direct_supervisor)
get_department_list_by_director_id(
_id) and raise_exception(ErrFormat.cannot_block_this_employee_is_department_manager)
class EmployeeCRUD(object):
@staticmethod
def get_employee_by_id(_id):
return Employee.get_by(
first=True, to_dict=False, deleted=0, employee_id=_id
) or abort(404, ErrFormat.employee_id_not_found.format(_id))
@staticmethod
def get_employee_by_uid_with_create(_uid):
"""
根据 uid 获取员工信息,不存在则创建
"""
try:
return EmployeeCRUD.get_employee_by_uid(_uid).to_dict()
except Exception as e:
if '不存在' not in str(e):
abort(400, str(e))
try:
acl = ACLManager('acl')
user_info = acl.get_user_info(_uid)
return EmployeeCRUD.check_acl_user_and_create(user_info)
except Exception as e:
abort(400, str(e))
@staticmethod
def get_employee_by_uid(_uid):
return Employee.get_by(
first=True, to_dict=False, deleted=0, acl_uid=_uid
) or abort(404, ErrFormat.acl_uid_not_found.format(_uid))
@staticmethod
def check_acl_user_and_create(user_info):
existed = Employee.get_by(
first=True, to_dict=False, username=user_info['username'])
if existed:
existed.update(
acl_uid=user_info['uid'],
)
return existed.to_dict()
# 创建员工
if not user_info.get('nickname', None):
user_info['nickname'] = user_info['name']
form = EmployeeAddForm(MultiDict(user_info))
data = form.data
data['password'] = ''
data['acl_uid'] = user_info['uid']
employee = CreateEmployee().create_single(**data)
return employee.to_dict()
@staticmethod
def add(**kwargs):
try:
return CreateEmployee().create_single(**kwargs)
except Exception as e:
abort(400, str(e))
@staticmethod
def update(_id, **kwargs):
EmployeeCRUD.check_email_unique(kwargs['email'], _id)
existed = EmployeeCRUD.get_employee_by_id(_id)
try:
edit_acl_user(existed.acl_uid, **kwargs)
for column in employee_pop_columns:
kwargs.pop(column, None)
new_department_id = kwargs.get('department_id', None)
e_list = []
if new_department_id is not None and new_department_id != existed.department_id:
e_list = [dict(
e_acl_rid=existed.acl_rid,
department_id=existed.department_id
)]
existed.update(**kwargs)
if len(e_list) > 0:
from api.tasks.common_setting import edit_employee_department_in_acl
# fixme: comment next line
# edit_employee_department_in_acl(e_list, new_department_id, current_user.uid)
edit_employee_department_in_acl.apply_async(
args=(e_list, new_department_id, current_user.uid),
queue=COMMON_SETTING_QUEUE
)
return existed
except Exception as e:
return abort(400, str(e))
@staticmethod
def edit_employee_by_uid(_uid, **kwargs):
existed = EmployeeCRUD.get_employee_by_uid(_uid)
try:
user = edit_acl_user(_uid, **kwargs)
for column in employee_pop_columns:
if kwargs.get(column):
kwargs.pop(column)
return existed.update(**kwargs)
except Exception as e:
return abort(400, str(e))
@staticmethod
def change_password_by_uid(_uid, password):
existed = EmployeeCRUD.get_employee_by_uid(_uid)
try:
user = edit_acl_user(_uid, password=password)
except Exception as e:
return abort(400, str(e))
@staticmethod
def get_all_position():
criterion = [
Employee.deleted == 0,
]
results = Employee.query.with_entities(
Employee.position_name
).filter(*criterion).group_by(
Employee.position_name
).order_by(
func.CONVERT(literal_column('position_name using gbk'))
).all()
return [item[0] for item in results if (item[0] is not None and item[0] != '')]
@staticmethod
def get_employee_count(block_status):
criterion = [
Employee.deleted == 0
]
if block_status >= 0:
criterion.append(
Employee.block == block_status
)
return Employee.query.filter(
*criterion
).count()
@staticmethod
def import_employee(employee_list):
return CreateEmployee().batch_create(employee_list)
@staticmethod
def get_export_employee_list(block_status):
if block_status >= 0:
employees = Employee.get_by(block=block_status, to_dict=False)
else:
employees = Employee.get_by(to_dict=False)
keep_cols = EmployeeCRUD.get_current_user_view_columns()
all_departments = Department.get_by(to_dict=False)
d_id_map = {d.department_id: d.department_name for d in all_departments}
e_id_map = {e.employee_id: e.nickname for e in employees}
export_columns_map = {
'username': "用户名",
'nickname': '姓名',
'email': '邮箱',
'department_name': '部门',
'sex': '性别',
'mobile': '手机号',
'position_name': '岗位',
'nickname_direct_supervisor': '直属上级',
'last_login': '上次登录时间',
}
data_list = []
for e in employees:
department_name = d_id_map.get(e.department_id, '')
nickname_direct_supervisor = e_id_map.get(e.direct_supervisor_id, '')
try:
last_login = str(e.last_login) if e.last_login else ''
except:
last_login = ''
data = e.to_dict()
data['last_login'] = last_login
data['department_name'] = department_name
data['nickname_direct_supervisor'] = nickname_direct_supervisor
tmp = {export_columns_map[k]: data[k] for k in export_columns_map.keys() if
k in keep_cols or k in sub_columns}
data_list.append(tmp)
return data_list
@staticmethod
def batch_employee(column_name, column_value, employee_id_list):
if not column_value:
abort(400, ErrFormat.value_is_required)
if column_name in ['password', 'block']:
return EmployeeCRUD.batch_edit_password_or_block_column(column_name, employee_id_list, column_value, True)
elif column_name in ['department_id']:
return EmployeeCRUD.batch_edit_employee_department(employee_id_list, column_value)
elif column_name in [
'direct_supervisor_id', 'position_name'
]:
return EmployeeCRUD.batch_edit_column(column_name, employee_id_list, column_value, False)
else:
abort(400, ErrFormat.column_name_not_support)
@staticmethod
def batch_edit_employee_department(employee_id_list, column_value):
err_list = []
employee_list = []
for _id in employee_id_list:
try:
existed = EmployeeCRUD.get_employee_by_id(_id)
employee = dict(
e_acl_rid=existed.acl_rid,
department_id=existed.department_id
)
employee_list.append(employee)
existed.update(department_id=column_value)
except Exception as e:
err_list.append({
'employee_id': _id,
'err': str(e),
})
from api.tasks.common_setting import edit_employee_department_in_acl
edit_employee_department_in_acl.apply_async(
args=(employee_list, column_value, current_user.uid),
queue=COMMON_SETTING_QUEUE
)
return err_list
@staticmethod
def batch_edit_password_or_block_column(column_name, employee_id_list, column_value, is_acl=False):
if column_name == 'block':
err_list = []
success_list = []
for _id in employee_id_list:
try:
employee = EmployeeCRUD.edit_employee_block_column(
_id, is_acl, **{column_name: column_value})
success_list.append(employee)
except Exception as e:
err_list.append({
'employee_id': _id,
'err': str(e),
})
return err_list
else:
return EmployeeCRUD.batch_edit_column(column_name, employee_id_list, column_value, is_acl)
@staticmethod
def batch_edit_column(column_name, employee_id_list, column_value, is_acl=False):
err_list = []
for _id in employee_id_list:
try:
EmployeeCRUD.edit_employee_single_column(
_id, is_acl, **{column_name: column_value})
except Exception as e:
err_list.append({
'employee_id': _id,
'err': str(e),
})
return err_list
@staticmethod
def edit_employee_single_column(_id, is_acl=False, **kwargs):
existed = EmployeeCRUD.get_employee_by_id(_id)
if is_acl:
return edit_acl_user(existed.acl_uid, **kwargs)
try:
for column in employee_pop_columns:
if kwargs.get(column):
kwargs.pop(column)
return existed.update(**kwargs)
except Exception as e:
return abort(400, str(e))
@staticmethod
def edit_employee_block_column(_id, is_acl=False, **kwargs):
existed = EmployeeCRUD.get_employee_by_id(_id)
value = get_block_value(kwargs.get('block'))
if value is True:
# 判断该用户是否为 部门负责人,或者员工的直接上级
check_department_director_id_or_direct_supervisor_id(_id)
if is_acl:
kwargs['block'] = value
edit_acl_user(existed.acl_uid, **kwargs)
data = existed.to_dict()
return data
@staticmethod
def check_email_unique(email, _id=0):
criterion = [
Employee.email == email,
Employee.deleted == 0,
]
if _id > 0:
criterion.append(
Employee.employee_id != _id
)
res = Employee.query.filter(
*criterion
).all()
if res:
err = ErrFormat.email_already_exists.format(email)
raise Exception(err)
@staticmethod
def get_employee_list_by_body(department_id, block_status, search='', order='', conditions=[], page=1,
page_size=10):
criterion = [
Employee.deleted == 0
]
if block_status >= 0:
criterion.append(
Employee.block == block_status
)
if len(search) > 0:
search_key = f"%{search}%"
criterion.append(
or_(
Employee.email.like(search_key),
Employee.username.like(search_key),
Employee.nickname.like(search_key)
)
)
if department_id > 0:
from api.lib.common_setting.department import DepartmentCRUD
department_id_list = DepartmentCRUD.get_department_id_list_by_root(
department_id)
criterion.append(
Employee.department_id.in_(department_id_list)
)
if conditions:
query = EmployeeCRUD.parse_condition_list_to_query(conditions).filter(
*criterion
)
else:
query = db.session.query(Employee, Department).outerjoin(Department).filter(
*criterion
)
if len(order) > 0:
query = EmployeeCRUD.format_query_sort(query, order)
pagination = query.paginate(page=page, per_page=page_size)
employees = []
for r in pagination.items:
d = r.Employee.to_dict()
d['department_name'] = r.Department.department_name
employees.append(d)
return {
'data_list': employees,
'page': page,
'page_size': page_size,
'total': pagination.total,
}
@staticmethod
def parse_condition_list_to_query(condition_list):
query = db.session.query(Employee, Department).outerjoin(Department)
query = EmployeeCRUD.get_query_by_conditions(query, condition_list)
return query
@staticmethod
def get_expr_by_condition(column, operator, value, relation):
"""
根据conditions返回expr: (and_list, or_list)
"""
attr = EmployeeCRUD.get_attr_by_column(column)
# 根据operator生成条件表达式
if operator == OperatorType.EQUAL:
expr = [attr == value]
elif operator == OperatorType.NOT_EQUAL:
expr = [attr != value]
elif operator == OperatorType.IN:
expr = [attr.like('%{}%'.format(value))]
elif operator == OperatorType.NOT_IN:
expr = [not_(attr.like('%{}%'.format(value)))]
elif operator == OperatorType.GREATER_THAN:
expr = [attr > value]
elif operator == OperatorType.LESS_THAN:
expr = [attr < value]
elif operator == OperatorType.IS_EMPTY:
if value:
abort(400, ErrFormat.query_column_none_keep_value_empty.format(column))
expr = [attr.is_(None)]
if column not in ["last_login"]:
expr += [attr == '']
expr = [or_(*expr)]
elif operator == OperatorType.IS_NOT_EMPTY:
if value:
abort(400, ErrFormat.query_column_none_keep_value_empty.format(column))
expr = [attr.isnot(None)]
if column not in ["last_login"]:
expr += [attr != '']
expr = [and_(*expr)]
else:
abort(400, ErrFormat.not_support_operator.format(operator))
# 根据relation生成复合条件
if relation == "&":
return expr, []
elif relation == "|":
return [], expr
else:
return abort(400, ErrFormat.not_support_relation.format(relation))
@staticmethod
def check_condition(column, operator, value, relation):
# 对于condition中column为空的报错
if column is None or operator is None or relation is None:
return abort(400, ErrFormat.conditions_field_missing)
if value and column == "last_login":
try:
value = datetime.strptime(value, "%Y-%m-%d %H:%M:%S")
except Exception as e:
abort(400, ErrFormat.datetime_format_error.format(column))
@staticmethod
def get_attr_by_column(column):
if 'department' in column:
attr = Department.__dict__[column]
else:
attr = Employee.__dict__[column]
return attr
@staticmethod
def get_query_by_conditions(query, conditions):
and_list = []
or_list = []
for condition in conditions:
operator = condition.get("operator", None)
column = condition.get("column", None)
relation = condition.get("relation", None)
value = condition.get("value", None)
EmployeeCRUD.check_condition(column, operator, value, relation)
a, o = EmployeeCRUD.get_expr_by_condition(
column, operator, value, relation)
and_list += a
or_list += o
query = query.filter(
Employee.deleted == 0,
or_(and_(*and_list), *or_list)
)
return query
@staticmethod
def get_employee_list_by(department_id, block_status, search='', order='', page=1, page_size=10):
criterion = [
Employee.deleted == 0
]
if block_status >= 0:
criterion.append(
Employee.block == block_status
)
if len(search) > 0:
search_key = f"%{search}%"
criterion.append(
or_(
Employee.email.like(search_key),
Employee.username.like(search_key),
Employee.nickname.like(search_key)
)
)
if department_id > 0:
from api.lib.common_setting.department import DepartmentCRUD
department_id_list = DepartmentCRUD.get_department_id_list_by_root(
department_id)
criterion.append(
Employee.department_id.in_(department_id_list)
)
query = db.session.query(Employee, Department).outerjoin(Department).filter(
*criterion
)
if len(order) > 0:
query = EmployeeCRUD.format_query_sort(query, order)
pagination = query.paginate(page=page, per_page=page_size)
employees = []
for r in pagination.items:
d = r.Employee.to_dict()
d['department_name'] = r.Department.department_name
employees.append(d)
return {
'data_list': employees,
'page': page,
'page_size': page_size,
'total': pagination.total,
}
@staticmethod
def format_query_sort(query, order):
order_list = order.split(',')
all_columns = Employee.get_columns()
for order_column in order_list:
if order_column.startswith('-'):
target_column = order_column[1:]
if target_column not in all_columns:
continue
query = query.order_by(getattr(Employee, target_column).desc())
else:
if order_column not in all_columns:
continue
query = query.order_by(getattr(Employee, order_column).asc())
return query
@staticmethod
def get_employees_by_department_id(department_id, block):
criterion = [
Employee.deleted == 0,
Employee.block == block,
]
if type(department_id) == list:
if len(department_id) == 0:
return []
else:
criterion.append(
Employee.department_id.in_(department_id)
)
else:
criterion.append(
Employee.department_id == department_id
)
results = Employee.query.filter(
*criterion
).all()
return [r.to_dict() for r in results]
def get_user_map(key='uid', acl=None):
"""
{
uid: userinfo
}
"""
if acl is None:
acl = ACLManager()
data = {user[key]: user for user in acl.get_all_users()}
return data
acl_user_columns = [
'email',
'mobile',
'nickname',
'username',
'password',
'block',
'avatar',
]
employee_pop_columns = ['password']
can_not_edit_columns = ['email']
def format_params(params):
for k in ['_key', '_secret']:
params.pop(k, None)
return params
class CreateEmployee(object):
def __init__(self):
self.acl = ACLManager()
self.useremail_map = {}
def check_acl_user(self, email):
user_info = self.useremail_map.get(email, None)
if user_info:
return user_info
return None
def add_acl_user(self, **kwargs):
user_data = {column: kwargs.get(
column, '') for column in acl_user_columns if kwargs.get(column, '')}
try:
existed = self.check_acl_user(user_data['email'])
if not existed:
return self.acl.create_user(user_data)
return existed
except Exception as e:
abort(400, ErrFormat.acl_add_user_failed.format(str(e)))
def create_single(self, **kwargs):
EmployeeCRUD.check_email_unique(kwargs['email'])
self.useremail_map = self.useremail_map if self.useremail_map else get_user_map(
'email', self.acl)
user = self.add_acl_user(**kwargs)
kwargs['acl_uid'] = user['uid']
kwargs['last_login'] = user['last_login']
for column in employee_pop_columns:
kwargs.pop(column)
return Employee.create(
**kwargs
)
def create_single_with_import(self, **kwargs):
self.useremail_map = self.useremail_map if self.useremail_map else get_user_map(
'email', self.acl)
user = self.add_acl_user(**kwargs)
kwargs['acl_uid'] = user['uid']
kwargs['last_login'] = user['last_login']
for column in employee_pop_columns:
kwargs.pop(column)
existed = Employee.get_by(
first=True, to_dict=False, deleted=0, acl_uid=user['uid']
)
if existed:
return existed
return Employee.create(
**kwargs
)
def get_department_by_name(self, d_name):
return Department.get_by(first=True, department_name=d_name)
def get_end_department_id(self, department_name_list, department_name_map):
parent_id = 0
end_d_id = 0
for d_name in department_name_list:
tmp_d = self.get_department_by_name(d_name)
if not tmp_d:
tmp_d = Department.create(
department_name=d_name, department_parent_id=parent_id).to_dict()
else:
if tmp_d['department_parent_id'] != parent_id:
department_name_map[d_name] = tmp_d
raise Exception(ErrFormat.department_level_relation_error)
department_name_map[d_name] = tmp_d
end_d_id = tmp_d['department_id']
parent_id = tmp_d['department_id']
return end_d_id
def format_department_id(self, employee):
"""
部门名称转化为ID不存在则创建
"""
department_name_map = {}
try:
department_name = employee.get('department_name', '')
if len(department_name) == 0:
return employee
department_name_list = department_name.split('/')
employee['department_id'] = self.get_end_department_id(
department_name_list, department_name_map)
except Exception as e:
employee['err'] = str(e)
return employee
def batch_create(self, employee_list):
err_list = []
self.useremail_map = get_user_map('email', self.acl)
for employee in employee_list:
try:
# 获取username
username = employee.get('username', None)
if username is None:
employee['username'] = employee['email']
# 校验通过后获取department_id
employee = self.format_department_id(employee)
err = employee.get('err', None)
if err:
raise Exception(err)
params = format_params(employee)
form = EmployeeAddForm(MultiDict(params))
if not form.validate():
raise Exception(
','.join(['{}: {}'.format(filed, ','.join(msg)) for filed, msg in form.errors.items()]))
data = self.create_single_with_import(**form.data)
except Exception as e:
err_list.append({
'email': employee.get('email', ''),
'nickname': employee.get('nickname', ''),
'err': str(e),
})
traceback.print_exc()
return err_list
class EmployeeAddForm(Form):
username = StringField(validators=[
validators.DataRequired(message="username不能为空"),
validators.Length(max=255),
])
email = StringField(validators=[
validators.DataRequired(message="邮箱不能为空"),
validators.Email(message="邮箱格式不正确"),
validators.Length(max=255),
])
password = StringField(validators=[
validators.Length(max=255),
])
position_name = StringField(validators=[])
nickname = StringField(validators=[
validators.DataRequired(message="用户名不能为空"),
validators.Length(max=255),
])
sex = StringField(validators=[])
mobile = StringField(validators=[])
department_id = IntegerField(validators=[], default=0)
direct_supervisor_id = IntegerField(validators=[], default=0)
class EmployeeUpdateByUidForm(Form):
nickname = StringField(validators=[
validators.DataRequired(message="用户名不能为空"),
validators.Length(max=255),
])
avatar = StringField(validators=[])
sex = StringField(validators=[])
mobile = StringField(validators=[])