微服务中,不同的服务之间相互调用主要有两种方式,一种是RPC方式,另外一种是事件驱动方式,也就是发消息方式。
RPC的方式就是远程函数调用,像RESTFul,gRPC, DUBBO 都是这种方式。它一般是同步的,可以马上得到结果。在实际中,大多数应用都要求立刻得到结果,这时同步方式更有优势,代码也更简单。而feign自然也是基于这种方式来做的。
feign的github上的介绍:
Feign is a Java to HTTP client binder inspired by Retrofit, JAXRS-2.0, and WebSocket. Feign's first goal was reducing the complexity of binding Denominator uniformly to HTTP APIs regardless of ReSTfulness.
简单的来说,Feign是由Retrofit,JAXRS-2.0和WebSocket启发的一个java的 RESTFUL api请求工具。 Feign的主要目标是将Java Http Clients变得简单。
首先在启动类上添加注解:
@SpringCloudApplication
@EnableFeignClients // 开启feign客户端
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
然后编写一个接口:
@FeignClient(name = \"item-service\")
public interface UserClient{
@GetMapping(\"/{id}\")
User getUserById(@PathVariable(\"id\") Long id); //接口代码没有贴出来
@GetMapping(\"/list\")
List<User> getUserByIds(@RequestParam(\"ids\") List<Long> ids);
}
最后在controller中调用:
@Controller
@RequestMapping(\"/user\")
public class UserController {
@Autowired
private UserClient userClient;
@GetMapping
@ResponseBody
public User queryUserById(@RequestParam(\"id\") Long id){
User user = this.userClient.queryUserById(id);
return user;
}
}
上面就是feign的整个流程了,只需要创建一个接口来描述http请求,然后我们就可以像调用java方法一样完成api的请求,非常的优雅。
从上面的Feign的使用来看,Feign就是充当了一个适配器的角色:将一个java接口翻译成一个http请求,然后用HttpURLConnection
去发送这个请求。
首先通过扫包注入FeignClient的bean,这个源码在FeignClientsRegistrar
类中:
private void registerDefaultConfiguration(AnnotationMetadata metadata,
BeanDefinitionRegistry registry) {
Map<String, Object> defaultAttrs = metadata
.getAnnotationAttributes(EnableFeignClients.class.getName(), true);
if (defaultAttrs != null && defaultAttrs.containsKey(\"defaultConfiguration\")) {
String name;
if (metadata.hasEnclosingClass()) {
name = \"default.\" + metadata.getEnclosingClassName();
}
else {
name = \"default.\" + metadata.getClassName();
}
registerClientConfiguration(registry, name,
defaultAttrs.get(\"defaultConfiguration\"));
}
}
这段代码先在启动配置上检查是否有@EnableFeignClients
注解,如果有,则开启包扫描,扫描被@FeignClient注解的接口,扫描到FeignClient之后,将其注入到ioc容器中。
然后就是核心实现,通过jdk动态代理,在请求FeignClient的方法的时候,会拦截这个方法。具体代码在ReflectiveFeign类,如下:
@SuppressWarnings(\"unchecked\")
@Override
public <T> T newInstance(Target<T> target) {
Map<String, MethodHandler> nameToHandler = targetToHandlersByName.apply(target);
Map<Method, MethodHandler> methodToHandler = new LinkedHashMap<Method, MethodHandler>();
List<DefaultMethodHandler> defaultMethodHandlers = new LinkedList<DefaultMethodHandler>();
for (Method method : target.type().getMethods()) {
if (method.getDeclaringClass() == Object.class) {
continue;
} else if(Util.isDefault(method)) {
DefaultMethodHandler handler = new DefaultMethodHandler(method);
defaultMethodHandlers.add(handler);
methodToHandler.put(method, handler);
} else {
methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method)));
}
}
InvocationHandler handler = factory.create(target, methodToHandler);
T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(), new Class<?>[]{target.type()}, handler);
for(DefaultMethodHandler defaultMethodHandler : defaultMethodHandlers) {
defaultMethodHandler.bindTo(proxy);
}
return proxy;
}
最终是在SynchronousMethodHandler
类进行拦截处理,通过参数生成一个request对象:
@Override
public Object invoke(Object[] argv) throws Throwable {
RequestTemplate template = buildTemplateFromArgs.create(argv);
Retryer retryer = this.retryer.clone();
while (true) {
try {
return executeAndDecode(template);
} catch (RetryableException e) {
retryer.continueOrPropagate(e);
if (logLevel != Logger.Level.NONE) {
logger.logRetry(metadata.configKey(), logLevel);
}
continue;
}
}
}
Object executeAndDecode(RequestTemplate template) throws Throwable {
Request request = targetRequest(template);
//...省略代码
Response response;
long start = System.nanoTime();
try {
response = client.execute(request, options);
// ensure the request is set. TODO: remove in Feign 10
response.toBuilder().request(request).build();
} catch (IOException e) {
if (logLevel != Logger.Level.NONE) {
logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
}
throw errorExecuting(request, e);
}
//...省略代码
}
然后将这个request对象交个client处理,处理完成之后获取到response。client是一个接口,默认的实现为它的一个静态内部类Default
,在这个类中,使用HttpURLConnection完成的网络请求:
@Override
public Response execute(Request request, Options options) throws IOException {
HttpURLConnection connection = convertAndSend(request, options);
return convertResponse(connection).toBuilder().request(request).build();
}
最终向容器中注入的是LoadBalancerFeignClient,即负载均衡客户端:
@Override
public Response execute(Request request, Request.Options options) throws IOException {
try {
URI asUri = URI.create(request.url());
String clientName = asUri.getHost();
URI uriWithoutHost = cleanUrl(request.url(), clientName);
FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
this.delegate, request, uriWithoutHost);
IClientConfig requestConfig = getClientConfig(options, clientName);
return lbClient(clientName).executeWithLoadBalancer(ribbonRequest,
requestConfig).toResponse();
}
catch (ClientException e) {
IOException io = findIOException(e);
if (io != null) {
throw io;
}
throw new RuntimeException(e);
}
}
可以看到是调用execute方法,最终是通过executeWithLoadBalancer方法获取结果的:
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
try {
return command.submit(
new ServerOperation<T>() {
@Override
public Observable<T> call(Server server) {
URI finalUri = reconstructURIWithServer(server, request.getUri());
S requestForServer = (S) request.replaceUri(finalUri);
try {
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
}
catch (Exception e) {
return Observable.error(e);
}
}
})
.toBlocking()
.single();
} catch (Exception e) {
Throwable t = e.getCause();
if (t instanceof ClientException) {
throw (ClientException) t;
} else {
throw new ClientException(e);
}
}
}
最终在submit方法上:
// Use the load balancer
Observable<T> o =
(server == null ? selectServer() : Observable.just(server))
.concatMap(new Func1<Server, Observable<T>>() {
@Override
// Called for each server being selected
public Observable<T> call(Server server) {
//...省略代码
});
通过selectServer方法,实现负载均衡:
private Observable<Server> selectServer() {
return Observable.create(new OnSubscribe<Server>() {
@Override
public void call(Subscriber<? super Server> next) {
try {
Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);
next.onNext(server);
next.onCompleted();
} catch (Exception e) {
next.onError(e);
}
}
});
}
最终交给loadBalancerContext
也就是Ribbon来做的。
回顾:
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.lingxiaomz.top/articleContent/?id=104794115938045779
内容来源于网络,如有侵权,请联系作者删除!