Only one connection receive subscriber allowed解决方案
前言
路由SpringCloud Gateway做为代理层请求数据 接口的时可能会遇到很多问题,如何有效的避免掉这些问题?需要一个踩坑人,然后把遇到的问题详细的汇聚成一篇篇的文档展现给开发者,大家好,我就是那个“踩坑人”,也立志于汇聚有效解决方案,帮助大家。
问题描述
spring cloud gateway接收前端请求,然后请求反向代理到服务器。
当请求 method 为 GET 时,可以顺利通过。但是请求 method 为 POST 时,路由则会报如下错误:
{timestamp: "2018-12-27T03:18:58.852+0000", path: "/service12/getUsers",
status: 500,…} error: "Internal Server Error" message: "Only one connection
receive subscriber allowed." path: "/service12/getUsers" status: 500 timestamp:
"2018-12-27T03:18:58.852+0000"
截图
排查问题
当遇到接口调用不通过,仔细一想今天上午就是将路由稍微改了下,而且通过postman测试服务端接口是正确的,所以快速定位了问题所在:路由发送Post请求会遇到这个问题
。
spring-cloud-gateway反向代理的原理是,首先读取原请求的数据,然后构造一个新的请求,将原请求的数据封装到新的请求中,然后再转发出去。然而我们在他封装之前读取了一次request
body,而request body只能读取一次
如下是报错地方:
解决方案:
读取request body的时候,我们再封装一次request,转发出去
@Component public class AuthSignatureFilter implements GlobalFilter ,Ordered {
@Autowired private RoutePlugService routePlugService; private Logger logger=
LoggerFactory.getLogger(AuthSignatureFilter.class); @Override public Mono<Void>
filter(ServerWebExchange exchange, GatewayFilterChain chain) {
logger.info("/********route-plug获取详细信息***************/"); ServerHttpRequest
request = exchange.getRequest(); RoutePlug routePlug=new RoutePlug(); String
url = exchange.getRequest().getPath().pathWithinApplication().value(); URI
requestUri = request.getURI(); String
method=exchange.getRequest().getMethodValue(); //地址
logger.info("请求URL:{}",url); logger.info("requestUri:{}",requestUri);
logger.info("method:{}",method); routePlug.setUrl(url);
routePlug.setUri(requestUri.toString()); routePlug.setMethod(method); //开始 时间
exchange.getAttributes().put("startTime", System.currentTimeMillis()); //参数
logger.info("QueryParams:{}",exchange.getRequest().getQueryParams());
logger.info("QueryParamsJSON:{}",JSON.toJSON(exchange.getRequest().getQueryParams()));
routePlug.setQueryParams(JSON.toJSON(exchange.getRequest().getQueryParams()).toString());
HttpHeaders headers=exchange.getRequest().getHeaders(); String contentType =
headers.getFirst("Content-Type"); logger.info("Host:{}",headers.getHost());
logger.info("contentType", contentType); logger.info("headersJson:{}",
JSON.toJSON(headers)); routePlug.setHost(headers.getHost().toString());
routePlug.setQueryHeard(JSON.toJSON(headers).toString()); URI ex =
UriComponentsBuilder.fromUri(requestUri).build(true).toUri(); ServerHttpRequest
newRequest = request.mutate().uri(ex).build(); //记录发送的参数:获取requstBody体中信息 if
("POST".equals(method) && !contentType.startsWith("multipart/form-data")){
String bodyStr = resolveBodyFromRequest(request); //下面将请求体再次封装写回到 request
里,传到下一级. DataBuffer bodyDataBuffer = stringBuffer(bodyStr); Flux<DataBuffer>
bodyFlux = Flux.just(bodyDataBuffer); newRequest = new
ServerHttpRequestDecorator(newRequest) { @Override public Flux<DataBuffer>
getBody() { return bodyFlux; } }; routePlug.setBody(formatStr(bodyStr));
logger.info("requesBody:{}",bodyStr);
logger.info("requesBody:{}",formatStr(bodyStr)); } ....省略掉一些代码 //记录response的
返回数据 ServerHttpResponse originalResponse = exchange.getResponse();
DataBufferFactory bufferFactory = originalResponse.bufferFactory();
ServerHttpResponseDecorator decoratedResponse = new
ServerHttpResponseDecorator(originalResponse) { @Override public Mono<Void>
writeWith(Publisher<? extends DataBuffer> body) { if (body instanceof Flux) {
Flux<? extends DataBuffer> fluxBody = (Flux<? extends DataBuffer>) body; return
super.writeWith(fluxBody.map(dataBuffer -> { byte[] content = new
byte[dataBuffer.readableByteCount()]; dataBuffer.read(content); //释放掉内存
DataBufferUtils.release(dataBuffer); /* String s = new String(content,
Charset.forName("UTF-8")); //TODO,s就是response的值,想修改、查看就随意而为了 byte[]
uppedContent = new String(content, Charset.forName("UTF-8")).getBytes();*/
String responseData = null; try { //赋值给实体类 responseData =
IOUtils.toString(content); routePlug.setSize(responseData.getBytes().length);
routePlug.setResultdata(responseData); //请求用时 Long startTime =
exchange.getAttribute("startTime"); if (startTime != null) { long executeTime =
(System.currentTimeMillis() - startTime);
routePlug.setUsetime(Integer.parseInt(executeTime+"")); }
routePlugService.save(routePlug); } catch (IOException e) {
e.printStackTrace(); } logger.debug("/*************返回content*******/"); return
bufferFactory.wrap(content); })); } // if body is not a flux. never got there.
return super.writeWith(body); } }; // return
chain.filter(exchange.mutate().request(request).build()); return
chain.filter(exchange.mutate().request(newRequest).response(decoratedResponse).build());
} //补充方法 private DataBuffer stringBuffer(String value){ byte[] bytes =
value.getBytes(StandardCharsets.UTF_8); NettyDataBufferFactory
nettyDataBufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
DataBuffer buffer = nettyDataBufferFactory.allocateBuffer(bytes.length);
buffer.write(bytes); return buffer; } ..........省略掉n行代码 }
如上代码,要说到是获取body内容之后,我们再如何处理,关键步骤在返回设置,开始只用了第一个return(如下)
总体解决方案:变成第二个return之后就ok了,需要封装后再转发到下一个filter
// return chain.filter(exchange.mutate().request(request).build()); return
chain.filter(exchange.mutate().request(newRequest).response(decoratedResponse).build());
END
动态路由设置
获取请求体参数
记录访问时间、流量大小、IP限制、流量限制参考订阅SpringCloud专栏项目。
热门工具 换一换