data_history_messages.cpp 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. /*
  2. This file is part of Telegram Desktop,
  3. the official desktop application for the Telegram messaging service.
  4. For license and copyright information please follow this link:
  5. https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
  6. */
  7. #include "data/data_history_messages.h"
  8. #include "apiwrap.h"
  9. #include "data/data_chat.h"
  10. #include "data/data_peer.h"
  11. #include "data/data_session.h"
  12. #include "data/data_sparse_ids.h"
  13. #include "history/history.h"
  14. #include "main/main_session.h"
  15. namespace Data {
  16. void HistoryMessages::addNew(MsgId messageId) {
  17. _chat.addNew(messageId);
  18. }
  19. void HistoryMessages::addExisting(MsgId messageId, MsgRange noSkipRange) {
  20. _chat.addExisting(messageId, noSkipRange);
  21. }
  22. void HistoryMessages::addSlice(
  23. std::vector<MsgId> &&messageIds,
  24. MsgRange noSkipRange,
  25. std::optional<int> count) {
  26. _chat.addSlice(std::move(messageIds), noSkipRange, count);
  27. }
  28. void HistoryMessages::removeOne(MsgId messageId) {
  29. _chat.removeOne(messageId);
  30. _oneRemoved.fire_copy(messageId);
  31. }
  32. void HistoryMessages::removeAll() {
  33. _chat.removeAll();
  34. _allRemoved.fire({});
  35. }
  36. void HistoryMessages::invalidateBottom() {
  37. _chat.invalidateBottom();
  38. _bottomInvalidated.fire({});
  39. }
  40. Storage::SparseIdsListResult HistoryMessages::snapshot(
  41. const Storage::SparseIdsListQuery &query) const {
  42. return _chat.snapshot(query);
  43. }
  44. auto HistoryMessages::sliceUpdated() const
  45. -> rpl::producer<Storage::SparseIdsSliceUpdate> {
  46. return _chat.sliceUpdated();
  47. }
  48. rpl::producer<MsgId> HistoryMessages::oneRemoved() const {
  49. return _oneRemoved.events();
  50. }
  51. rpl::producer<> HistoryMessages::allRemoved() const {
  52. return _allRemoved.events();
  53. }
  54. rpl::producer<> HistoryMessages::bottomInvalidated() const {
  55. return _bottomInvalidated.events();
  56. }
  57. rpl::producer<SparseIdsSlice> HistoryViewer(
  58. not_null<History*> history,
  59. MsgId aroundId,
  60. int limitBefore,
  61. int limitAfter) {
  62. Expects(IsServerMsgId(aroundId) || (aroundId == 0));
  63. Expects((aroundId != 0) || (limitBefore == 0 && limitAfter == 0));
  64. return [=](auto consumer) {
  65. auto lifetime = rpl::lifetime();
  66. const auto messages = &history->messages();
  67. auto builder = lifetime.make_state<SparseIdsSliceBuilder>(
  68. aroundId,
  69. limitBefore,
  70. limitAfter);
  71. using RequestAroundInfo = SparseIdsSliceBuilder::AroundData;
  72. builder->insufficientAround(
  73. ) | rpl::start_with_next([=](const RequestAroundInfo &info) {
  74. if (!info.aroundId) {
  75. // Ignore messages-count-only requests, because we perform
  76. // them with non-zero limit of messages and end up adding
  77. // a broken slice with several last messages from the chat
  78. // with a non-skip range starting at zero.
  79. return;
  80. }
  81. history->session().api().requestHistory(
  82. history,
  83. info.aroundId,
  84. info.direction);
  85. }, lifetime);
  86. auto pushNextSnapshot = [=] {
  87. consumer.put_next(builder->snapshot());
  88. };
  89. using SliceUpdate = Storage::SparseIdsSliceUpdate;
  90. messages->sliceUpdated(
  91. ) | rpl::filter([=](const SliceUpdate &update) {
  92. return builder->applyUpdate(update);
  93. }) | rpl::start_with_next(pushNextSnapshot, lifetime);
  94. messages->oneRemoved(
  95. ) | rpl::filter([=](MsgId messageId) {
  96. return builder->removeOne(messageId);
  97. }) | rpl::start_with_next(pushNextSnapshot, lifetime);
  98. messages->allRemoved(
  99. ) | rpl::filter([=] {
  100. return builder->removeAll();
  101. }) | rpl::start_with_next(pushNextSnapshot, lifetime);
  102. messages->bottomInvalidated(
  103. ) | rpl::filter([=] {
  104. return builder->invalidateBottom();
  105. }) | rpl::start_with_next(pushNextSnapshot, lifetime);
  106. const auto snapshot = messages->snapshot({
  107. aroundId,
  108. limitBefore,
  109. limitAfter,
  110. });
  111. if (snapshot.count || !snapshot.messageIds.empty()) {
  112. if (builder->applyInitial(snapshot)) {
  113. pushNextSnapshot();
  114. }
  115. }
  116. builder->checkInsufficient();
  117. return lifetime;
  118. };
  119. }
  120. rpl::producer<SparseIdsMergedSlice> HistoryMergedViewer(
  121. not_null<History*> history,
  122. /*Universal*/MsgId universalAroundId,
  123. int limitBefore,
  124. int limitAfter) {
  125. const auto migrateFrom = history->peer->migrateFrom();
  126. auto createSimpleViewer = [=](
  127. PeerId peerId,
  128. MsgId topicRootId,
  129. SparseIdsSlice::Key simpleKey,
  130. int limitBefore,
  131. int limitAfter) {
  132. const auto chosen = (history->peer->id == peerId)
  133. ? history
  134. : history->owner().history(peerId);
  135. return HistoryViewer(chosen, simpleKey, limitBefore, limitAfter);
  136. };
  137. const auto peerId = history->peer->id;
  138. const auto topicRootId = MsgId();
  139. const auto migratedPeerId = migrateFrom ? migrateFrom->id : PeerId(0);
  140. using Key = SparseIdsMergedSlice::Key;
  141. return SparseIdsMergedSlice::CreateViewer(
  142. Key(peerId, topicRootId, migratedPeerId, universalAroundId),
  143. limitBefore,
  144. limitAfter,
  145. std::move(createSimpleViewer));
  146. }
  147. rpl::producer<MessagesSlice> HistoryMessagesViewer(
  148. not_null<History*> history,
  149. MessagePosition aroundId,
  150. int limitBefore,
  151. int limitAfter) {
  152. const auto computeUnreadAroundId = [&] {
  153. if (const auto migrated = history->migrateFrom()) {
  154. if (const auto around = migrated->loadAroundId()) {
  155. return MsgId(around - ServerMaxMsgId);
  156. }
  157. }
  158. if (const auto around = history->loadAroundId()) {
  159. return around;
  160. }
  161. return MsgId(ServerMaxMsgId - 1);
  162. };
  163. const auto messageId = (aroundId.fullId.msg == ShowAtUnreadMsgId)
  164. ? computeUnreadAroundId()
  165. : ((aroundId.fullId.msg == ShowAtTheEndMsgId)
  166. || (aroundId == MaxMessagePosition))
  167. ? (ServerMaxMsgId - 1)
  168. : (aroundId.fullId.peer == history->peer->id)
  169. ? aroundId.fullId.msg
  170. : (aroundId.fullId.msg - ServerMaxMsgId);
  171. return HistoryMergedViewer(
  172. history,
  173. messageId,
  174. limitBefore,
  175. limitAfter
  176. ) | rpl::map([=](SparseIdsMergedSlice &&slice) {
  177. auto result = Data::MessagesSlice();
  178. result.fullCount = slice.fullCount();
  179. result.skippedAfter = slice.skippedAfter();
  180. result.skippedBefore = slice.skippedBefore();
  181. const auto count = slice.size();
  182. result.ids.reserve(count);
  183. if (const auto msgId = slice.nearest(messageId)) {
  184. result.nearestToAround = *msgId;
  185. }
  186. for (auto i = 0; i != count; ++i) {
  187. result.ids.push_back(slice[i]);
  188. }
  189. return result;
  190. });
  191. }
  192. } // namespace Data