mtproto_concurrent_sender.cpp 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. /*
  2. This file is part of Telegram Desktop,
  3. the official desktop application for the Telegram messaging service.
  4. For license and copyright information please follow this link:
  5. https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
  6. */
  7. #include "mtproto/mtproto_concurrent_sender.h"
  8. #include "mtproto/mtp_instance.h"
  9. #include "mtproto/mtproto_response.h"
  10. #include "mtproto/facade.h"
  11. namespace MTP {
  12. class ConcurrentSender::HandlerMaker final {
  13. public:
  14. static DoneHandler MakeDone(
  15. not_null<ConcurrentSender*> sender,
  16. Fn<void(FnMut<void()>)> runner);
  17. static FailHandler MakeFail(
  18. not_null<ConcurrentSender*> sender,
  19. Fn<void(FnMut<void()>)> runner,
  20. FailSkipPolicy skipPolicy);
  21. };
  22. DoneHandler ConcurrentSender::HandlerMaker::MakeDone(
  23. not_null<ConcurrentSender*> sender,
  24. Fn<void(FnMut<void()>)> runner) {
  25. return [
  26. weak = base::make_weak(sender),
  27. runner = std::move(runner)
  28. ](const Response &response) mutable {
  29. runner([=]() mutable {
  30. if (const auto strong = weak.get()) {
  31. strong->senderRequestDone(
  32. response.requestId,
  33. bytes::make_span(response.reply));
  34. }
  35. });
  36. return true;
  37. };
  38. }
  39. FailHandler ConcurrentSender::HandlerMaker::MakeFail(
  40. not_null<ConcurrentSender*> sender,
  41. Fn<void(FnMut<void()>)> runner,
  42. FailSkipPolicy skipPolicy) {
  43. return [
  44. weak = base::make_weak(sender),
  45. runner = std::move(runner),
  46. skipPolicy
  47. ](const Error &error, const Response &response) mutable {
  48. if (skipPolicy == FailSkipPolicy::Simple) {
  49. if (IsDefaultHandledError(error)) {
  50. return false;
  51. }
  52. } else if (skipPolicy == FailSkipPolicy::HandleFlood) {
  53. if (IsDefaultHandledError(error) && !IsFloodError(error)) {
  54. return false;
  55. }
  56. }
  57. runner([=, requestId = response.requestId]() mutable {
  58. if (const auto strong = weak.get()) {
  59. strong->senderRequestFail(requestId, error);
  60. }
  61. });
  62. return true;
  63. };
  64. }
  65. template <typename Method>
  66. auto ConcurrentSender::with_instance(Method &&method)
  67. -> std::enable_if_t<is_callable_v<Method, not_null<Instance*>>> {
  68. crl::on_main([
  69. weak = _weak,
  70. method = std::forward<Method>(method)
  71. ]() mutable {
  72. if (const auto instance = weak.data()) {
  73. std::move(method)(instance);
  74. }
  75. });
  76. }
  77. ConcurrentSender::RequestBuilder::RequestBuilder(
  78. not_null<ConcurrentSender*> sender,
  79. details::SerializedRequest &&serialized) noexcept
  80. : _sender(sender)
  81. , _serialized(std::move(serialized)) {
  82. }
  83. void ConcurrentSender::RequestBuilder::setToDC(ShiftedDcId dcId) noexcept {
  84. _dcId = dcId;
  85. }
  86. void ConcurrentSender::RequestBuilder::setCanWait(crl::time ms) noexcept {
  87. _canWait = ms;
  88. }
  89. void ConcurrentSender::RequestBuilder::setFailSkipPolicy(
  90. FailSkipPolicy policy) noexcept {
  91. _failSkipPolicy = policy;
  92. }
  93. void ConcurrentSender::RequestBuilder::setAfter(
  94. mtpRequestId requestId) noexcept {
  95. _afterRequestId = requestId;
  96. }
  97. mtpRequestId ConcurrentSender::RequestBuilder::send() {
  98. const auto requestId = details::GetNextRequestId();
  99. const auto dcId = _dcId;
  100. const auto msCanWait = _canWait;
  101. const auto afterRequestId = _afterRequestId;
  102. _sender->senderRequestRegister(requestId, std::move(_handlers));
  103. _sender->with_instance([
  104. =,
  105. request = std::move(_serialized),
  106. done = HandlerMaker::MakeDone(_sender, _sender->_runner),
  107. fail = HandlerMaker::MakeFail(
  108. _sender,
  109. _sender->_runner,
  110. _failSkipPolicy)
  111. ](not_null<Instance*> instance) mutable {
  112. instance->sendSerialized(
  113. requestId,
  114. std::move(request),
  115. ResponseHandler{ std::move(done), std::move(fail) },
  116. dcId,
  117. msCanWait,
  118. afterRequestId);
  119. });
  120. return requestId;
  121. }
  122. ConcurrentSender::ConcurrentSender(
  123. QPointer<Instance> weak,
  124. Fn<void(FnMut<void()>)> runner)
  125. : _weak(weak)
  126. , _runner(runner) {
  127. }
  128. ConcurrentSender::~ConcurrentSender() {
  129. senderRequestCancelAll();
  130. }
  131. void ConcurrentSender::senderRequestRegister(
  132. mtpRequestId requestId,
  133. Handlers &&handlers) {
  134. _requests.emplace(requestId, std::move(handlers));
  135. }
  136. void ConcurrentSender::senderRequestDone(
  137. mtpRequestId requestId,
  138. bytes::const_span result) {
  139. if (auto handlers = _requests.take(requestId)) {
  140. if (!handlers->done(requestId, result)) {
  141. handlers->fail(
  142. requestId,
  143. Error::Local(
  144. "RESPONSE_PARSE_FAILED",
  145. "ConcurrentSender::senderRequestDone"));
  146. }
  147. }
  148. }
  149. void ConcurrentSender::senderRequestFail(
  150. mtpRequestId requestId,
  151. const Error &error) {
  152. if (auto handlers = _requests.take(requestId)) {
  153. handlers->fail(requestId, error);
  154. }
  155. }
  156. void ConcurrentSender::senderRequestCancel(mtpRequestId requestId) {
  157. senderRequestDetach(requestId);
  158. with_instance([=](not_null<Instance*> instance) {
  159. instance->cancel(requestId);
  160. });
  161. }
  162. void ConcurrentSender::senderRequestCancelAll() {
  163. auto list = std::vector<mtpRequestId>(_requests.size());
  164. for (const auto &pair : base::take(_requests)) {
  165. list.push_back(pair.first);
  166. }
  167. with_instance([list = std::move(list)](not_null<Instance*> instance) {
  168. for (const auto requestId : list) {
  169. instance->cancel(requestId);
  170. }
  171. });
  172. }
  173. void ConcurrentSender::senderRequestDetach(mtpRequestId requestId) {
  174. _requests.erase(requestId);
  175. }
  176. } // namespace MTP