重构路由监控

This commit is contained in:
tanghc
2020-10-31 18:02:02 +08:00
parent 4ce2fc826d
commit 80003c44d3
58 changed files with 1966 additions and 215 deletions

View File

@@ -0,0 +1,43 @@
package com.gitee.sop.gatewaycommon.bean;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.Function;
/**
* LRU缓存
* LinkedHashMap 本身内部有一个触发条件则自动执行的方法:删除最老元素(最近最少使用的元素)
* 由于最近最少使用元素是 LinkedHashMap 内部处理
* 故我们不再需要维护 最近访问元素放在链尾get 时直接访问/ put 时直接存储
* created by Ethan-Walker on 2019/2/16
*/
public class LRUCache<K, V> {
private final Map<K, V> map;
public LRUCache(int capacity) {
map = new LinkedHashMap<K, V>(capacity, 0.75f, true) {
@Override
protected boolean removeEldestEntry(Map.Entry eldest) {
// 容量大于capacity 时就删除
return size() > capacity;
}
};
}
public V get(K key) {
return map.get(key);
}
public V computeIfAbsent(K key,
Function<? super K, ? extends V> mappingFunction) {
return map.computeIfAbsent(key, mappingFunction);
}
public V put(K key, V value) {
return map.put(key, value);
}
public Collection<V> values() {
return map.values();
}
}

View File

@@ -5,7 +5,6 @@ import com.gitee.sop.gatewaycommon.bean.ApiContext;
import com.gitee.sop.gatewaycommon.bean.BeanInitializer;
import com.gitee.sop.gatewaycommon.bean.SpringContext;
import com.gitee.sop.gatewaycommon.gateway.loadbalancer.NacosServerIntrospector;
import com.gitee.sop.gatewaycommon.interceptor.MonitorRouteInterceptor;
import com.gitee.sop.gatewaycommon.interceptor.RouteInterceptor;
import com.gitee.sop.gatewaycommon.limit.LimitManager;
import com.gitee.sop.gatewaycommon.manager.EnvGrayManager;
@@ -17,17 +16,20 @@ import com.gitee.sop.gatewaycommon.manager.LimitConfigManager;
import com.gitee.sop.gatewaycommon.manager.RouteConfigManager;
import com.gitee.sop.gatewaycommon.manager.RouteRepositoryContext;
import com.gitee.sop.gatewaycommon.message.ErrorFactory;
import com.gitee.sop.gatewaycommon.monitor.MonitorManager;
import com.gitee.sop.gatewaycommon.param.ParameterFormatter;
import com.gitee.sop.gatewaycommon.route.RegistryListener;
import com.gitee.sop.gatewaycommon.route.ServiceListener;
import com.gitee.sop.gatewaycommon.route.ServiceRouteListener;
import com.gitee.sop.gatewaycommon.secret.IsvManager;
import com.gitee.sop.gatewaycommon.sync.SopAsyncConfigurer;
import com.gitee.sop.gatewaycommon.util.RouteInterceptorUtil;
import com.gitee.sop.gatewaycommon.validate.SignConfig;
import com.gitee.sop.gatewaycommon.validate.Validator;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
@@ -156,6 +158,17 @@ public class AbstractConfiguration implements ApplicationContextAware, Applicati
return ApiConfig.getInstance().getParameterFormatter();
}
@Bean
public SopAsyncConfigurer sopAsyncConfigurer(@Value("${sop.monitor-route-interceptor.thread-pool-size:4}")
int threadPoolSize) {
return new SopAsyncConfigurer("gatewayAsync", threadPoolSize);
}
@Bean
@ConditionalOnMissingBean
public MonitorManager monitorManager() {
return new MonitorManager();
}
/**
* 跨域过滤器gateway采用react形式需要使用reactive包下的UrlBasedCorsConfigurationSource
@@ -231,7 +244,6 @@ public class AbstractConfiguration implements ApplicationContextAware, Applicati
protected void initRouteInterceptor() {
Map<String, RouteInterceptor> routeInterceptorMap = applicationContext.getBeansOfType(RouteInterceptor.class);
Collection<RouteInterceptor> routeInterceptors = new ArrayList<>(routeInterceptorMap.values());
routeInterceptors.add(new MonitorRouteInterceptor());
RouteInterceptorUtil.addInterceptors(routeInterceptors);
}

View File

@@ -39,7 +39,7 @@ public class GatewayResultExecutor extends BaseExecutorAdapter<ServerWebExchange
List<String> errorCodeList = exchange.getResponse().getHeaders().get(SopConstants.X_SERVICE_ERROR_CODE);
if (!CollectionUtils.isEmpty(errorCodeList)) {
String errorCode = errorCodeList.get(0);
responseStatus = Integer.valueOf(errorCode);
responseStatus = Integer.parseInt(errorCode);
}
return responseStatus;
}

View File

@@ -1,96 +0,0 @@
package com.gitee.sop.gatewaycommon.interceptor;
import com.gitee.sop.gatewaycommon.bean.ApiConfig;
import com.gitee.sop.gatewaycommon.monitor.MonitorInfo;
import com.gitee.sop.gatewaycommon.monitor.MonitorManager;
import com.gitee.sop.gatewaycommon.param.ApiParam;
import java.util.ArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* SOP默认的拦截器用于收集监控数据
*
* @author tanghc
*/
public class MonitorRouteInterceptor implements RouteInterceptor {
private ThreadPoolExecutor threadPoolExecutor;
public MonitorRouteInterceptor(int threadPoolSize) {
threadPoolExecutor = new ThreadPoolExecutor(threadPoolSize, threadPoolSize,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
}
public MonitorRouteInterceptor() {
this(4);
}
public MonitorRouteInterceptor(ThreadPoolExecutor threadPoolExecutor) {
this.threadPoolExecutor = threadPoolExecutor;
}
@Override
public void preRoute(RouteInterceptorContext context) {
}
@Override
public void afterRoute(RouteInterceptorContext context) {
threadPoolExecutor.execute(() -> this.storeRequestInfo(context));
}
/**
* 记录接口调用流量,最大时间,最小时间,总时长,平均时长,调用次数,成功次数,失败次数.
* 需要考虑并发情况。
*/
protected void storeRequestInfo(RouteInterceptorContext context) {
MonitorManager monitorManager = ApiConfig.getInstance().getMonitorManager();
ApiParam apiParam = context.getApiParam();
String routeId = apiParam.fetchNameVersion();
long spendTime = context.getFinishTimeMillis() - context.getBeginTimeMillis();
// 这步操作是线程安全的底层调用了ConcurrentHashMap.computeIfAbsent
MonitorInfo monitorInfo = monitorManager.getMonitorInfo(routeId, (k) -> this.createMonitorInfo(apiParam));
monitorInfo.storeMaxTime(spendTime);
monitorInfo.storeMinTime(spendTime);
monitorInfo.getTotalCount().incrementAndGet();
monitorInfo.getTotalTime().addAndGet(spendTime);
monitorInfo.getTotalRequestDataSize().addAndGet(context.getRequestDataSize());
monitorInfo.getTotalResponseDataSize().addAndGet(context.getResponseDataSize());
if (context.isSuccessRequest()) {
monitorInfo.getSuccessCount().incrementAndGet();
} else {
monitorInfo.getErrorCount().incrementAndGet();
String errorMsg = context.getServiceErrorMsg();
monitorInfo.addErrorMsg(errorMsg);
}
}
private MonitorInfo createMonitorInfo(ApiParam apiParam) {
MonitorInfo monitorInfo = new MonitorInfo();
monitorInfo.setName(apiParam.fetchName());
monitorInfo.setVersion(apiParam.fetchVersion());
monitorInfo.setServiceId(apiParam.fetchServiceId());
monitorInfo.setTotalRequestDataSize(new AtomicLong());
monitorInfo.setTotalResponseDataSize(new AtomicLong());
monitorInfo.setTotalTime(new AtomicLong());
monitorInfo.setMaxTime(0L);
monitorInfo.setMinTime(0L);
monitorInfo.setSuccessCount(new AtomicLong());
monitorInfo.setTotalCount(new AtomicLong());
monitorInfo.setErrorCount(new AtomicLong());
monitorInfo.setErrorMsgList(new ArrayList<>(10));
return monitorInfo;
}
@Override
public int getOrder() {
return -1000;
}
}

View File

@@ -88,6 +88,6 @@ public interface RouteInterceptorContext {
*/
default boolean isSuccessRequest() {
int responseStatus = getResponseStatus();
return responseStatus == HttpStatus.OK.value() || responseStatus == SopConstants.BIZ_ERROR_STATUS;
return responseStatus == HttpStatus.OK.value();
}
}

View File

@@ -33,8 +33,7 @@ public enum EnvironmentKeys {
/**
* post请求body缓存大小
*/
MAX_IN_MEMORY_SIZE("spring.codec.max-in-memory-size", "262144")
MAX_IN_MEMORY_SIZE("spring.codec.max-in-memory-size", "262144"),
;

View File

@@ -0,0 +1,63 @@
package com.gitee.sop.gatewaycommon.monitor;
import lombok.Data;
import java.util.Collection;
/**
* 每个接口 总调用流量,最大时间,最小时间,总时长,平均时长,调用次数,成功次数,失败次数,错误查看。
*
* @author tanghc
*/
@Data
public class MonitorDTO {
/**
* 路由id
*/
private String routeId;
/**
* 接口名
*/
private String name;
/**
* 版本号
*/
private String version;
/**
* serviceId
*/
private String serviceId;
/**
* 实例id
*/
private String instanceId;
/** 请求耗时最长时间, 数据库字段max_time */
private Integer maxTime;
/** 请求耗时最小时间, 数据库字段min_time */
private Integer minTime;
/**
* 总时长
*/
private Long totalTime;
/** 总调用次数, 数据库字段total_request_count */
private Long totalRequestCount;
/** 成功次数, 数据库字段success_count */
private Long successCount;
/** 失败次数(业务主动抛出的异常算作成功,如参数校验,未知的错误算失败), 数据库字段error_count */
private Long errorCount;
/**
* 错误信息
*/
private Collection<MonitorErrorMsg> errorMsgList;
}

View File

@@ -0,0 +1,106 @@
package com.gitee.sop.gatewaycommon.monitor;
import com.gitee.sop.gatewaycommon.bean.LRUCache;
import lombok.Data;
import org.apache.commons.codec.digest.DigestUtils;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 每个接口 总调用流量,最大时间,最小时间,总时长,平均时长,调用次数,成功次数,失败次数,错误查看。
*
* @author tanghc
*/
@Data
public class MonitorData {
public static final int LIMIT_SIZE = 50;
private String routeId;
/**
* 接口名
*/
private String name;
/**
* 版本号
*/
private String version;
/**
* serviceId
*/
private String serviceId;
private String instanceId;
/**
* 请求耗时最长时间
*/
private Integer maxTime;
/**
* 请求耗时最小时间
*/
private Integer minTime;
/**
* 总时长
*/
private AtomicInteger totalTime;
/**
* 总调用次数
*/
private AtomicInteger totalRequestCount;
/**
* 成功次数
*/
private AtomicInteger successCount;
/**
* 失败次数(业务主动抛出的异常算作成功,如参数校验,未知的错误算失败)
*/
private AtomicInteger errorCount;
/**
* 错误信息,key: errorId
*/
private LRUCache<String, MonitorErrorMsg> monitorErrorMsgMap;
/**
* 下一次刷新到数据库的时间
*/
private LocalDateTime flushTime;
public synchronized void storeMaxTime(int spendTime) {
if (spendTime > maxTime) {
maxTime = spendTime;
}
}
public synchronized void storeMinTime(int spendTime) {
if (minTime == 0 || spendTime < minTime) {
minTime = spendTime;
}
}
public void addErrorMsg(String errorMsg, int httpStatus) {
if (errorMsg == null || "".equals(errorMsg)) {
return;
}
synchronized (this) {
String errorId = DigestUtils.md5Hex(instanceId + routeId + errorMsg);
MonitorErrorMsg monitorErrorMsg = monitorErrorMsgMap.computeIfAbsent(errorId, (k) -> {
MonitorErrorMsg value = new MonitorErrorMsg();
value.setErrorId(errorId);
value.setInstanceId(instanceId);
value.setRouteId(routeId);
value.setErrorMsg(errorMsg);
value.setErrorStatus(httpStatus);
value.setCount(0);
return value;
});
monitorErrorMsg.setCount(monitorErrorMsg.getCount() + 1);
}
}
}

View File

@@ -0,0 +1,30 @@
package com.gitee.sop.gatewaycommon.monitor;
import lombok.Data;
/**
* @author thc
*/
@Data
public class MonitorErrorMsg {
/** 错误id,md5(error_msg), 数据库字段error_id */
private String errorId;
/** 实例id, 数据库字段instance_id */
private String instanceId;
/** 数据库字段route_id */
private String routeId;
/** 数据库字段isp_id */
private Long ispId;
/** 数据库字段error_msg */
private String errorMsg;
/** http status非200错误 */
private Integer errorStatus;
/** 错误次数, 数据库字段count */
private Integer count;
}

View File

@@ -1,88 +0,0 @@
package com.gitee.sop.gatewaycommon.monitor;
import lombok.Data;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
/**
* 每个接口 总调用流量,最大时间,最小时间,总时长,平均时长,调用次数,成功次数,失败次数,错误查看。
*
* @author tanghc
*/
@Data
public class MonitorInfo {
/**
* 接口名
*/
private String name;
/**
* 版本号
*/
private String version;
/**
* serviceId
*/
private String serviceId;
/**
* 总请求数据量
*/
private AtomicLong totalRequestDataSize;
/**
* 总返回数据量
*/
private AtomicLong totalResponseDataSize;
/**
* 请求耗时最长时间
*/
private Long maxTime;
/**
* 请求耗时最小时间
*/
private Long minTime;
/**
* 总时长
*/
private AtomicLong totalTime;
/**
* 总调用次数
*/
private AtomicLong totalCount;
/**
* 成功次数
*/
private AtomicLong successCount;
/**
* 失败次数(业务主动抛出的异常算作成功,如参数校验,未知的错误算失败)
*/
private AtomicLong errorCount;
/**
* 错误信息
*/
private List<String> errorMsgList;
public synchronized void storeMaxTime(long spendTime) {
if (spendTime > maxTime) {
maxTime = spendTime;
}
}
public synchronized void storeMinTime(long spendTime) {
if (minTime == 0 || spendTime < minTime) {
minTime = spendTime;
}
}
public void addErrorMsg(String errorMsg) {
if (errorMsg == null || "".equals(errorMsg)) {
return;
}
synchronized (this) {
if (errorMsgList != null && errorMsgList.size() < 10) {
errorMsgList.add(errorMsg);
}
}
}
}

View File

@@ -9,13 +9,13 @@ import java.util.function.Function;
*/
public class MonitorManager {
private static Map<String, MonitorInfo> monitorMap = new ConcurrentHashMap<>(128);
private static Map<String, MonitorData> monitorMap = new ConcurrentHashMap<>(128);
public Map<String, MonitorInfo> getMonitorData() {
public Map<String, MonitorData> getMonitorData() {
return monitorMap;
}
public MonitorInfo getMonitorInfo(String routeId, Function<String, MonitorInfo> createFun) {
public MonitorData getMonitorInfo(String routeId, Function<String, MonitorData> createFun) {
return monitorMap.computeIfAbsent(routeId, createFun);
}

View File

@@ -0,0 +1,13 @@
package com.gitee.sop.gatewaycommon.monitor;
import lombok.Data;
/**
* @author tanghc
*/
@Data
public class RouteErrorCount {
private String routeId;
private Integer count;
}

View File

@@ -119,7 +119,7 @@ public abstract class BaseExecutorAdapter<T, R> implements ResultExecutor<T, R>
defaultRouteInterceptorContext.setServiceResult(serviceResult);
defaultRouteInterceptorContext.setFinishTimeMillis(System.currentTimeMillis());
defaultRouteInterceptorContext.setResponseDataSize(serviceResult.length());
if (responseStatus != HttpStatus.OK.value() && responseStatus != SopConstants.BIZ_ERROR_STATUS) {
if (responseStatus != HttpStatus.OK.value()) {
String responseErrorMessage = getResponseErrorMessage(requestContext);
if (StringUtils.isEmpty(responseErrorMessage)) {
responseErrorMessage = serviceResult;

View File

@@ -0,0 +1,37 @@
package com.gitee.sop.gatewaycommon.sync;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author thc
*/
public class MyNamedThreadFactory implements ThreadFactory {
private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final String namePrefix;
public MyNamedThreadFactory(String name) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
if (null == name || name.isEmpty()) {
name = "pool";
}
namePrefix = name + "-" + POOL_NUMBER.getAndIncrement() + "-thread-";
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement());
if (t.isDaemon()) {
t.setDaemon(false);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}

View File

@@ -0,0 +1,41 @@
package com.gitee.sop.gatewaycommon.sync;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 异步执行配置
*
* @author tanghc
*/
@Slf4j
public class SopAsyncConfigurer implements AsyncConfigurer {
private final ThreadPoolExecutor threadPoolExecutor;
public SopAsyncConfigurer(String threadName, int poolSize) {
threadPoolExecutor = new ThreadPoolExecutor(poolSize, poolSize,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), new MyNamedThreadFactory(threadName));
}
@Override
public Executor getAsyncExecutor() {
return threadPoolExecutor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (Throwable e, Method method, Object... args) -> {
log.error("异步运行方法出错, method:{}, args:{}, message:{}", method, Arrays.deepToString(args), e.getMessage(), e);
};
}
}

View File

@@ -0,0 +1,79 @@
package com.gitee.sop.gatewaycommon.util;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
* @author tanghc
*/
@Slf4j
public class CopyUtil {
public static void copyPropertiesIgnoreNull(Object from, Object to) {
MyBeanUtil.copyPropertiesIgnoreNull(from, to);
}
public static void copyProperties(Object from, Object to) {
BeanUtils.copyProperties(from, to);
}
public static <T> T copyBean(Object from, Supplier<T> supplier) {
Objects.requireNonNull(from);
T to = supplier.get();
BeanUtils.copyProperties(from, to);
return to;
}
public static <T> T copyBeanNullable(Object from, Supplier<T> supplier) {
if (from == null) {
return supplier.get();
}
T to = supplier.get();
BeanUtils.copyProperties(from, to);
return to;
}
public static <T> T copyBean(Object from, Supplier<T> supplier, Consumer<T> after) {
if (from == null) {
return null;
}
T to = supplier.get();
BeanUtils.copyProperties(from, to);
after.accept(to);
return to;
}
public static <T> List<T> copyList(List<?> fromList, Supplier<T> toElement) {
if (fromList == null) {
return Collections.emptyList();
}
return fromList.stream()
.map(source -> {
T target = toElement.get();
BeanUtils.copyProperties(source, target);
return target;
})
.collect(Collectors.toList());
}
public static <T> List<T> copyList(List<?> fromList, Supplier<T> toElement, Consumer<T> after) {
if (fromList == null) {
return Collections.emptyList();
}
return fromList.stream()
.map(source -> {
T target = toElement.get();
BeanUtils.copyProperties(source, target);
after.accept(target);
return target;
})
.collect(Collectors.toList());
}
}

View File

@@ -72,7 +72,7 @@ public class GlobalExceptionHandler {
int lineCount = 5;
for (int i = 0; i < stackTrace.length && i < lineCount; i++) {
StackTraceElement stackTraceElement = stackTrace[i];
msg.append("<br> at ").append(stackTraceElement.toString());
msg.append("\n at ").append(stackTraceElement.toString());
}
response.setHeader("x-service-error-message", UriUtils.encode(msg.toString(), StandardCharsets.UTF_8));
return this.processError(request, response, new ServiceException("系统繁忙"));