Only one connection receive subscriber allowed解决方案
前言
路由SpringCloud Gateway做为代理层请求数据 接口的时可能会遇到很多问题,如何有效的避免掉这些问题?需要一个踩坑人,然后把遇到的问题详细的汇聚成一篇篇的文档展现给开发者,大家好,我就是那个“踩坑人”,也立志于汇聚有效解决方案,帮助大家。
问题描述
spring cloud gateway接收前端请求,然后请求反向代理到服务器。
当请求 method 为 GET 时,可以顺利通过。但是请求 method 为 POST 时,路由则会报如下错误:
{timestamp: "2018-12-27T03:18:58.852+0000", path: "/service12/getUsers", 
status: 500,…} error: "Internal Server Error" message: "Only one connection 
receive subscriber allowed." path: "/service12/getUsers" status: 500 timestamp: 
"2018-12-27T03:18:58.852+0000" 
截图
排查问题
 当遇到接口调用不通过,仔细一想今天上午就是将路由稍微改了下,而且通过postman测试服务端接口是正确的,所以快速定位了问题所在:路由发送Post请求会遇到这个问题 
。
spring-cloud-gateway反向代理的原理是,首先读取原请求的数据,然后构造一个新的请求,将原请求的数据封装到新的请求中,然后再转发出去。然而我们在他封装之前读取了一次request 
body,而request body只能读取一次
如下是报错地方:
解决方案:
读取request body的时候,我们再封装一次request,转发出去
 
@Component public class AuthSignatureFilter implements GlobalFilter ,Ordered { 
@Autowired private RoutePlugService routePlugService; private Logger logger= 
LoggerFactory.getLogger(AuthSignatureFilter.class); @Override public Mono<Void> 
filter(ServerWebExchange exchange, GatewayFilterChain chain) { 
logger.info("/********route-plug获取详细信息***************/"); ServerHttpRequest 
request = exchange.getRequest(); RoutePlug routePlug=new RoutePlug(); String 
url = exchange.getRequest().getPath().pathWithinApplication().value(); URI 
requestUri = request.getURI(); String 
method=exchange.getRequest().getMethodValue(); //地址 
logger.info("请求URL:{}",url); logger.info("requestUri:{}",requestUri); 
logger.info("method:{}",method); routePlug.setUrl(url); 
routePlug.setUri(requestUri.toString()); routePlug.setMethod(method); //开始 时间 
exchange.getAttributes().put("startTime", System.currentTimeMillis()); //参数 
logger.info("QueryParams:{}",exchange.getRequest().getQueryParams()); 
logger.info("QueryParamsJSON:{}",JSON.toJSON(exchange.getRequest().getQueryParams())); 
routePlug.setQueryParams(JSON.toJSON(exchange.getRequest().getQueryParams()).toString()); 
HttpHeaders headers=exchange.getRequest().getHeaders(); String contentType = 
headers.getFirst("Content-Type"); logger.info("Host:{}",headers.getHost()); 
logger.info("contentType", contentType); logger.info("headersJson:{}", 
JSON.toJSON(headers)); routePlug.setHost(headers.getHost().toString()); 
routePlug.setQueryHeard(JSON.toJSON(headers).toString()); URI ex = 
UriComponentsBuilder.fromUri(requestUri).build(true).toUri(); ServerHttpRequest 
newRequest = request.mutate().uri(ex).build(); //记录发送的参数:获取requstBody体中信息 if 
("POST".equals(method) && !contentType.startsWith("multipart/form-data")){ 
String bodyStr = resolveBodyFromRequest(request); //下面将请求体再次封装写回到 request 
里,传到下一级. DataBuffer bodyDataBuffer = stringBuffer(bodyStr); Flux<DataBuffer> 
bodyFlux = Flux.just(bodyDataBuffer); newRequest = new 
ServerHttpRequestDecorator(newRequest) { @Override public Flux<DataBuffer> 
getBody() { return bodyFlux; } }; routePlug.setBody(formatStr(bodyStr)); 
logger.info("requesBody:{}",bodyStr); 
logger.info("requesBody:{}",formatStr(bodyStr)); } ....省略掉一些代码 //记录response的 
返回数据 ServerHttpResponse originalResponse = exchange.getResponse(); 
DataBufferFactory bufferFactory = originalResponse.bufferFactory(); 
ServerHttpResponseDecorator decoratedResponse = new 
ServerHttpResponseDecorator(originalResponse) { @Override public Mono<Void> 
writeWith(Publisher<? extends DataBuffer> body) { if (body instanceof Flux) { 
Flux<? extends DataBuffer> fluxBody = (Flux<? extends DataBuffer>) body; return 
super.writeWith(fluxBody.map(dataBuffer -> { byte[] content = new 
byte[dataBuffer.readableByteCount()]; dataBuffer.read(content); //释放掉内存 
DataBufferUtils.release(dataBuffer); /* String s = new String(content, 
Charset.forName("UTF-8")); //TODO,s就是response的值,想修改、查看就随意而为了 byte[] 
uppedContent = new String(content, Charset.forName("UTF-8")).getBytes();*/ 
String responseData = null; try { //赋值给实体类 responseData = 
IOUtils.toString(content); routePlug.setSize(responseData.getBytes().length); 
routePlug.setResultdata(responseData); //请求用时 Long startTime = 
exchange.getAttribute("startTime"); if (startTime != null) { long executeTime = 
(System.currentTimeMillis() - startTime); 
routePlug.setUsetime(Integer.parseInt(executeTime+"")); } 
routePlugService.save(routePlug); } catch (IOException e) { 
e.printStackTrace(); } logger.debug("/*************返回content*******/"); return 
bufferFactory.wrap(content); })); } // if body is not a flux. never got there. 
return super.writeWith(body); } }; // return 
chain.filter(exchange.mutate().request(request).build()); return 
chain.filter(exchange.mutate().request(newRequest).response(decoratedResponse).build()); 
} //补充方法 private DataBuffer stringBuffer(String value){ byte[] bytes = 
value.getBytes(StandardCharsets.UTF_8); NettyDataBufferFactory 
nettyDataBufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT); 
DataBuffer buffer = nettyDataBufferFactory.allocateBuffer(bytes.length); 
buffer.write(bytes); return buffer; } ..........省略掉n行代码 } 
 
如上代码,要说到是获取body内容之后,我们再如何处理,关键步骤在返回设置,开始只用了第一个return(如下)
总体解决方案:变成第二个return之后就ok了,需要封装后再转发到下一个filter
// return chain.filter(exchange.mutate().request(request).build()); return 
chain.filter(exchange.mutate().request(newRequest).response(decoratedResponse).build());
 
END
动态路由设置
获取请求体参数
记录访问时间、流量大小、IP限制、流量限制参考订阅SpringCloud专栏项目。
热门工具 换一换
