combine.h 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. // This file is part of Desktop App Toolkit,
  2. // a set of libraries for developing nice desktop applications.
  3. //
  4. // For license and copyright information please follow this link:
  5. // https://github.com/desktop-app/legal/blob/master/LEGAL
  6. //
  7. #pragma once
  8. #include "base/optional.h"
  9. #include "base/variant.h"
  10. #include <rpl/map.h>
  11. #include <rpl/producer.h>
  12. #include <rpl/details/type_list.h>
  13. #include <rpl/details/callable.h>
  14. #include <rpl/mappers.h>
  15. #include <rpl/complete.h>
  16. namespace rpl {
  17. namespace details {
  18. template <typename ...Values>
  19. struct combine_state {
  20. combine_state() : accumulated(std::tuple<std::optional<Values>...>()) {
  21. }
  22. std::optional<std::tuple<std::optional<Values>...>> accumulated;
  23. std::optional<std::tuple<Values...>> latest;
  24. int invalid = sizeof...(Values);
  25. int working = sizeof...(Values);
  26. };
  27. template <typename ...Values, std::size_t ...I>
  28. inline std::tuple<Values...> combine_make_first(
  29. std::tuple<std::optional<Values>...> &&accumulated,
  30. std::index_sequence<I...>) {
  31. return std::make_tuple(std::move(*std::get<I>(accumulated))...);
  32. }
  33. template <size_t Index, typename consumer_type, typename ...Values>
  34. class combine_subscribe_one {
  35. public:
  36. combine_subscribe_one(
  37. const consumer_type &consumer,
  38. combine_state<Values...> *state)
  39. : _consumer(consumer)
  40. , _state(state) {
  41. }
  42. template <typename Value, typename Error, typename Generator>
  43. void subscribe(producer<Value, Error, Generator> &&producer) {
  44. _consumer.add_lifetime(std::move(producer).start(
  45. [consumer = _consumer, state = _state](Value &&value) {
  46. if (!state->accumulated) {
  47. std::get<Index>(*state->latest) = std::move(value);
  48. consumer.put_next_copy(*state->latest);
  49. } else {
  50. auto &accumulated = std::get<Index>(
  51. *state->accumulated);
  52. if (accumulated) {
  53. accumulated = std::move(value);
  54. } else {
  55. accumulated = std::move(value);
  56. if (!--state->invalid) {
  57. constexpr auto kArity = sizeof...(Values);
  58. state->latest = combine_make_first(
  59. std::move(*state->accumulated),
  60. std::make_index_sequence<kArity>());
  61. state->accumulated = std::nullopt;
  62. consumer.put_next_copy(*state->latest);
  63. }
  64. }
  65. }
  66. }, [consumer = _consumer](auto &&error) {
  67. consumer.put_error_forward(
  68. std::forward<decltype(error)>(error));
  69. }, [consumer = _consumer, state = _state] {
  70. if (!--state->working) {
  71. consumer.put_done();
  72. }
  73. }));
  74. }
  75. private:
  76. const consumer_type &_consumer;
  77. combine_state<Values...> *_state = nullptr;
  78. };
  79. template <
  80. typename consumer_type,
  81. typename ...Values,
  82. typename ...Errors,
  83. typename ...Generators,
  84. std::size_t ...I>
  85. inline void combine_subscribe(
  86. const consumer_type &consumer,
  87. combine_state<Values...> *state,
  88. std::index_sequence<I...>,
  89. std::tuple<producer<Values, Errors, Generators>...> &&saved) {
  90. auto consume = { (
  91. combine_subscribe_one<I, consumer_type, Values...>(
  92. consumer,
  93. state
  94. ).subscribe(std::get<I>(std::move(saved))), 0)... };
  95. (void)consume;
  96. }
  97. template <typename ...Producers>
  98. class combine_implementation_helper;
  99. template <typename ...Producers>
  100. combine_implementation_helper<std::decay_t<Producers>...>
  101. make_combine_implementation_helper(Producers &&...producers) {
  102. return combine_implementation_helper<std::decay_t<Producers>...>(
  103. std::forward<Producers>(producers)...);
  104. }
  105. template <
  106. typename ...Values,
  107. typename ...Errors,
  108. typename ...Generators>
  109. class combine_implementation_helper<producer<Values, Errors, Generators>...> {
  110. public:
  111. using CombinedValue = std::tuple<Values...>;
  112. using CombinedError = v::normalized_variant_t<Errors...>;
  113. combine_implementation_helper(
  114. producer<Values, Errors, Generators> &&...producers)
  115. : _saved(std::make_tuple(std::move(producers)...)) {
  116. }
  117. template <typename Handlers>
  118. lifetime operator()(const consumer<CombinedValue, CombinedError, Handlers> &consumer) {
  119. auto state = consumer.template make_state<
  120. combine_state<Values...>>();
  121. constexpr auto kArity = sizeof...(Values);
  122. combine_subscribe(
  123. consumer,
  124. state,
  125. std::make_index_sequence<kArity>(),
  126. std::move(_saved));
  127. return lifetime();
  128. }
  129. private:
  130. std::tuple<producer<Values, Errors, Generators>...> _saved;
  131. };
  132. template <
  133. typename ...Values,
  134. typename ...Errors,
  135. typename ...Generators>
  136. inline auto combine_implementation(
  137. producer<Values, Errors, Generators> &&...producers) {
  138. using CombinedValue = std::tuple<Values...>;
  139. using CombinedError = v::normalized_variant_t<Errors...>;
  140. return make_producer<CombinedValue, CombinedError>(
  141. make_combine_implementation_helper(std::move(producers)...));
  142. }
  143. template <typename ...Args>
  144. struct combine_just_producers : std::false_type {
  145. };
  146. template <typename ...Args>
  147. constexpr bool combine_just_producers_v
  148. = combine_just_producers<Args...>::value;
  149. template <
  150. typename ...Values,
  151. typename ...Errors,
  152. typename ...Generators>
  153. struct combine_just_producers<
  154. producer<Values, Errors, Generators>...>
  155. : std::true_type {
  156. };
  157. template <typename ArgsList>
  158. struct combine_just_producers_list
  159. : type_list::extract_to_t<ArgsList, combine_just_producers> {
  160. };
  161. template <typename ...Args>
  162. struct combine_result_type;
  163. template <typename ...Args>
  164. using combine_result_type_t
  165. = typename combine_result_type<Args...>::type;
  166. template <
  167. typename ...Values,
  168. typename ...Errors,
  169. typename ...Generators>
  170. struct combine_result_type<producer<Values, Errors, Generators>...> {
  171. using type = std::tuple<Values...>;
  172. };
  173. template <typename ArgsList>
  174. struct combine_result_type_list
  175. : type_list::extract_to_t<ArgsList, combine_result_type> {
  176. };
  177. template <typename ArgsList>
  178. using combine_result_type_list_t
  179. = typename combine_result_type_list<ArgsList>::type;
  180. template <typename ArgsList>
  181. using combine_producers_no_mapper_t
  182. = type_list::chop_last_t<ArgsList>;
  183. template <typename ArgsList>
  184. constexpr bool combine_is_good_mapper(std::true_type) {
  185. return is_callable_v<
  186. type_list::last_t<ArgsList>,
  187. combine_result_type_list_t<
  188. combine_producers_no_mapper_t<ArgsList>
  189. >>;
  190. }
  191. template <typename ArgsList>
  192. constexpr bool combine_is_good_mapper(std::false_type) {
  193. return false;
  194. }
  195. template <typename ArgsList>
  196. struct combine_producers_with_mapper_list : std::bool_constant<
  197. combine_is_good_mapper<ArgsList>(
  198. combine_just_producers_list<
  199. combine_producers_no_mapper_t<ArgsList>
  200. >())> {
  201. };
  202. template <typename ...Args>
  203. struct combine_producers_with_mapper
  204. : combine_producers_with_mapper_list<type_list::list<Args...>> {
  205. };
  206. template <typename ...Args>
  207. constexpr bool combine_producers_with_mapper_v
  208. = combine_producers_with_mapper<Args...>::value;
  209. template <typename ...Producers, std::size_t ...I>
  210. inline decltype(auto) combine_call(
  211. std::index_sequence<I...>,
  212. Producers &&...producers) {
  213. return combine_implementation(
  214. argument_mapper<I>::call(std::move(producers)...)...);
  215. }
  216. } // namespace details
  217. template <
  218. typename ...Args,
  219. typename = std::enable_if_t<
  220. details::combine_just_producers_v<Args...>
  221. || details::combine_producers_with_mapper_v<Args...>>>
  222. inline decltype(auto) combine(Args &&...args) {
  223. if constexpr (details::combine_just_producers_v<Args...>) {
  224. return details::combine_implementation(std::move(args)...);
  225. } else if constexpr (details::combine_producers_with_mapper_v<Args...>) {
  226. constexpr auto kProducersCount = sizeof...(Args) - 1;
  227. return details::combine_call(
  228. std::make_index_sequence<kProducersCount>(),
  229. std::forward<Args>(args)...)
  230. | map(details::argument_mapper<kProducersCount>::call(
  231. std::forward<Args>(args)...));
  232. } else {
  233. static_assert(false_(args...), "Bad combine() call.");
  234. }
  235. }
  236. namespace details {
  237. template <typename Value>
  238. struct combine_vector_state {
  239. std::vector<std::optional<Value>> accumulated;
  240. std::vector<Value> latest;
  241. int invalid = 0;
  242. int working = 0;
  243. };
  244. } // namespace details
  245. template <typename Value, typename Error, typename Generator>
  246. inline auto combine(
  247. std::vector<producer<Value, Error, Generator>> &&producers) {
  248. using state_type = details::combine_vector_state<Value>;
  249. return make_producer<std::vector<Value>, Error>([
  250. producers = std::move(producers)
  251. ](const auto &consumer) mutable {
  252. auto count = producers.size();
  253. auto state = consumer.template make_state<state_type>();
  254. state->accumulated.resize(count);
  255. state->invalid = count;
  256. state->working = count;
  257. for (auto index = 0; index != count; ++index) {
  258. auto &producer = producers[index];
  259. consumer.add_lifetime(std::move(producer).start(
  260. [consumer, state, index](Value &&value) {
  261. if (state->accumulated.empty()) {
  262. state->latest[index] = std::move(value);
  263. consumer.put_next_copy(state->latest);
  264. } else if (state->accumulated[index]) {
  265. state->accumulated[index] = std::move(value);
  266. } else {
  267. state->accumulated[index] = std::move(value);
  268. if (!--state->invalid) {
  269. state->latest.reserve(
  270. state->accumulated.size());
  271. for (auto &&value : state->accumulated) {
  272. state->latest.push_back(
  273. std::move(*value));
  274. }
  275. details::take(state->accumulated);
  276. consumer.put_next_copy(state->latest);
  277. }
  278. }
  279. }, [consumer](auto &&error) {
  280. consumer.put_error_forward(
  281. std::forward<decltype(error)>(error));
  282. }, [consumer, state] {
  283. if (!--state->working) {
  284. consumer.put_done();
  285. }
  286. }));
  287. }
  288. if (!count) {
  289. consumer.put_done();
  290. }
  291. return lifetime();
  292. });
  293. }
  294. template <
  295. typename Value,
  296. typename Error,
  297. typename Generator,
  298. typename Mapper>
  299. inline auto combine(
  300. std::vector<producer<Value, Error, Generator>> &&producers,
  301. Mapper &&mapper) {
  302. return combine(std::move(producers))
  303. | map(std::forward<Mapper>(mapper));
  304. }
  305. } // namespace rpl