Merge branch 'master' into registry-nacos

This commit is contained in:
tanghc
2019-08-13 15:34:19 +08:00
40 changed files with 982 additions and 198 deletions

View File

@@ -25,7 +25,7 @@
<dependency>
<groupId>com.gitee.sop</groupId>
<artifactId>sop-registry-api</artifactId>
<version>1.14.0-SNAPSHOT</version>
<version>1.15.0-SNAPSHOT</version>
</dependency>
<dependency>

View File

@@ -1,5 +1,6 @@
package com.gitee.sop.websiteserver.bean;
import com.alibaba.fastjson.annotation.JSONField;
import lombok.Data;
import java.util.List;
@@ -10,5 +11,7 @@ import java.util.List;
@Data
public class DocInfo {
private String title;
@JSONField(serialize = false)
private String serviceId;
private List<DocModule> docModuleList;
}

View File

@@ -8,4 +8,9 @@ public class WebsiteConstants {
* zookeeper存放接口路由信息的根目录
*/
public static final String SOP_SERVICE_ROUTE_PATH = "/com.gitee.sop.route";
/**
* 服务临时节点
*/
public static final String SOP_SERVICE_TEMP_PATH = "/com.gitee.sop.service.tmp";
}

View File

@@ -5,11 +5,18 @@ import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.core.env.Environment;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.function.Consumer;
/**
* @author tanghc
@@ -50,6 +57,10 @@ public class ZookeeperContext {
return WebsiteConstants.SOP_SERVICE_ROUTE_PATH;
}
public static String getServiceTempRootPath() {
return WebsiteConstants.SOP_SERVICE_TEMP_PATH;
}
public static CuratorFramework getClient() {
return client;
}
@@ -58,10 +69,99 @@ public class ZookeeperContext {
ZookeeperContext.client = client;
}
public static boolean isPathExist(String path) {
try {
return client.checkExists().forPath(path) != null;
} catch (Exception e) {
return false;
}
}
/**
* 是否有子节点
* @param parentPath
* @return
* @throws Exception
*/
public static boolean hasChildren(String parentPath) throws Exception {
List<String> children = client.getChildren().forPath(parentPath);
return !CollectionUtils.isEmpty(children);
}
/**
* 创建path如果path存在不报错静默返回path名称
*
* @param path
* @param data
* @return
* @throws Exception
*/
public static String createPath(String path, String data) throws Exception {
if (isPathExist(path)) {
return path;
}
return getClient().create()
// 如果指定节点的父节点不存在则Curator将会自动级联创建父节点
.creatingParentContainersIfNeeded()
.forPath(path, data.getBytes());
}
/**
* 获取子节点信息并监听子节点
*
* @param parentPath 父节点路径
* @param listConsumer 子节点数据
* @param listener 监听事件
* @throws Exception
*/
public static void getChildrenAndListen(String parentPath, Consumer<List<ChildData>> listConsumer, PathChildrenCacheListener listener) throws Exception {
// 为子节点添加watcher
// PathChildrenCache: 监听数据节点的增删改,可以设置触发的事件
PathChildrenCache childrenCache = new PathChildrenCache(client, parentPath, true);
/**
* StartMode: 初始化方式
* POST_INITIALIZED_EVENT异步初始化初始化之后会触发事件
* NORMAL异步初始化
* BUILD_INITIAL_CACHE同步初始化
*/
childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
// 列出子节点数据列表需要使用BUILD_INITIAL_CACHE同步初始化模式才能获得异步是获取不到的
List<ChildData> childDataList = childrenCache.getCurrentData();
listConsumer.accept(childDataList);
log.info("监听子节点增删改,监听路径:{}", parentPath);
// 监听根节点下面的子节点
childrenCache.getListenable().addListener(listener);
}
/**
* 监听子节点的增删改
*
* @param parentPath 父节点路径
* @param listener
* @throws Exception
*/
public static void listenChildren(String parentPath, PathChildrenCacheListener listener) throws Exception {
// 为子节点添加watcher
// PathChildrenCache: 监听数据节点的增删改,可以设置触发的事件
PathChildrenCache childrenCache = new PathChildrenCache(client, parentPath, true);
/**
* StartMode: 初始化方式
* POST_INITIALIZED_EVENT异步初始化初始化之后会触发事件
* NORMAL异步初始化
* BUILD_INITIAL_CACHE同步初始化
*/
childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
// 监听根节点下面的子节点
childrenCache.getListenable().addListener(listener);
}
/**
* 监听子节点,可以自定义层级
* @param parentPath 父节点路径
* @param maxDepth 层级从1开始。比如当前监听节点/t1目录最深为/t1/t2/t3/t4,则maxDepth=3,说明下面3级子目录全
* @param maxDepth 层级从1开始。比如当前监听节点/t1目录最深为/t1/t2/t3/t4,则maxDepth=3,说明下面3级子目录全部监听
* @param listener
* @throws Exception
*/
@@ -73,7 +173,6 @@ public class ZookeeperContext {
.build();
treeCache.getListenable().addListener(listener);
//没有开启模式作为入参的方法
treeCache.start();
}

View File

@@ -1,7 +1,6 @@
package com.gitee.sop.websiteserver.controller;
import com.gitee.sop.websiteserver.bean.DocInfo;
import com.gitee.sop.websiteserver.bean.DocItem;
import com.gitee.sop.websiteserver.manager.DocManager;
import com.gitee.sop.websiteserver.vo.DocBaseInfoVO;
import com.gitee.sop.websiteserver.vo.DocInfoVO;
@@ -59,11 +58,6 @@ public class DocController {
return docManager.getByTitle(title);
}
@GetMapping("/item/{method}/{version}/")
public DocItem getDocItem(@PathVariable("method") String method, @PathVariable("version") String version) {
return docManager.get(method, version);
}
// 后门地址,可手动更新文档内容,一般情况下用不到
@GetMapping("/reload")

View File

@@ -1,7 +1,6 @@
package com.gitee.sop.websiteserver.manager;
import com.gitee.sop.websiteserver.bean.DocInfo;
import com.gitee.sop.websiteserver.bean.DocItem;
import java.util.Collection;
@@ -12,8 +11,6 @@ public interface DocManager {
void load(String serviceId);
DocItem get(String method, String version);
DocInfo getByTitle(String title);
Collection<DocInfo> listAll();

View File

@@ -7,12 +7,14 @@ import com.gitee.sop.registryapi.bean.ServiceInfo;
import com.gitee.sop.registryapi.bean.ServiceInstance;
import com.gitee.sop.registryapi.service.RegistryService;
import com.gitee.sop.websiteserver.bean.DocInfo;
import com.gitee.sop.websiteserver.bean.DocItem;
import com.gitee.sop.websiteserver.bean.ZookeeperContext;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.env.Environment;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
@@ -41,21 +43,17 @@ import java.util.stream.Collectors;
public class DocManagerImpl implements DocManager {
// key:title
Map<String, DocInfo> docDefinitionMap = new HashMap<>();
private Map<String, DocInfo> docDefinitionMap = new HashMap<>();
// key: name+version
Map<String, DocItem> docItemMap = new HashMap<>();
private RestTemplate restTemplate = new RestTemplate();
private DocParser swaggerDocParser = new SwaggerDocParser();
RestTemplate restTemplate = new RestTemplate();
private DocParser easyopenDocParser = new EasyopenDocParser();
DocParser swaggerDocParser = new SwaggerDocParser();
private ExecutorService executorService = Executors.newSingleThreadExecutor();
DocParser easyopenDocParser = new EasyopenDocParser();
ExecutorService executorService = Executors.newSingleThreadExecutor();
DelayQueue<Msg> queue = new DelayQueue<>();
private DelayQueue<Msg> queue = new DelayQueue<>();
private String secret = "b749a2ec000f4f29";
@@ -65,14 +63,13 @@ public class DocManagerImpl implements DocManager {
@Autowired
private RegistryService registryService;
private volatile boolean listenInited;
@Value("${doc.refresh-seconds:60}")
private String refreshSeconds;
@Override
public void load(String serviceId) {
try {
List<ServiceInfo> serviceInfoList = registryService.listAllService(1, 9999);
log.info("服务列表:{}", serviceInfoList);
serviceInfoList
.stream()
// 网关没有文档提供,需要排除
@@ -105,6 +102,7 @@ public class DocManagerImpl implements DocManager {
JSONObject docRoot = JSON.parseObject(docInfoJson, Feature.OrderedField, Feature.DisableCircularReferenceDetect);
DocParser docParser = this.buildDocParser(docRoot);
DocInfo docInfo = docParser.parseJson(docRoot);
docInfo.setServiceId(serviceInstance.getServiceId());
docDefinitionMap.put(docInfo.getTitle(), docInfo);
} catch (Exception e) {
// 这里报错可能是因为有些微服务没有配置swagger文档导致404访问不到
@@ -129,11 +127,6 @@ public class DocManagerImpl implements DocManager {
}
}
@Override
public DocItem get(String method, String version) {
return docItemMap.get(method + version);
}
@Override
public DocInfo getByTitle(String title) {
return docDefinitionMap.get(title);
@@ -159,28 +152,73 @@ public class DocManagerImpl implements DocManager {
executorService.execute(new Consumer(queue, this));
ZookeeperContext.setEnvironment(environment);
String routeRootPath = ZookeeperContext.getRouteRootPath();
String serviceTempRootPath = ZookeeperContext.getServiceTempRootPath();
ZookeeperContext.createPath(serviceTempRootPath, "{}");
// 如果节点内容有变化则自动更新文档
ZookeeperContext.listenChildren(routeRootPath, 1, (client, event) -> {
if (listenInited) {
ZookeeperContext.getChildrenAndListen(serviceTempRootPath, childDataList -> {
for (ChildData childData : childDataList) {
String serviceIdPath = childData.getPath();
try {
boolean hasChildren = ZookeeperContext.hasChildren(serviceIdPath);
if (hasChildren) {
log.info("加载文档服务器path:{}", serviceIdPath);
listenServiceIdPath(serviceIdPath);
}
} catch (Exception e) {
log.error("监听路径失败serviceIdPath{}", serviceIdPath);
}
}
}, (client, event) -> {
PathChildrenCacheEvent.Type type = event.getType();
if (type == PathChildrenCacheEvent.Type.CHILD_ADDED) {
String serviceIdPath = event.getData().getPath();
log.info("新增文档服务器path:{}", serviceIdPath);
listenServiceIdPath(serviceIdPath);
}
});
}
private void listenServiceIdPath(String serviceIdPath) throws Exception {
ZookeeperContext.listenChildren(serviceIdPath, (client, event) -> {
String path = event.getData().getPath();
PathChildrenCacheEvent.Type type = event.getType();
log.info("服务节点变更path:{}, eventType:{}", path, event.getType().name());
if (type == PathChildrenCacheEvent.Type.CHILD_ADDED
|| type == PathChildrenCacheEvent.Type.CHILD_UPDATED) {
byte[] data = event.getData().getData();
String serviceInfoJson = new String(data);
if (StringUtils.isEmpty(serviceInfoJson)) {
return;
}
ZKServiceInfo serviceInfo = JSON.parseObject(serviceInfoJson, ZKServiceInfo.class);
String serviceId = serviceInfo.getServiceId();
int delaySeconds = NumberUtils.toInt(refreshSeconds, 60);
log.info("微服务[{}]推送更新,{}秒后加载文档内容", serviceId, delaySeconds);
long id = System.currentTimeMillis();
Msg msg = new Msg(id, delaySeconds * 1000);
msg.serviceId = serviceId;
// 延迟20秒执行
queue.offer(msg);
} else if (event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED) {
byte[] data = event.getData().getData();
String serviceInfoJson = new String(data);
ZKServiceInfo serviceInfo = JSON.parseObject(serviceInfoJson, ZKServiceInfo.class);
String serviceId = serviceInfo.getServiceId();
log.info("微服务[{}]推送更新", serviceId);
Msg msg = new Msg(id, 1000 * 20);
msg.serviceId = serviceId;
// 延迟20秒执行
queue.offer(msg);
}
TreeCacheEvent.Type type = event.getType();
if (type == TreeCacheEvent.Type.INITIALIZED) {
listenInited = true;
boolean hasChildren = ZookeeperContext.hasChildren(serviceIdPath);
// 如果没有子节点就删除
if (!hasChildren) {
log.info("服务节点已删除,删除对应文档信息,path:{}", event.getData().getPath());
removeDoc(serviceId);
}
}
});
}
public void removeDoc(String serviceId) {
docDefinitionMap.entrySet().removeIf(entry -> serviceId.equalsIgnoreCase(entry.getValue().getServiceId()));
}
static class Msg implements Delayed {
private long id;
private long delay;
@@ -197,8 +235,7 @@ public class DocManagerImpl implements DocManager {
@Override
public int compareTo(Delayed delayed) {
Msg msg = (Msg) delayed;
return Long.valueOf(this.id) > Long.valueOf(msg.id) ? 1
: (Long.valueOf(this.id) < Long.valueOf(msg.id) ? -1 : 0);
return Long.compare(this.id, msg.id);
}
// 延迟任务是否到时就是按照这个方法判断如果返回的是负数则说明到期否则还没到期