cmdb/cmdb-api/api/lib/common_setting/upload_file.py

95 lines
3.1 KiB
Python

import base64
import uuid
import os
from io import BytesIO
from flask import abort, current_app
import lz4.frame
from api.lib.common_setting.utils import get_cur_time_str
from api.models.common_setting import CommonFile
from api.lib.common_setting.resp_format import ErrFormat
def allowed_file(filename, allowed_extensions):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in allowed_extensions
def generate_new_file_name(name):
ext = name.split('.')[-1]
prev_name = ''.join(name.split(f".{ext}")[:-1])
uid = str(uuid.uuid4())
cur_str = get_cur_time_str('_')
return f"{prev_name}_{cur_str}_{uid}.{ext}"
class CommonFileCRUD:
@staticmethod
def add_file(**kwargs):
return CommonFile.create(**kwargs)
@staticmethod
def get_file(file_name, to_str=False):
existed = CommonFile.get_by(file_name=file_name, first=True, to_dict=False)
if not existed:
abort(400, ErrFormat.file_not_found)
uncompressed_data = lz4.frame.decompress(existed.binary)
return base64.b64encode(uncompressed_data).decode('utf-8') if to_str else BytesIO(uncompressed_data)
@staticmethod
def sync_file_to_db():
for p in ['UPLOAD_DIRECTORY_FULL']:
upload_path = current_app.config.get(p, None)
if not upload_path:
continue
for root, dirs, files in os.walk(upload_path):
for file in files:
file_path = os.path.join(root, file)
if not os.path.isfile(file_path):
continue
existed = CommonFile.get_by(file_name=file, first=True, to_dict=False)
if existed:
continue
with open(file_path, 'rb') as f:
data = f.read()
compressed_data = lz4.frame.compress(data)
try:
CommonFileCRUD.add_file(
origin_name=file,
file_name=file,
binary=compressed_data
)
current_app.logger.info(f'sync file {file} to db')
except Exception as e:
current_app.logger.error(f'sync file {file} to db error: {e}')
def get_file_binary_str(self, file_name):
return self.get_file(file_name, True)
def save_str_to_file(self, file_name, str_data):
try:
self.get_file(file_name)
current_app.logger.info(f'file {file_name} already exists')
return
except Exception as e:
# file not found
pass
bytes_data = base64.b64decode(str_data)
compressed_data = lz4.frame.compress(bytes_data)
try:
self.add_file(
origin_name=file_name,
file_name=file_name,
binary=compressed_data
)
current_app.logger.info(f'save_str_to_file {file_name} success')
except Exception as e:
current_app.logger.error(f"save_str_to_file error: {e}")