This commit is contained in:
tanghc
2021-03-02 17:41:41 +08:00
parent 181b881bc5
commit 995d5ba623
16 changed files with 146 additions and 52 deletions

View File

@@ -4,11 +4,10 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.gitee.sop</groupId>
<artifactId>sop-parent</artifactId>
<artifactId>sop-common</artifactId>
<version>${revision}</version>
<relativePath>../../pom.xml</relativePath> <!-- lookup parent from repository -->
<relativePath>../pom.xml</relativePath> <!-- lookup parent from repository -->
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>sop-bridge-eureka</artifactId>

View File

@@ -4,11 +4,10 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.gitee.sop</groupId>
<artifactId>sop-parent</artifactId>
<artifactId>sop-common</artifactId>
<version>${revision}</version>
<relativePath>../../pom.xml</relativePath> <!-- lookup parent from repository -->
<relativePath>../pom.xml</relativePath> <!-- lookup parent from repository -->
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>sop-bridge-nacos</artifactId>

View File

@@ -4,9 +4,9 @@
<parent>
<groupId>com.gitee.sop</groupId>
<artifactId>sop-parent</artifactId>
<artifactId>sop-common</artifactId>
<version>${revision}</version>
<relativePath>../../pom.xml</relativePath> <!-- lookup parent from repository -->
<relativePath>../pom.xml</relativePath> <!-- lookup parent from repository -->
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@@ -7,6 +7,7 @@ import org.springframework.core.env.PropertiesPropertySource;
import org.springframework.core.env.PropertySource;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.util.unit.DataSize;
import java.io.IOException;
import java.util.Properties;
@@ -31,9 +32,17 @@ public class SopGatewayEnvironmentPostProcessor implements EnvironmentPostProces
}
try {
properties.load(resource.getInputStream());
this.initMemSize(properties);
return new PropertiesPropertySource(resource.getFilename(), properties);
} catch (IOException ex) {
throw new IllegalStateException("加载配置文件失败" + resource, ex);
}
}
private void initMemSize(Properties properties) {
String size = properties.getProperty("spring.servlet.multipart.max-file-size", "10M");
DataSize dataSize = DataSize.parse(size);
properties.putIfAbsent("spring.codec.max-in-memory-size", dataSize.toBytes());
}
}

View File

@@ -16,13 +16,12 @@ import org.apache.commons.lang3.StringUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.http.converter.FormHttpMessageConverter;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.util.MultiValueMap;
import org.springframework.web.multipart.MultipartHttpServletRequest;
import org.springframework.web.multipart.commons.CommonsMultipartResolver;
import org.springframework.web.reactive.function.server.HandlerStrategies;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain;
@@ -47,8 +46,6 @@ public class ServerWebExchangeUtil {
private static final FormHttpMessageConverter formHttpMessageConverter = new FormHttpMessageConverter();
private static final List<HttpMessageReader<?>> messageReaders = HandlerStrategies.withDefaults().messageReaders();
/**
* 重定向
*
@@ -70,10 +67,11 @@ public class ServerWebExchangeUtil {
* 构建一个接受请求体的request
*
* @param exchange exchange
* @param codecConfigurer codecConfigurer
* @return 返回ServerRequest
*/
public static ServerRequest createReadBodyRequest(ServerWebExchange exchange) {
return ServerRequest.create(exchange, messageReaders);
public static ServerRequest createReadBodyRequest(ServerWebExchange exchange, ServerCodecConfigurer codecConfigurer) {
return ServerRequest.create(exchange, codecConfigurer.getReaders());
}
public static String getRestfulPath(String path, String prefix) {

View File

@@ -14,7 +14,9 @@ import com.gitee.sop.gatewaycommon.manager.RouteConfigManager;
import com.gitee.sop.gatewaycommon.secret.IsvManager;
import com.gitee.sop.gatewaycommon.util.RequestUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.function.server.ServerRequest;
@@ -45,9 +47,12 @@ public class ConfigChannelController {
@Value("${sop.secret}")
private String secret;
@Autowired
private ServerCodecConfigurer codecConfigurer;
@PostMapping("/sop/configChannelMsg")
public Mono<String> configChannel(ServerWebExchange exchange) {
ServerRequest serverRequest = ServerWebExchangeUtil.createReadBodyRequest(exchange);
ServerRequest serverRequest = ServerWebExchangeUtil.createReadBodyRequest(exchange, codecConfigurer);
// 读取请求体中的内容
return serverRequest.bodyToMono(String.class)
.flatMap(requestJson -> {

View File

@@ -5,36 +5,58 @@ import com.gitee.sop.gatewaycommon.bean.SopConstants;
import com.gitee.sop.gatewaycommon.result.ResultExecutor;
import org.apache.commons.lang3.StringUtils;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.cloud.gateway.filter.NettyWriteResponseFilter;
import org.springframework.cloud.gateway.filter.factory.rewrite.CachedBodyOutputMessage;
import org.springframework.cloud.gateway.filter.factory.rewrite.MessageBodyEncoder;
import org.springframework.cloud.gateway.support.BodyInserterContext;
import org.springframework.cloud.gateway.support.DefaultClientResponse;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseCookie;
import org.springframework.http.client.reactive.ClientHttpResponse;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.util.MultiValueMap;
import org.springframework.web.reactive.function.BodyInserter;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.ExchangeStrategies;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import javax.annotation.PostConstruct;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static java.util.function.Function.identity;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR;
/**
* 修改返回结果
*
* @author tanghc
*/
public class GatewayModifyResponseGatewayFilter implements GlobalFilter, Ordered {
@Autowired
private ServerCodecConfigurer codecConfigurer;
@Autowired
private Set<MessageBodyEncoder> bodyEncoders;
private Map<String, MessageBodyEncoder> messageBodyEncoders;
@Override
@SuppressWarnings("unchecked")
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
@@ -54,12 +76,14 @@ public class GatewayModifyResponseGatewayFilter implements GlobalFilter, Ordered
}
Class inClass = String.class;
Class outClass = String.class;
HttpHeaders httpHeaders = new HttpHeaders();
//explicitly add it in this way instead of 'httpHeaders.setContentType(originalResponseContentType)'
//this will prevent exception in case of using non-standard media types like "Content-Type: image"
// explicitly add it in this way instead of
// 'httpHeaders.setContentType(originalResponseContentType)'
// this will prevent exception in case of using non-standard media
// types like "Content-Type: image"
httpHeaders.add(HttpHeaders.CONTENT_TYPE, originalResponseContentType);
ResponseAdapter responseAdapter = new ResponseAdapter(body, httpHeaders);
DefaultClientResponse clientResponse = new DefaultClientResponse(responseAdapter, ExchangeStrategies.withDefaults());
ClientResponse clientResponse = prepareClientResponse(exchange, body, httpHeaders);
//TODO: flux or mono
Mono modifiedBody = clientResponse.bodyToMono(inClass)
@@ -72,16 +96,21 @@ public class GatewayModifyResponseGatewayFilter implements GlobalFilter, Ordered
return Mono.just(ret);
});
BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, outClass);
CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, exchange.getResponse().getHeaders());
BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody,
outClass);
CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange,
exchange.getResponse().getHeaders());
return bodyInserter.insert(outputMessage, new BodyInserterContext())
.then(Mono.defer(() -> {
Flux<DataBuffer> messageBody = outputMessage.getBody();
Mono<DataBuffer> messageBody = writeBody(getDelegate(),
outputMessage, outClass);
HttpHeaders headers = getDelegate().getHeaders();
if (!headers.containsKey(HttpHeaders.TRANSFER_ENCODING)) {
messageBody = messageBody.doOnNext(data -> headers.setContentLength(data.readableByteCount()));
if (!headers.containsKey(HttpHeaders.TRANSFER_ENCODING)
|| headers.containsKey(HttpHeaders.CONTENT_LENGTH)) {
messageBody = messageBody.doOnNext(data -> headers
.setContentLength(data.readableByteCount()));
}
//TODO: use isStreamingMediaType?
// TODO: fail if isStreamingMediaType?
return getDelegate().writeWith(messageBody);
}));
}
@@ -96,6 +125,42 @@ public class GatewayModifyResponseGatewayFilter implements GlobalFilter, Ordered
return chain.filter(exchange.mutate().response(responseDecorator).build());
}
private ClientResponse prepareClientResponse(
ServerWebExchange exchange,
Publisher<? extends DataBuffer> body,
HttpHeaders httpHeaders
) {
ClientResponse.Builder builder;
builder = ClientResponse.create(exchange.getResponse().getStatusCode(), codecConfigurer.getReaders());
return builder.headers(headers -> headers.putAll(httpHeaders))
.body(Flux.from(body)).build();
}
private Mono<DataBuffer> writeBody(ServerHttpResponse httpResponse,
CachedBodyOutputMessage message, Class<?> outClass) {
Mono<DataBuffer> response = DataBufferUtils.join(message.getBody());
if (byte[].class.isAssignableFrom(outClass)) {
return response;
}
List<String> encodingHeaders = httpResponse.getHeaders()
.getOrEmpty(HttpHeaders.CONTENT_ENCODING);
for (String encoding : encodingHeaders) {
MessageBodyEncoder encoder = messageBodyEncoders.get(encoding);
if (encoder != null) {
DataBufferFactory dataBufferFactory = httpResponse.bufferFactory();
response = response.publishOn(Schedulers.parallel()).map(buffer -> {
byte[] encodedResponse = encoder.encode(buffer);
DataBufferUtils.release(buffer);
return encodedResponse;
}).map(dataBufferFactory::wrap);
break;
}
}
return response;
}
@Override
public int getOrder() {
return NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER - 1;
@@ -140,4 +205,10 @@ public class GatewayModifyResponseGatewayFilter implements GlobalFilter, Ordered
return null;
}
}
@PostConstruct
public void after() {
this.messageBodyEncoders = bodyEncoders == null ? Collections.emptyMap() : bodyEncoders.stream()
.collect(Collectors.toMap(MessageBodyEncoder::encodingType, identity()));
}
}

View File

@@ -20,6 +20,7 @@ import org.springframework.core.annotation.Order;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.web.reactive.function.BodyInserter;
@@ -45,6 +46,9 @@ import java.util.Objects;
@Order(Ordered.HIGHEST_PRECEDENCE)
public class IndexFilter implements WebFilter {
@Autowired
private ServerCodecConfigurer codecConfigurer;
/** 路径白名单 */
private static final List<String> PATH_WHITE_LIST = Arrays.asList(
"/sop", "/actuator"
@@ -78,7 +82,7 @@ public class IndexFilter implements WebFilter {
}
if (Objects.equals(path, indexPath) || "".equals(path)) {
if (request.getMethod() == HttpMethod.POST) {
ServerRequest serverRequest = ServerWebExchangeUtil.createReadBodyRequest(exchange);
ServerRequest serverRequest = ServerWebExchangeUtil.createReadBodyRequest(exchange, codecConfigurer);
// 读取请求体中的内容
Mono<?> modifiedBody = serverRequest.bodyToMono(byte[].class)
.switchIfEmpty(Mono.just("".getBytes()))

View File

@@ -93,7 +93,7 @@ public abstract class BaseExecutorAdapter<T, R> implements ResultExecutor<T, R>
serviceResult = formatResult(serviceResult);
boolean isMergeResult = this.isMergeResult(request);
int responseStatus = this.getResponseStatus(request);
// this.doAfterRoute(serviceResult, responseStatus, request);
this.doAfterRoute(serviceResult, responseStatus, request);
String finalResult;
if (isMergeResult) {
Map<String, Object> responseData = this.parseServiceResult(serviceResult, responseStatus, request);

View File

@@ -5,9 +5,9 @@
<parent>
<groupId>com.gitee.sop</groupId>
<artifactId>sop-parent</artifactId>
<artifactId>sop-common</artifactId>
<version>${revision}</version>
<relativePath>../../pom.xml</relativePath> <!-- lookup parent from repository -->
<relativePath>../pom.xml</relativePath> <!-- lookup parent from repository -->
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@@ -1,6 +1,7 @@
package com.gitee.sop.servercommon.configuration;
import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
import com.alibaba.cloud.nacos.NacosServiceManager;
import com.alibaba.cloud.nacos.discovery.NacosWatch;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.ObjectProvider;
@@ -8,7 +9,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.core.env.Environment;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import java.util.Map;
@@ -21,7 +22,12 @@ public class ServiceConfiguration extends SpringmvcConfiguration {
@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty("spring.cloud.nacos.discovery.server-addr")
public NacosWatch nacosWatch(NacosDiscoveryProperties nacosDiscoveryProperties, ObjectProvider<TaskScheduler> taskScheduler, Environment environment) {
public NacosWatch nacosWatch(
NacosServiceManager nacosServiceManager,
NacosDiscoveryProperties nacosDiscoveryProperties,
ObjectProvider<ThreadPoolTaskScheduler> taskScheduler,
Environment environment
) {
Map<String, String> metadata = nacosDiscoveryProperties.getMetadata();
String contextPath = environment.getProperty(METADATA_SERVER_CONTEXT_PATH);
// 将context-path信息加入到metadata中
@@ -31,7 +37,7 @@ public class ServiceConfiguration extends SpringmvcConfiguration {
// 在元数据中新增启动时间,不能修改这个值,不然网关拉取接口会有问题
// 如果没有这个值,网关会忽略这个服务
metadata.put("server.startup-time", String.valueOf(System.currentTimeMillis()));
return new NacosWatch(nacosDiscoveryProperties, taskScheduler);
return new NacosWatch(nacosServiceManager, nacosDiscoveryProperties, taskScheduler);
}
}