media_streaming_reader.cpp 38 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435
  1. /*
  2. This file is part of Telegram Desktop,
  3. the official desktop application for the Telegram messaging service.
  4. For license and copyright information please follow this link:
  5. https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
  6. */
  7. #include "media/streaming/media_streaming_reader.h"
  8. #include "media/streaming/media_streaming_common.h"
  9. #include "media/streaming/media_streaming_loader.h"
  10. #include "storage/cache/storage_cache_database.h"
  11. namespace Media {
  12. namespace Streaming {
  13. namespace {
  14. constexpr auto kPartSize = Loader::kPartSize;
  15. constexpr auto kPartsInSlice = 64;
  16. constexpr auto kInSlice = uint32(kPartsInSlice * kPartSize);
  17. constexpr auto kMaxPartsInHeader = 64;
  18. constexpr auto kMaxOnlyInHeader = 80 * kPartSize;
  19. constexpr auto kPartsOutsideFirstSliceGood = 8;
  20. constexpr auto kSlicesInMemory = 2;
  21. // 1 MB of parts are requested from cloud ahead of reading demand.
  22. constexpr auto kPreloadPartsAhead = 8;
  23. constexpr auto kDownloaderRequestsLimit = 4;
  24. using PartsMap = base::flat_map<uint32, QByteArray>;
  25. struct ParsedCacheEntry {
  26. PartsMap parts;
  27. std::optional<PartsMap> included;
  28. };
  29. bool IsContiguousSerialization(int serializedSize, int maxSliceSize) {
  30. return !(serializedSize % kPartSize) || (serializedSize == maxSliceSize);
  31. }
  32. bool IsFullInHeader(int64 size) {
  33. return (size <= kMaxOnlyInHeader);
  34. }
  35. bool ComputeIsGoodHeader(int64 size, const PartsMap &header) {
  36. if (IsFullInHeader(size)) {
  37. return false;
  38. }
  39. const auto outsideFirstSliceIt = ranges::lower_bound(
  40. header,
  41. kInSlice,
  42. ranges::less(),
  43. &PartsMap::value_type::first);
  44. const auto outsideFirstSlice = end(header) - outsideFirstSliceIt;
  45. return (outsideFirstSlice <= kPartsOutsideFirstSliceGood);
  46. }
  47. int SlicesCount(uint32 size) {
  48. const auto result = (size + kInSlice - 1) / kInSlice;
  49. Ensures(result < 0x1FFU);
  50. return result;
  51. }
  52. int MaxSliceSize(int sliceNumber, uint32 size) {
  53. return !sliceNumber
  54. ? size
  55. : (sliceNumber == SlicesCount(size))
  56. ? (size - (sliceNumber - 1) * kInSlice)
  57. : kInSlice;
  58. }
  59. bytes::const_span ParseComplexCachedMap(
  60. PartsMap &result,
  61. bytes::const_span data,
  62. int maxSize) {
  63. const auto takeInt = [&]() -> std::optional<uint32> {
  64. if (data.size() < sizeof(uint32)) {
  65. return std::nullopt;
  66. }
  67. const auto bytes = data.data();
  68. const auto result = *reinterpret_cast<const uint32*>(bytes);
  69. data = data.subspan(sizeof(uint32));
  70. return result;
  71. };
  72. const auto takeBytes = [&](int count) {
  73. if (count <= 0 || data.size() < count) {
  74. return bytes::const_span();
  75. }
  76. const auto result = data.subspan(0, count);
  77. data = data.subspan(count);
  78. return result;
  79. };
  80. const auto maybeCount = takeInt();
  81. if (!maybeCount) {
  82. return {};
  83. }
  84. const auto count = *maybeCount;
  85. if (!count || count > (kMaxOnlyInHeader / kPartSize)) {
  86. return data;
  87. }
  88. for (auto i = 0; i != count; ++i) {
  89. const auto offset = takeInt().value_or(0);
  90. const auto size = takeInt().value_or(0);
  91. const auto bytes = takeBytes(size);
  92. if (offset >= maxSize
  93. || !size
  94. || size > maxSize
  95. || offset + size > maxSize
  96. || bytes.size() != size) {
  97. return {};
  98. }
  99. result.try_emplace(
  100. offset,
  101. reinterpret_cast<const char*>(bytes.data()),
  102. bytes.size());
  103. }
  104. return data;
  105. }
  106. bytes::const_span ParseCachedMap(
  107. PartsMap &result,
  108. bytes::const_span data,
  109. int maxSize) {
  110. const auto size = int(data.size());
  111. if (IsContiguousSerialization(size, maxSize)) {
  112. if (size > maxSize) {
  113. return {};
  114. }
  115. for (auto offset = int64(); offset < size; offset += kPartSize) {
  116. const auto part = data.subspan(
  117. offset,
  118. std::min(kPartSize, size - offset));
  119. result.try_emplace(
  120. uint32(offset),
  121. reinterpret_cast<const char*>(part.data()),
  122. part.size());
  123. }
  124. return {};
  125. }
  126. return ParseComplexCachedMap(result, data, maxSize);
  127. }
  128. ParsedCacheEntry ParseCacheEntry(
  129. bytes::const_span data,
  130. int sliceNumber,
  131. int64 size) {
  132. auto result = ParsedCacheEntry();
  133. const auto remaining = ParseCachedMap(
  134. result.parts,
  135. data,
  136. MaxSliceSize(sliceNumber, size));
  137. if (!sliceNumber && ComputeIsGoodHeader(size, result.parts)) {
  138. result.included = PartsMap();
  139. ParseCachedMap(*result.included, remaining, MaxSliceSize(1, size));
  140. }
  141. return result;
  142. }
  143. template <typename Range> // Range::value_type is Pair<int, QByteArray>
  144. uint32 FindNotLoadedStart(Range &&parts, uint32 offset) {
  145. auto result = offset;
  146. for (const auto &part : parts) {
  147. const auto partStart = part.first;
  148. const auto partEnd = partStart + part.second.size();
  149. if (partStart <= result && partEnd >= result) {
  150. result = partEnd;
  151. } else {
  152. break;
  153. }
  154. }
  155. return result;
  156. }
  157. template <typename Range> // Range::value_type is Pair<uint32, QByteArray>
  158. void CopyLoaded(
  159. bytes::span buffer,
  160. Range &&parts,
  161. uint32 offset,
  162. uint32 till) {
  163. auto filled = offset;
  164. for (const auto &part : parts) {
  165. const auto bytes = bytes::make_span(part.second);
  166. const auto partStart = part.first;
  167. const auto partEnd = uint32(partStart + bytes.size());
  168. const auto copyTill = std::min(partEnd, till);
  169. Assert(partStart <= filled && filled < copyTill);
  170. const auto from = filled - partStart;
  171. const auto copy = copyTill - filled;
  172. bytes::copy(buffer, bytes.subspan(from, copy));
  173. buffer = buffer.subspan(copy);
  174. filled += copy;
  175. }
  176. }
  177. } // namespace
  178. template <int Size>
  179. bool Reader::StackIntVector<Size>::add(uint32 value) {
  180. using namespace rpl::mappers;
  181. const auto i = ranges::find_if(_storage, _1 == uint32(-1));
  182. if (i == end(_storage)) {
  183. return false;
  184. }
  185. *i = value;
  186. const auto next = i + 1;
  187. if (next != end(_storage)) {
  188. *next = -1;
  189. }
  190. return true;
  191. }
  192. template <int Size>
  193. auto Reader::StackIntVector<Size>::values() const {
  194. using namespace rpl::mappers;
  195. return ranges::views::all(
  196. _storage
  197. ) | ranges::views::take_while(_1 != uint32(-1));
  198. }
  199. struct Reader::CacheHelper {
  200. explicit CacheHelper(Storage::Cache::Key baseKey);
  201. Storage::Cache::Key key(int sliceNumber) const;
  202. const Storage::Cache::Key baseKey;
  203. QMutex mutex;
  204. base::flat_map<uint32, PartsMap> results;
  205. std::vector<int> sizes;
  206. std::atomic<crl::semaphore*> waiting = nullptr;
  207. };
  208. Reader::CacheHelper::CacheHelper(Storage::Cache::Key baseKey)
  209. : baseKey(baseKey) {
  210. }
  211. Storage::Cache::Key Reader::CacheHelper::key(int sliceNumber) const {
  212. return Storage::Cache::Key{ baseKey.high, baseKey.low + sliceNumber };
  213. }
  214. void Reader::Slice::processCacheData(PartsMap &&data) {
  215. Expects((flags & Flag::LoadingFromCache) != 0);
  216. Expects(!(flags & Flag::LoadedFromCache));
  217. const auto guard = gsl::finally([&] {
  218. flags |= Flag::LoadedFromCache;
  219. flags &= ~Flag::LoadingFromCache;
  220. });
  221. if (parts.empty()) {
  222. parts = std::move(data);
  223. } else {
  224. for (auto &[offset, bytes] : data) {
  225. parts.emplace(offset, std::move(bytes));
  226. }
  227. }
  228. }
  229. void Reader::Slice::addPart(uint32 offset, QByteArray bytes) {
  230. Expects(!parts.contains(offset));
  231. parts.emplace(offset, std::move(bytes));
  232. if (flags & Flag::LoadedFromCache) {
  233. flags |= Flag::ChangedSinceCache;
  234. }
  235. }
  236. auto Reader::Slice::prepareFill(
  237. uint32 from,
  238. uint32 till) -> PrepareFillResult {
  239. auto result = PrepareFillResult();
  240. result.ready = false;
  241. const auto fromOffset = (from / kPartSize) * kPartSize;
  242. const auto tillPart = (till + kPartSize - 1) / kPartSize;
  243. const auto preloadTillOffset = (tillPart + kPreloadPartsAhead)
  244. * kPartSize;
  245. const auto after = ranges::upper_bound(
  246. parts,
  247. from,
  248. ranges::less(),
  249. &PartsMap::value_type::first);
  250. if (after == begin(parts)) {
  251. result.offsetsFromLoader = offsetsFromLoader(
  252. fromOffset,
  253. preloadTillOffset);
  254. return result;
  255. }
  256. const auto start = after - 1;
  257. const auto finish = ranges::lower_bound(
  258. start,
  259. end(parts),
  260. till,
  261. ranges::less(),
  262. &PartsMap::value_type::first);
  263. const auto haveTill = FindNotLoadedStart(
  264. ranges::make_subrange(start, finish),
  265. fromOffset);
  266. if (haveTill < till) {
  267. result.offsetsFromLoader = offsetsFromLoader(
  268. haveTill,
  269. preloadTillOffset);
  270. return result;
  271. }
  272. result.ready = true;
  273. result.start = start;
  274. result.finish = finish;
  275. result.offsetsFromLoader = offsetsFromLoader(
  276. tillPart * kPartSize,
  277. preloadTillOffset);
  278. return result;
  279. }
  280. auto Reader::Slice::offsetsFromLoader(uint32 from, uint32 till) const
  281. -> StackIntVector<Reader::kLoadFromRemoteMax> {
  282. auto result = StackIntVector<kLoadFromRemoteMax>();
  283. const auto after = ranges::upper_bound(
  284. parts,
  285. from,
  286. ranges::less(),
  287. &PartsMap::value_type::first);
  288. auto check = (after == begin(parts)) ? after : (after - 1);
  289. const auto end = parts.end();
  290. for (auto offset = from; offset != till; offset += kPartSize) {
  291. while (check != end && check->first < offset) {
  292. ++check;
  293. }
  294. if (check != end && check->first == offset) {
  295. continue;
  296. } else if (!result.add(offset)) {
  297. break;
  298. }
  299. }
  300. return result;
  301. }
  302. Reader::Slices::Slices(uint32 size, bool useCache)
  303. : _size(size) {
  304. Expects(size > 0);
  305. if (useCache) {
  306. _header.flags |= Slice::Flag::LoadingFromCache;
  307. } else {
  308. _headerMode = HeaderMode::NoCache;
  309. }
  310. if (!isFullInHeader()) {
  311. _data.resize(SlicesCount(_size));
  312. }
  313. }
  314. bool Reader::Slices::headerModeUnknown() const {
  315. return (_headerMode == HeaderMode::Unknown);
  316. }
  317. bool Reader::Slices::isFullInHeader() const {
  318. return IsFullInHeader(_size);
  319. }
  320. bool Reader::Slices::isGoodHeader() const {
  321. return (_headerMode == HeaderMode::Good);
  322. }
  323. bool Reader::Slices::computeIsGoodHeader() const {
  324. return ComputeIsGoodHeader(_size, _header.parts);
  325. }
  326. void Reader::Slices::headerDone(bool fromCache) {
  327. if (_headerMode != HeaderMode::Unknown) {
  328. return;
  329. }
  330. _headerMode = isFullInHeader()
  331. ? HeaderMode::Full
  332. : computeIsGoodHeader()
  333. ? HeaderMode::Good
  334. : HeaderMode::Small;
  335. if (!fromCache) {
  336. for (auto &slice : _data) {
  337. using Flag = Slice::Flag;
  338. Assert(!(slice.flags
  339. & (Flag::LoadingFromCache | Flag::LoadedFromCache)));
  340. slice.flags |= Slice::Flag::LoadedFromCache;
  341. }
  342. }
  343. }
  344. int Reader::Slices::headerSize() const {
  345. return _header.parts.size() * kPartSize;
  346. }
  347. bool Reader::Slices::fullInCache() const {
  348. return _fullInCache;
  349. }
  350. int Reader::Slices::requestSliceSizesCount() const {
  351. if (!headerModeUnknown() || isFullInHeader()) {
  352. return 0;
  353. }
  354. return _data.size();
  355. }
  356. bool Reader::Slices::headerWontBeFilled() const {
  357. return headerModeUnknown()
  358. && (_header.parts.size() >= kMaxPartsInHeader);
  359. }
  360. void Reader::Slices::applyHeaderCacheData() {
  361. using namespace rpl::mappers;
  362. const auto applyWhile = [&](auto &&predicate) {
  363. for (const auto &[offset, part] : _header.parts) {
  364. const auto index = int(offset / kInSlice);
  365. if (!predicate(index)) {
  366. break;
  367. }
  368. _data[index].addPart(
  369. offset - index * kInSlice,
  370. base::duplicate(part));
  371. }
  372. };
  373. if (_header.parts.empty()) {
  374. return;
  375. } else if (_headerMode == HeaderMode::Good) {
  376. // Always apply data to first block if it is cached in the header.
  377. applyWhile(_1 == 0);
  378. } else if (_headerMode != HeaderMode::Unknown) {
  379. return;
  380. } else if (isFullInHeader()) {
  381. headerDone(true);
  382. } else {
  383. applyWhile(_1 < int(_data.size()));
  384. headerDone(true);
  385. }
  386. }
  387. void Reader::Slices::processCacheResult(int sliceNumber, PartsMap &&result) {
  388. Expects(sliceNumber >= 0 && sliceNumber <= _data.size());
  389. auto &slice = (sliceNumber ? _data[sliceNumber - 1] : _header);
  390. if (!sliceNumber && isGoodHeader()) {
  391. // We've loaded header slice because really we wanted first slice.
  392. if (!(_data[0].flags & Slice::Flag::LoadingFromCache)) {
  393. // We could've already unloaded this slice using LRU _usedSlices.
  394. return;
  395. }
  396. // So just process whole result even if we didn't want header really.
  397. slice.flags |= Slice::Flag::LoadingFromCache;
  398. slice.flags &= ~Slice::Flag::LoadedFromCache;
  399. }
  400. if (!(slice.flags & Slice::Flag::LoadingFromCache)) {
  401. // We could've already unloaded this slice using LRU _usedSlices.
  402. return;
  403. }
  404. slice.processCacheData(std::move(result));
  405. checkSliceFullLoaded(sliceNumber);
  406. if (!sliceNumber) {
  407. applyHeaderCacheData();
  408. if (isGoodHeader()) {
  409. // When we first read header we don't request the first slice.
  410. // But we get it, so let's apply it anyway.
  411. _data[0].flags |= Slice::Flag::LoadingFromCache;
  412. }
  413. }
  414. }
  415. void Reader::Slices::processCachedSizes(const std::vector<int> &sizes) {
  416. Expects(sizes.size() == _data.size());
  417. using Flag = Slice::Flag;
  418. const auto count = int(sizes.size());
  419. auto loadedCount = 0;
  420. for (auto i = 0; i != count; ++i) {
  421. const auto sliceNumber = (i + 1);
  422. const auto sliceSize = (sliceNumber < _data.size())
  423. ? kInSlice
  424. : (_size - (sliceNumber - 1) * kInSlice);
  425. const auto loaded = (sizes[i] == sliceSize);
  426. if (_data[i].flags & Flag::FullInCache) {
  427. ++loadedCount;
  428. } else if (loaded) {
  429. _data[i].flags |= Flag::FullInCache;
  430. ++loadedCount;
  431. }
  432. }
  433. _fullInCache = (loadedCount == count);
  434. }
  435. void Reader::Slices::checkSliceFullLoaded(int sliceNumber) {
  436. if (!sliceNumber && !isFullInHeader()) {
  437. return;
  438. }
  439. const auto partsCount = [&] {
  440. if (!sliceNumber) {
  441. return (_size + kPartSize - 1) / kPartSize;
  442. }
  443. return (sliceNumber < _data.size())
  444. ? kPartsInSlice
  445. : ((_size - (sliceNumber - 1) * kInSlice + kPartSize - 1)
  446. / kPartSize);
  447. }();
  448. auto &slice = (sliceNumber ? _data[sliceNumber - 1] : _header);
  449. const auto loaded = (slice.parts.size() == partsCount);
  450. using Flag = Slice::Flag;
  451. if ((slice.flags & Flag::FullInCache) && !loaded) {
  452. slice.flags &= ~Flag::FullInCache;
  453. _fullInCache = false;
  454. } else if (!(slice.flags & Flag::FullInCache) && loaded) {
  455. slice.flags |= Flag::FullInCache;
  456. _fullInCache = checkFullInCache();
  457. }
  458. }
  459. bool Reader::Slices::checkFullInCache() const {
  460. using Flag = Slice::Flag;
  461. if (isFullInHeader()) {
  462. return (_header.flags & Flag::FullInCache);
  463. }
  464. return ranges::none_of(_data, [](const Slice &slice) {
  465. return !(slice.flags & Flag::FullInCache);
  466. });
  467. }
  468. void Reader::Slices::processPart(
  469. uint32 offset,
  470. QByteArray &&bytes) {
  471. Expects(isFullInHeader() || (offset / kInSlice < _data.size()));
  472. if (isFullInHeader()) {
  473. _header.addPart(offset, bytes);
  474. checkSliceFullLoaded(0);
  475. return;
  476. //} else if (_headerMode == HeaderMode::Unknown) {
  477. // if (_header.parts.contains(offset)) {
  478. // return;
  479. // } else if (_header.parts.size() < kMaxPartsInHeader) {
  480. // _header.addPart(offset, bytes);
  481. // }
  482. }
  483. const auto index = offset / kInSlice;
  484. _data[index].addPart(offset - index * kInSlice, std::move(bytes));
  485. checkSliceFullLoaded(index + 1);
  486. }
  487. auto Reader::Slices::fill(uint32 offset, bytes::span buffer) -> FillResult {
  488. Expects(!buffer.empty());
  489. Expects(offset < _size);
  490. Expects(offset + buffer.size() <= _size);
  491. Expects(buffer.size() <= kInSlice);
  492. using Flag = Slice::Flag;
  493. if (_headerMode != HeaderMode::NoCache
  494. && !(_header.flags & Flag::LoadedFromCache)) {
  495. // Waiting for initial cache query.
  496. Assert(waitingForHeaderCache());
  497. return {};
  498. } else if (isFullInHeader()) {
  499. return fillFromHeader(offset, buffer);
  500. }
  501. auto result = FillResult();
  502. const auto till = uint32(offset + buffer.size());
  503. const auto fromSlice = offset / kInSlice;
  504. const auto tillSlice = (till + kInSlice - 1) / kInSlice;
  505. Assert((fromSlice + 1 == tillSlice || fromSlice + 2 == tillSlice)
  506. && tillSlice <= _data.size());
  507. const auto cacheNotLoaded = [&](int sliceIndex) {
  508. return (_headerMode != HeaderMode::NoCache)
  509. && (_headerMode != HeaderMode::Unknown)
  510. && !(_data[sliceIndex].flags & Flag::LoadedFromCache);
  511. };
  512. const auto handlePrepareResult = [&](
  513. int sliceIndex,
  514. const Slice::PrepareFillResult &prepared) {
  515. if (cacheNotLoaded(sliceIndex)) {
  516. return;
  517. }
  518. for (const auto offset : prepared.offsetsFromLoader.values()) {
  519. const auto full = offset + sliceIndex * kInSlice;
  520. if (offset < kInSlice && full < _size) {
  521. result.offsetsFromLoader.add(full);
  522. }
  523. }
  524. };
  525. const auto handleReadFromCache = [&](int sliceIndex) {
  526. if (cacheNotLoaded(sliceIndex)) {
  527. if (!(_data[sliceIndex].flags & Flag::LoadingFromCache)) {
  528. _data[sliceIndex].flags |= Flag::LoadingFromCache;
  529. result.sliceNumbersFromCache.add(sliceIndex + 1);
  530. }
  531. result.state = FillState::WaitingCache;
  532. }
  533. };
  534. const auto addToHeader = [&](int slice, auto parts) {
  535. if (_headerMode == HeaderMode::Unknown) {
  536. for (const auto &part : parts) {
  537. const auto totalOffset = slice * kInSlice + part.first;
  538. if (!_header.parts.contains(totalOffset)
  539. && _header.parts.size() < kMaxPartsInHeader) {
  540. _header.addPart(totalOffset, part.second);
  541. }
  542. }
  543. }
  544. };
  545. const auto firstFrom = offset - fromSlice * kInSlice;
  546. const auto firstTill = std::min(kInSlice, till - fromSlice * kInSlice);
  547. const auto secondFrom = 0;
  548. const auto secondTill = (till > (fromSlice + 1) * kInSlice)
  549. ? (till - (fromSlice + 1) * kInSlice)
  550. : 0;
  551. const auto first = _data[fromSlice].prepareFill(firstFrom, firstTill);
  552. const auto second = (fromSlice + 1 < tillSlice)
  553. ? _data[fromSlice + 1].prepareFill(secondFrom, secondTill)
  554. : Slice::PrepareFillResult();
  555. handlePrepareResult(fromSlice, first);
  556. if (fromSlice + 1 < tillSlice) {
  557. handlePrepareResult(fromSlice + 1, second);
  558. }
  559. if (first.ready && second.ready) {
  560. markSliceUsed(fromSlice);
  561. auto &&list = ranges::make_subrange(first.start, first.finish);
  562. CopyLoaded(buffer, list, firstFrom, firstTill);
  563. addToHeader(fromSlice, list);
  564. if (fromSlice + 1 < tillSlice) {
  565. markSliceUsed(fromSlice + 1);
  566. auto &&list = ranges::make_subrange(second.start, second.finish);
  567. CopyLoaded(
  568. buffer.subspan(firstTill - firstFrom),
  569. list,
  570. secondFrom,
  571. secondTill);
  572. addToHeader(fromSlice + 1, list);
  573. }
  574. result.toCache = serializeAndUnloadUnused();
  575. result.state = FillState::Success;
  576. } else {
  577. handleReadFromCache(fromSlice);
  578. if (fromSlice + 1 < tillSlice) {
  579. handleReadFromCache(fromSlice + 1);
  580. }
  581. }
  582. return result;
  583. }
  584. auto Reader::Slices::fillFromHeader(uint32 offset, bytes::span buffer)
  585. -> FillResult {
  586. auto result = FillResult();
  587. const auto from = offset;
  588. const auto till = uint32(offset + buffer.size());
  589. const auto prepared = _header.prepareFill(from, till);
  590. for (const auto full : prepared.offsetsFromLoader.values()) {
  591. if (full < _size) {
  592. result.offsetsFromLoader.add(full);
  593. }
  594. }
  595. if (prepared.ready) {
  596. CopyLoaded(
  597. buffer,
  598. ranges::make_subrange(prepared.start, prepared.finish),
  599. from,
  600. till);
  601. result.state = FillState::Success;
  602. }
  603. return result;
  604. }
  605. QByteArray Reader::Slices::partForDownloader(uint32 offset) const {
  606. Expects(offset < _size);
  607. if (const auto i = _header.parts.find(offset); i != end(_header.parts)) {
  608. return i->second;
  609. } else if (isFullInHeader()) {
  610. return QByteArray();
  611. }
  612. const auto index = offset / kInSlice;
  613. const auto &slice = _data[index];
  614. const auto i = slice.parts.find(offset - index * kInSlice);
  615. return (i != end(slice.parts)) ? i->second : QByteArray();
  616. }
  617. bool Reader::Slices::waitingForHeaderCache() const {
  618. return (_header.flags & Slice::Flag::LoadingFromCache);
  619. }
  620. bool Reader::Slices::readCacheForDownloaderRequired(uint32 offset) {
  621. Expects(offset < _size);
  622. Expects(!waitingForHeaderCache());
  623. if (isFullInHeader()) {
  624. return false;
  625. }
  626. const auto index = offset / kInSlice;
  627. auto &slice = _data[index];
  628. return !(slice.flags & Slice::Flag::LoadedFromCache);
  629. }
  630. void Reader::Slices::markSliceUsed(int sliceIndex) {
  631. const auto i = ranges::find(_usedSlices, sliceIndex);
  632. const auto end = _usedSlices.end();
  633. if (i == end) {
  634. _usedSlices.push_back(sliceIndex);
  635. } else {
  636. const auto next = i + 1;
  637. if (next != end) {
  638. std::rotate(i, next, end);
  639. }
  640. }
  641. }
  642. int Reader::Slices::maxSliceSize(int sliceNumber) const {
  643. return MaxSliceSize(sliceNumber, _size);
  644. }
  645. Reader::SerializedSlice Reader::Slices::serializeAndUnloadUnused() {
  646. using Flag = Slice::Flag;
  647. if (_headerMode == HeaderMode::Unknown
  648. || _usedSlices.size() <= kSlicesInMemory) {
  649. return {};
  650. }
  651. const auto purgeSlice = _usedSlices.front();
  652. _usedSlices.pop_front();
  653. if (!(_data[purgeSlice].flags & Flag::LoadedFromCache)) {
  654. // If the only data in this slice was from _header, just leave it.
  655. return {};
  656. }
  657. const auto noNeedToSaveToCache = [&] {
  658. if (_headerMode == HeaderMode::NoCache) {
  659. // Cache is not used.
  660. return true;
  661. } else if (!(_data[purgeSlice].flags & Flag::ChangedSinceCache)) {
  662. // If no data was changed we should still save first slice,
  663. // if header data was changed since loading from cache.
  664. // Otherwise in destructor we won't be able to unload header.
  665. if (!isGoodHeader()
  666. || (purgeSlice > 0)
  667. || (!(_header.flags & Flag::ChangedSinceCache))) {
  668. return true;
  669. }
  670. }
  671. return false;
  672. }();
  673. if (noNeedToSaveToCache) {
  674. unloadSlice(_data[purgeSlice]);
  675. return {};
  676. }
  677. return serializeAndUnloadSlice(purgeSlice + 1);
  678. }
  679. Reader::SerializedSlice Reader::Slices::serializeAndUnloadSlice(
  680. int sliceNumber) {
  681. Expects(_headerMode != HeaderMode::Unknown);
  682. Expects(_headerMode != HeaderMode::NoCache);
  683. Expects(sliceNumber >= 0 && sliceNumber <= _data.size());
  684. if (isGoodHeader() && (sliceNumber == 1)) {
  685. return serializeAndUnloadSlice(0);
  686. }
  687. const auto writeHeaderAndSlice = isGoodHeader() && !sliceNumber;
  688. auto &slice = sliceNumber ? _data[sliceNumber - 1] : _header;
  689. const auto count = slice.parts.size();
  690. Assert(count > 0);
  691. auto result = SerializedSlice();
  692. result.number = sliceNumber;
  693. // We always use complex serialization for header + first slice.
  694. const auto continuousTill = writeHeaderAndSlice
  695. ? 0
  696. : FindNotLoadedStart(slice.parts, 0);
  697. const auto continuous = (continuousTill > slice.parts.back().first);
  698. if (continuous) {
  699. // All data is continuous.
  700. result.data.reserve(count * kPartSize);
  701. for (const auto &[offset, part] : slice.parts) {
  702. result.data.append(part);
  703. }
  704. } else {
  705. result.data = serializeComplexSlice(slice);
  706. if (writeHeaderAndSlice) {
  707. result.data.append(serializeAndUnloadFirstSliceNoHeader());
  708. }
  709. // Make sure this data won't be taken for full continuous data.
  710. const auto maxSize = maxSliceSize(sliceNumber);
  711. while (IsContiguousSerialization(result.data.size(), maxSize)) {
  712. result.data.push_back(char(0));
  713. }
  714. }
  715. // We may serialize header in the middle of streaming, if we use
  716. // HeaderMode::Good and we unload first slice. We still require
  717. // header data to continue working, so don't really unload the header.
  718. if (sliceNumber) {
  719. unloadSlice(slice);
  720. } else {
  721. slice.flags &= ~Slice::Flag::ChangedSinceCache;
  722. }
  723. return result;
  724. }
  725. void Reader::Slices::unloadSlice(Slice &slice) const {
  726. const auto full = (slice.flags & Slice::Flag::FullInCache);
  727. slice = Slice();
  728. if (full) {
  729. slice.flags |= Slice::Flag::FullInCache;
  730. }
  731. }
  732. QByteArray Reader::Slices::serializeComplexSlice(const Slice &slice) const {
  733. return SerializeComplexPartsMap(slice.parts);
  734. }
  735. QByteArray Reader::Slices::serializeAndUnloadFirstSliceNoHeader() {
  736. Expects(_data[0].flags & Slice::Flag::LoadedFromCache);
  737. auto &slice = _data[0];
  738. for (const auto &[offset, part] : _header.parts) {
  739. slice.parts.erase(offset);
  740. }
  741. auto result = serializeComplexSlice(slice);
  742. unloadSlice(slice);
  743. return result;
  744. }
  745. Reader::SerializedSlice Reader::Slices::unloadToCache() {
  746. if (_headerMode == HeaderMode::Unknown
  747. || _headerMode == HeaderMode::NoCache) {
  748. return {};
  749. }
  750. if (_header.flags & Slice::Flag::ChangedSinceCache) {
  751. return serializeAndUnloadSlice(0);
  752. }
  753. for (auto i = 0, count = int(_data.size()); i != count; ++i) {
  754. if (_data[i].flags & Slice::Flag::ChangedSinceCache) {
  755. return serializeAndUnloadSlice(i + 1);
  756. }
  757. }
  758. return {};
  759. }
  760. Reader::Reader(
  761. std::unique_ptr<Loader> loader,
  762. Storage::Cache::Database *cache)
  763. : _loader(std::move(loader))
  764. , _cache(cache)
  765. , _cacheHelper(cache ? InitCacheHelper(_loader->baseCacheKey()) : nullptr)
  766. , _slices(_loader->size(), _cacheHelper != nullptr) {
  767. _loader->parts(
  768. ) | rpl::start_with_next([=](LoadedPart &&part) {
  769. if (_attachedDownloader) {
  770. _partsForDownloader.fire_copy(part);
  771. }
  772. if (_streamingActive) {
  773. _loadedParts.emplace(std::move(part));
  774. }
  775. if (const auto waiting = _waiting.load(std::memory_order_acquire)) {
  776. _waiting.store(nullptr, std::memory_order_release);
  777. waiting->release();
  778. }
  779. }, _lifetime);
  780. if (_cacheHelper) {
  781. readFromCache(0);
  782. }
  783. }
  784. void Reader::startSleep(not_null<crl::semaphore*> wake) {
  785. _sleeping.store(wake, std::memory_order_release);
  786. processDownloaderRequests();
  787. }
  788. void Reader::wakeFromSleep() {
  789. if (const auto sleeping = _sleeping.load(std::memory_order_acquire)) {
  790. _sleeping.store(nullptr, std::memory_order_release);
  791. sleeping->release();
  792. }
  793. }
  794. void Reader::stopSleep() {
  795. _sleeping.store(nullptr, std::memory_order_release);
  796. }
  797. void Reader::stopStreamingAsync() {
  798. _stopStreamingAsync = true;
  799. crl::on_main(this, [=] {
  800. if (_stopStreamingAsync) {
  801. stopStreaming(false);
  802. }
  803. });
  804. }
  805. void Reader::tryRemoveLoaderAsync() {
  806. _loader->tryRemoveFromQueue();
  807. }
  808. void Reader::startStreaming() {
  809. _streamingActive = true;
  810. refreshLoaderPriority();
  811. }
  812. void Reader::stopStreaming(bool stillActive) {
  813. Expects(_sleeping == nullptr);
  814. _stopStreamingAsync = false;
  815. _waiting.store(nullptr, std::memory_order_release);
  816. if (_cacheHelper && _cacheHelper->waiting != nullptr) {
  817. QMutexLocker lock(&_cacheHelper->mutex);
  818. _cacheHelper->waiting.store(nullptr, std::memory_order_release);
  819. }
  820. if (!stillActive) {
  821. _streamingActive = false;
  822. refreshLoaderPriority();
  823. _loadingOffsets.clear();
  824. processDownloaderRequests();
  825. }
  826. }
  827. rpl::producer<LoadedPart> Reader::partsForDownloader() const {
  828. return _partsForDownloader.events();
  829. }
  830. void Reader::loadForDownloader(
  831. not_null<Storage::StreamedFileDownloader*> downloader,
  832. int64 offset) {
  833. Expects(offset >= 0 && offset <= std::numeric_limits<uint32>::max());
  834. if (_attachedDownloader != downloader) {
  835. if (_attachedDownloader) {
  836. cancelForDownloader(_attachedDownloader);
  837. }
  838. _attachedDownloader = downloader;
  839. _loader->attachDownloader(downloader);
  840. }
  841. _downloaderOffsetRequests.emplace(uint32(offset));
  842. // Will be processed in continueDownloaderFromMainThread()
  843. // from StreamedFileDownloader::requestParts().
  844. }
  845. void Reader::doneForDownloader(int64 offset) {
  846. Expects(offset >= 0 && offset <= std::numeric_limits<uint32>::max());
  847. _downloaderOffsetAcks.emplace(offset);
  848. // Will be processed in continueDownloaderFromMainThread()
  849. // from StreamedFileDownloader::requestParts().
  850. }
  851. void Reader::cancelForDownloader(
  852. not_null<Storage::StreamedFileDownloader*> downloader) {
  853. if (_attachedDownloader == downloader) {
  854. _downloaderOffsetRequests.take();
  855. _attachedDownloader = nullptr;
  856. _loader->clearAttachedDownloader();
  857. }
  858. }
  859. void Reader::enqueueDownloaderOffsets() {
  860. auto offsets = _downloaderOffsetRequests.take();
  861. if (!empty(offsets)) {
  862. if (!empty(_offsetsForDownloader)) {
  863. _offsetsForDownloader.insert(
  864. end(_offsetsForDownloader),
  865. std::make_move_iterator(begin(offsets)),
  866. std::make_move_iterator(end(offsets)));
  867. checkForDownloaderChange(offsets.size() + 1);
  868. } else {
  869. _offsetsForDownloader = std::move(offsets);
  870. checkForDownloaderChange(offsets.size());
  871. }
  872. }
  873. }
  874. void Reader::checkForDownloaderChange(int checkItemsCount) {
  875. Expects(checkItemsCount <= _offsetsForDownloader.size());
  876. // If a requested offset is less-or-equal of some previously requested
  877. // offset, it means that the downloader was changed, ignore old offsets.
  878. const auto end = _offsetsForDownloader.end();
  879. const auto changed = std::adjacent_find(
  880. end - checkItemsCount,
  881. end,
  882. [](uint32 first, uint32 second) { return (second <= first); });
  883. if (changed != end) {
  884. _offsetsForDownloader.erase(
  885. begin(_offsetsForDownloader),
  886. changed + 1);
  887. _downloaderReadCache.clear();
  888. _downloaderOffsetAcks.take();
  889. }
  890. }
  891. void Reader::checkForDownloaderReadyOffsets() {
  892. // If a requested part is available right now we simply fire it on the
  893. // main thread, until the first not-available-right-now offset is found.
  894. const auto unavailableInBytes = [&](uint32 offset, QByteArray &&bytes) {
  895. if (bytes.isEmpty()) {
  896. return true;
  897. }
  898. crl::on_main(this, [=, bytes = std::move(bytes)]() mutable {
  899. _partsForDownloader.fire({ int64(offset), std::move(bytes) });
  900. });
  901. return false;
  902. };
  903. const auto unavailableInCache = [&](uint32 offset) {
  904. const auto index = (offset / kInSlice);
  905. const auto sliceNumber = index + 1;
  906. const auto i = _downloaderReadCache.find(sliceNumber);
  907. if (i == end(_downloaderReadCache) || !i->second) {
  908. return true;
  909. }
  910. const auto j = i->second->find(offset - index * kInSlice);
  911. if (j == end(*i->second)) {
  912. return true;
  913. }
  914. return unavailableInBytes(offset, std::move(j->second));
  915. };
  916. const auto unavailable = [&](uint32 offset) {
  917. return unavailableInBytes(offset, _slices.partForDownloader(offset))
  918. && unavailableInCache(offset);
  919. };
  920. _offsetsForDownloader.erase(
  921. begin(_offsetsForDownloader),
  922. ranges::find_if(_offsetsForDownloader, unavailable));
  923. }
  924. void Reader::processDownloaderRequests() {
  925. processCacheResults();
  926. enqueueDownloaderOffsets();
  927. checkForDownloaderReadyOffsets();
  928. pruneDoneDownloaderRequests();
  929. if (!empty(_offsetsForDownloader)) {
  930. pruneDownloaderCache(_offsetsForDownloader.front());
  931. sendDownloaderRequests();
  932. }
  933. }
  934. void Reader::pruneDownloaderCache(uint32 minimalOffset) {
  935. const auto minimalSliceNumber = (minimalOffset / kInSlice) + 1;
  936. const auto removeTill = ranges::lower_bound(
  937. _downloaderReadCache,
  938. minimalSliceNumber,
  939. ranges::less(),
  940. &base::flat_map<uint32, std::optional<PartsMap>>::value_type::first);
  941. _downloaderReadCache.erase(_downloaderReadCache.begin(), removeTill);
  942. }
  943. void Reader::pruneDoneDownloaderRequests() {
  944. for (const auto done : _downloaderOffsetAcks.take()) {
  945. _downloaderOffsetsRequested.remove(done);
  946. const auto i = ranges::find(_offsetsForDownloader, done);
  947. if (i != end(_offsetsForDownloader)) {
  948. _offsetsForDownloader.erase(i);
  949. }
  950. }
  951. }
  952. void Reader::sendDownloaderRequests() {
  953. auto &&offsets = ranges::views::all(
  954. _offsetsForDownloader
  955. ) | ranges::views::take(kDownloaderRequestsLimit);
  956. for (const auto offset : offsets) {
  957. if ((!_cacheHelper || !downloaderWaitForCachedSlice(offset))
  958. && _downloaderOffsetsRequested.emplace(offset).second) {
  959. _loader->load(offset);
  960. }
  961. }
  962. }
  963. bool Reader::downloaderWaitForCachedSlice(uint32 offset) {
  964. if (_slices.waitingForHeaderCache()) {
  965. return true;
  966. }
  967. if (!_slices.readCacheForDownloaderRequired(offset)) {
  968. return false;
  969. }
  970. const auto sliceNumber = (offset / kInSlice) + 1;
  971. auto i = _downloaderReadCache.find(sliceNumber);
  972. if (i == _downloaderReadCache.end()) {
  973. // If we didn't request that slice yet, try requesting it.
  974. // If there is no need to (header mode is unknown) - place empty map.
  975. // Otherwise place std::nullopt and wait for the cache result.
  976. i = _downloaderReadCache.emplace(
  977. sliceNumber,
  978. (readFromCacheForDownloader(sliceNumber)
  979. ? std::nullopt
  980. : std::make_optional(PartsMap()))).first;
  981. }
  982. return !i->second;
  983. }
  984. void Reader::checkCacheResultsForDownloader() {
  985. continueDownloaderFromMainThread();
  986. }
  987. void Reader::continueDownloaderFromMainThread() {
  988. if (_streamingActive) {
  989. wakeFromSleep();
  990. } else {
  991. processDownloaderRequests();
  992. }
  993. }
  994. rpl::producer<SpeedEstimate> Reader::speedEstimate() const {
  995. return _loader->speedEstimate();
  996. }
  997. void Reader::setLoaderPriority(int priority) {
  998. if (_realPriority == priority) {
  999. return;
  1000. }
  1001. _realPriority = priority;
  1002. refreshLoaderPriority();
  1003. }
  1004. void Reader::refreshLoaderPriority() {
  1005. _loader->setPriority(_streamingActive ? _realPriority : 0);
  1006. }
  1007. bool Reader::isRemoteLoader() const {
  1008. return _loader->baseCacheKey().valid();
  1009. }
  1010. std::shared_ptr<Reader::CacheHelper> Reader::InitCacheHelper(
  1011. Storage::Cache::Key baseKey) {
  1012. if (!baseKey) {
  1013. return nullptr;
  1014. }
  1015. return std::make_shared<Reader::CacheHelper>(baseKey);
  1016. }
  1017. // 0 is for headerData, slice index = sliceNumber - 1.
  1018. void Reader::readFromCache(int sliceNumber) {
  1019. Expects(_cache != nullptr);
  1020. Expects(_cacheHelper != nullptr);
  1021. Expects(!sliceNumber || !_slices.headerModeUnknown());
  1022. if (sliceNumber == 1 && _slices.isGoodHeader()) {
  1023. return readFromCache(0);
  1024. }
  1025. const auto size = _loader->size();
  1026. const auto key = _cacheHelper->key(sliceNumber);
  1027. const auto cache = std::weak_ptr<CacheHelper>(_cacheHelper);
  1028. const auto weak = base::make_weak(this);
  1029. const auto ready = [=](
  1030. QByteArray &&result,
  1031. std::vector<int> &&sizes = {}) {
  1032. crl::async([
  1033. =,
  1034. result = std::move(result),
  1035. sizes = std::move(sizes)
  1036. ]() mutable{
  1037. auto entry = ParseCacheEntry(
  1038. bytes::make_span(result),
  1039. sliceNumber,
  1040. size);
  1041. if (const auto strong = cache.lock()) {
  1042. QMutexLocker lock(&strong->mutex);
  1043. strong->results.emplace(sliceNumber, std::move(entry.parts));
  1044. if (!sliceNumber && entry.included) {
  1045. strong->results.emplace(1, std::move(*entry.included));
  1046. }
  1047. strong->sizes = std::move(sizes);
  1048. if (const auto waiting = strong->waiting.load()) {
  1049. strong->waiting.store(nullptr, std::memory_order_release);
  1050. waiting->release();
  1051. } else {
  1052. crl::on_main(weak, [=] {
  1053. checkCacheResultsForDownloader();
  1054. });
  1055. }
  1056. }
  1057. });
  1058. };
  1059. auto keys = std::vector<Storage::Cache::Key>();
  1060. const auto count = _slices.requestSliceSizesCount();
  1061. for (auto i = 0; i != count; ++i) {
  1062. keys.push_back(_cacheHelper->key(i + 1));
  1063. }
  1064. _cache->getWithSizes(key, std::move(keys), ready);
  1065. }
  1066. bool Reader::readFromCacheForDownloader(int sliceNumber) {
  1067. Expects(_cacheHelper != nullptr);
  1068. Expects(sliceNumber > 0);
  1069. if (_slices.headerModeUnknown()) {
  1070. return false;
  1071. }
  1072. readFromCache(sliceNumber);
  1073. return true;
  1074. }
  1075. void Reader::putToCache(SerializedSlice &&slice) {
  1076. Expects(_cache != nullptr);
  1077. Expects(_cacheHelper != nullptr);
  1078. Expects(slice.number >= 0);
  1079. _cache->put(_cacheHelper->key(slice.number), std::move(slice.data));
  1080. }
  1081. int64 Reader::size() const {
  1082. return _loader->size();
  1083. }
  1084. std::optional<Error> Reader::streamingError() const {
  1085. return _streamingError;
  1086. }
  1087. void Reader::headerDone() {
  1088. _slices.headerDone(false);
  1089. }
  1090. int Reader::headerSize() const {
  1091. return _slices.headerSize();
  1092. }
  1093. bool Reader::fullInCache() const {
  1094. return _slices.fullInCache();
  1095. }
  1096. Reader::FillState Reader::fill(
  1097. int64 offset,
  1098. bytes::span buffer,
  1099. not_null<crl::semaphore*> notify) {
  1100. Expects(offset + buffer.size() <= size());
  1101. Expects(offset >= 0 && size() <= std::numeric_limits<uint32>::max());
  1102. const auto startWaiting = [&] {
  1103. if (_cacheHelper) {
  1104. _cacheHelper->waiting = notify.get();
  1105. }
  1106. _waiting.store(notify.get(), std::memory_order_release);
  1107. };
  1108. const auto clearWaiting = [&] {
  1109. _waiting.store(nullptr, std::memory_order_release);
  1110. if (_cacheHelper) {
  1111. _cacheHelper->waiting.store(nullptr, std::memory_order_release);
  1112. }
  1113. };
  1114. const auto done = [&] {
  1115. clearWaiting();
  1116. return FillState::Success;
  1117. };
  1118. const auto failed = [&] {
  1119. clearWaiting();
  1120. notify->release();
  1121. return FillState::Failed;
  1122. };
  1123. checkForSomethingMoreReceived();
  1124. if (_streamingError) {
  1125. return FillState::Failed;
  1126. }
  1127. auto lastResult = FillState();
  1128. do {
  1129. lastResult = fillFromSlices(uint32(offset), buffer);
  1130. if (lastResult == FillState::Success) {
  1131. return done();
  1132. }
  1133. startWaiting();
  1134. } while (checkForSomethingMoreReceived());
  1135. return _streamingError ? failed() : lastResult;
  1136. }
  1137. Reader::FillState Reader::fillFromSlices(uint32 offset, bytes::span buffer) {
  1138. using namespace rpl::mappers;
  1139. auto result = _slices.fill(offset, buffer);
  1140. if (result.state != FillState::Success && _slices.headerWontBeFilled()) {
  1141. _streamingError = Error::NotStreamable;
  1142. return FillState::Failed;
  1143. }
  1144. for (const auto sliceNumber : result.sliceNumbersFromCache.values()) {
  1145. readFromCache(sliceNumber);
  1146. }
  1147. if (_cacheHelper && result.toCache.number >= 0) {
  1148. // If we put to cache the header (number == 0) that means we're in
  1149. // HeaderMode::Good and really are putting the first slice to cache.
  1150. Assert(result.toCache.number > 0 || _slices.isGoodHeader());
  1151. const auto index = std::max(result.toCache.number, 1) - 1;
  1152. cancelLoadInRange(index * kInSlice, (index + 1) * kInSlice);
  1153. putToCache(std::move(result.toCache));
  1154. }
  1155. auto checkPriority = true;
  1156. for (const auto offset : result.offsetsFromLoader.values()) {
  1157. if (checkPriority) {
  1158. checkLoadWillBeFirst(offset);
  1159. checkPriority = false;
  1160. }
  1161. loadAtOffset(offset);
  1162. }
  1163. return result.state;
  1164. }
  1165. void Reader::cancelLoadInRange(uint32 from, uint32 till) {
  1166. Expects(from < till);
  1167. for (const auto offset : _loadingOffsets.takeInRange(from, till)) {
  1168. if (!_downloaderOffsetsRequested.contains(offset)) {
  1169. _loader->cancel(offset);
  1170. }
  1171. }
  1172. }
  1173. void Reader::checkLoadWillBeFirst(uint32 offset) {
  1174. if (_loadingOffsets.front().value_or(offset) != offset) {
  1175. _loadingOffsets.resetPriorities();
  1176. _loader->resetPriorities();
  1177. }
  1178. }
  1179. bool Reader::processCacheResults() {
  1180. if (!_cacheHelper) {
  1181. return false;
  1182. }
  1183. QMutexLocker lock(&_cacheHelper->mutex);
  1184. auto loaded = base::take(_cacheHelper->results);
  1185. auto sizes = base::take(_cacheHelper->sizes);
  1186. lock.unlock();
  1187. for (auto &[sliceNumber, cachedParts] : _downloaderReadCache) {
  1188. if (!cachedParts) {
  1189. const auto i = loaded.find(sliceNumber);
  1190. if (i != end(loaded)) {
  1191. cachedParts = i->second;
  1192. }
  1193. }
  1194. }
  1195. if (_streamingError) {
  1196. return false;
  1197. }
  1198. for (auto &[sliceNumber, result] : loaded) {
  1199. _slices.processCacheResult(sliceNumber, std::move(result));
  1200. }
  1201. if (!sizes.empty()) {
  1202. _slices.processCachedSizes(sizes);
  1203. }
  1204. if (!loaded.empty()
  1205. && (loaded.front().first == 0)
  1206. && _slices.isGoodHeader()) {
  1207. Assert(loaded.size() > 1);
  1208. Assert((loaded.begin() + 1)->first == 1);
  1209. }
  1210. return !loaded.empty();
  1211. }
  1212. bool Reader::processLoadedParts() {
  1213. if (_streamingError) {
  1214. return false;
  1215. }
  1216. auto loaded = _loadedParts.take();
  1217. for (auto &part : loaded) {
  1218. if (!part.valid(size())) {
  1219. _streamingError = Error::LoadFailed;
  1220. return false;
  1221. } else if (!_loadingOffsets.remove(part.offset)) {
  1222. continue;
  1223. }
  1224. _slices.processPart(
  1225. part.offset,
  1226. std::move(part.bytes));
  1227. }
  1228. return !loaded.empty();
  1229. }
  1230. bool Reader::checkForSomethingMoreReceived() {
  1231. const auto result1 = processCacheResults();
  1232. const auto result2 = processLoadedParts();
  1233. return result1 || result2;
  1234. }
  1235. void Reader::loadAtOffset(uint32 offset) {
  1236. if (_loadingOffsets.add(offset)) {
  1237. _loader->load(offset);
  1238. }
  1239. }
  1240. void Reader::finalizeCache() {
  1241. if (!_cacheHelper) {
  1242. return;
  1243. }
  1244. Assert(_cache != nullptr);
  1245. auto toCache = _slices.unloadToCache();
  1246. while (toCache.number >= 0) {
  1247. putToCache(std::move(toCache));
  1248. toCache = _slices.unloadToCache();
  1249. }
  1250. _cache->sync();
  1251. }
  1252. Reader::~Reader() {
  1253. finalizeCache();
  1254. }
  1255. QByteArray SerializeComplexPartsMap(
  1256. const base::flat_map<uint32, QByteArray> &parts) {
  1257. auto result = QByteArray();
  1258. const auto count = parts.size();
  1259. const auto intSize = sizeof(int32);
  1260. result.reserve(count * kPartSize + 2 * intSize * (count + 1));
  1261. const auto appendInt = [&](int value) {
  1262. auto serialized = int32(value);
  1263. result.append(
  1264. reinterpret_cast<const char*>(&serialized),
  1265. intSize);
  1266. };
  1267. appendInt(count);
  1268. for (const auto &[offset, part] : parts) {
  1269. appendInt(offset);
  1270. appendInt(part.size());
  1271. result.append(part);
  1272. }
  1273. return result;
  1274. }
  1275. } // namespace Streaming
  1276. } // namespace Media