data_sparse_ids.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423
  1. /*
  2. This file is part of Telegram Desktop,
  3. the official desktop application for the Telegram messaging service.
  4. For license and copyright information please follow this link:
  5. https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
  6. */
  7. #include "data/data_sparse_ids.h"
  8. #include <rpl/combine.h>
  9. #include "storage/storage_sparse_ids_list.h"
  10. SparseIdsMergedSlice::SparseIdsMergedSlice(Key key)
  11. : SparseIdsMergedSlice(
  12. key,
  13. SparseIdsSlice(),
  14. MigratedSlice(key)) {
  15. }
  16. SparseIdsMergedSlice::SparseIdsMergedSlice(
  17. Key key,
  18. SparseIdsSlice part,
  19. std::optional<SparseIdsSlice> migrated)
  20. : _key(key)
  21. , _part(std::move(part))
  22. , _migrated(std::move(migrated)) {
  23. }
  24. SparseIdsMergedSlice::SparseIdsMergedSlice(
  25. Key key,
  26. SparseUnsortedIdsSlice unsorted)
  27. : _key(key)
  28. , _unsorted(std::move(unsorted)) {
  29. }
  30. std::optional<int> SparseIdsMergedSlice::fullCount() const {
  31. return _unsorted
  32. ? _unsorted->fullCount()
  33. : Add(
  34. _part.fullCount(),
  35. _migrated ? _migrated->fullCount() : 0);
  36. }
  37. std::optional<int> SparseIdsMergedSlice::skippedBefore() const {
  38. return _unsorted
  39. ? _unsorted->skippedBefore()
  40. : Add(
  41. isolatedInMigrated() ? 0 : _part.skippedBefore(),
  42. _migrated
  43. ? (isolatedInPart()
  44. ? _migrated->fullCount()
  45. : _migrated->skippedBefore())
  46. : 0
  47. );
  48. }
  49. std::optional<int> SparseIdsMergedSlice::skippedAfter() const {
  50. return _unsorted
  51. ? _unsorted->skippedAfter()
  52. : Add(
  53. isolatedInMigrated() ? _part.fullCount() : _part.skippedAfter(),
  54. isolatedInPart() ? 0 : _migrated->skippedAfter()
  55. );
  56. }
  57. std::optional<int> SparseIdsMergedSlice::indexOf(
  58. FullMsgId fullId) const {
  59. return _unsorted
  60. ? _unsorted->indexOf(fullId.msg)
  61. : isFromPart(fullId)
  62. ? (_part.indexOf(fullId.msg) | func::add(migratedSize()))
  63. : isolatedInPart()
  64. ? std::nullopt
  65. : isFromMigrated(fullId)
  66. ? _migrated->indexOf(fullId.msg)
  67. : std::nullopt;
  68. }
  69. int SparseIdsMergedSlice::size() const {
  70. return _unsorted
  71. ? _unsorted->size()
  72. : (isolatedInPart() ? 0 : migratedSize())
  73. + (isolatedInMigrated() ? 0 : _part.size());
  74. }
  75. FullMsgId SparseIdsMergedSlice::operator[](int index) const {
  76. Expects(index >= 0 && index < size());
  77. if (_unsorted) {
  78. return ComputeId(_key.peerId, (*_unsorted)[index]);
  79. }
  80. if (const auto size = migratedSize()) {
  81. if (index < size) {
  82. return ComputeId(_key.migratedPeerId, (*_migrated)[index]);
  83. }
  84. index -= size;
  85. }
  86. return ComputeId(_key.peerId, _part[index]);
  87. }
  88. std::optional<int> SparseIdsMergedSlice::distance(
  89. const Key &a,
  90. const Key &b) const {
  91. if (const auto i = indexOf(ComputeId(a))) {
  92. if (const auto j = indexOf(ComputeId(b))) {
  93. return *j - *i;
  94. }
  95. }
  96. return std::nullopt;
  97. }
  98. auto SparseIdsMergedSlice::nearest(
  99. UniversalMsgId id) const -> std::optional<FullMsgId> {
  100. if (_unsorted) {
  101. if (_unsorted->indexOf(id).has_value()) {
  102. return ComputeId(_key.peerId, id);
  103. } else if (const auto count = _unsorted->size()) {
  104. return ComputeId(_key.peerId, (*_unsorted)[count / 2]);
  105. }
  106. return std::nullopt;
  107. }
  108. const auto convertFromPartNearest = [&](MsgId result) {
  109. return ComputeId(_key.peerId, result);
  110. };
  111. const auto convertFromMigratedNearest = [&](MsgId result) {
  112. return ComputeId(_key.migratedPeerId, result);
  113. };
  114. if (IsServerMsgId(id)) {
  115. if (auto partNearestId = _part.nearest(id)) {
  116. return partNearestId
  117. | convertFromPartNearest;
  118. } else if (isolatedInPart()) {
  119. return std::nullopt;
  120. }
  121. return _migrated->nearest(ServerMaxMsgId - 1)
  122. | convertFromMigratedNearest;
  123. }
  124. if (auto migratedNearestId = _migrated
  125. ? _migrated->nearest(id + ServerMaxMsgId)
  126. : std::nullopt) {
  127. return migratedNearestId
  128. | convertFromMigratedNearest;
  129. } else if (isolatedInMigrated()) {
  130. return std::nullopt;
  131. }
  132. return _part.nearest(0)
  133. | convertFromPartNearest;
  134. }
  135. SparseIdsSliceBuilder::SparseIdsSliceBuilder(
  136. Key key,
  137. int limitBefore,
  138. int limitAfter)
  139. : _key(key)
  140. , _limitBefore(limitBefore)
  141. , _limitAfter(limitAfter) {
  142. }
  143. bool SparseIdsSliceBuilder::applyInitial(
  144. const Storage::SparseIdsListResult &result) {
  145. mergeSliceData(
  146. result.count,
  147. result.messageIds,
  148. result.skippedBefore,
  149. result.skippedAfter);
  150. return true;
  151. }
  152. bool SparseIdsSliceBuilder::applyUpdate(
  153. const Storage::SparseIdsSliceUpdate &update) {
  154. auto intersects = [](MsgRange range1, MsgRange range2) {
  155. return (range1.from <= range2.till)
  156. && (range2.from <= range1.till);
  157. };
  158. auto needMergeMessages = (update.messages != nullptr)
  159. && intersects(update.range, {
  160. _ids.empty() ? _key : _ids.front(),
  161. _ids.empty() ? _key : _ids.back()
  162. });
  163. if (!needMergeMessages && !update.count) {
  164. return false;
  165. }
  166. auto skippedBefore = (update.range.from == 0)
  167. ? 0
  168. : std::optional<int> {};
  169. auto skippedAfter = (update.range.till == ServerMaxMsgId)
  170. ? 0
  171. : std::optional<int> {};
  172. mergeSliceData(
  173. update.count,
  174. needMergeMessages
  175. ? *update.messages
  176. : base::flat_set<MsgId> {},
  177. skippedBefore,
  178. skippedAfter);
  179. return true;
  180. }
  181. bool SparseIdsSliceBuilder::removeOne(MsgId messageId) {
  182. auto changed = false;
  183. if (_fullCount && *_fullCount > 0) {
  184. --*_fullCount;
  185. changed = true;
  186. }
  187. if (_ids.contains(messageId)) {
  188. _ids.remove(messageId);
  189. changed = true;
  190. } else if (!_ids.empty()) {
  191. if (_ids.front() > messageId
  192. && _skippedBefore
  193. && *_skippedBefore > 0) {
  194. --*_skippedBefore;
  195. changed = true;
  196. } else if (_ids.back() < messageId
  197. && _skippedAfter
  198. && *_skippedAfter > 0) {
  199. --*_skippedAfter;
  200. changed = true;
  201. }
  202. }
  203. if (changed) {
  204. checkInsufficient();
  205. }
  206. return changed;
  207. }
  208. bool SparseIdsSliceBuilder::removeAll() {
  209. _ids = {};
  210. _fullCount = 0;
  211. _skippedBefore = 0;
  212. _skippedAfter = 0;
  213. return true;
  214. }
  215. bool SparseIdsSliceBuilder::invalidateBottom() {
  216. _fullCount = _skippedAfter = std::nullopt;
  217. checkInsufficient();
  218. return true;
  219. }
  220. void SparseIdsSliceBuilder::checkInsufficient() {
  221. sliceToLimits();
  222. }
  223. void SparseIdsSliceBuilder::mergeSliceData(
  224. std::optional<int> count,
  225. const base::flat_set<MsgId> &messageIds,
  226. std::optional<int> skippedBefore,
  227. std::optional<int> skippedAfter) {
  228. if (messageIds.empty()) {
  229. if (count && _fullCount != count) {
  230. _fullCount = count;
  231. if (*_fullCount <= _ids.size()) {
  232. _fullCount = _ids.size();
  233. _skippedBefore = _skippedAfter = 0;
  234. }
  235. }
  236. fillSkippedAndSliceToLimits();
  237. return;
  238. }
  239. if (count) {
  240. _fullCount = count;
  241. }
  242. auto wasMinId = _ids.empty() ? -1 : _ids.front();
  243. auto wasMaxId = _ids.empty() ? -1 : _ids.back();
  244. _ids.merge(messageIds.begin(), messageIds.end());
  245. auto adjustSkippedBefore = [&](MsgId oldId, int oldSkippedBefore) {
  246. auto it = _ids.find(oldId);
  247. Assert(it != _ids.end());
  248. _skippedBefore = oldSkippedBefore - (it - _ids.begin());
  249. accumulate_max(*_skippedBefore, 0);
  250. };
  251. if (skippedBefore) {
  252. adjustSkippedBefore(messageIds.front(), *skippedBefore);
  253. } else if (wasMinId >= 0 && _skippedBefore) {
  254. adjustSkippedBefore(wasMinId, *_skippedBefore);
  255. } else {
  256. _skippedBefore = std::nullopt;
  257. }
  258. auto adjustSkippedAfter = [&](MsgId oldId, int oldSkippedAfter) {
  259. auto it = _ids.find(oldId);
  260. Assert(it != _ids.end());
  261. _skippedAfter = oldSkippedAfter - (_ids.end() - it - 1);
  262. accumulate_max(*_skippedAfter, 0);
  263. };
  264. if (skippedAfter) {
  265. adjustSkippedAfter(messageIds.back(), *skippedAfter);
  266. } else if (wasMaxId >= 0 && _skippedAfter) {
  267. adjustSkippedAfter(wasMaxId, *_skippedAfter);
  268. } else {
  269. _skippedAfter = std::nullopt;
  270. }
  271. fillSkippedAndSliceToLimits();
  272. }
  273. void SparseIdsSliceBuilder::fillSkippedAndSliceToLimits() {
  274. if (_fullCount) {
  275. if (_skippedBefore && !_skippedAfter) {
  276. _skippedAfter = *_fullCount
  277. - *_skippedBefore
  278. - int(_ids.size());
  279. } else if (_skippedAfter && !_skippedBefore) {
  280. _skippedBefore = *_fullCount
  281. - *_skippedAfter
  282. - int(_ids.size());
  283. }
  284. }
  285. sliceToLimits();
  286. }
  287. void SparseIdsSliceBuilder::sliceToLimits() {
  288. if (!_key) {
  289. if (!_fullCount) {
  290. requestMessagesCount();
  291. }
  292. return;
  293. }
  294. auto requestedSomething = false;
  295. auto aroundIt = ranges::lower_bound(_ids, _key);
  296. auto removeFromBegin = (aroundIt - _ids.begin() - _limitBefore);
  297. auto removeFromEnd = (_ids.end() - aroundIt - _limitAfter - 1);
  298. if (removeFromBegin > 0) {
  299. _ids.erase(_ids.begin(), _ids.begin() + removeFromBegin);
  300. if (_skippedBefore) {
  301. *_skippedBefore += removeFromBegin;
  302. }
  303. } else if (removeFromBegin < 0
  304. && (!_skippedBefore || *_skippedBefore > 0)) {
  305. requestedSomething = true;
  306. requestMessages(RequestDirection::Before);
  307. }
  308. if (removeFromEnd > 0) {
  309. _ids.erase(_ids.end() - removeFromEnd, _ids.end());
  310. if (_skippedAfter) {
  311. *_skippedAfter += removeFromEnd;
  312. }
  313. } else if (removeFromEnd < 0
  314. && (!_skippedAfter || *_skippedAfter > 0)) {
  315. requestedSomething = true;
  316. requestMessages(RequestDirection::After);
  317. }
  318. if (!_fullCount && !requestedSomething) {
  319. requestMessagesCount();
  320. }
  321. }
  322. void SparseIdsSliceBuilder::requestMessages(
  323. RequestDirection direction) {
  324. auto requestAroundData = [&]() -> AroundData {
  325. if (_ids.empty()) {
  326. return { _key, Data::LoadDirection::Around };
  327. } else if (direction == RequestDirection::Before) {
  328. return { _ids.front(), Data::LoadDirection::Before };
  329. }
  330. return { _ids.back(), Data::LoadDirection::After };
  331. };
  332. _insufficientAround.fire(requestAroundData());
  333. }
  334. void SparseIdsSliceBuilder::requestMessagesCount() {
  335. _insufficientAround.fire({ 0, Data::LoadDirection::Around });
  336. }
  337. SparseIdsSlice SparseIdsSliceBuilder::snapshot() const {
  338. return SparseIdsSlice(
  339. _ids,
  340. _fullCount,
  341. _skippedBefore,
  342. _skippedAfter);
  343. }
  344. rpl::producer<SparseIdsMergedSlice> SparseIdsMergedSlice::CreateViewer(
  345. SparseIdsMergedSlice::Key key,
  346. int limitBefore,
  347. int limitAfter,
  348. Fn<SimpleViewerFunction> simpleViewer) {
  349. Expects(!key.topicRootId || !key.migratedPeerId);
  350. Expects(IsServerMsgId(key.universalId)
  351. || (key.universalId == 0)
  352. || (IsServerMsgId(ServerMaxMsgId + key.universalId) && key.migratedPeerId != 0));
  353. Expects((key.universalId != 0)
  354. || (limitBefore == 0 && limitAfter == 0));
  355. return [=](auto consumer) {
  356. auto partViewer = simpleViewer(
  357. key.peerId,
  358. key.topicRootId,
  359. SparseIdsMergedSlice::PartKey(key),
  360. limitBefore,
  361. limitAfter
  362. );
  363. if (!key.migratedPeerId) {
  364. return std::move(
  365. partViewer
  366. ) | rpl::start_with_next([=](SparseIdsSlice &&part) {
  367. consumer.put_next(SparseIdsMergedSlice(
  368. key,
  369. std::move(part),
  370. std::nullopt));
  371. });
  372. }
  373. auto migratedViewer = simpleViewer(
  374. key.migratedPeerId,
  375. MsgId(0), // topicRootId
  376. SparseIdsMergedSlice::MigratedKey(key),
  377. limitBefore,
  378. limitAfter);
  379. return rpl::combine(
  380. std::move(partViewer),
  381. std::move(migratedViewer)
  382. ) | rpl::start_with_next([=](
  383. SparseIdsSlice &&part,
  384. SparseIdsSlice &&migrated) {
  385. consumer.put_next(SparseIdsMergedSlice(
  386. key,
  387. std::move(part),
  388. std::move(migrated)));
  389. });
  390. };
  391. }