gio-async.cpp 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. #define GI_INLINE 1
  2. #include <gio/gio.hpp>
  3. #include <iostream>
  4. #include <boost/fiber/all.hpp>
  5. namespace GLib = gi::repository::GLib;
  6. namespace GObject_ = gi::repository::GObject;
  7. namespace Gio = gi::repository::Gio;
  8. class context_scheduler : public boost::fibers::algo::round_robin
  9. {
  10. typedef context_scheduler self;
  11. typedef boost::fibers::algo::round_robin super;
  12. struct src : GSource
  13. {
  14. self *scheduler;
  15. };
  16. GSourceFuncs funcs{};
  17. GLib::MainContext ctx_;
  18. src *source_;
  19. boost::fibers::condition_variable cond_;
  20. boost::fibers::mutex mtx_;
  21. using clock_type = std::chrono::steady_clock;
  22. bool dispatching_ = false;
  23. static gboolean src_dispatch(
  24. GSource *source, GSourceFunc /*callback*/, gpointer /*user_data*/)
  25. {
  26. auto s = (src *)(source);
  27. auto sched = s->scheduler;
  28. // wait here to give (other) fibers a chance
  29. sched->dispatching_ = true;
  30. std::unique_lock<boost::fibers::mutex> lk(sched->mtx_);
  31. sched->cond_.wait(lk);
  32. sched->dispatching_ = false;
  33. // all available work has been done while we were waiting above
  34. // no need to dispatch again until new work
  35. // which we accept as of now (due to mainloop activity)
  36. return G_SOURCE_CONTINUE;
  37. }
  38. public:
  39. context_scheduler(GLib::MainContext ctx) : ctx_(ctx)
  40. {
  41. // this is a bit too much for bindings, so handle the raw C way
  42. funcs.dispatch = src_dispatch;
  43. auto s = g_source_new(&funcs, sizeof(src));
  44. source_ = (src *)(s);
  45. source_->scheduler = this;
  46. g_source_attach(s, ctx.gobj_());
  47. }
  48. ~context_scheduler()
  49. {
  50. g_source_destroy(source_);
  51. g_source_unref(source_);
  52. }
  53. void awakened(boost::fibers::context *t) noexcept override
  54. {
  55. // delegate first
  56. super::awakened(t);
  57. // arrange for dispatch of work
  58. // discard awake of source dispatch
  59. if (!dispatching_)
  60. g_source_set_ready_time(source_, 0);
  61. }
  62. void suspend_until(
  63. std::chrono::steady_clock::time_point const &abs_time) noexcept override
  64. {
  65. // release dispatch
  66. // should only end up here while dispatching in source
  67. // (rather than inadvertently trying to block main loop,
  68. // which would then lead to busy loop)
  69. if (dispatching_) {
  70. // derive time of subsequent dispatch
  71. if (clock_type::time_point::max() != abs_time) {
  72. auto to = abs_time - std::chrono::steady_clock::now();
  73. int ms =
  74. std::chrono::duration_cast<std::chrono::milliseconds>(to).count();
  75. ms = std::max(ms, 0);
  76. g_source_set_ready_time(source_, g_get_monotonic_time() + ms);
  77. } else {
  78. g_source_set_ready_time(source_, -1);
  79. }
  80. // release source dispatching
  81. cond_.notify_one();
  82. } else {
  83. // suspend is requested, so there is nothing to do for a while
  84. // so in particular the main fiber is then also blocked (e.g. some sleep)
  85. // such main loop blocking is also/still not allowed
  86. // (if such is active)
  87. g_assert(g_main_depth() == 0);
  88. // no running loop (so also no dispatch)
  89. // so delegate to the usual scheduling
  90. // (which will really block the hard way, rather than poll)
  91. super::suspend_until(abs_time);
  92. }
  93. }
  94. // might be called from a different thread
  95. void notify() noexcept override
  96. {
  97. // discard our own notify above to resume source dispatch
  98. if (dispatching_)
  99. return;
  100. ctx_.wakeup();
  101. }
  102. };
  103. class async_future
  104. {
  105. boost::fibers::promise<Gio::AsyncResult> p_;
  106. boost::fibers::future<Gio::AsyncResult> f_;
  107. Gio::Cancellable cancel_;
  108. GLib::Source timeout_;
  109. public:
  110. ~async_future()
  111. {
  112. if (timeout_)
  113. timeout_.destroy();
  114. }
  115. operator Gio::AsyncReadyCallback()
  116. {
  117. // prepare a new promise
  118. p_ = decltype(p_)();
  119. f_ = p_.get_future();
  120. cancel_ = nullptr;
  121. return [&](GObject_::Object, Gio::AsyncResult result) {
  122. p_.set_value(result);
  123. };
  124. }
  125. Gio::AsyncResult get() { return f_.get(); }
  126. Gio::Cancellable cancellable()
  127. {
  128. if (!cancel_)
  129. cancel_ = Gio::Cancellable::new_();
  130. return cancel_;
  131. }
  132. Gio::Cancellable timeout(const std::chrono::milliseconds &to)
  133. {
  134. auto cancel = cancellable();
  135. if (to.count() > 0) {
  136. timeout_ = GLib::timeout_source_new(to.count());
  137. auto do_timeout = [cancel]() mutable {
  138. cancel.cancel();
  139. return GLib::SOURCE_REMOVE_;
  140. };
  141. timeout_.set_callback<GLib::SourceFunc>(do_timeout);
  142. timeout_.attach(GLib::MainContext::get_thread_default());
  143. }
  144. return cancel_;
  145. }
  146. };
  147. static void
  148. async_client(int port, int id, int &count)
  149. {
  150. async_future w;
  151. auto dest = Gio::NetworkAddress::new_loopback(port);
  152. std::string sid = "client ";
  153. sid += std::to_string(id);
  154. // connect a client
  155. std::cout << sid << ": connect" << std::endl;
  156. auto client = Gio::SocketClient::new_();
  157. client.connect_async(dest, w);
  158. auto conn = gi::expect(client.connect_finish(w.get()));
  159. // say something
  160. auto os = conn.get_output_stream();
  161. std::cout << sid << ": send: " << sid << std::endl;
  162. os.write_all_async(
  163. (guint8 *)sid.data(), sid.size(), GLib::PRIORITY_DEFAULT_, w);
  164. os.write_all_finish(w.get(), (gsize *)nullptr);
  165. // now hear what the other side has to say
  166. std::cout << sid << ": receive" << std::endl;
  167. auto is = conn.get_input_stream();
  168. while (1) {
  169. guint8 data[1024];
  170. is.read_async(data, sizeof(data), GLib::PRIORITY_DEFAULT_, w);
  171. auto size = gi::expect(is.read_finish(w.get()));
  172. if (!size)
  173. break;
  174. std::string msg(data, data + size);
  175. std::cout << sid << ": got data: " << msg << std::endl;
  176. }
  177. std::cout << sid << ": closing down" << std::endl;
  178. --count;
  179. }
  180. static void
  181. async_handle_client(Gio::SocketConnection conn)
  182. {
  183. async_future w;
  184. // say hello
  185. auto os = conn.get_output_stream();
  186. std::string msg = "hello ";
  187. os.write_all_async(
  188. (guint8 *)msg.data(), msg.size(), GLib::PRIORITY_DEFAULT_, w);
  189. os.write_all_finish(w.get(), (gsize *)nullptr);
  190. // now echo what the other side has to say
  191. auto is = conn.get_input_stream();
  192. while (1) {
  193. guint8 data[1024];
  194. // give up if timeout
  195. GLib::Error error;
  196. is.read_async(data, sizeof(data), GLib::PRIORITY_DEFAULT_,
  197. w.timeout(std::chrono::milliseconds(200)), w);
  198. auto size = gi::expect(is.read_finish(w.get(), &error));
  199. if (error) {
  200. if (error.matches(G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
  201. break;
  202. } else {
  203. throw error;
  204. }
  205. }
  206. std::string msg(data, data + size);
  207. std::cout << "server: got data: " << msg << std::endl;
  208. os.write_all_async(data, size, GLib::PRIORITY_DEFAULT_, w);
  209. os.write_all_finish(w.get(), (gsize *)nullptr);
  210. }
  211. std::cout << "server: closing down client" << std::endl;
  212. }
  213. static void
  214. async_server(int clients, int &port)
  215. {
  216. async_future w;
  217. auto listener = Gio::SocketListener::new_();
  218. port = gi::expect(listener.add_any_inet_port());
  219. int count = 0;
  220. while (count < clients) {
  221. // accept clients
  222. std::cout << "server: accepting" << std::endl;
  223. listener.accept_async(w);
  224. auto conn = gi::expect(
  225. listener.accept_finish(w.get(), (GObject_::Object *)nullptr));
  226. // spawn client handler
  227. std::cout << "server: new connection" << std::endl;
  228. boost::fibers::fiber c(async_handle_client, conn);
  229. c.detach();
  230. ++count;
  231. }
  232. // wait a bit and shutdown
  233. // wait long enough to test the out-of-loop join below
  234. boost::this_fiber::sleep_for(std::chrono::milliseconds(1000));
  235. std::cout << "server: shutdown" << std::endl;
  236. }
  237. static void
  238. async_demo(GLib::MainLoop loop, int clients)
  239. {
  240. // run server
  241. // dispatch at once to obtain port
  242. int port = 0;
  243. boost::fibers::fiber server(
  244. boost::fibers::launch::dispatch, async_server, clients, std::ref(port));
  245. // make clients
  246. int count = 0;
  247. for (int i = 0; i < clients; ++i) {
  248. ++count;
  249. auto c = boost::fibers::fiber(async_client, port, i, std::ref(count));
  250. c.detach();
  251. }
  252. // plain-and-simple; poll regularly and quit when all clients done
  253. auto check = [&]() {
  254. if (!count)
  255. loop.quit();
  256. return G_SOURCE_CONTINUE;
  257. };
  258. GLib::timeout_add(100, check);
  259. std::cout << "running loop" << std::endl;
  260. loop.run();
  261. std::cout << "ending loop" << std::endl;
  262. server.join();
  263. }
  264. int
  265. main(int argc, char **argv)
  266. {
  267. GLib::MainLoop loop = GLib::MainLoop::new_();
  268. auto ctx = GLib::MainContext::default_();
  269. boost::fibers::use_scheduling_algorithm<context_scheduler>(ctx);
  270. { // basic fiber demo
  271. int count = 0;
  272. auto work = [&](const std::string &msg) {
  273. std::cout << msg << std::endl;
  274. ++count;
  275. };
  276. auto quit = [&](int limit, const std::chrono::milliseconds &d) {
  277. while (count < limit)
  278. boost::this_fiber::sleep_for(d);
  279. loop.quit();
  280. };
  281. boost::fibers::fiber f1(work, "fiber 1");
  282. boost::fibers::fiber f2(work, "fiber 2");
  283. boost::fibers::fiber q(quit, 2, std::chrono::milliseconds(100));
  284. loop.run();
  285. f1.join();
  286. f2.join();
  287. q.join();
  288. }
  289. // now an optional async GIO demo
  290. int clients = argc > 1 ? std::stoi(argv[1]) : 0;
  291. std::cout << clients << " clients" << std::endl;
  292. if (clients > 0)
  293. async_demo(loop, clients);
  294. }