ISO/IEC C++ China Unofficial

 找回密码
 立即注册

QQ登录

只需一步,快速开始

搜索
查看: 1904|回复: 5

【Flying with Boost.asio ②】协程与自制异步处理

[复制链接]

8

主题

64

帖子

269

积分

超级版主

Rank: 8Rank: 8

威望
12
经验
169
贡献
12
发表于 2015-12-9 21:04:09 | 显示全部楼层 |阅读模式
本帖最后由 nadesico19 于 2015-12-15 12:52 编辑

boost.asio 配合协程使用,其在异步处理上的表达能力可谓不俗。借用官方的 Echo Server 作为例子:

  1. boost::asio::spawn(my_strand, [&](boost::asio::yield_context yield) {
  2.     try {
  3.         char data[128];
  4.         for (;;) {
  5.             std::size_t length = my_socket.async_read_some(
  6.                 boost::asio::buffer(data), yield); // I know you hate callbacks
  7.             boost::asio::async_write(my_socket,
  8.                 boost::asio::buffer(data, length), yield);
  9.         }
  10.     } catch(...) {}
  11. });
复制代码


boost.asio 为用户提供了丰富的异步接口(async_* 系列)以满足多类异步IO需求,不过在以下一些方面还未能达到百分之百的需求覆盖。

1. 非平台依赖的文件IO
2. 数据库IO(这个需要客户端实现的支持,略无奈)
3. 计算密集型过程(虽然不是IO,但亦适合异步化)

本文的主要目的就是分享一个泛用的 AsyncExcute 接口,以支持将协程中的一些耗时较长的处理异步化。闲话少说,代码先行:


  1. template<class Dispatcher, class Handler>
  2. auto AsyncExcute(Dispatcher& dispatcher, // Can be io_service, io_service::strand, or other task queue
  3.                  Handler&& handler,      // Can be any callable type that meets the requirements
  4.                  boost::asio::yield_context& context) -> decltype(handler()) {
  5.     boost::asio::detail::async_result_init<decltype(context), void(decltype(handler()))> init(context);

  6.     dispatcher.dispatch([&, coro = init.handler]() mutable { // !! Must make a copy of init.handler here
  7.         // Return to the original io_service where the coroutine was spawned
  8.         context.handler_.dispatcher_.dispatch(
  9.             [&, coro = std::move(coro), result = handler()]() mutable { coro(std::move(result)); });
  10.     });

  11.     return init.result.get();
  12. }
  13. // Usage: auto result = AsyncExcute(some_task_queue, [] { /* Heavy work goes here */ return result; }, yield);
复制代码


boost.asio 的源码写得太绕容易看晕,其实自己做也就上面这么10行左右。

来看一个实际例子,首先是服务器端,代码要点如下:

1. 使用两个 io_service,coro_service 负责管理与客户端交互的协程,work_service 则作为任务队列使用。
2. 服务端提供两个远程命令,执行分别耗时 100ms 和 1s,根据客户端提供的命令编号进行调用,命令执行完毕后回写响应并断开连接。
3. 中间被注释掉的部分即是将耗时1s的命令交由 work_service 执行的处理方式。


  1. using boost::asio::ip::tcp;
  2. boost::asio::io_service coro_service; // For non-blocking coroutines
  3. boost::asio::io_service work_service; // For long time blocking I/O or heavy processing

  4. boost::asio::spawn(coro_service, [&](boost::asio::yield_context yield) {
  5.     tcp::acceptor acceptor(coro_service, tcp::endpoint(tcp::v4(), 12306)); // SB 12306
  6.     while (true) { // Keep waiting for new connetions
  7.         boost::system::error_code ec;
  8.         tcp::socket socket(coro_service);
  9.         acceptor.async_accept(socket, yield[ec]);
  10.         if (!ec) { // Spawn a new coroutine for user session
  11.             boost::asio::spawn(coro_service,
  12.                 [&, socket = std::move(socket)](boost::asio::yield_context yield) mutable {
  13.                 try {
  14.                     using namespace std::literals;
  15.                     uint8_t buf[1];
  16.                     std::function<int()> commands[] = {
  17.                         [] { std::this_thread::sleep_for(100ms); return 0; }, // Light processing
  18.                         [] { std::this_thread::sleep_for(1s); return 0; } }; // Heavy processing
  19.                     const char* results[] = { "OK", "NG" };
  20.                     boost::asio::async_read(socket, boost::asio::buffer(buf), yield);
  21.                     if (buf[0] < 2) {
  22.                         //if (buf[0] == 1) {
  23.                         //    AsyncExcute(work_service, commands[1], yield);
  24.                         //} else {
  25.                         //    commands[0]();
  26.                         //}
  27.                         commands[buf[0]]();
  28.                         boost::asio::async_write(socket, boost::asio::buffer(results[0], 2), yield);
  29.                     } else {
  30.                         boost::asio::async_write(socket, boost::asio::buffer(results[1], 2), yield);
  31.                     }
  32.                 } catch(...) {}
  33.             });
  34.         }
  35.     }
  36. });

  37. std::thread work_thread([&] {
  38.     boost::asio::io_service::work keep_working(work_service);
  39.     work_service.run();
  40. });
  41. coro_service.run();
  42. work_service.stop();
  43. work_thread.join();
复制代码


接下来用 Py蛇 + tornado 写一个异步客户端测试一下,代码要点如下:

1. 同时建立 100 个客户端连接进行异步读写。
2. 其中 90% 的用户调用耗时较短的远程命令,剩余 10% 调用耗时较长者。
3. 记录从第一个请求发出到最后一个响应返回为止的耗时。(感谢 GIL 让我写这么烂的代码wwww)


  1. import socket, time
  2. import tornado.ioloop as io
  3. import tornado.iostream as ios

  4. total_clients, total_count, start_time, end_time = 100, 0, 0, 0

  5. clients = [ios.IOStream(socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0))
  6.            for _ in range(total_clients)]

  7. for i in range(total_clients):
  8.     clients[i].connect(("127.0.0.1", 12306), lambda i=i: send(i))

  9. def send(i):
  10.     if total_count == 0:
  11.         start_time = time.clock()
  12.     clients[i].write(b'\1' if i // int(total_clients * 0.9) else b'\0')
  13.     clients[i].read_bytes(2, readed)

  14. def readed(data):
  15.     assert(data == b'OK')
  16.     global total_count
  17.     total_count += 1
  18.     if total_count == total_clients:
  19.         end_time = time.clock()
  20.         print('Total time:', end_time - start_time)

  21. io.IOLoop.current().start()
复制代码


经过测试,使用 work_service 与不使用的版本相比,耗时约减少一半,结果在预期内。

虽然有点仓促。。。不过要下班了,本回至此收笔

评分

参与人数 3威望 +3 贡献 +3 收起 理由
萧の十三郎 + 1 + 1 (楼主头像)温婉好看
岩川黑鬼 + 1 + 1 温婉好看。
LH_Mouse + 1 + 1 拜!

查看全部评分

回复

使用道具 举报

3

主题

20

帖子

202

积分

超级版主

mikonmikonmi

Rank: 8Rank: 8

威望
4
经验
170
贡献
4
发表于 2015-12-10 15:50:54 | 显示全部楼层
本帖最后由 岩川黑鬼 于 2015-12-10 15:52 编辑

确实不错,果然C艹的接口还是好看……

说起来,数据库和SSL这两个方面抚子有什么好办法么?用coro做这里我一直觉得没有特别满意的。

点评

很多数据库引擎都不支持异步,所以确实没辙。  发表于 2015-12-10 17:14
关于数据库,鉴于需求复杂自己造一个也不大现实(redis这类协议简单的除外),也就连接池+任务队列去搞了。至于SSL,boost已有方案于是没有更多研究  发表于 2015-12-10 16:29
沿海征收头GAY骨
回复 支持 反对

使用道具 举报

10

主题

108

帖子

507

积分

超级版主

RA2DIY 特别行政区行政长官

Rank: 8Rank: 8

威望
4
经验
388
贡献
3
发表于 2015-12-10 17:08:28 | 显示全部楼层
本帖最后由 LH_Mouse 于 2015-12-10 17:19 编辑
岩川黑鬼 发表于 2015-12-10 15:50
确实不错,果然C艹的接口还是好看……

说起来,数据库和SSL这两个方面抚子有什么好办法么?用coro做这里我 ...

ssl 丢两个一等函数进去,缺省绑到 ::send 和 ::recv,允许重设,即可。
由于保持 C++98 兼容性的原因,我这里采用了下面这个解决方案:
https://github.com/lhmouse/posei ... _base.cpp?ts=4#L174


======================
广义来讲这里的 filter 也是个一等函数,只是有两种使用姿势。
而且在这个函数不存在的时候就是用默认的 ::send 和 ::recv,如果该函数(使用 scoped_ptr / unique_ptr 管理)存在,则允许使用被重写的收发函数。

点评

看了看OpenSSL那些狗〇似乎也可以直接关联fd和SSL信息,还能拯救一下。  发表于 2015-12-10 17:19
We will prevail!! www.lhmouse.com
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

小黑屋|手机版|Archiver|ISO/IEC C++ China Unofficial

GMT+8, 2017-12-15 18:20 , Processed in 0.074483 second(s), 23 queries , XCache On.

Powered by Discuz! X3.4

© 2001-2017 Comsenz Inc.

快速回复 返回顶部 返回列表