feat: add project sync task

This commit is contained in:
vran 2022-04-16 12:20:20 +08:00
parent bbc5217310
commit fa9ff242ef
15 changed files with 277 additions and 73 deletions

View File

@ -1,11 +1,13 @@
package com.databasir.api;
import com.databasir.api.common.LoginUserContext;
import com.databasir.common.JsonData;
import com.databasir.core.diff.data.RootDiff;
import com.databasir.core.domain.document.data.*;
import com.databasir.core.domain.document.generator.DocumentFileType;
import com.databasir.core.domain.document.service.DocumentService;
import com.databasir.core.domain.log.annotation.Operation;
import com.databasir.core.domain.project.service.ProjectService;
import lombok.RequiredArgsConstructor;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
@ -20,6 +22,7 @@ import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBo
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
import static org.springframework.data.domain.Sort.Direction.DESC;
@ -30,11 +33,14 @@ public class DocumentController {
private final DocumentService documentService;
private final ProjectService projectService;
@PostMapping(Routes.Document.SYNC_ONE)
@Operation(module = Operation.Modules.PROJECT, name = "文档同步", involvedProjectId = "#projectId")
public JsonData<Void> sync(@PathVariable Integer projectId) {
documentService.syncByProjectId(projectId);
return JsonData.ok();
public JsonData<Integer> sync(@PathVariable Integer projectId) {
Integer userId = LoginUserContext.getLoginUserId();
Optional<Integer> taskIdOpt = projectService.createSyncTask(projectId, userId, false);
return JsonData.ok(taskIdOpt);
}
@GetMapping(Routes.Document.DIFF)
@ -56,14 +62,14 @@ public class DocumentController {
public JsonData<Page<DatabaseDocumentVersionResponse>> getVersionsByProjectId(@PathVariable Integer projectId,
@PageableDefault(sort = "id",
direction = DESC)
Pageable page) {
Pageable page) {
return JsonData.ok(documentService.getVersionsByProjectId(projectId, page));
}
@GetMapping(Routes.Document.EXPORT)
public ResponseEntity<StreamingResponseBody> getDocumentFiles(@PathVariable Integer projectId,
@RequestParam(required = false)
Long version,
Long version,
@RequestParam DocumentFileType fileType) {
HttpHeaders headers = new HttpHeaders();
String fileName = "project[" + projectId + "]." + fileType.getFileExtension();
@ -79,7 +85,7 @@ public class DocumentController {
@GetMapping(Routes.Document.GET_SIMPLE_ONE)
public JsonData<DatabaseDocumentSimpleResponse> getSimpleByProjectId(@PathVariable Integer projectId,
@RequestParam(required = false)
Long version) {
Long version) {
return JsonData.ok(documentService.getSimpleOneByProjectId(projectId, version));
}

View File

@ -5,6 +5,8 @@ import com.databasir.api.validator.CronExpressionValidator;
import com.databasir.common.JsonData;
import com.databasir.core.domain.log.annotation.Operation;
import com.databasir.core.domain.project.data.*;
import com.databasir.core.domain.project.data.task.ProjectSimpleTaskResponse;
import com.databasir.core.domain.project.data.task.ProjectTaskListCondition;
import com.databasir.core.domain.project.service.ProjectService;
import lombok.RequiredArgsConstructor;
import org.springframework.data.domain.Page;
@ -17,6 +19,7 @@ import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import javax.validation.Valid;
import java.util.List;
@RestController
@RequiredArgsConstructor
@ -71,7 +74,7 @@ public class ProjectController {
@GetMapping(Routes.GroupProject.LIST)
public JsonData<Page<ProjectSimpleResponse>> list(@PageableDefault(sort = "id", direction = Sort.Direction.DESC)
Pageable page,
Pageable page,
ProjectListCondition condition) {
DatabasirUserDetails user = (DatabasirUserDetails) SecurityContextHolder.getContext()
.getAuthentication()
@ -85,4 +88,10 @@ public class ProjectController {
projectService.testConnection(request);
return JsonData.ok();
}
@PostMapping(Routes.GroupProject.LIST_MANUAL_TASKS)
public JsonData<List<ProjectSimpleTaskResponse>> listManualTasks(@PathVariable Integer projectId,
@RequestBody ProjectTaskListCondition condition) {
return JsonData.ok(projectService.listManualTasks(projectId, condition));
}
}

View File

@ -70,6 +70,8 @@ public interface Routes {
String DELETE = BASE + "/groups/{groupId}/projects/{projectId}";
String TEST_CONNECTION = BASE + "/projects/test_connection";
String LIST_MANUAL_TASKS = BASE + "/projects/{projectId}/list_manual_tasks";
}
interface Document {

View File

@ -25,11 +25,11 @@ public class DatabasirAuthenticationEntryPoint implements AuthenticationEntryPoi
public void commence(javax.servlet.http.HttpServletRequest request,
javax.servlet.http.HttpServletResponse response,
AuthenticationException authException) throws IOException, ServletException {
log.warn("验证未通过. 提示信息 - {} - {} {}", request.getRequestURI(),
log.warn("验证未通过. 提示信息 - {} - {} - {}", request.getRequestURI(),
authException.getClass().getName(),
authException.getMessage());
DomainErrors err = DomainErrors.INVALID_REFRESH_TOKEN_OPERATION;
DomainErrors err = DomainErrors.INVALID_ACCESS_TOKEN;
JsonData<Void> data = JsonData.error(err.getErrCode(), err.getErrMessage());
String jsonString = objectMapper.writeValueAsString(data);
response.setStatus(HttpStatus.UNAUTHORIZED.value());

View File

@ -1,61 +0,0 @@
package com.databasir.job;
import com.databasir.common.JsonData;
import com.databasir.core.domain.document.service.DocumentService;
import com.databasir.core.domain.log.data.OperationLogRequest;
import com.databasir.core.domain.log.service.OperationLogService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
@RequiredArgsConstructor
@Slf4j
public class ProjectDocumentAutoSyncJob implements Job {
private final OperationLogService operationLogService;
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
JobDataMap dataMap = context.getMergedJobDataMap();
log.info("start sync project document: " + dataMap.toString());
DocumentService documentService = (DocumentService) dataMap.get("documentService");
Integer projectId = dataMap.getInt("projectId");
try {
documentService.syncByProjectId(projectId);
OperationLogRequest request = OperationLogRequest.builder()
.isSuccess(true)
.operatorNickname("system")
.operatorUsername("system")
.operatorUserId(-1)
.operationName("文档自动同步")
.operationCode("autoSyncDocumentation")
.operationModule("project")
.operationResponse(JsonData.ok())
.isSuccess(true)
.involvedProjectId(projectId)
.build();
operationLogService.save(request);
log.info("sync project document {} over....", projectId);
} catch (Exception e) {
OperationLogRequest request = OperationLogRequest.builder()
.isSuccess(true)
.operatorNickname("system")
.operatorUsername("system")
.operatorUserId(-1)
.operationName("文档自动同步")
.operationCode("autoSyncDocumentation")
.operationModule("project")
.operationResponse(JsonData.error("-1", e.getMessage()))
.isSuccess(false)
.involvedProjectId(projectId)
.build();
operationLogService.save(request);
throw e;
}
}
}

View File

@ -0,0 +1,24 @@
package com.databasir.job;
import com.databasir.core.domain.project.service.ProjectService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
@RequiredArgsConstructor
@Slf4j
public class ProjectSyncJob implements Job {
private final ProjectService projectService;
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
JobDataMap dataMap = context.getMergedJobDataMap();
Integer projectId = dataMap.getInt("projectId");
projectService.createSyncTask(projectId, -1, true);
}
}

View File

@ -16,10 +16,13 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* 用于启用 / 停用项目同步任务的调度器
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class ProjectDocumentAutoSyncTriggerJob {
public class ProjectSyncJobScheduler {
private static final String JOB_IDENTITY_PATTERN = "JOB_PROJECT[%s]";
@ -141,7 +144,7 @@ public class ProjectDocumentAutoSyncTriggerJob {
dataMap.put("cron", rule.getAutoSyncCron());
dataMap.put("documentService", documentService);
JobDetail job = JobBuilder.newJob()
.ofType(ProjectDocumentAutoSyncJob.class)
.ofType(ProjectSyncJob.class)
.withIdentity(jobKey(projectId))
.withDescription("auto sync project document")
.usingJobData(dataMap)

View File

@ -0,0 +1,100 @@
package com.databasir.job;
import com.databasir.common.JsonData;
import com.databasir.core.domain.document.service.DocumentService;
import com.databasir.core.domain.log.data.OperationLogRequest;
import com.databasir.core.domain.log.service.OperationLogService;
import com.databasir.dao.enums.ProjectSyncTaskStatus;
import com.databasir.dao.impl.ProjectSyncTaskDao;
import com.databasir.dao.impl.UserDao;
import com.databasir.dao.tables.pojos.UserPojo;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.util.Objects;
@Component
@RequiredArgsConstructor
@Slf4j
public class ProjectSyncTaskScheduler {
private final DocumentService documentService;
private final OperationLogService operationLogService;
private final UserDao userDao;
private final ProjectSyncTaskDao projectSyncTaskDao;
private final ThreadPoolTaskExecutor projectSyncTaskThreadPoolTaskExecutor;
/**
* 每隔 5s 执行一次
*/
@Scheduled(fixedRate = 5000L)
public void startSyncTask() {
final int size = 10;
projectSyncTaskDao.listNewTasks(size).forEach(task -> {
projectSyncTaskThreadPoolTaskExecutor.execute(() -> {
Integer taskId = task.getId();
Integer projectId = task.getProjectId();
Integer userId = task.getUserId();
sync(taskId, projectId, userId);
});
});
}
private void sync(Integer taskId, Integer projectId, Integer userId) {
try {
updateSyncTaskStatus(taskId, ProjectSyncTaskStatus.RUNNING, "running");
documentService.syncByProjectId(projectId);
updateSyncTaskStatus(taskId, ProjectSyncTaskStatus.FINISHED, "ok");
saveOperationLog(projectId, userId, null);
} catch (Exception e) {
updateSyncTaskStatus(taskId, ProjectSyncTaskStatus.FAILED, e.getMessage());
saveOperationLog(projectId, userId, e);
throw e;
}
}
private void updateSyncTaskStatus(Integer taskId, ProjectSyncTaskStatus status, String result) {
projectSyncTaskDao.updateStatusAndResultById(taskId, status, result);
}
private void saveOperationLog(Integer projectId, Integer userId, Exception ex) {
String operatorNickName;
String operatorUsername;
String operationName;
if (Objects.equals(-1, userId)) {
operatorNickName = "system";
operatorUsername = "system";
operationName = "文档定时同步";
} else {
UserPojo user = userDao.selectById(userId);
operatorNickName = user.getNickname();
operatorUsername = user.getUsername();
operationName = "文档手动同步";
}
JsonData response;
if (ex == null) {
response = JsonData.ok();
} else {
response = JsonData.error("-1", ex.getMessage());
}
OperationLogRequest operationLog = OperationLogRequest.builder()
.operatorNickname(operatorNickName)
.operatorUsername(operatorUsername)
.operatorUserId(userId)
.operationName(operationName)
.operationCode("autoSyncDocumentation")
.operationModule("project")
.operationResponse(response)
.isSuccess(ex == null)
.involvedProjectId(projectId)
.build();
operationLogService.save(operationLog);
}
}

View File

@ -0,0 +1,27 @@
package com.databasir.job.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
public class ProjectSyncTaskThreadPoolConfig {
@Bean
public ThreadPoolTaskExecutor projectSyncTaskThreadPoolTaskExecutor() {
final int maxCorePoolSize = 12;
final int maxPoolSize = 32;
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
int availableProcessorCount = Runtime.getRuntime().availableProcessors() + 2;
int corePoolSize = Math.min(maxCorePoolSize, availableProcessorCount);
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setAllowCoreThreadTimeOut(true);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
return executor;
}
}

View File

@ -11,6 +11,7 @@ public enum DomainErrors implements DatabasirErrors {
REFRESH_TOKEN_EXPIRED("X_0001", "refresh token expired"),
INVALID_REFRESH_TOKEN_OPERATION("X_0002", "invalid refresh token operation"),
NETWORK_ERROR("X_0003", "网络似乎不稳定,请稍后再试"),
INVALID_ACCESS_TOKEN("X_0004", "无效的 access token"),
NOT_SUPPORT_DATABASE_TYPE("A_10000", "不支持的数据库类型, 请检查项目配置"),
PROJECT_NOT_FOUND("A_10001", "项目不存在"),

View File

@ -43,6 +43,7 @@ public class LoginService {
.orElseThrow(DomainErrors.INVALID_REFRESH_TOKEN_OPERATION::exception);
// refresh-token 已过期
if (login.getRefreshTokenExpireAt().isBefore(LocalDateTime.now())) {
log.warn("refresh token expired, {} ", request);
throw DomainErrors.REFRESH_TOKEN_EXPIRED.exception();
}
// access-token 未过期允许一分钟的误差就开始刷新有可能是 refresh-token 泄露了删除 refresh-token

View File

@ -0,0 +1,17 @@
package com.databasir.core.domain.project.converter;
import com.databasir.core.domain.project.data.task.ProjectSimpleTaskResponse;
import com.databasir.dao.tables.pojos.ProjectSyncTaskPojo;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import java.util.List;
@Mapper(componentModel = "spring")
public interface ProjectSimpleTaskResponseConverter {
List<ProjectSimpleTaskResponse> of(List<ProjectSyncTaskPojo> pojos);
@Mapping(target = "taskId", source = "id")
ProjectSimpleTaskResponse of(ProjectSyncTaskPojo pojo);
}

View File

@ -0,0 +1,14 @@
package com.databasir.core.domain.project.data.task;
import com.databasir.dao.enums.ProjectSyncTaskStatus;
import lombok.Data;
@Data
public class ProjectSimpleTaskResponse {
private Integer taskId;
private ProjectSyncTaskStatus status;
private String result;
}

View File

@ -0,0 +1,29 @@
package com.databasir.core.domain.project.data.task;
import com.databasir.dao.Tables;
import com.databasir.dao.enums.ProjectSyncTaskStatus;
import lombok.Data;
import org.jooq.Condition;
import java.util.Collection;
import java.util.Collections;
@Data
public class ProjectTaskListCondition {
private Collection<Integer> taskIdIn = Collections.emptyList();
private Collection<ProjectSyncTaskStatus> taskStatusIn = Collections.emptyList();
public Condition toCondition(Integer projectId) {
Condition condition = Tables.PROJECT_SYNC_TASK.PROJECT_ID.eq(projectId);
if (taskIdIn != null && !taskIdIn.isEmpty()) {
condition = condition.and(Tables.PROJECT_SYNC_TASK.ID.in(taskIdIn));
}
if (taskStatusIn != null && !taskStatusIn.isEmpty()) {
condition = condition.and(Tables.PROJECT_SYNC_TASK.STATUS.in(taskStatusIn));
}
// ignore system user task
return condition.and(Tables.PROJECT_SYNC_TASK.USER_ID.ne(-1));
}
}

View File

@ -5,11 +5,16 @@ import com.databasir.core.domain.DomainErrors;
import com.databasir.core.domain.project.converter.DataSourcePojoConverter;
import com.databasir.core.domain.project.converter.ProjectPojoConverter;
import com.databasir.core.domain.project.converter.ProjectResponseConverter;
import com.databasir.core.domain.project.converter.ProjectSimpleTaskResponseConverter;
import com.databasir.core.domain.project.data.*;
import com.databasir.core.domain.project.data.task.ProjectSimpleTaskResponse;
import com.databasir.core.domain.project.data.task.ProjectTaskListCondition;
import com.databasir.core.infrastructure.connection.DatabaseConnectionService;
import com.databasir.dao.enums.ProjectSyncTaskStatus;
import com.databasir.dao.impl.*;
import com.databasir.dao.tables.pojos.*;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
@ -23,8 +28,11 @@ import java.util.stream.Collectors;
@Service
@RequiredArgsConstructor
@Slf4j
public class ProjectService {
private final DatabaseConnectionService databaseConnectionService;
private final ProjectDao projectDao;
private final ProjectSyncRuleDao projectSyncRuleDao;
@ -37,13 +45,15 @@ public class ProjectService {
private final UserFavoriteProjectDao userFavoriteProjectDao;
private final ProjectSyncTaskDao projectSyncTaskDao;
private final DataSourcePojoConverter dataSourcePojoConverter;
private final ProjectPojoConverter projectPojoConverter;
private final ProjectResponseConverter projectResponseConverter;
private final DatabaseConnectionService databaseConnectionService;
private final ProjectSimpleTaskResponseConverter projectSimpleTaskResponseConverter;
public ProjectDetailResponse getOne(Integer id) {
return projectDao.selectOptionalById(id)
@ -175,4 +185,26 @@ public class ProjectService {
properties);
}
@Transactional
public Optional<Integer> createSyncTask(Integer projectId, Integer userId, boolean ignoreIfExists) {
if (!projectDao.existsById(projectId)) {
log.info("create sync task failed, because project not exists, projectId={}", projectId);
return Optional.empty();
}
var validTaskStatus = List.of(ProjectSyncTaskStatus.NEW, ProjectSyncTaskStatus.RUNNING);
if (ignoreIfExists && projectSyncTaskDao.existsByProjectId(projectId, validTaskStatus)) {
log.info("create sync task failed, it's already exists, projectId={}", projectId);
return Optional.empty();
}
ProjectSyncTaskPojo projectSyncTask = new ProjectSyncTaskPojo();
projectSyncTask.setProjectId(projectId);
projectSyncTask.setStatus(ProjectSyncTaskStatus.NEW);
projectSyncTask.setUserId(userId);
return Optional.of(projectSyncTaskDao.insertAndReturnId(projectSyncTask));
}
public List<ProjectSimpleTaskResponse> listManualTasks(Integer projectId, ProjectTaskListCondition condition) {
var tasks = projectSyncTaskDao.selectList(condition.toCondition(projectId));
return projectSimpleTaskResponseConverter.of(tasks);
}
}