| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200 |
- /*
- This file is part of Telegram Desktop,
- the official desktop application for the Telegram messaging service.
- For license and copyright information please follow this link:
- https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
- */
- #include "mtproto/mtproto_concurrent_sender.h"
- #include "mtproto/mtp_instance.h"
- #include "mtproto/mtproto_response.h"
- #include "mtproto/facade.h"
- namespace MTP {
- class ConcurrentSender::HandlerMaker final {
- public:
- static DoneHandler MakeDone(
- not_null<ConcurrentSender*> sender,
- Fn<void(FnMut<void()>)> runner);
- static FailHandler MakeFail(
- not_null<ConcurrentSender*> sender,
- Fn<void(FnMut<void()>)> runner,
- FailSkipPolicy skipPolicy);
- };
- DoneHandler ConcurrentSender::HandlerMaker::MakeDone(
- not_null<ConcurrentSender*> sender,
- Fn<void(FnMut<void()>)> runner) {
- return [
- weak = base::make_weak(sender),
- runner = std::move(runner)
- ](const Response &response) mutable {
- runner([=]() mutable {
- if (const auto strong = weak.get()) {
- strong->senderRequestDone(
- response.requestId,
- bytes::make_span(response.reply));
- }
- });
- return true;
- };
- }
- FailHandler ConcurrentSender::HandlerMaker::MakeFail(
- not_null<ConcurrentSender*> sender,
- Fn<void(FnMut<void()>)> runner,
- FailSkipPolicy skipPolicy) {
- return [
- weak = base::make_weak(sender),
- runner = std::move(runner),
- skipPolicy
- ](const Error &error, const Response &response) mutable {
- if (skipPolicy == FailSkipPolicy::Simple) {
- if (IsDefaultHandledError(error)) {
- return false;
- }
- } else if (skipPolicy == FailSkipPolicy::HandleFlood) {
- if (IsDefaultHandledError(error) && !IsFloodError(error)) {
- return false;
- }
- }
- runner([=, requestId = response.requestId]() mutable {
- if (const auto strong = weak.get()) {
- strong->senderRequestFail(requestId, error);
- }
- });
- return true;
- };
- }
- template <typename Method>
- auto ConcurrentSender::with_instance(Method &&method)
- -> std::enable_if_t<is_callable_v<Method, not_null<Instance*>>> {
- crl::on_main([
- weak = _weak,
- method = std::forward<Method>(method)
- ]() mutable {
- if (const auto instance = weak.data()) {
- std::move(method)(instance);
- }
- });
- }
- ConcurrentSender::RequestBuilder::RequestBuilder(
- not_null<ConcurrentSender*> sender,
- details::SerializedRequest &&serialized) noexcept
- : _sender(sender)
- , _serialized(std::move(serialized)) {
- }
- void ConcurrentSender::RequestBuilder::setToDC(ShiftedDcId dcId) noexcept {
- _dcId = dcId;
- }
- void ConcurrentSender::RequestBuilder::setCanWait(crl::time ms) noexcept {
- _canWait = ms;
- }
- void ConcurrentSender::RequestBuilder::setFailSkipPolicy(
- FailSkipPolicy policy) noexcept {
- _failSkipPolicy = policy;
- }
- void ConcurrentSender::RequestBuilder::setAfter(
- mtpRequestId requestId) noexcept {
- _afterRequestId = requestId;
- }
- mtpRequestId ConcurrentSender::RequestBuilder::send() {
- const auto requestId = details::GetNextRequestId();
- const auto dcId = _dcId;
- const auto msCanWait = _canWait;
- const auto afterRequestId = _afterRequestId;
- _sender->senderRequestRegister(requestId, std::move(_handlers));
- _sender->with_instance([
- =,
- request = std::move(_serialized),
- done = HandlerMaker::MakeDone(_sender, _sender->_runner),
- fail = HandlerMaker::MakeFail(
- _sender,
- _sender->_runner,
- _failSkipPolicy)
- ](not_null<Instance*> instance) mutable {
- instance->sendSerialized(
- requestId,
- std::move(request),
- ResponseHandler{ std::move(done), std::move(fail) },
- dcId,
- msCanWait,
- afterRequestId);
- });
- return requestId;
- }
- ConcurrentSender::ConcurrentSender(
- QPointer<Instance> weak,
- Fn<void(FnMut<void()>)> runner)
- : _weak(weak)
- , _runner(runner) {
- }
- ConcurrentSender::~ConcurrentSender() {
- senderRequestCancelAll();
- }
- void ConcurrentSender::senderRequestRegister(
- mtpRequestId requestId,
- Handlers &&handlers) {
- _requests.emplace(requestId, std::move(handlers));
- }
- void ConcurrentSender::senderRequestDone(
- mtpRequestId requestId,
- bytes::const_span result) {
- if (auto handlers = _requests.take(requestId)) {
- if (!handlers->done(requestId, result)) {
- handlers->fail(
- requestId,
- Error::Local(
- "RESPONSE_PARSE_FAILED",
- "ConcurrentSender::senderRequestDone"));
- }
- }
- }
- void ConcurrentSender::senderRequestFail(
- mtpRequestId requestId,
- const Error &error) {
- if (auto handlers = _requests.take(requestId)) {
- handlers->fail(requestId, error);
- }
- }
- void ConcurrentSender::senderRequestCancel(mtpRequestId requestId) {
- senderRequestDetach(requestId);
- with_instance([=](not_null<Instance*> instance) {
- instance->cancel(requestId);
- });
- }
- void ConcurrentSender::senderRequestCancelAll() {
- auto list = std::vector<mtpRequestId>(_requests.size());
- for (const auto &pair : base::take(_requests)) {
- list.push_back(pair.first);
- }
- with_instance([list = std::move(list)](not_null<Instance*> instance) {
- for (const auto requestId : list) {
- instance->cancel(requestId);
- }
- });
- }
- void ConcurrentSender::senderRequestDetach(mtpRequestId requestId) {
- _requests.erase(requestId);
- }
- } // namespace MTP
|