feat: common notice config (#180)

This commit is contained in:
simontigers 2023-09-26 19:44:20 +08:00 committed by GitHub
parent b8d93bc9eb
commit 8e6ce05c04
5 changed files with 231 additions and 0 deletions

View File

@ -230,3 +230,59 @@ def init_department():
cli.init_wide_company()
cli.create_acl_role_with_department()
cli.init_backend_resource()
@click.command()
@with_appcontext
def common_check_new_columns():
"""
add new columns to tables
"""
from api.extensions import db
from sqlalchemy import inspect, text
def get_model_by_table_name(table_name):
for model in db.Model.registry._class_registry.values():
if hasattr(model, '__tablename__') and model.__tablename__ == table_name:
return model
return None
def add_new_column(table_name, new_column):
column_type = new_column.type.compile(engine.dialect)
default_value = new_column.default.arg if new_column.default else None
sql = f"ALTER TABLE {table_name} ADD COLUMN {new_column.name} {column_type} "
if new_column.comment:
sql += f" comment '{new_column.comment}'"
if column_type == 'JSON':
pass
elif default_value:
if column_type.startswith('VAR') or column_type.startswith('Text'):
if default_value is None or len(default_value) == 0:
pass
else:
sql += f" DEFAULT {default_value}"
sql = text(sql)
db.session.execute(sql)
engine = db.get_engine()
inspector = inspect(engine)
table_names = inspector.get_table_names()
for table_name in table_names:
existed_columns = inspector.get_columns(table_name)
existed_column_name_list = [c['name'] for c in existed_columns]
model = get_model_by_table_name(table_name)
if model is None:
continue
model_columns = model.__table__.columns._all_columns
for column in model_columns:
if column.name not in existed_column_name_list:
try:
add_new_column(table_name, column)
current_app.logger.info(f"add new column [{column.name}] in table [{table_name}] success.")
except Exception as e:
current_app.logger.error(f"add new column [{column.name}] in table [{table_name}] err:")
current_app.logger.error(e)

View File

@ -0,0 +1,94 @@
from api.models.common_setting import NoticeConfig
from wtforms import Form
from wtforms import StringField
from wtforms import validators
from flask import abort
import smtplib
from email.mime.text import MIMEText
from email.utils import formataddr
class NoticeConfigCRUD(object):
@staticmethod
def add_notice_config(**kwargs):
NoticeConfigCRUD.check_platform(kwargs.get('platform'))
try:
return NoticeConfig.create(
**kwargs
)
except Exception as e:
return abort(400, str(e))
@staticmethod
def check_platform(platform):
NoticeConfig.get_by(first=True, to_dict=False, platform=platform) and abort(400, f"{platform} 已存在!")
@staticmethod
def edit_notice_config(_id, **kwargs):
existed = NoticeConfigCRUD.get_notice_config_by_id(_id)
try:
return existed.update(**kwargs)
except Exception as e:
return abort(400, str(e))
@staticmethod
def get_notice_config_by_id(_id):
return NoticeConfig.get_by(first=True, to_dict=False, id=_id) or abort(400, f"{_id} 配置项不存在!")
@staticmethod
def get_all():
return NoticeConfig.get_by(to_dict=True)
@staticmethod
def test_send_email(receive_address, **kwargs):
# 设置发送方和接收方的电子邮件地址
sender_email = 'test@test.com'
sender_name = 'Test Sender'
recipient_email = receive_address
recipient_name = receive_address
subject = 'Test Email'
body = 'This is a test email'
message = MIMEText(body, 'plain', 'utf-8')
message['From'] = formataddr((sender_name, sender_email))
message['To'] = formataddr((recipient_name, recipient_email))
message['Subject'] = subject
smtp_server = kwargs.get('server')
smtp_port = kwargs.get('port')
smtp_username = kwargs.get('username')
smtp_password = kwargs.get('password')
if kwargs.get('mail_type') == 'SMTP':
smtp_connection = smtplib.SMTP(smtp_server, smtp_port)
else:
smtp_connection = smtplib.SMTP_SSL(smtp_server, smtp_port)
if kwargs.get('is_login'):
smtp_connection.login(smtp_username, smtp_password)
smtp_connection.sendmail(sender_email, recipient_email, message.as_string())
smtp_connection.quit()
return 1
class NoticeConfigForm(Form):
platform = StringField(validators=[
validators.DataRequired(message="平台 不能为空"),
validators.Length(max=255),
])
info = StringField(validators=[
validators.DataRequired(message="信息 不能为空"),
validators.Length(max=255),
])
class NoticeConfigUpdateForm(Form):
info = StringField(validators=[
validators.DataRequired(message="信息 不能为空"),
validators.Length(max=255),
])

View File

@ -53,5 +53,6 @@ class ErrFormat(CommonErrFormat):
username_is_required = "username不能为空"
email_is_required = "邮箱不能为空"
email_format_error = "邮箱格式错误"
email_send_timeout = "邮件发送超时"
common_data_not_found = "ID {} 找不到记录"

View File

@ -47,6 +47,8 @@ class Employee(ModelWithoutPK):
last_login = db.Column(db.TIMESTAMP, nullable=True)
block = db.Column(db.Integer, default=0)
notice_info = db.Column(db.JSON, default={})
_department = db.relationship(
'Department', backref='common_employee.department_id',
lazy='joined'
@ -87,3 +89,10 @@ class CommonData(Model):
data_type = db.Column(db.VARCHAR(255), default='')
data = db.Column(db.JSON)
class NoticeConfig(Model):
__tablename__ = "common_notice_config"
platform = db.Column(db.VARCHAR(255), nullable=False)
info = db.Column(db.JSON)

View File

@ -0,0 +1,71 @@
from flask import request, abort, current_app
from werkzeug.datastructures import MultiDict
from api.lib.perm.auth import auth_with_app_token
from api.models.common_setting import NoticeConfig
from api.resource import APIView
from api.lib.common_setting.notice_config import NoticeConfigForm, NoticeConfigUpdateForm, NoticeConfigCRUD
from api.lib.decorator import args_required
from api.lib.common_setting.resp_format import ErrFormat
prefix = '/notice_config'
class NoticeConfigView(APIView):
url_prefix = (f'{prefix}',)
@args_required('platform')
@auth_with_app_token
def get(self):
platform = request.args.get('platform')
res = NoticeConfig.get_by(first=True, to_dict=True, platform=platform) or {}
return self.jsonify(res)
def post(self):
form = NoticeConfigForm(MultiDict(request.json))
if not form.validate():
abort(400, ','.join(['{}: {}'.format(filed, ','.join(msg)) for filed, msg in form.errors.items()]))
data = NoticeConfigCRUD.add_notice_config(**form.data)
return self.jsonify(data.to_dict())
class NoticeConfigUpdateView(APIView):
url_prefix = (f'{prefix}/<int:_id>',)
def put(self, _id):
form = NoticeConfigUpdateForm(MultiDict(request.json))
if not form.validate():
abort(400, ','.join(['{}: {}'.format(filed, ','.join(msg)) for filed, msg in form.errors.items()]))
data = NoticeConfigCRUD.edit_notice_config(_id, **form.data)
return self.jsonify(data.to_dict())
class CheckEmailServer(APIView):
url_prefix = (f'{prefix}/send_test_email',)
def post(self):
receive_address = request.args.get('receive_address')
info = request.values.get('info')
try:
result = NoticeConfigCRUD.test_send_email(receive_address, **info)
return self.jsonify(result=result)
except Exception as e:
current_app.logger.error('test_send_email err:')
current_app.logger.error(e)
if 'Timed Out' in str(e):
abort(400, ErrFormat.email_send_timeout)
abort(400, f"{str(e)}")
class NoticeConfigGetView(APIView):
method_decorators = []
url_prefix = (f'{prefix}/all',)
@auth_with_app_token
def get(self):
res = NoticeConfigCRUD.get_all()
return self.jsonify(res)