download_manager_mtproto.h 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. /*
  2. This file is part of Telegram Desktop,
  3. the official desktop application for the Telegram messaging service.
  4. For license and copyright information please follow this link:
  5. https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
  6. */
  7. #pragma once
  8. #include "data/data_file_origin.h"
  9. #include "base/timer.h"
  10. #include "base/weak_ptr.h"
  11. class ApiWrap;
  12. namespace MTP {
  13. class Error;
  14. } // namespace MTP
  15. namespace Storage {
  16. // Different part sizes are not supported for now :(
  17. // Because we start downloading with some part size
  18. // and then we get a CDN-redirect where we support only
  19. // fixed part size download for hash checking.
  20. constexpr auto kDownloadPartSize = 128 * 1024;
  21. class DownloadMtprotoTask;
  22. class DownloadManagerMtproto final : public base::has_weak_ptr {
  23. public:
  24. using Task = DownloadMtprotoTask;
  25. explicit DownloadManagerMtproto(not_null<ApiWrap*> api);
  26. ~DownloadManagerMtproto();
  27. [[nodiscard]] ApiWrap &api() const {
  28. return *_api;
  29. }
  30. void enqueue(not_null<Task*> task, int priority);
  31. void remove(not_null<Task*> task);
  32. void notifyTaskFinished() {
  33. _taskFinished.fire({});
  34. }
  35. [[nodiscard]] rpl::producer<> taskFinished() const {
  36. return _taskFinished.events();
  37. }
  38. int changeRequestedAmount(MTP::DcId dcId, int index, int delta);
  39. void requestSucceeded(
  40. MTP::DcId dcId,
  41. int index,
  42. int amountAtRequestStart,
  43. crl::time timeAtRequestStart);
  44. void checkSendNextAfterSuccess(MTP::DcId dcId);
  45. [[nodiscard]] int chooseSessionIndex(MTP::DcId dcId) const;
  46. void notifyNonPremiumDelay(DocumentId id) {
  47. _nonPremiumDelays.fire_copy(id);
  48. }
  49. [[nodiscard]] rpl::producer<DocumentId> nonPremiumDelays() const {
  50. return _nonPremiumDelays.events();
  51. }
  52. private:
  53. class Queue final {
  54. public:
  55. void enqueue(not_null<Task*> task, int priority);
  56. void remove(not_null<Task*> task);
  57. void resetGeneration();
  58. [[nodiscard]] bool empty() const;
  59. [[nodiscard]] Task *nextTask(bool onlyHighestPriority) const;
  60. void removeSession(int index);
  61. private:
  62. struct Enqueued {
  63. not_null<Task*> task;
  64. int priority = 0;
  65. };
  66. std::vector<Enqueued> _tasks;
  67. };
  68. struct DcSessionBalanceData {
  69. DcSessionBalanceData();
  70. int requested = 0;
  71. int successes = 0; // Since last timeout in this dc in any session.
  72. int maxWaitedAmount = 0;
  73. };
  74. struct DcBalanceData {
  75. DcBalanceData();
  76. std::vector<DcSessionBalanceData> sessions;
  77. crl::time lastSessionRemove = 0;
  78. int sessionRemoveIndex = 0;
  79. int sessionRemoveTimes = 0;
  80. int timeouts = 0; // Since all sessions had successes >= required.
  81. int totalRequested = 0;
  82. };
  83. void checkSendNext();
  84. void checkSendNext(MTP::DcId dcId, Queue &queue);
  85. bool trySendNextPart(MTP::DcId dcId, Queue &queue);
  86. void killSessionsSchedule(MTP::DcId dcId);
  87. void killSessionsCancel(MTP::DcId dcId);
  88. void killSessions();
  89. void killSessions(MTP::DcId dcId);
  90. void resetGeneration();
  91. void sessionTimedOut(MTP::DcId dcId, int index);
  92. void removeSession(MTP::DcId dcId);
  93. const not_null<ApiWrap*> _api;
  94. rpl::event_stream<> _taskFinished;
  95. rpl::event_stream<DocumentId> _nonPremiumDelays;
  96. base::flat_map<MTP::DcId, DcBalanceData> _balanceData;
  97. base::Timer _resetGenerationTimer;
  98. base::flat_map<MTP::DcId, crl::time> _killSessionsWhen;
  99. base::Timer _killSessionsTimer;
  100. base::flat_map<MTP::DcId, Queue> _queues;
  101. rpl::lifetime _lifetime;
  102. };
  103. class DownloadMtprotoTask : public base::has_weak_ptr {
  104. public:
  105. struct Location {
  106. std::variant<
  107. StorageFileLocation,
  108. WebFileLocation,
  109. GeoPointLocation,
  110. AudioAlbumThumbLocation> data;
  111. };
  112. DownloadMtprotoTask(
  113. not_null<DownloadManagerMtproto*> owner,
  114. const StorageFileLocation &location,
  115. Data::FileOrigin origin);
  116. DownloadMtprotoTask(
  117. not_null<DownloadManagerMtproto*> owner,
  118. MTP::DcId dcId,
  119. const Location &location);
  120. virtual ~DownloadMtprotoTask();
  121. [[nodiscard]] MTP::DcId dcId() const;
  122. [[nodiscard]] Data::FileOrigin fileOrigin() const;
  123. [[nodiscard]] uint64 objectId() const;
  124. [[nodiscard]] const Location &location() const;
  125. [[nodiscard]] virtual bool readyToRequest() const = 0;
  126. void loadPart(int sessionIndex);
  127. void removeSession(int sessionIndex);
  128. void refreshFileReferenceFrom(
  129. const Data::UpdatedFileReferences &updates,
  130. int requestId,
  131. const QByteArray &current);
  132. protected:
  133. [[nodiscard]] bool haveSentRequests() const;
  134. [[nodiscard]] bool haveSentRequestForOffset(int64 offset) const;
  135. void cancelAllRequests();
  136. void cancelRequestForOffset(int64 offset);
  137. void addToQueue(int priority = 0);
  138. void removeFromQueue();
  139. [[nodiscard]] ApiWrap &api() const {
  140. return _owner->api();
  141. }
  142. private:
  143. struct RequestData {
  144. int64 offset = 0;
  145. mutable int sessionIndex = 0;
  146. int requestedInSession = 0;
  147. crl::time sent = 0;
  148. inline bool operator<(const RequestData &other) const {
  149. return offset < other.offset;
  150. }
  151. };
  152. struct CdnFileHash {
  153. CdnFileHash(int limit, QByteArray hash) : limit(limit), hash(hash) {
  154. }
  155. int limit = 0;
  156. QByteArray hash;
  157. };
  158. enum class CheckCdnHashResult {
  159. NoHash,
  160. Invalid,
  161. Good,
  162. };
  163. enum class FinishRequestReason {
  164. Success,
  165. Redirect,
  166. Cancel,
  167. };
  168. // Called only if readyToRequest() == true.
  169. [[nodiscard]] virtual int64 takeNextRequestOffset() = 0;
  170. virtual bool feedPart(int64 offset, const QByteArray &bytes) = 0;
  171. virtual bool setWebFileSizeHook(int64 size);
  172. virtual void cancelOnFail() = 0;
  173. void cancelRequest(mtpRequestId requestId);
  174. void makeRequest(const RequestData &requestData);
  175. void normalPartLoaded(
  176. const MTPupload_File &result,
  177. mtpRequestId requestId);
  178. void webPartLoaded(
  179. const MTPupload_WebFile &result,
  180. mtpRequestId requestId);
  181. void cdnPartLoaded(
  182. const MTPupload_CdnFile &result,
  183. mtpRequestId requestId);
  184. void reuploadDone(
  185. const MTPVector<MTPFileHash> &result,
  186. mtpRequestId requestId);
  187. void requestMoreCdnFileHashes();
  188. void getCdnFileHashesDone(
  189. const MTPVector<MTPFileHash> &result,
  190. mtpRequestId requestId);
  191. void partLoaded(int64 offset, const QByteArray &bytes);
  192. bool partFailed(const MTP::Error &error, mtpRequestId requestId);
  193. bool normalPartFailed(
  194. QByteArray fileReference,
  195. const MTP::Error &error,
  196. mtpRequestId requestId);
  197. bool cdnPartFailed(const MTP::Error &error, mtpRequestId requestId);
  198. [[nodiscard]] mtpRequestId sendRequest(const RequestData &requestData);
  199. void placeSentRequest(
  200. mtpRequestId requestId,
  201. const RequestData &requestData);
  202. [[nodiscard]] RequestData finishSentRequest(
  203. mtpRequestId requestId,
  204. FinishRequestReason reason);
  205. void switchToCDN(
  206. const RequestData &requestData,
  207. const MTPDupload_fileCdnRedirect &redirect);
  208. void addCdnHashes(const QVector<MTPFileHash> &hashes);
  209. void changeCDNParams(
  210. const RequestData &requestData,
  211. MTP::DcId dcId,
  212. const QByteArray &token,
  213. const QByteArray &encryptionKey,
  214. const QByteArray &encryptionIV,
  215. const QVector<MTPFileHash> &hashes);
  216. [[nodiscard]] CheckCdnHashResult checkCdnFileHash(
  217. int64 offset,
  218. bytes::const_span buffer);
  219. void subscribeToNonPremiumLimit();
  220. const not_null<DownloadManagerMtproto*> _owner;
  221. const MTP::DcId _dcId = 0;
  222. // _location can be changed with an updated file_reference.
  223. Location _location;
  224. const Data::FileOrigin _origin;
  225. base::flat_map<mtpRequestId, RequestData> _sentRequests;
  226. base::flat_map<int64, mtpRequestId> _requestByOffset;
  227. MTP::DcId _cdnDcId = 0;
  228. QByteArray _cdnToken;
  229. QByteArray _cdnEncryptionKey;
  230. QByteArray _cdnEncryptionIV;
  231. base::flat_map<int64, CdnFileHash> _cdnFileHashes;
  232. base::flat_map<RequestData, QByteArray> _cdnUncheckedParts;
  233. mtpRequestId _cdnHashesRequestId = 0;
  234. rpl::lifetime _nonPremiumLimitSubscription;
  235. };
  236. } // namespace Storage