博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Dubbo的异步调用
阅读量:4185 次
发布时间:2019-05-26

本文共 6816 字,大约阅读时间需要 22 分钟。

文章目录

在微服务环境中,往往一个接口,是经过多个服务间的接口调用,最后封装成一个接口中返回。如果每个等待每个接口串并执行结果,会比较耗时,此时我们就需要异步处理。

dubbo异步调用

dubbo的异步调用,基于NIO的非阻塞实现并行调用,客户端不需要启动多线程即可完成并行调用多个远程服务,相对多线程开销较小。

在这里插入图片描述
在userThread用户线程请求网关服务,调用某个方法,执行ioThread,再去调用服务端server,在服务端返回数据之前,将Future对象,复制到RpcContext中,在通过getFuture获取Future对象。
在服务端返回数据后,通知Future对象,通过Future的get方法,获取返回结果。

2.6版本中dubbo异步调用的实现

在客户端的consumer.xml 中配置如下:

调用代码。

// 此调用会立即返回nullfooService.findFoo(fooId)// 拿到调用的Future引用, 当结果返回后,会被通知和设置到此Future。Future
fooFuture = RpcContext.getContext().getFuture();// 此调用会立即返回nullbarService.findBar(barId)// 拿到调用的Future引用, 当结果返回后,会被通知和设置到此Future。Future
fooFuture = RpcContext.getContext().getFuture();// 此时findFoo 和 findBar 的请求同时在执行,客户端不需要启动多线程来支持并行,而是借助NIO的非阻塞完成。// 如果foo 已返回,直接拿到返回值,否则当前线程wait住, 等待foo返回后,线程会被notify唤醒。Foo foo = fooFuture.get().// 同理等待bar返回。Bar bar = barFuture.get();// 如果foo需要5秒返回,bar需要6秒返回,实际只需要6秒,即可获取到foo和bar,进行接下来的处理。

一些特殊场景下,客户端为了尽快调用返回值,可以设置是否等待消息发出

  • sent=“true” 等待消息发出,消息发送失败将抛出异常;
  • sent=“false” 不等待消息发出,将消息放入 IO 队列,即刻返回。
    默认为fase。配置方式如下:

如果你只是想异步,完全忽略返回值,可以配置 return="false",以减少 Future 对象的创建和管理成本:

此时,RpcContext.getContext().getFuture()将返回null

2.7版本dubbo 客户端Consumer异步调用

从v2.7.0开始,Dubbo的所有异步编程接口开始以CompletableFuture为基础。

使用CompletableFuture签名的接口

需要服务提供者事先定义CompletableFuture签名的服务,具体参见服务端异步执行接口定义:

public interface AsyncService {    CompletableFuture
sayHello(String name);}

注意接口的返回类型是CompletableFuture<String>

XML引用服务:

 
1、调用远程服务:
// 调用直接返回CompletableFutureCompletableFuture
future = asyncService.sayHello("async call request");// 增加回调future.whenComplete((v, t) -> { if (t != null) { t.printStackTrace(); } else { System.out.println("Response: " + v); }});// 早于结果输出System.out.println("Executed before response return.");### Springboot
2、 使用RpcContext

在 consumer.xml 中配置:

调用代码:

// 此调用会立即返回nullasyncService.sayHello("world");// 拿到调用的Future引用,当结果返回后,会被通知和设置到此FutureCompletableFuture
helloFuture = RpcContext.getContext().getCompletableFuture();// 为Future添加回调helloFuture.whenComplete((retValue, exception) -> { if (exception == null) { System.out.println(retValue); } else { exception.printStackTrace(); }});

或者,你也可以这样做异步调用:

CompletableFuture
future = RpcContext.getContext().asyncCall( () -> { asyncService.sayHello("oneway call request1"); });future.get();

2.7 版本 服务提供者Provider异步执行

Provier端异步执行将阻塞的业务从Dubbo内部线程池切换到业务自定义线程,避免Dubbo线程池过度占用,有助于避免不同服务间的互相影响。异步执行无益于节省资源或提升RPC响应性能,因为如果业务执行需要阻塞,则始终还是要有线程来负责执行。

注意:Provider端异步执行和Consumer端异步调用是相互独立的,你可以任意正交组合两端配置

  • Consumer同步 - Provider同步
  • Consumer异步 - Provider同步
  • Consumer同步 - Provider异步
  • Consumer异步 - Provider异步
1、定义CompletableFuture签名的接口

服务接口定义:

public interface AsyncService {    CompletableFuture
sayHello(String name);}

服务实现:

public class AsyncServiceImpl implements AsyncService {    @Override    public CompletableFuture
sayHello(String name) { RpcContext savedContext = RpcContext.getContext(); // 建议为supplyAsync提供自定义线程池,避免使用JDK公用线程池 return CompletableFuture.supplyAsync(() -> { System.out.println(savedContext.getAttachment("consumer-key1")); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } return "async response from provider."; }); }}

通过return CompletableFuture.supplyAsync(),业务执行已从Dubbo线程切换到业务线程,避免了对Dubbo线程池的阻塞。

2、使用AsyncContext

Dubbo提供了一个类似Serverlet 3.0的异步接口AsyncContext,在没有CompletableFuture签名接口的情况下,也可以实现Provider端的异步执行。

服务接口定义:

public interface AsyncService {    String sayHello(String name);}

服务暴露,和普通服务完全一致:

服务实现:

public class AsyncServiceImpl implements AsyncService {    public String sayHello(String name) {        final AsyncContext asyncContext = RpcContext.startAsync();        new Thread(() -> {            // 如果要使用上下文,则必须要放在第一句执行            asyncContext.signalContextSwitch();            try {                Thread.sleep(500);            } catch (InterruptedException e) {                e.printStackTrace();            }            // 写回响应            asyncContext.write("Hello " + name + ", response from provider.");        }).start();        return null;    }}

springboot 项目集成异步调用

1、启动来添加注解@EnableAsync

@SpringBootApplication(scanBasePackages = {"com.stylefeng.guns"})@EnableDubboConfiguration@EnableAsyncpublic class GatewayApplication {    public static void main(String[] args) {        SpringApplication.run(GatewayApplication.class, args);    }}

2、 客户端注解@Reference中添加属性async = true

@Reference(interfaceClass = FilmAsyncServiceApi.class, validation = "1.0", async = true)    private FilmAsyncServiceApi filmAsyncServiceApi;

3、 接口调用

/**     *  影片详情接口     * @return     */    @RequestMapping(value = "films/{searchParam}", method = RequestMethod.GET)    public ResponseVO films(@PathVariable String searchParam,                            int searchType) throws ExecutionException, InterruptedException {        // 根据searchType,判断查询类型        FilmDetailVO filmDetail = filmServiceApi.getFilmDetail(searchType, searchParam);        // 查询影片的详细信息 -> Dubbo 的异步获取        // 获取影片描述信息        if (filmDetail == null ) {            return ResponseVO.serviceFail("没有可查询的影片");        } else if (filmDetail.getFilmId() == null || filmDetail.getFilmId().trim().length() == 0) {            return ResponseVO.serviceFail("没有可查询的影片");        }        String filmId = filmDetail.getFilmId();                filmServiceApi.getFilmDesc(filmId);        // 拿到调用的Future引用,当结果返回后,会被通知和设置到此future。        Future
filmDescVOFuture = RpcContext.getContext().getFuture(); // 获取图片信息 filmServiceApi.getImags(filmId); Future
imgVOFuture = RpcContext.getContext().getFuture(); // 获取导演信息 filmServiceApi.getDectInfo(filmId); Future
actorVOFuture = RpcContext.getContext().getFuture(); // 获取演员信息 filmServiceApi.getActors(filmId); Future
> actorsFuture = RpcContext.getContext().getFuture(); FilmInfoVO filmInfoVO = new FilmInfoVO(); BeanUtils.copyProperties(filmDetail, filmInfoVO); // 组织Actor属性 InfoRequestVO infoRequestVO = new InfoRequestVO(); ActorRequestVO actorRequestVO = new ActorRequestVO(); actorRequestVO.setActors(actorsFuture.get()); actorRequestVO.setDirector(actorVOFuture.get()); infoRequestVO.setActors(actorsFuture.get()); infoRequestVO.setBiography(filmDescVOFuture.get().getBiography()); infoRequestVO.setFilmId(filmId); infoRequestVO.setImgVO(imgVOFuture.get()); filmInfoVO.setInfo04(infoRequestVO); return ResponseVO.success(IMG_PRE, filmInfoVO); }

在每个服务调用后,都会拿到对应Future,拿到值之后,设置到Future中,通过get方法获取。

转载地址:http://vbfoi.baihongyu.com/

你可能感兴趣的文章
gp性能管理
查看>>
linux的清屏命令
查看>>
maven打jar包
查看>>
Win10系统ie浏览器打不开网页的2种解决方法
查看>>
自己搭建一套hadoop的运行环境
查看>>
本次装的hadoop版本是hadoop1.2的版本
查看>>
跳板机SecureCRT
查看>>
namenode的log时,散仙发现有如下的警告信息
查看>>
用过eclipse直接向hadoop提交MR作业
查看>>
在执行bin/hadoop checknative 命令时
查看>>
MapReduce作业
查看>>
去连接Linux系统上的HDFS
查看>>
在eclipse中远程连接并读取数据
查看>>
处理多个类似表的txt数据
查看>>
两种hadoop集群(CDH的和Apache的))在使用过程中遇到
查看>>
在hadoop的编程中输入输出参数路径的设定
查看>>
PV、UV、IP的区别
查看>>
Hadoop2.2内存参数模板
查看>>
hue的架构图
查看>>
把项目打成jar包,提交执行
查看>>