| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348 |
- // This file is part of Desktop App Toolkit,
- // a set of libraries for developing nice desktop applications.
- //
- // For license and copyright information please follow this link:
- // https://github.com/desktop-app/legal/blob/master/LEGAL
- //
- #pragma once
- #include "base/optional.h"
- #include "base/variant.h"
- #include <rpl/map.h>
- #include <rpl/producer.h>
- #include <rpl/details/type_list.h>
- #include <rpl/details/callable.h>
- #include <rpl/mappers.h>
- #include <rpl/complete.h>
- namespace rpl {
- namespace details {
- template <typename ...Values>
- struct combine_state {
- combine_state() : accumulated(std::tuple<std::optional<Values>...>()) {
- }
- std::optional<std::tuple<std::optional<Values>...>> accumulated;
- std::optional<std::tuple<Values...>> latest;
- int invalid = sizeof...(Values);
- int working = sizeof...(Values);
- };
- template <typename ...Values, std::size_t ...I>
- inline std::tuple<Values...> combine_make_first(
- std::tuple<std::optional<Values>...> &&accumulated,
- std::index_sequence<I...>) {
- return std::make_tuple(std::move(*std::get<I>(accumulated))...);
- }
- template <size_t Index, typename consumer_type, typename ...Values>
- class combine_subscribe_one {
- public:
- combine_subscribe_one(
- const consumer_type &consumer,
- combine_state<Values...> *state)
- : _consumer(consumer)
- , _state(state) {
- }
- template <typename Value, typename Error, typename Generator>
- void subscribe(producer<Value, Error, Generator> &&producer) {
- _consumer.add_lifetime(std::move(producer).start(
- [consumer = _consumer, state = _state](Value &&value) {
- if (!state->accumulated) {
- std::get<Index>(*state->latest) = std::move(value);
- consumer.put_next_copy(*state->latest);
- } else {
- auto &accumulated = std::get<Index>(
- *state->accumulated);
- if (accumulated) {
- accumulated = std::move(value);
- } else {
- accumulated = std::move(value);
- if (!--state->invalid) {
- constexpr auto kArity = sizeof...(Values);
- state->latest = combine_make_first(
- std::move(*state->accumulated),
- std::make_index_sequence<kArity>());
- state->accumulated = std::nullopt;
- consumer.put_next_copy(*state->latest);
- }
- }
- }
- }, [consumer = _consumer](auto &&error) {
- consumer.put_error_forward(
- std::forward<decltype(error)>(error));
- }, [consumer = _consumer, state = _state] {
- if (!--state->working) {
- consumer.put_done();
- }
- }));
- }
- private:
- const consumer_type &_consumer;
- combine_state<Values...> *_state = nullptr;
- };
- template <
- typename consumer_type,
- typename ...Values,
- typename ...Errors,
- typename ...Generators,
- std::size_t ...I>
- inline void combine_subscribe(
- const consumer_type &consumer,
- combine_state<Values...> *state,
- std::index_sequence<I...>,
- std::tuple<producer<Values, Errors, Generators>...> &&saved) {
- auto consume = { (
- combine_subscribe_one<I, consumer_type, Values...>(
- consumer,
- state
- ).subscribe(std::get<I>(std::move(saved))), 0)... };
- (void)consume;
- }
- template <typename ...Producers>
- class combine_implementation_helper;
- template <typename ...Producers>
- combine_implementation_helper<std::decay_t<Producers>...>
- make_combine_implementation_helper(Producers &&...producers) {
- return combine_implementation_helper<std::decay_t<Producers>...>(
- std::forward<Producers>(producers)...);
- }
- template <
- typename ...Values,
- typename ...Errors,
- typename ...Generators>
- class combine_implementation_helper<producer<Values, Errors, Generators>...> {
- public:
- using CombinedValue = std::tuple<Values...>;
- using CombinedError = v::normalized_variant_t<Errors...>;
- combine_implementation_helper(
- producer<Values, Errors, Generators> &&...producers)
- : _saved(std::make_tuple(std::move(producers)...)) {
- }
- template <typename Handlers>
- lifetime operator()(const consumer<CombinedValue, CombinedError, Handlers> &consumer) {
- auto state = consumer.template make_state<
- combine_state<Values...>>();
- constexpr auto kArity = sizeof...(Values);
- combine_subscribe(
- consumer,
- state,
- std::make_index_sequence<kArity>(),
- std::move(_saved));
- return lifetime();
- }
- private:
- std::tuple<producer<Values, Errors, Generators>...> _saved;
- };
- template <
- typename ...Values,
- typename ...Errors,
- typename ...Generators>
- inline auto combine_implementation(
- producer<Values, Errors, Generators> &&...producers) {
- using CombinedValue = std::tuple<Values...>;
- using CombinedError = v::normalized_variant_t<Errors...>;
- return make_producer<CombinedValue, CombinedError>(
- make_combine_implementation_helper(std::move(producers)...));
- }
- template <typename ...Args>
- struct combine_just_producers : std::false_type {
- };
- template <typename ...Args>
- constexpr bool combine_just_producers_v
- = combine_just_producers<Args...>::value;
- template <
- typename ...Values,
- typename ...Errors,
- typename ...Generators>
- struct combine_just_producers<
- producer<Values, Errors, Generators>...>
- : std::true_type {
- };
- template <typename ArgsList>
- struct combine_just_producers_list
- : type_list::extract_to_t<ArgsList, combine_just_producers> {
- };
- template <typename ...Args>
- struct combine_result_type;
- template <typename ...Args>
- using combine_result_type_t
- = typename combine_result_type<Args...>::type;
- template <
- typename ...Values,
- typename ...Errors,
- typename ...Generators>
- struct combine_result_type<producer<Values, Errors, Generators>...> {
- using type = std::tuple<Values...>;
- };
- template <typename ArgsList>
- struct combine_result_type_list
- : type_list::extract_to_t<ArgsList, combine_result_type> {
- };
- template <typename ArgsList>
- using combine_result_type_list_t
- = typename combine_result_type_list<ArgsList>::type;
- template <typename ArgsList>
- using combine_producers_no_mapper_t
- = type_list::chop_last_t<ArgsList>;
- template <typename ArgsList>
- constexpr bool combine_is_good_mapper(std::true_type) {
- return is_callable_v<
- type_list::last_t<ArgsList>,
- combine_result_type_list_t<
- combine_producers_no_mapper_t<ArgsList>
- >>;
- }
- template <typename ArgsList>
- constexpr bool combine_is_good_mapper(std::false_type) {
- return false;
- }
- template <typename ArgsList>
- struct combine_producers_with_mapper_list : std::bool_constant<
- combine_is_good_mapper<ArgsList>(
- combine_just_producers_list<
- combine_producers_no_mapper_t<ArgsList>
- >())> {
- };
- template <typename ...Args>
- struct combine_producers_with_mapper
- : combine_producers_with_mapper_list<type_list::list<Args...>> {
- };
- template <typename ...Args>
- constexpr bool combine_producers_with_mapper_v
- = combine_producers_with_mapper<Args...>::value;
- template <typename ...Producers, std::size_t ...I>
- inline decltype(auto) combine_call(
- std::index_sequence<I...>,
- Producers &&...producers) {
- return combine_implementation(
- argument_mapper<I>::call(std::move(producers)...)...);
- }
- } // namespace details
- template <
- typename ...Args,
- typename = std::enable_if_t<
- details::combine_just_producers_v<Args...>
- || details::combine_producers_with_mapper_v<Args...>>>
- inline decltype(auto) combine(Args &&...args) {
- if constexpr (details::combine_just_producers_v<Args...>) {
- return details::combine_implementation(std::move(args)...);
- } else if constexpr (details::combine_producers_with_mapper_v<Args...>) {
- constexpr auto kProducersCount = sizeof...(Args) - 1;
- return details::combine_call(
- std::make_index_sequence<kProducersCount>(),
- std::forward<Args>(args)...)
- | map(details::argument_mapper<kProducersCount>::call(
- std::forward<Args>(args)...));
- } else {
- static_assert(false_(args...), "Bad combine() call.");
- }
- }
- namespace details {
- template <typename Value>
- struct combine_vector_state {
- std::vector<std::optional<Value>> accumulated;
- std::vector<Value> latest;
- int invalid = 0;
- int working = 0;
- };
- } // namespace details
- template <typename Value, typename Error, typename Generator>
- inline auto combine(
- std::vector<producer<Value, Error, Generator>> &&producers) {
- using state_type = details::combine_vector_state<Value>;
- return make_producer<std::vector<Value>, Error>([
- producers = std::move(producers)
- ](const auto &consumer) mutable {
- auto count = producers.size();
- auto state = consumer.template make_state<state_type>();
- state->accumulated.resize(count);
- state->invalid = count;
- state->working = count;
- for (auto index = 0; index != count; ++index) {
- auto &producer = producers[index];
- consumer.add_lifetime(std::move(producer).start(
- [consumer, state, index](Value &&value) {
- if (state->accumulated.empty()) {
- state->latest[index] = std::move(value);
- consumer.put_next_copy(state->latest);
- } else if (state->accumulated[index]) {
- state->accumulated[index] = std::move(value);
- } else {
- state->accumulated[index] = std::move(value);
- if (!--state->invalid) {
- state->latest.reserve(
- state->accumulated.size());
- for (auto &&value : state->accumulated) {
- state->latest.push_back(
- std::move(*value));
- }
- details::take(state->accumulated);
- consumer.put_next_copy(state->latest);
- }
- }
- }, [consumer](auto &&error) {
- consumer.put_error_forward(
- std::forward<decltype(error)>(error));
- }, [consumer, state] {
- if (!--state->working) {
- consumer.put_done();
- }
- }));
- }
- if (!count) {
- consumer.put_done();
- }
- return lifetime();
- });
- }
- template <
- typename Value,
- typename Error,
- typename Generator,
- typename Mapper>
- inline auto combine(
- std::vector<producer<Value, Error, Generator>> &&producers,
- Mapper &&mapper) {
- return combine(std::move(producers))
- | map(std::forward<Mapper>(mapper));
- }
- } // namespace rpl
|