fix: remove useless

This commit is contained in:
hu.sima 2023-08-10 18:55:32 +08:00
parent 28dea81036
commit 4cecdb10fb
2 changed files with 0 additions and 179 deletions

View File

@ -212,159 +212,6 @@ class EmployeeCRUD(object):
*criterion
).count()
@staticmethod
def import_employee(employee_list):
return CreateEmployee().batch_create(employee_list)
@staticmethod
def get_export_employee_list(block_status):
if block_status >= 0:
employees = Employee.get_by(block=block_status, to_dict=False)
else:
employees = Employee.get_by(to_dict=False)
all_departments = Department.get_by(to_dict=False)
d_id_map = {d.department_id: d.department_name for d in all_departments}
e_id_map = {e.employee_id: e.nickname for e in employees}
export_columns_map = {
'username': "用户名",
'nickname': '姓名',
'email': '邮箱',
'department_name': '部门',
'sex': '性别',
'mobile': '手机号',
'position_name': '岗位',
'nickname_direct_supervisor': '直属上级',
'last_login': '上次登录时间',
}
data_list = []
for e in employees:
department_name = d_id_map.get(e.department_id, '')
nickname_direct_supervisor = e_id_map.get(e.direct_supervisor_id, '')
try:
last_login = str(e.last_login) if e.last_login else ''
except:
last_login = ''
data = e.to_dict()
data['last_login'] = last_login
data['department_name'] = department_name
data['nickname_direct_supervisor'] = nickname_direct_supervisor
tmp = {export_columns_map[k]: data[k] for k in export_columns_map.keys()}
data_list.append(tmp)
return data_list
@staticmethod
def batch_employee(column_name, column_value, employee_id_list):
if not column_value:
abort(400, ErrFormat.value_is_required)
if column_name in ['password', 'block']:
return EmployeeCRUD.batch_edit_password_or_block_column(column_name, employee_id_list, column_value, True)
elif column_name in ['department_id']:
return EmployeeCRUD.batch_edit_employee_department(employee_id_list, column_value)
elif column_name in [
'direct_supervisor_id', 'position_name'
]:
return EmployeeCRUD.batch_edit_column(column_name, employee_id_list, column_value, False)
else:
abort(400, ErrFormat.column_name_not_support)
@staticmethod
def batch_edit_employee_department(employee_id_list, column_value):
err_list = []
employee_list = []
for _id in employee_id_list:
try:
existed = EmployeeCRUD.get_employee_by_id(_id)
employee = dict(
e_acl_rid=existed.acl_rid,
department_id=existed.department_id
)
employee_list.append(employee)
existed.update(department_id=column_value)
except Exception as e:
err_list.append({
'employee_id': _id,
'err': str(e),
})
from api.tasks.common_setting import edit_employee_department_in_acl
edit_employee_department_in_acl.apply_async(
args=(employee_list, column_value, current_user.uid),
queue=COMMON_SETTING_QUEUE
)
return err_list
@staticmethod
def batch_edit_password_or_block_column(column_name, employee_id_list, column_value, is_acl=False):
if column_name == 'block':
err_list = []
success_list = []
for _id in employee_id_list:
try:
employee = EmployeeCRUD.edit_employee_block_column(
_id, is_acl, **{column_name: column_value})
success_list.append(employee)
except Exception as e:
err_list.append({
'employee_id': _id,
'err': str(e),
})
return err_list
else:
return EmployeeCRUD.batch_edit_column(column_name, employee_id_list, column_value, is_acl)
@staticmethod
def batch_edit_column(column_name, employee_id_list, column_value, is_acl=False):
err_list = []
for _id in employee_id_list:
try:
EmployeeCRUD.edit_employee_single_column(
_id, is_acl, **{column_name: column_value})
except Exception as e:
err_list.append({
'employee_id': _id,
'err': str(e),
})
return err_list
@staticmethod
def edit_employee_single_column(_id, is_acl=False, **kwargs):
existed = EmployeeCRUD.get_employee_by_id(_id)
if is_acl:
return edit_acl_user(existed.acl_uid, **kwargs)
try:
for column in employee_pop_columns:
if kwargs.get(column):
kwargs.pop(column)
return existed.update(**kwargs)
except Exception as e:
return abort(400, str(e))
@staticmethod
def edit_employee_block_column(_id, is_acl=False, **kwargs):
existed = EmployeeCRUD.get_employee_by_id(_id)
value = get_block_value(kwargs.get('block'))
if value is True:
check_department_director_id_or_direct_supervisor_id(_id)
if is_acl:
kwargs['block'] = value
edit_acl_user(existed.acl_uid, **kwargs)
data = existed.to_dict()
return data
@staticmethod
def check_email_unique(email, _id=0):
criterion = [

View File

@ -145,29 +145,3 @@ class EmployeePositionView(APIView):
result = EmployeeCRUD.get_all_position()
return self.jsonify(result)
class EmployeeViewExportExcel(APIView):
url_prefix = (f'{prefix}/export_all',)
def get(self):
excel_filename = 'all_employee_info.xlsx'
excel_path = current_app.config['UPLOAD_DIRECTORY_FULL']
excel_path_with_filename = os.path.join(excel_path, excel_filename)
block_status = int(request.args.get('block_status', -1))
data_list = EmployeeCRUD.get_export_employee_list(block_status)
headers = data_list[0].keys()
from openpyxl import Workbook
wb = Workbook()
ws = wb.active
# insert header
for col_num, col_data in enumerate(headers, start=1):
ws.cell(row=1, column=col_num, value=col_data)
for row_num, row_data in enumerate(data_list, start=2):
for col_num, col_data in enumerate(row_data.values(), start=1):
ws.cell(row=row_num, column=col_num, value=col_data)
wb.save(excel_path_with_filename)
return send_from_directory(excel_path, excel_filename, as_attachment=True)