适配eureka

This commit is contained in:
tanghc
2019-09-26 20:01:35 +08:00
parent 51ca5ba319
commit be0adc0a8e
37 changed files with 1120 additions and 219 deletions

View File

@@ -0,0 +1,68 @@
package com.gitee.sop.gateway.controller;
import com.alibaba.fastjson.JSON;
import com.gitee.sop.gateway.manager.ChannelMsgProcessor;
import com.gitee.sop.gateway.manager.DbEnvGrayManager;
import com.gitee.sop.gateway.manager.DbIPBlacklistManager;
import com.gitee.sop.gateway.manager.DbIsvManager;
import com.gitee.sop.gateway.manager.DbIsvRoutePermissionManager;
import com.gitee.sop.gateway.manager.DbLimitConfigManager;
import com.gitee.sop.gateway.manager.DbRouteConfigManager;
import com.gitee.sop.gatewaycommon.bean.GatewayPushDTO;
import com.gitee.sop.gatewaycommon.bean.NacosConfigs;
import com.gitee.sop.gatewaycommon.bean.SpringContext;
import com.gitee.sop.gatewaycommon.util.RequestUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* @author tanghc
*/
@Slf4j
@RestController
public class ConfigChannelController {
private static Map<String, Class<? extends ChannelMsgProcessor>> processorMap = new HashMap<>(16);
static {
processorMap.put(NacosConfigs.GROUP_CHANNEL + NacosConfigs.DATA_ID_GRAY, DbEnvGrayManager.class);
processorMap.put(NacosConfigs.GROUP_CHANNEL + NacosConfigs.DATA_ID_IP_BLACKLIST, DbIPBlacklistManager.class);
processorMap.put(NacosConfigs.GROUP_CHANNEL + NacosConfigs.DATA_ID_ISV, DbIsvManager.class);
processorMap.put(NacosConfigs.GROUP_CHANNEL + NacosConfigs.DATA_ID_ROUTE_PERMISSION, DbIsvRoutePermissionManager.class);
processorMap.put(NacosConfigs.GROUP_CHANNEL + NacosConfigs.DATA_ID_LIMIT_CONFIG, DbLimitConfigManager.class);
processorMap.put(NacosConfigs.GROUP_CHANNEL + NacosConfigs.DATA_ID_ROUTE_CONFIG, DbRouteConfigManager.class);
}
@Value("${zuul.secret}")
private String secret;
@PostMapping("/configChannelMsg")
public String configChannel(HttpServletRequest request) throws IOException {
String requestJson = RequestUtil.getText(request);
String sign = request.getHeader("sign");
try {
RequestUtil.checkResponseBody(requestJson, sign, secret);
} catch (Exception e) {
log.error("configChannelMsg错误", e);
return e.getMessage();
}
GatewayPushDTO gatewayPushDTO = JSON.parseObject(requestJson, GatewayPushDTO.class);
ChannelMsgProcessor channelMsgProcessor = getChannelMsgProcessor(gatewayPushDTO);
channelMsgProcessor.process(gatewayPushDTO.getChannelMsg());
return "ok";
}
private ChannelMsgProcessor getChannelMsgProcessor(GatewayPushDTO gatewayPushDTO) {
String key = gatewayPushDTO.getGroupId() + gatewayPushDTO.getDataId();
Class<? extends ChannelMsgProcessor> aClass = processorMap.get(key);
return SpringContext.getBean(aClass);
}
}

View File

@@ -0,0 +1,71 @@
package com.gitee.sop.gateway.entity;
import com.gitee.fastmybatis.core.annotation.LogicDelete;
import lombok.Data;
import java.util.Date;
import javax.persistence.Column;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;
/**
* 表名config_service_route
* 备注:路由配置
*
* @author tanghc
*/
@Table(name = "config_service_route")
@Data
public class ConfigServiceRoute {
@Id
@Column(name = "id")
@GeneratedValue(strategy = GenerationType.AUTO)
/** 数据库字段id */
private String id;
/** 数据库字段service_id */
private String serviceId;
/** 接口名, 数据库字段name */
private String name;
/** 版本号, 数据库字段version */
private String version;
/** 路由断言SpringCloudGateway专用, 数据库字段predicates */
private String predicates;
/** 路由过滤器SpringCloudGateway专用, 数据库字段filters */
private String filters;
/** 路由规则转发的目标uri, 数据库字段uri */
private String uri;
/** uri后面跟的path, 数据库字段path */
private String path;
/** 路由执行的顺序, 数据库字段order */
private Integer order;
/** 是否忽略验证,业务参数验证除外, 数据库字段ignore_validate */
private Byte ignoreValidate;
/** 状态0待审核1启用2禁用, 数据库字段status */
private Byte status;
/** 是否合并结果, 数据库字段merge_result */
private Byte mergeResult;
/** 是否需要授权才能访问, 数据库字段permission */
private Byte permission;
/** 数据库字段gmt_create */
private Date gmtCreate;
/** 数据库字段gmt_modified */
private Date gmtModified;
}

View File

@@ -0,0 +1,10 @@
package com.gitee.sop.gateway.manager;
import com.gitee.sop.gatewaycommon.bean.ChannelMsg;
/**
* @author tanghc
*/
public interface ChannelMsgProcessor {
void process(ChannelMsg channelMsg);
}

View File

@@ -36,7 +36,7 @@ import java.util.stream.Stream;
*/
@Slf4j
@Service
public class DbEnvGrayManager extends DefaultEnvGrayManager {
public class DbEnvGrayManager extends DefaultEnvGrayManager implements ChannelMsgProcessor {
private static final int STATUS_ENABLE = 1;
@@ -97,29 +97,32 @@ public class DbEnvGrayManager extends DefaultEnvGrayManager {
this.saveServiceGrayConfig(serviceGrayConfig);
}
@Override
public void process(ChannelMsg channelMsg) {
ServiceGrayDefinition userKeyDefinition = channelMsg.toObject(ServiceGrayDefinition.class);
String serviceId = userKeyDefinition.getServiceId();
switch (channelMsg.getOperation()) {
case "set":
ConfigGray configGray = configGrayMapper.getByColumn("service_id", serviceId);
setServiceGrayConfig(configGray);
break;
case "open":
openGray(userKeyDefinition.getInstanceId(), serviceId);
break;
case "close":
closeGray(userKeyDefinition.getInstanceId());
break;
default:
}
}
@PostConstruct
protected void after() throws Exception {
configService.addListener(NacosConfigs.DATA_ID_GRAY, NacosConfigs.GROUP_CHANNEL, new AbstractListener() {
@Override
public void receiveConfigInfo(String configInfo) {
ChannelMsg channelMsg = JSON.parseObject(configInfo, ChannelMsg.class);
String data = channelMsg.getData();
ServiceGrayDefinition userKeyDefinition = JSON.parseObject(data, ServiceGrayDefinition.class);
String serviceId = userKeyDefinition.getServiceId();
switch (channelMsg.getOperation()) {
case "set":
ConfigGray configGray = configGrayMapper.getByColumn("service_id", serviceId);
setServiceGrayConfig(configGray);
break;
case "open":
openGray(userKeyDefinition.getInstanceId(), serviceId);
break;
case "close":
closeGray(userKeyDefinition.getInstanceId());
break;
default:
}
ChannelMsg channelMsg = JSON.parseObject(configInfo, ChannelMsg.class);
process(channelMsg);
}
});
}

View File

@@ -23,7 +23,7 @@ import java.util.List;
*/
@Slf4j
@Service
public class DbIPBlacklistManager extends DefaultIPBlacklistManager {
public class DbIPBlacklistManager extends DefaultIPBlacklistManager implements ChannelMsgProcessor {
@Autowired
private IPBlacklistMapper ipBlacklistMapper;
@@ -35,29 +35,34 @@ public class DbIPBlacklistManager extends DefaultIPBlacklistManager {
public void load() {
List<String> ipList = ipBlacklistMapper.listAllIP();
log.info("加载IP黑名单, size:{}", ipList.size());
ipList.stream().forEach(this::add);
ipList.forEach(this::add);
}
@Override
public void process(ChannelMsg channelMsg) {
final IPDto ipDto = channelMsg.toObject(IPDto.class);
String ip = ipDto.getIp();
switch (channelMsg.getOperation()) {
case "add":
log.info("添加IP黑名单ip:{}", ip);
add(ip);
break;
case "delete":
log.info("移除IP黑名单ip:{}", ip);
remove(ip);
break;
default:
}
}
@PostConstruct
protected void after() throws Exception {
configService.addListener(NacosConfigs.DATA_ID_IP_BLACKLIST, NacosConfigs.GROUP_CHANNEL, new AbstractListener() {
@Override
public void receiveConfigInfo(String configInfo) {
ChannelMsg channelMsg = JSON.parseObject(configInfo, ChannelMsg.class);
final IPDto ipDto = JSON.parseObject(channelMsg.getData(), IPDto.class);
String ip = ipDto.getIp();
switch (channelMsg.getOperation()) {
case "add":
log.info("添加IP黑名单ip:{}", ip);
add(ip);
break;
case "delete":
log.info("移除IP黑名单ip:{}", ip);
remove(ip);
break;
default:
}
ChannelMsg channelMsg = JSON.parseObject(configInfo, ChannelMsg.class);
process(channelMsg);
}
});
}
@@ -66,5 +71,4 @@ public class DbIPBlacklistManager extends DefaultIPBlacklistManager {
private static class IPDto {
private String ip;
}
}

View File

@@ -24,7 +24,7 @@ import java.util.List;
*/
@Slf4j
@Service
public class DbIsvManager extends CacheIsvManager {
public class DbIsvManager extends CacheIsvManager implements ChannelMsgProcessor{
@Autowired
private IsvInfoMapper isvInfoMapper;
@@ -35,7 +35,7 @@ public class DbIsvManager extends CacheIsvManager {
@Override
public void load() {
List<IsvDetailDTO> isvInfoList = isvInfoMapper.listIsvDetail();
isvInfoList.stream()
isvInfoList
.forEach(isvInfo -> {
IsvDefinition isvDefinition = new IsvDefinition();
BeanUtils.copyProperties(isvInfo, isvDefinition);
@@ -43,6 +43,23 @@ public class DbIsvManager extends CacheIsvManager {
});
}
@Override
public void process(ChannelMsg channelMsg) {
final IsvDefinition isvDefinition = channelMsg.toObject(IsvDefinition.class);
switch (channelMsg.getOperation()) {
case "update":
log.info("更新ISV信息isvDefinition:{}", isvDefinition);
update(isvDefinition);
break;
case "remove":
log.info("删除ISVisvDefinition:{}", isvDefinition);
remove(isvDefinition.getAppKey());
break;
default:
}
}
@PostConstruct
protected void after() throws Exception {
ApiConfig.getInstance().setIsvManager(this);
@@ -50,20 +67,8 @@ public class DbIsvManager extends CacheIsvManager {
configService.addListener(NacosConfigs.DATA_ID_ISV, NacosConfigs.GROUP_CHANNEL, new AbstractListener() {
@Override
public void receiveConfigInfo(String configInfo) {
ChannelMsg channelMsg = JSON.parseObject(configInfo, ChannelMsg.class);
final IsvDefinition isvDefinition = JSON.parseObject(channelMsg.getData(), IsvDefinition.class);
switch (channelMsg.getOperation()) {
case "update":
log.info("更新ISV信息isvDefinition:{}", isvDefinition);
update(isvDefinition);
break;
case "remove":
log.info("删除ISVisvDefinition:{}", isvDefinition);
remove(isvDefinition.getAppKey());
break;
default:
}
ChannelMsg channelMsg = JSON.parseObject(configInfo, ChannelMsg.class);
process(channelMsg);
}
});
}

View File

@@ -40,7 +40,7 @@ import static java.util.stream.Collectors.toList;
*/
@Slf4j
@Service
public class DbIsvRoutePermissionManager extends DefaultIsvRoutePermissionManager {
public class DbIsvRoutePermissionManager extends DefaultIsvRoutePermissionManager implements ChannelMsgProcessor {
@Autowired
Environment environment;
@@ -122,41 +122,46 @@ public class DbIsvRoutePermissionManager extends DefaultIsvRoutePermissionManage
.collect(Collectors.toList());
}
@Data
static class IsvRole {
private String appKey;
private String roleCode;
}
@Override
public void process(ChannelMsg channelMsg) {
final IsvRoutePermission isvRoutePermission = channelMsg.toObject(IsvRoutePermission.class);
switch (channelMsg.getOperation()) {
case "reload":
log.info("重新加载路由权限信息isvRoutePermission:{}", isvRoutePermission);
try {
load();
} catch (Exception e) {
log.error("重新加载路由权限失败, channelMsg:{}", channelMsg, e);
}
break;
case "update":
log.info("更新ISV路由权限信息isvRoutePermission:{}", isvRoutePermission);
update(isvRoutePermission);
break;
case "remove":
log.info("删除ISV路由权限信息isvRoutePermission:{}", isvRoutePermission);
remove(isvRoutePermission.getAppKey());
break;
default:
}
}
@PostConstruct
protected void after() throws Exception {
configService.addListener(NacosConfigs.DATA_ID_ROUTE_PERMISSION, NacosConfigs.GROUP_CHANNEL, new AbstractListener() {
@Override
public void receiveConfigInfo(String configInfo) {
ChannelMsg channelMsg = JSON.parseObject(configInfo, ChannelMsg.class);
final IsvRoutePermission isvRoutePermission = JSON.parseObject(channelMsg.getData(), IsvRoutePermission.class);
switch (channelMsg.getOperation()) {
case "reload":
log.info("重新加载路由权限信息isvRoutePermission:{}", isvRoutePermission);
try {
load();
} catch (Exception e) {
log.error("重新加载路由权限失败, channelMsg:{}", channelMsg, e);
}
break;
case "update":
log.info("更新ISV路由权限信息isvRoutePermission:{}", isvRoutePermission);
update(isvRoutePermission);
break;
case "remove":
log.info("删除ISV路由权限信息isvRoutePermission:{}", isvRoutePermission);
remove(isvRoutePermission.getAppKey());
break;
default:
}
ChannelMsg channelMsg = JSON.parseObject(configInfo, ChannelMsg.class);
process(channelMsg);
}
});
}
@Data
static class IsvRole {
private String appKey;
private String roleCode;
}
}

View File

@@ -24,7 +24,7 @@ import javax.annotation.PostConstruct;
*/
@Slf4j
@Service
public class DbLimitConfigManager extends DefaultLimitConfigManager {
public class DbLimitConfigManager extends DefaultLimitConfigManager implements ChannelMsgProcessor {
@Autowired
ConfigLimitMapper configLimitMapper;
@@ -39,7 +39,6 @@ public class DbLimitConfigManager extends DefaultLimitConfigManager {
public void load() {
Query query = new Query();
configLimitMapper.list(query)
.stream()
.forEach(configLimit -> putVal(configLimit));
}
@@ -50,25 +49,29 @@ public class DbLimitConfigManager extends DefaultLimitConfigManager {
this.update(configLimitDto);
}
@Override
public void process(ChannelMsg channelMsg) {
final ConfigLimitDto configLimitDto = channelMsg.toObject(ConfigLimitDto.class);
switch (channelMsg.getOperation()) {
case "reload":
log.info("重新加载限流配置信息configLimitDto:{}", configLimitDto);
load();
break;
case "update":
log.info("更新限流配置信息configLimitDto:{}", configLimitDto);
update(configLimitDto);
break;
default:
}
}
@PostConstruct
protected void after() throws Exception {
configService.addListener(NacosConfigs.DATA_ID_LIMIT_CONFIG, NacosConfigs.GROUP_CHANNEL, new AbstractListener() {
@Override
public void receiveConfigInfo(String configInfo) {
ChannelMsg channelMsg = JSON.parseObject(configInfo, ChannelMsg.class);
final ConfigLimitDto configLimitDto = JSON.parseObject(channelMsg.getData(), ConfigLimitDto.class);
switch (channelMsg.getOperation()) {
case "reload":
log.info("重新加载限流配置信息configLimitDto:{}", configLimitDto);
load();
break;
case "update":
log.info("更新限流配置信息configLimitDto:{}", configLimitDto);
update(configLimitDto);
break;
default:
}
ChannelMsg channelMsg = JSON.parseObject(configInfo, ChannelMsg.class);
process(channelMsg);
}
});
}

View File

@@ -27,7 +27,7 @@ import java.util.Collection;
*/
@Slf4j
@Service
public class DbRouteConfigManager extends DefaultRouteConfigManager {
public class DbRouteConfigManager extends DefaultRouteConfigManager implements ChannelMsgProcessor {
@Autowired
ConfigRouteBaseMapper configRouteBaseMapper;
@@ -72,25 +72,29 @@ public class DbRouteConfigManager extends DefaultRouteConfigManager {
this.doUpdate(routeId, object);
}
@Override
public void process(ChannelMsg channelMsg) {
final RouteConfig routeConfig = channelMsg.toObject( RouteConfig.class);
switch (channelMsg.getOperation()) {
case "reload":
log.info("重新加载路由配置信息routeConfigDto:{}", routeConfig);
load();
break;
case "update":
log.info("更新路由配置信息routeConfigDto:{}", routeConfig);
update(routeConfig);
break;
default:
}
}
@PostConstruct
protected void after() throws Exception {
configService.addListener(NacosConfigs.DATA_ID_ROUTE_CONFIG, NacosConfigs.GROUP_CHANNEL, new AbstractListener() {
@Override
public void receiveConfigInfo(String configInfo) {
ChannelMsg channelMsg = JSON.parseObject(configInfo, ChannelMsg.class);
final RouteConfig routeConfig = JSON.parseObject(channelMsg.getData(), RouteConfig.class);
switch (channelMsg.getOperation()) {
case "reload":
log.info("重新加载路由配置信息routeConfigDto:{}", routeConfig);
load();
break;
case "update":
log.info("更新路由配置信息routeConfigDto:{}", routeConfig);
update(routeConfig);
break;
default:
}
ChannelMsg channelMsg = JSON.parseObject(configInfo, ChannelMsg.class);
process(channelMsg);
}
});
}

View File

@@ -0,0 +1,68 @@
package com.gitee.sop.gateway.manager;
import com.alibaba.fastjson.JSON;
import com.gitee.fastmybatis.core.query.Query;
import com.gitee.sop.gateway.entity.ConfigServiceRoute;
import com.gitee.sop.gateway.mapper.ConfigServiceRouteMapper;
import com.gitee.sop.gatewaycommon.bean.InstanceDefinition;
import com.gitee.sop.gatewaycommon.bean.ServiceRouteInfo;
import com.gitee.sop.gatewaycommon.route.RoutesProcessor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
import java.util.stream.Collectors;
/**
* @author tanghc
*/
@Slf4j
@Component
public class DbRoutesProcessor implements RoutesProcessor {
@Autowired
private ConfigServiceRouteMapper configServiceRouteMapper;
@Override
public void removeAllRoutes(String serviceId) {
// 删除serviceId下所有的路由
Query delServiceQuery = new Query().eq("service_id", serviceId);
configServiceRouteMapper.deleteByQuery(delServiceQuery);
}
@Override
@Transactional
public void saveRoutes(ServiceRouteInfo serviceRouteInfo, InstanceDefinition instance) {
log.info("保存路由信息到数据库instance: {}", instance);
String serviceId = serviceRouteInfo.getServiceId();
List<ConfigServiceRoute> configServiceRoutes = serviceRouteInfo
.getRouteDefinitionList()
.parallelStream()
.map(routeDefinition -> {
ConfigServiceRoute configServiceRoute = new ConfigServiceRoute();
configServiceRoute.setId(routeDefinition.getId());
configServiceRoute.setName(routeDefinition.getName());
configServiceRoute.setVersion(routeDefinition.getVersion());
configServiceRoute.setUri(routeDefinition.getUri());
configServiceRoute.setPath(routeDefinition.getPath());
configServiceRoute.setFilters(JSON.toJSONString(routeDefinition.getFilters()));
configServiceRoute.setPredicates(JSON.toJSONString(routeDefinition.getPredicates()));
configServiceRoute.setIgnoreValidate((byte) routeDefinition.getIgnoreValidate());
configServiceRoute.setMergeResult((byte) routeDefinition.getMergeResult());
configServiceRoute.setStatus((byte) routeDefinition.getStatus());
configServiceRoute.setPermission((byte) routeDefinition.getPermission());
configServiceRoute.setOrder(routeDefinition.getOrder());
configServiceRoute.setServiceId(serviceId);
return configServiceRoute;
})
.collect(Collectors.toList());
// 删除serviceId下所有的路由
this.removeAllRoutes(serviceId);
// 批量保存
configServiceRouteMapper.saveBatch(configServiceRoutes);
}
}

View File

@@ -0,0 +1,11 @@
package com.gitee.sop.gateway.mapper;
import com.gitee.fastmybatis.core.mapper.CrudMapper;
import com.gitee.sop.gateway.entity.ConfigServiceRoute;
/**
* @author tanghc
*/
public interface ConfigServiceRouteMapper extends CrudMapper<ConfigServiceRoute, Long> {
}