media_streaming_reader.h 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. /*
  2. This file is part of Telegram Desktop,
  3. the official desktop application for the Telegram messaging service.
  4. For license and copyright information please follow this link:
  5. https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
  6. */
  7. #pragma once
  8. #include "media/streaming/media_streaming_common.h"
  9. #include "media/streaming/media_streaming_loader.h"
  10. #include "base/bytes.h"
  11. #include "base/weak_ptr.h"
  12. #include "base/thread_safe_wrap.h"
  13. namespace Storage {
  14. class StreamedFileDownloader;
  15. } // namespace Storage
  16. namespace Storage {
  17. namespace Cache {
  18. struct Key;
  19. class Database;
  20. } // namespace Cache
  21. } // namespace Storage
  22. namespace Media {
  23. namespace Streaming {
  24. class Loader;
  25. struct LoadedPart;
  26. enum class Error;
  27. class Reader final : public base::has_weak_ptr {
  28. public:
  29. enum class FillState : uchar {
  30. Success,
  31. WaitingCache,
  32. WaitingRemote,
  33. Failed,
  34. };
  35. // Main thread.
  36. explicit Reader(
  37. std::unique_ptr<Loader> loader,
  38. Storage::Cache::Database *cache = nullptr);
  39. void setLoaderPriority(int priority);
  40. // Any thread.
  41. [[nodiscard]] int64 size() const;
  42. [[nodiscard]] bool isRemoteLoader() const;
  43. // Single thread.
  44. [[nodiscard]] FillState fill(
  45. int64 offset,
  46. bytes::span buffer,
  47. not_null<crl::semaphore*> notify);
  48. [[nodiscard]] std::optional<Error> streamingError() const;
  49. void headerDone();
  50. [[nodiscard]] int headerSize() const;
  51. [[nodiscard]] bool fullInCache() const;
  52. // Thread safe.
  53. void startSleep(not_null<crl::semaphore*> wake);
  54. void wakeFromSleep();
  55. void stopSleep();
  56. void stopStreamingAsync();
  57. void tryRemoveLoaderAsync();
  58. // Main thread.
  59. void startStreaming();
  60. void stopStreaming(bool stillActive = false);
  61. [[nodiscard]] rpl::producer<LoadedPart> partsForDownloader() const;
  62. void loadForDownloader(
  63. not_null<Storage::StreamedFileDownloader*> downloader,
  64. int64 offset);
  65. void doneForDownloader(int64 offset);
  66. void cancelForDownloader(
  67. not_null<Storage::StreamedFileDownloader*> downloader);
  68. void continueDownloaderFromMainThread();
  69. [[nodiscard]] rpl::producer<SpeedEstimate> speedEstimate() const;
  70. ~Reader();
  71. private:
  72. static constexpr auto kLoadFromRemoteMax = 8;
  73. struct CacheHelper;
  74. // FileSize: Right now any file size fits 32 bit.
  75. using PartsMap = base::flat_map<uint32, QByteArray>;
  76. template <int Size>
  77. class StackIntVector {
  78. public:
  79. bool add(uint32 value);
  80. auto values() const;
  81. private:
  82. std::array<uint32, Size> _storage = { uint32(-1) };
  83. };
  84. struct SerializedSlice {
  85. int number = -1;
  86. QByteArray data;
  87. };
  88. struct FillResult {
  89. static constexpr auto kReadFromCacheMax = 2;
  90. StackIntVector<kReadFromCacheMax> sliceNumbersFromCache;
  91. StackIntVector<kLoadFromRemoteMax> offsetsFromLoader;
  92. SerializedSlice toCache;
  93. FillState state = FillState::WaitingRemote;
  94. };
  95. struct Slice {
  96. enum class Flag : uchar {
  97. LoadingFromCache = 0x01,
  98. LoadedFromCache = 0x02,
  99. ChangedSinceCache = 0x04,
  100. FullInCache = 0x08,
  101. };
  102. friend constexpr inline bool is_flag_type(Flag) { return true; }
  103. using Flags = base::flags<Flag>;
  104. struct PrepareFillResult {
  105. StackIntVector<kLoadFromRemoteMax> offsetsFromLoader;
  106. PartsMap::const_iterator start;
  107. PartsMap::const_iterator finish;
  108. bool ready = true;
  109. };
  110. void processCacheData(PartsMap &&data);
  111. void addPart(uint32 offset, QByteArray bytes);
  112. PrepareFillResult prepareFill(uint32 from, uint32 till);
  113. // Get up to kLoadFromRemoteMax not loaded parts in from-till range.
  114. StackIntVector<kLoadFromRemoteMax> offsetsFromLoader(
  115. uint32 from,
  116. uint32 till) const;
  117. PartsMap parts;
  118. Flags flags;
  119. };
  120. class Slices {
  121. public:
  122. Slices(uint32 size, bool useCache);
  123. void headerDone(bool fromCache);
  124. [[nodiscard]] int headerSize() const;
  125. [[nodiscard]] bool fullInCache() const;
  126. [[nodiscard]] bool headerWontBeFilled() const;
  127. [[nodiscard]] bool headerModeUnknown() const;
  128. [[nodiscard]] bool isFullInHeader() const;
  129. [[nodiscard]] bool isGoodHeader() const;
  130. [[nodiscard]] bool waitingForHeaderCache() const;
  131. [[nodiscard]] int requestSliceSizesCount() const;
  132. void processCacheResult(int sliceNumber, PartsMap &&result);
  133. void processCachedSizes(const std::vector<int> &sizes);
  134. void processPart(uint32 offset, QByteArray &&bytes);
  135. [[nodiscard]] FillResult fill(uint32 offset, bytes::span buffer);
  136. [[nodiscard]] SerializedSlice unloadToCache();
  137. [[nodiscard]] QByteArray partForDownloader(uint32 offset) const;
  138. [[nodiscard]] bool readCacheForDownloaderRequired(uint32 offset);
  139. private:
  140. enum class HeaderMode {
  141. Unknown,
  142. Small,
  143. Good,
  144. Full,
  145. NoCache,
  146. };
  147. void applyHeaderCacheData();
  148. [[nodiscard]] int maxSliceSize(int sliceNumber) const;
  149. [[nodiscard]] SerializedSlice serializeAndUnloadSlice(
  150. int sliceNumber);
  151. [[nodiscard]] SerializedSlice serializeAndUnloadUnused();
  152. [[nodiscard]] QByteArray serializeComplexSlice(
  153. const Slice &slice) const;
  154. [[nodiscard]] QByteArray serializeAndUnloadFirstSliceNoHeader();
  155. void markSliceUsed(int sliceIndex);
  156. [[nodiscard]] bool computeIsGoodHeader() const;
  157. [[nodiscard]] FillResult fillFromHeader(
  158. uint32 offset,
  159. bytes::span buffer);
  160. void unloadSlice(Slice &slice) const;
  161. void checkSliceFullLoaded(int sliceNumber);
  162. [[nodiscard]] bool checkFullInCache() const;
  163. std::vector<Slice> _data;
  164. Slice _header;
  165. std::deque<int> _usedSlices;
  166. uint32 _size = 0;
  167. HeaderMode _headerMode = HeaderMode::Unknown;
  168. bool _fullInCache = false;
  169. };
  170. // 0 is for headerData, slice index = sliceNumber - 1.
  171. // returns false if asked for a known-empty downloader slice cache.
  172. void readFromCache(int sliceNumber);
  173. [[nodiscard]] bool readFromCacheForDownloader(int sliceNumber);
  174. bool processCacheResults();
  175. void putToCache(SerializedSlice &&data);
  176. void cancelLoadInRange(uint32 from, uint32 till);
  177. void loadAtOffset(uint32 offset);
  178. void checkLoadWillBeFirst(uint32 offset);
  179. bool processLoadedParts();
  180. bool checkForSomethingMoreReceived();
  181. FillState fillFromSlices(uint32 offset, bytes::span buffer);
  182. void finalizeCache();
  183. void processDownloaderRequests();
  184. void checkCacheResultsForDownloader();
  185. void pruneDownloaderCache(uint32 minimalOffset);
  186. void pruneDoneDownloaderRequests();
  187. void sendDownloaderRequests();
  188. [[nodiscard]] bool downloaderWaitForCachedSlice(uint32 offset);
  189. void enqueueDownloaderOffsets();
  190. void checkForDownloaderChange(int checkItemsCount);
  191. void checkForDownloaderReadyOffsets();
  192. void refreshLoaderPriority();
  193. static std::shared_ptr<CacheHelper> InitCacheHelper(
  194. Storage::Cache::Key baseKey);
  195. const std::unique_ptr<Loader> _loader;
  196. Storage::Cache::Database * const _cache = nullptr;
  197. // shared_ptr is used to be able to have weak_ptr.
  198. const std::shared_ptr<CacheHelper> _cacheHelper;
  199. base::thread_safe_queue<LoadedPart, std::vector> _loadedParts;
  200. std::atomic<crl::semaphore*> _waiting = nullptr;
  201. std::atomic<crl::semaphore*> _sleeping = nullptr;
  202. std::atomic<bool> _stopStreamingAsync = false;
  203. PriorityQueue _loadingOffsets;
  204. Slices _slices;
  205. // Even if streaming had failed, the Reader can work for the downloader.
  206. std::optional<Error> _streamingError;
  207. // In case streaming is active both main and streaming threads have work.
  208. // In case only downloader is active, all work is done on main thread.
  209. // Main thread.
  210. Storage::StreamedFileDownloader *_attachedDownloader = nullptr;
  211. rpl::event_stream<LoadedPart> _partsForDownloader;
  212. int _realPriority = 1;
  213. bool _streamingActive = false;
  214. // Streaming thread.
  215. std::deque<uint32> _offsetsForDownloader;
  216. base::flat_set<uint32> _downloaderOffsetsRequested;
  217. base::flat_map<uint32, std::optional<PartsMap>> _downloaderReadCache;
  218. // Communication from main thread to streaming thread.
  219. // Streaming thread to main thread communicates using crl::on_main.
  220. base::thread_safe_queue<uint32> _downloaderOffsetRequests;
  221. base::thread_safe_queue<uint32> _downloaderOffsetAcks;
  222. rpl::lifetime _lifetime;
  223. };
  224. [[nodiscard]] QByteArray SerializeComplexPartsMap(
  225. const base::flat_map<uint32, QByteArray> &parts);
  226. } // namespace Streaming
  227. } // namespace Media