本文章对应spring cloud的版本为(Dalston.SR4),具体内容如下:
首先如何使用spring cloud zuul完成路由转发的功能,这个问题很简单,只需要进行如下准备工作即可:
我们希望zuul和后端的应用服务同时都注册到Eureka Server上,当我们访问Zuul的某一个地址时,对应其实访问的是后端应用的某个地址,从而从这个地址返回一段内容,并展现到浏览器上。
创建一个Eureka Server只需要在主函数上添加@EnableEurekaServer
@EnableEurekaServer
@RestController
@SpringBootApplication
public class EurekaServerApplication {
public static void main(String[] args) {
SpringApplication.run(EurekaServerApplication.class, args);
}
}
在properties
文件进行简单配置
server.port=8761
eureka.client.register-with-eureka=false
eureka.client.fetch-registry=false
主函数添加@EnableZuulProxy
注解(因为集成Eureka,需要另外添加@EnableDiscoveryClient
注解)。
@EnableZuulProxy
@EnableDiscoveryClient
@SpringBootApplication
public class ZuulDemoApplication {
/**
* 省略代码...
*/
}
配置properties
文件
server.port=8081
spring.application.name=ZUUL-CLIENT
zuul.routes.api-a.serviceId=EUREKA-CLIENT
zuul.routes.api-a.path=/api-a/**
eureka.client.service-url.defaultZone=http://localhost:8761/eureka
@RestController
@EnableEurekaClient
@SpringBootApplication
public class EurekaClientApplication {
public static void main(String[] args) {
SpringApplication.run(EurekaClientApplication.class, args);
}
@RequestMapping(value = "/hello")
public String index() {
return "hello spring...";
}
}
配置properties
文件
spring.application.name=EUREKA-CLIENT
eureka.client.service-url.defaultZone=http://localhost:8761/eureka
三个工程全部启动,这时当我们访问localhost:8081/api-a/hello
时,你会看到浏览器输出的内容是hello spring...
接下来我们通过源码层面来了解下,一次转发内部都做了哪些事情。
首先我们查看Zuul的配置类ZuulProxyAutoConfiguration
在这个类中有一项工作是初始化Zuul默认自带的Filter,其中有一个Filter很重要,它就是RibbonRoutingFilter
。它主要是完成请求的路由转发。接下来我们看下他的run
方法
@Override
public Object run() {
RequestContext context = RequestContext.getCurrentContext();
try {
RibbonCommandContext commandContext = buildCommandContext(context);
ClientHttpResponse response = forward(commandContext);
setResponse(response);
return response;
}
catch (ZuulException ex) {
throw new ZuulRuntimeException(ex);
}
catch (Exception ex) {
throw new ZuulRuntimeException(ex);
}
}
可以看到进行转发的方法是forward
,我们进一步查看这个方法,具体内容如下:
省略部分代码
protected ClientHttpResponse forward(RibbonCommandContext context) throws Exception {
RibbonCommand command = this.ribbonCommandFactory.create(context);
try {
ClientHttpResponse response = command.execute();
return response;
}
catch (HystrixRuntimeException ex) {
return handleException(info, ex);
}
}
ribbonCommandFactory
指的是HttpClientRibbonCommandFactory
这个类是在RibbonCommandFactoryConfiguration
完成初始化的(触发RibbonCommandFactoryConfiguration
的加载动作是利用ZuulProxyAutoConfiguration
类上面的@Import
标签),具体代码如下:
@Configuration
@ConditionalOnRibbonHttpClient
protected static class HttpClientRibbonConfiguration {
@Autowired(required = false)
private Set<ZuulFallbackProvider> zuulFallbackProviders = Collections.emptySet();
@Bean
@ConditionalOnMissingBean
public RibbonCommandFactory<?> ribbonCommandFactory(
SpringClientFactory clientFactory, ZuulProperties zuulProperties) {
return new HttpClientRibbonCommandFactory(clientFactory, zuulProperties, zuulFallbackProviders);
}
}
知道了这个ribbonCommandFactory
具体的实现类(HttpClientRibbonCommandFactory
),接下来我们看看它的create
方法具体做了那些事情
@Override
public HttpClientRibbonCommand create(final RibbonCommandContext context) {
ZuulFallbackProvider zuulFallbackProvider = getFallbackProvider(context.getServiceId());
final String serviceId = context.getServiceId();
final RibbonLoadBalancingHttpClient client = this.clientFactory.getClient(
serviceId, RibbonLoadBalancingHttpClient.class);
client.setLoadBalancer(this.clientFactory.getLoadBalancer(serviceId));
return new HttpClientRibbonCommand(serviceId, client, context, zuulProperties, zuulFallbackProvider,
clientFactory.getClientConfig(serviceId));
}
这个方法按照我的理解主要做了以下几件事情:
@Override
public HttpClientRibbonCommand create(final RibbonCommandContext context) {
/**
*获取所有ZuulFallbackProvider,即当Zuul
*调用失败后的降级方法
*/
ZuulFallbackProvider = xxxxx
/**
*创建处理请求转发类,该类会利用
*Apache的Http client进行请求的转发
*/
RibbonLoadBalancingHttpClient = xxxxx
/**
*将降级方法、处理请求转发类、以及其他一些内容
*包装成HttpClientRibbonCommand(这个类继承了HystrixCommand)
*/
return new HttpClientRibbonCommand(xxxxx);
}
到这里我们很清楚的知道了RibbonRoutingFilter
类的forward
方法中RibbonCommand command = this.ribbonCommandFactory.create(context);
这一行代码都做了哪些内容.
接下来调用的是command.execute();
方法,通过刚刚的分析我们知道了command
其实指的是HttpClientRibbonCommand
,同时我们也知道HttpClientRibbonCommand
继承了HystrixCommand
所以当执行command.execute();
时其实执行的是HttpClientRibbonCommand
的run
方法。查看源码我们并没有发现run
方法,但是我们发现HttpClientRibbonCommand
直接继承了AbstractRibbonCommand
。所以其实执行的是AbstractRibbonCommand
的run方法,接下来我们看看run方法里面都做了哪些事情:
@Override
protected ClientHttpResponse run() throws Exception {
final RequestContext context = RequestContext.getCurrentContext();
RQ request = createRequest();
RS response = this.client.executeWithLoadBalancer(request, config);
context.set("ribbonResponse", response);
if (this.isResponseTimedOut()) {
if (response != null) {
response.close();
}
}
return new RibbonHttpResponse(response);
}
可以看到在run
方法中会调用client的executeWithLoadBalancer
方法,通过上面介绍我们知道client指的是RibbonLoadBalancingHttpClient
,而RibbonLoadBalancingHttpClient
里面并没有executeWithLoadBalancer
方法。(这里面会最终调用它的父类AbstractLoadBalancerAwareClient
的executeWithLoadBalancer
方法。)
具体代码如下:
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
/**
* 创建一个RetryHandler,这个很重要它是用来
* 决定利用RxJava的Observable是否进行重试的标准。
*/
RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, requestConfig);
/**
* 创建一个LoadBalancerCommand,这个类用来创建Observable
* 以及根据RetryHandler来判断是否进行重试操作。
*/
LoadBalancerCommand<T> command = LoadBalancerCommand.<T>builder()
.withLoadBalancerContext(this)
.withRetryHandler(handler)
.withLoadBalancerURI(request.getUri())
.build();
try {
/**
*command.submit()方法主要是创建了一个Observable(RxJava)
*并且为这个Observable设置了重试次数,这个Observable最终
*会回调AbstractLoadBalancerAwareClient.this.execute()
*方法。
*/
return command.submit(
new ServerOperation<T>() {
@Override
public Observable<T> call(Server server) {
URI finalUri = reconstructURIWithServer(server, request.getUri());
S requestForServer = (S) request.replaceUri(finalUri);
try {
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
}
catch (Exception e) {
return Observable.error(e);
}
}
})
.toBlocking()
.single();
} catch (Exception e) {
Throwable t = e.getCause();
if (t instanceof ClientException) {
throw (ClientException) t;
} else {
throw new ClientException(e);
}
}
}
下面针对于每一块内容做详细说明:
首先getRequestSpecificRetryHandler(request, requestConfig);
这个方法其实调用的是RibbonLoadBalancingHttpClient
的getRequestSpecificRetryHandler
方法,这个方法主要是返回一个RequestSpecificRetryHandler
@Override
public RequestSpecificRetryHandler getRequestSpecificRetryHandler(RibbonApacheHttpRequest request, IClientConfig requestConfig) {
/**
*这个很关键,请注意该类构造器中的前两个参数的值
*正因为一开始我也忽略了这两个值,所以后续给我造
*成一定的干扰。
*/
return new RequestSpecificRetryHandler(false, false,
RetryHandler.DEFAULT, requestConfig);
}
接下来创建LoadBalancerCommand
并将上一步获得的RequestSpecificRetryHandler
作为参数内容。
最后调用LoadBalancerCommand
的submit
方法。该方法内容太长具体代码细节就不在这里贴出了,按照我个人的理解,只贴出相应的伪代码:
public Observable<T> submit(final ServerOperation<T> operation) {
//相同server的重试次数(去除首次请求)
final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
//集群内其他Server的重试个数
final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
/**
*创建一个Observable(RxJava),selectServer()方法是
*利用Ribbon选择一个Server,并将其包装成Observable
*/
Observable<T> o = selectServer().concatMap(new Func1<Server, Observable<T>>() {
@Override
public Observable<T> call(final Server server) {
/**
*这里会回调submit方法入参ServerOperation类的call方法,
*/
return operation.call(server).doOnEach(new Observer<T>() {}
}
}
if (maxRetrysSame > 0)
o = o.retry(retryPolicy(maxRetrysSame, true));
if (maxRetrysNext > 0 && server == null)
o = o.retry(retryPolicy(maxRetrysNext, false));
return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
@Override
public Observable<T> call(Throwable e) {
/**
*转发请求失败时,会进入此方法。通过此方法进行判断
*是否超过重试次数maxRetrysSame、maxRetrysNext。
*/
}
});
}
operation.call()
方法最终会调用RibbonLoadBalancingHttpClient
的execute
方法,该方法内容如下:
@Override
public RibbonApacheHttpResponse execute(RibbonApacheHttpRequest request,
final IClientConfig configOverride) throws Exception {
/**
* 组装参数(RequestConfig)
*/
final RequestConfig.Builder builder = RequestConfig.custom();
IClientConfig config = configOverride != null ? configOverride : this.config;
builder.setConnectTimeout(config.get(
CommonClientConfigKey.ConnectTimeout, this.connectTimeout));
builder.setSocketTimeout(config.get(
CommonClientConfigKey.ReadTimeout, this.readTimeout));
builder.setRedirectsEnabled(config.get(
CommonClientConfigKey.FollowRedirects, this.followRedirects));
final RequestConfig requestConfig = builder.build();
if (isSecure(configOverride)) {
final URI secureUri = UriComponentsBuilder.fromUri(request.getUri())
.scheme("https").build().toUri();
request = request.withNewUri(secureUri);
}
final HttpUriRequest httpUriRequest = request.toRequest(requestConfig);
/**
* 发送转发请求
*/
final HttpResponse httpResponse = this.delegate.execute(httpUriRequest);
/**
* 返回结果
*/
return new RibbonApacheHttpResponse(httpResponse, httpUriRequest.getURI());
}
可以看到上面方法主要做的就是组装请求参数(包括各种超时时间),然后发起转发请求,最终获取相应结果。
说到这里,zuul转发一次请求的基本原理就说完了。让我们再回顾下整个流程。
RibbonRoutingFilter
这个Filter进行操作的。事实真的是这样吗?当我看到源码中为Observable设置重试次数的时候,我以为这就是zuul的重试逻辑。遗憾的是我的想法是错误的。还记得上面我说的getRequestSpecificRetryHandler(request, requestConfig);
这个方法吗?(不记得的同学可以回过头来再看下),这个方法返回的是RequestSpecificRetryHandler
这个类,而且在创建该类时,构造器的前两个参数都为false。(这一点非常重要)。这两个参数分别是okToRetryOnConnectErrors
和okToRetryOnAllErrors
。
我原本的想法是这个请求被包装成Observable,如果这次请求因为超时出现异常或者其他异常,这样就会触发Observable的重试机制(RxJava),但是事实并非如此,为什么呢?原因就是上面的那两个参数,当出现了超时异常的时候,在触发重试机制之前会调用RequestSpecificRetryHandler
的isRetriableException()
方法,该方法的作用是用来判断是否执行重试动作,具体代码如下:
@Override
public boolean isRetriableException(Throwable e, boolean sameServer) {
//此时该值为false
if (okToRetryOnAllErrors) {
return true;
}
else if (e instanceof ClientException) {
ClientException ce = (ClientException) e;
if (ce.getErrorType() == ClientException.ErrorType.SERVER_THROTTLED) {
return !sameServer;
} else {
return false;
}
}
else {
//此时该值为false
return okToRetryOnConnectErrors && isConnectionException(e);
}
}
说道这里zuul转发一次请求的基本原理大概了解了,同时也验证了一个事实就是实现zuul进行重试的逻辑并不是Observable的重试机制。那么问题来了?是什么使zuul具有重试功能的呢?
开启Zuul重试的功能在原有的配置基础上需要额外进行以下设置:
zuul.retryable=true
(该参数默认为false)具体properties文件内容如下:
server.port=8081
spring.application.name=ZUUL-CLIENT
#路由信息
zuul.routes.api-a.serviceId=EUREKA-CLIENT
zuul.routes.api-a.path=/api-a/**
#是否开启重试功能
zuul.retryable=true
#同一个Server重试的次数(除去首次)
ribbon.MaxAutoRetries=3
#切换相同Server的次数
ribbon.MaxAutoRetriesNextServer=0
eureka.client.service-url.defaultZone=http://localhost:8761/eureka
为了模拟出Zuul重试的功能,需要对后端应用服务进行改造,改造后的内容如下:
@RequestMapping(value = "/hello")
public String index() {
System.out.println("request is coming...");
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
System.out.println("线程被打断... " + e.getMessage());
}
return "hello spring ...";
}
通过使用Thread.sleep(100000)
达到Zuul转发超时情况(Zuul默认连接超时未2s、read超时时间为5s),从而触发Zuul的重试功能。这时候在此访问localhost:8081/api-a/hello
时,查看应用服务后台,会发现最终打印三次"request is coming..."
通过现象看本质,接下来简单介绍下Zuul重试的原理。首先如果你工程classpath中存在spring-retry,那么zuul在初始化的时候就不会创建RibbonLoadBalancingHttpClient
而是创建RetryableRibbonLoadBalancingHttpClient
具体源代码如下:
@ConditionalOnClass(name = "org.apache.http.client.HttpClient")
@ConditionalOnProperty(name = "ribbon.httpclient.enabled", matchIfMissing = true)
public class HttpClientRibbonConfiguration {
@Value("${ribbon.client.name}")
private String name = "client";
@Bean
@ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class)
@ConditionalOnMissingClass(value = "org.springframework.retry.support.RetryTemplate")
public RibbonLoadBalancingHttpClient ribbonLoadBalancingHttpClient(
IClientConfig config, ServerIntrospector serverIntrospector,
ILoadBalancer loadBalancer, RetryHandler retryHandler) {
RibbonLoadBalancingHttpClient client = new RibbonLoadBalancingHttpClient(
config, serverIntrospector);
client.setLoadBalancer(loadBalancer);
client.setRetryHandler(retryHandler);
Monitors.registerObject("Client_" + this.name, client);
return client;
}
@Bean
@ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class)
@ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")
public RetryableRibbonLoadBalancingHttpClient retryableRibbonLoadBalancingHttpClient(
IClientConfig config, ServerIntrospector serverIntrospector,
ILoadBalancer loadBalancer, RetryHandler retryHandler,
LoadBalancedRetryPolicyFactory loadBalancedRetryPolicyFactory) {
RetryableRibbonLoadBalancingHttpClient client = new RetryableRibbonLoadBalancingHttpClient(
config, serverIntrospector, loadBalancedRetryPolicyFactory);
client.setLoadBalancer(loadBalancer);
client.setRetryHandler(retryHandler);
Monitors.registerObject("Client_" + this.name, client);
return client;
}
}
所以请求到来需要转发的时候(AbstractLoadBalancerAwareClient
类中executeWithLoadBalancer
方法会调用AbstractLoadBalancerAwareClient.this.execute()
)其实调用的是RetryableRibbonLoadBalancingHttpClient
的execute
方法(而不是没有重试时候RibbonLoadBalancingHttpClient
的execute
方法),源码内容如下:
@Override
public RibbonApacheHttpResponse execute(final RibbonApacheHttpRequest request, final IClientConfig configOverride) throws Exception {
final RequestConfig.Builder builder = RequestConfig.custom();
IClientConfig config = configOverride != null ? configOverride : this.config;
builder.setConnectTimeout(config.get(
CommonClientConfigKey.ConnectTimeout, this.connectTimeout));
builder.setSocketTimeout(config.get(
CommonClientConfigKey.ReadTimeout, this.readTimeout));
builder.setRedirectsEnabled(config.get(
CommonClientConfigKey.FollowRedirects, this.followRedirects));
final RequestConfig requestConfig = builder.build();
final LoadBalancedRetryPolicy retryPolicy = loadBalancedRetryPolicyFactory.create(this.getClientName(), this);
RetryCallback retryCallback = new RetryCallback() {
@Override
public RibbonApacheHttpResponse doWithRetry(RetryContext context) throws Exception {
//on retries the policy will choose the server and set it in the context
//extract the server and update the request being made
RibbonApacheHttpRequest newRequest = request;
if(context instanceof LoadBalancedRetryContext) {
ServiceInstance service = ((LoadBalancedRetryContext)context).getServiceInstance();
if(service != null) {
//Reconstruct the request URI using the host and port set in the retry context
newRequest = newRequest.withNewUri(new URI(service.getUri().getScheme(),
newRequest.getURI().getUserInfo(), service.getHost(), service.getPort(),
newRequest.getURI().getPath(), newRequest.getURI().getQuery(),
newRequest.getURI().getFragment()));
}
}
if (isSecure(configOverride)) {
final URI secureUri = UriComponentsBuilder.fromUri(newRequest.getUri())
.scheme("https").build().toUri();
newRequest = newRequest.withNewUri(secureUri);
}
HttpUriRequest httpUriRequest = newRequest.toRequest(requestConfig);
final HttpResponse httpResponse = RetryableRibbonLoadBalancingHttpClient.this.delegate.execute(httpUriRequest);
if(retryPolicy.retryableStatusCode(httpResponse.getStatusLine().getStatusCode())) {
if(CloseableHttpResponse.class.isInstance(httpResponse)) {
((CloseableHttpResponse)httpResponse).close();
}
throw new RetryableStatusCodeException(RetryableRibbonLoadBalancingHttpClient.this.clientName,
httpResponse.getStatusLine().getStatusCode());
}
return new RibbonApacheHttpResponse(httpResponse, httpUriRequest.getURI());
}
};
return this.executeWithRetry(request, retryPolicy, retryCallback);
}
executeWithRetry
方法内容如下:
private RibbonApacheHttpResponse executeWithRetry(RibbonApacheHttpRequest request, LoadBalancedRetryPolicy retryPolicy, RetryCallback<RibbonApacheHttpResponse, IOException> callback) throws Exception {
RetryTemplate retryTemplate = new RetryTemplate();
boolean retryable = request.getContext() == null ? true :
BooleanUtils.toBooleanDefaultIfNull(request.getContext().getRetryable(), true);
retryTemplate.setRetryPolicy(retryPolicy == null || !retryable ? new NeverRetryPolicy()
: new RetryPolicy(request, retryPolicy, this, this.getClientName()));
return retryTemplate.execute(callback);
}
按照我的理解,主要逻辑如下:
@Override
public RibbonApacheHttpResponse execute(final RibbonApacheHttpRequest request, final IClientConfig configOverride) throws Exception {
/**
*创建RequestConfig(请求信息)
*/
final RequestConfig requestConfig = builder.build();
final LoadBalancedRetryPolicy retryPolicy = loadBalancedRetryPolicyFactory.create(this.getClientName(), this);
/**
* 创建RetryCallbck的实现类,用来完成重试逻辑
*/
RetryCallback retryCallback = new RetryCallback() {};
//创建Spring-retry的模板类,RetryTemplate。
RetryTemplate retryTemplate = new RetryTemplate();
/**
*设置重试规则,即在什么情况下进行重试
*什么情况下停止重试。源码中这部分存在
*一个判断,判断的根据就是在zuul工程
*的propertris中配置的zuul.retryable
*该参数内容为true才可以具有重试功能。
*/
retryTemplate.setRetryPolicy(xxx);
/**
*发起请求
*/
return retryTemplate.execute(callback);
}
到此为止我们不仅知道了zuul路由一次请求的整体过程,也明确了zuul因后端超时而触发重试的原理。可是似乎还存在着一个问题,就是超时问题。前面说过zuul把路由请求这个过程包装成一个HystrixCommnd,而在我的propertries文件中并没有设置Hystrix的超时时间(默认时间为1s),而read的超时时间是5s(前面源码部分介绍过)。这里就会有人问,因为最外层是采用Hystrix,而Hystrix此时已经超时了,为什么还允许它内部继续使用spring-retry进行重试呢?带着这个问题我查看了官方GitHub上的issues,发现有人对此问题提出过疑问。作者给出的回复是Hystrix超时的时候并不会打断内部重试的操作。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/weixin_42073629/article/details/107350797
内容来源于网络,如有侵权,请联系作者删除!