data_pts_waiter.cpp 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. /*
  2. This file is part of Telegram Desktop,
  3. the official desktop application for the Telegram messaging service.
  4. For license and copyright information please follow this link:
  5. https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
  6. */
  7. #include "data/data_pts_waiter.h"
  8. #include "api/api_updates.h"
  9. PtsWaiter::PtsWaiter(not_null<Api::Updates*> owner) : _owner(owner) {
  10. }
  11. uint64 PtsWaiter::ptsKey(PtsSkippedQueue queue, int32 pts) {
  12. return _queue.emplace(
  13. uint64(uint32(pts)) << 32 | (++_skippedKey),
  14. queue
  15. ).first->first;
  16. }
  17. void PtsWaiter::setWaitingForSkipped(ChannelData *channel, crl::time ms) {
  18. if (ms >= 0) {
  19. _owner->ptsWaiterStartTimerFor(channel, ms);
  20. _waitingForSkipped = true;
  21. } else {
  22. _waitingForSkipped = false;
  23. checkForWaiting(channel);
  24. }
  25. }
  26. void PtsWaiter::setWaitingForShortPoll(ChannelData *channel, crl::time ms) {
  27. if (ms >= 0) {
  28. _owner->ptsWaiterStartTimerFor(channel, ms);
  29. _waitingForShortPoll = true;
  30. } else {
  31. _waitingForShortPoll = false;
  32. checkForWaiting(channel);
  33. }
  34. }
  35. void PtsWaiter::checkForWaiting(ChannelData *channel) {
  36. if (!_waitingForSkipped && !_waitingForShortPoll) {
  37. _owner->ptsWaiterStartTimerFor(channel, -1);
  38. }
  39. }
  40. void PtsWaiter::applySkippedUpdates(ChannelData *channel) {
  41. if (!_waitingForSkipped) {
  42. return;
  43. }
  44. setWaitingForSkipped(channel, -1);
  45. if (_queue.empty()) {
  46. return;
  47. }
  48. ++_applySkippedLevel;
  49. for (auto i = _queue.cbegin(), e = _queue.cend(); i != e; ++i) {
  50. switch (i->second) {
  51. case SkippedUpdate: {
  52. _owner->applyUpdateNoPtsCheck(_updateQueue[i->first]);
  53. } break;
  54. case SkippedUpdates: {
  55. _owner->applyUpdatesNoPtsCheck(_updatesQueue[i->first]);
  56. } break;
  57. }
  58. }
  59. --_applySkippedLevel;
  60. clearSkippedUpdates();
  61. }
  62. void PtsWaiter::clearSkippedUpdates() {
  63. _queue.clear();
  64. _updateQueue.clear();
  65. _updatesQueue.clear();
  66. _applySkippedLevel = 0;
  67. }
  68. bool PtsWaiter::updated(
  69. ChannelData *channel,
  70. int32 pts,
  71. int32 count,
  72. const MTPUpdates &updates) {
  73. if (_requesting || _applySkippedLevel) {
  74. return true;
  75. } else if (pts <= _good && count > 0) {
  76. return false;
  77. } else if (check(channel, pts, count)) {
  78. return true;
  79. }
  80. _updatesQueue.emplace(ptsKey(SkippedUpdates, pts), updates);
  81. return false;
  82. }
  83. bool PtsWaiter::updated(
  84. ChannelData *channel,
  85. int32 pts,
  86. int32 count,
  87. const MTPUpdate &update) {
  88. if (_requesting || _applySkippedLevel) {
  89. return true;
  90. } else if (pts <= _good && count > 0) {
  91. return false;
  92. } else if (check(channel, pts, count)) {
  93. return true;
  94. }
  95. _updateQueue.emplace(ptsKey(SkippedUpdate, pts), update);
  96. return false;
  97. }
  98. bool PtsWaiter::updated(ChannelData *channel, int32 pts, int32 count) {
  99. if (_requesting || _applySkippedLevel) {
  100. return true;
  101. } else if (pts <= _good && count > 0) {
  102. return false;
  103. }
  104. return check(channel, pts, count);
  105. }
  106. bool PtsWaiter::updateAndApply(
  107. ChannelData *channel,
  108. int32 pts,
  109. int32 count,
  110. const MTPUpdates &updates) {
  111. if (!updated(channel, pts, count, updates)) {
  112. return false;
  113. }
  114. if (!_waitingForSkipped || _queue.empty()) {
  115. // Optimization - no need to put in queue and back.
  116. _owner->applyUpdatesNoPtsCheck(updates);
  117. } else {
  118. _updatesQueue.emplace(ptsKey(SkippedUpdates, pts), updates);
  119. applySkippedUpdates(channel);
  120. }
  121. return true;
  122. }
  123. bool PtsWaiter::updateAndApply(
  124. ChannelData *channel,
  125. int32 pts,
  126. int32 count,
  127. const MTPUpdate &update) {
  128. if (!updated(channel, pts, count, update)) {
  129. return false;
  130. }
  131. if (!_waitingForSkipped || _queue.empty()) {
  132. // Optimization - no need to put in queue and back.
  133. _owner->applyUpdateNoPtsCheck(update);
  134. } else {
  135. _updateQueue.emplace(ptsKey(SkippedUpdate, pts), update);
  136. applySkippedUpdates(channel);
  137. }
  138. return true;
  139. }
  140. bool PtsWaiter::updateAndApply(
  141. ChannelData *channel,
  142. int32 pts,
  143. int32 count) {
  144. if (!updated(channel, pts, count)) {
  145. return false;
  146. }
  147. applySkippedUpdates(channel);
  148. return true;
  149. }
  150. // Return false if need to save that update and apply later.
  151. bool PtsWaiter::check(ChannelData *channel, int32 pts, int32 count) {
  152. if (!inited()) {
  153. init(pts);
  154. return true;
  155. }
  156. _last = qMax(_last, pts);
  157. _count += count;
  158. if (_last == _count) {
  159. _good = _last;
  160. return true;
  161. } else if (_last < _count) {
  162. setWaitingForSkipped(channel, 1);
  163. } else {
  164. setWaitingForSkipped(channel, kWaitForSkippedTimeout);
  165. }
  166. return !count;
  167. }