scheduled_messages.cpp 18 KB


  1. /*
  2. This file is part of Telegram Desktop,
  3. the official desktop application for the Telegram messaging service.
  4. For license and copyright information please follow this link:
  5. https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
  6. */
  7. #include "data/components/scheduled_messages.h"
  8. #include "base/unixtime.h"
  9. #include "data/data_forum_topic.h"
  10. #include "data/data_peer.h"
  11. #include "data/data_session.h"
  12. #include "api/api_hash.h"
  13. #include "api/api_text_entities.h"
  14. #include "main/main_session.h"
  15. #include "history/history.h"
  16. #include "history/history_item_components.h"
  17. #include "history/history_item_helpers.h"
  18. #include "apiwrap.h"
  19. namespace Data {
  20. namespace {
  21. constexpr auto kRequestTimeLimit = 60 * crl::time(1000);
  22. [[nodiscard]] MsgId RemoteToLocalMsgId(MsgId id) {
  23. Expects(IsServerMsgId(id));
  24. return ServerMaxMsgId + id + 1;
  25. }
  26. [[nodiscard]] MsgId LocalToRemoteMsgId(MsgId id) {
  27. Expects(IsScheduledMsgId(id));
  28. return (id - ServerMaxMsgId - 1);
  29. }
  30. [[nodiscard]] bool TooEarlyForRequest(crl::time received) {
  31. return (received > 0) && (received + kRequestTimeLimit > crl::now());
  32. }
  33. [[nodiscard]] bool HasScheduledDate(not_null<HistoryItem*> item) {
  34. return (item->date() != Api::kScheduledUntilOnlineTimestamp)
  35. && (item->date() > base::unixtime::now());
  36. }
  37. [[nodiscard]] MTPMessage PrepareMessage(const MTPMessage &message) {
  38. return message.match([&](const MTPDmessageEmpty &data) {
  39. return MTP_messageEmpty(
  40. data.vflags(),
  41. data.vid(),
  42. data.vpeer_id() ? *data.vpeer_id() : MTPPeer());
  43. }, [&](const MTPDmessageService &data) {
  44. return MTP_messageService(
  45. MTP_flags(data.vflags().v
  46. | MTPDmessageService::Flag(
  47. MTPDmessage::Flag::f_from_scheduled)),
  48. data.vid(),
  49. data.vfrom_id() ? *data.vfrom_id() : MTPPeer(),
  50. data.vpeer_id(),
  51. data.vreply_to() ? *data.vreply_to() : MTPMessageReplyHeader(),
  52. data.vdate(),
  53. data.vaction(),
  54. data.vreactions() ? *data.vreactions() : MTPMessageReactions(),
  55. MTP_int(data.vttl_period().value_or_empty()));
  56. }, [&](const MTPDmessage &data) {
  57. return MTP_message(
  58. MTP_flags(data.vflags().v | MTPDmessage::Flag::f_from_scheduled),
  59. data.vid(),
  60. data.vfrom_id() ? *data.vfrom_id() : MTPPeer(),
  61. MTPint(), // from_boosts_applied
  62. data.vpeer_id(),
  63. data.vsaved_peer_id() ? *data.vsaved_peer_id() : MTPPeer(),
  64. data.vfwd_from() ? *data.vfwd_from() : MTPMessageFwdHeader(),
  65. MTP_long(data.vvia_bot_id().value_or_empty()),
  66. MTP_long(data.vvia_business_bot_id().value_or_empty()),
  67. data.vreply_to() ? *data.vreply_to() : MTPMessageReplyHeader(),
  68. data.vdate(),
  69. data.vmessage(),
  70. data.vmedia() ? *data.vmedia() : MTPMessageMedia(),
  71. data.vreply_markup() ? *data.vreply_markup() : MTPReplyMarkup(),
  72. (data.ventities()
  73. ? *data.ventities()
  74. : MTPVector<MTPMessageEntity>()),
  75. MTP_int(data.vviews().value_or_empty()),
  76. MTP_int(data.vforwards().value_or_empty()),
  77. data.vreplies() ? *data.vreplies() : MTPMessageReplies(),
  78. MTP_int(data.vedit_date().value_or_empty()),
  79. MTP_bytes(data.vpost_author().value_or_empty()),
  80. MTP_long(data.vgrouped_id().value_or_empty()),
  81. MTPMessageReactions(),
  82. MTPVector<MTPRestrictionReason>(),
  83. MTP_int(data.vttl_period().value_or_empty()),
  84. MTPint(), // quick_reply_shortcut_id
  85. MTP_long(data.veffect().value_or_empty()), // effect
  86. data.vfactcheck() ? *data.vfactcheck() : MTPFactCheck(),
  87. MTP_int(data.vreport_delivery_until_date().value_or_empty()),
  88. MTP_long(data.vpaid_message_stars().value_or_empty()));
  89. });
  90. }
  91. } // namespace
  92. bool IsScheduledMsgId(MsgId id) {
  93. return (id > ServerMaxMsgId) && (id < ScheduledMaxMsgId);
  94. }
  95. ScheduledMessages::ScheduledMessages(not_null<Main::Session*> session)
  96. : _session(session)
  97. , _clearTimer([=] { clearOldRequests(); }) {
  98. _session->data().itemRemoved(
  99. ) | rpl::filter([](not_null<const HistoryItem*> item) {
  100. return item->isScheduled();
  101. }) | rpl::start_with_next([=](not_null<const HistoryItem*> item) {
  102. remove(item);
  103. }, _lifetime);
  104. }
  105. ScheduledMessages::~ScheduledMessages() {
  106. Expects(_data.empty());
  107. Expects(_requests.empty());
  108. }
  109. void ScheduledMessages::clear() {
  110. _lifetime.destroy();
  111. for (const auto &request : base::take(_requests)) {
  112. _session->api().request(request.second.requestId).cancel();
  113. }
  114. base::take(_data);
  115. }
  116. void ScheduledMessages::clearOldRequests() {
  117. const auto now = crl::now();
  118. while (true) {
  119. const auto i = ranges::find_if(_requests, [&](const auto &value) {
  120. const auto &request = value.second;
  121. return !request.requestId
  122. && (request.lastReceived + kRequestTimeLimit <= now);
  123. });
  124. if (i == end(_requests)) {
  125. break;
  126. }
  127. _requests.erase(i);
  128. }
  129. }
  130. MsgId ScheduledMessages::localMessageId(MsgId remoteId) const {
  131. return RemoteToLocalMsgId(remoteId);
  132. }
  133. MsgId ScheduledMessages::lookupId(not_null<const HistoryItem*> item) const {
  134. Expects(item->isScheduled());
  135. Expects(!item->isSending());
  136. Expects(!item->hasFailed());
  137. return LocalToRemoteMsgId(item->id);
  138. }
  139. HistoryItem *ScheduledMessages::lookupItem(PeerId peer, MsgId msg) const {
  140. const auto history = _session->data().historyLoaded(peer);
  141. if (!history) {
  142. return nullptr;
  143. }
  144. const auto i = _data.find(history);
  145. if (i == end(_data)) {
  146. return nullptr;
  147. }
  148. const auto &items = i->second.items;
  149. const auto j = ranges::find_if(items, [&](auto &item) {
  150. return item->id == msg;
  151. });
  152. if (j == end(items)) {
  153. return nullptr;
  154. }
  155. return (*j).get();
  156. }
  157. HistoryItem *ScheduledMessages::lookupItem(FullMsgId itemId) const {
  158. return lookupItem(itemId.peer, itemId.msg);
  159. }
  160. int ScheduledMessages::count(not_null<History*> history) const {
  161. const auto i = _data.find(history);
  162. return (i != end(_data)) ? i->second.items.size() : 0;
  163. }
  164. bool ScheduledMessages::hasFor(not_null<Data::ForumTopic*> topic) const {
  165. const auto i = _data.find(topic->owningHistory());
  166. if (i == end(_data)) {
  167. return false;
  168. }
  169. return ranges::any_of(i->second.items, [&](const OwnedItem &item) {
  170. return item->topic() == topic;
  171. });
  172. }
  173. void ScheduledMessages::sendNowSimpleMessage(
  174. const MTPDupdateShortSentMessage &update,
  175. not_null<HistoryItem*> local) {
  176. Expects(local->isSending());
  177. Expects(local->isScheduled());
  178. if (HasScheduledDate(local)) {
  179. LOG(("Error: trying to put to history a new local message, "
  180. "that has scheduled date."));
  181. return;
  182. }
  183. // When the user sends a text message scheduled until online
  184. // while the recipient is already online, the server sends
  185. // updateShortSentMessage to the client and the client calls this method.
  186. // Since such messages can only be sent to recipients,
  187. // we know for sure that a message can't have fields such as the author,
  188. // views count, etc.
  189. const auto history = local->history();
  190. auto action = Api::SendAction(history);
  191. action.replyTo = local->replyTo();
  192. const auto replyHeader = NewMessageReplyHeader(action);
  193. const auto localFlags = NewMessageFlags(history->peer)
  194. & ~MessageFlag::BeingSent;
  195. const auto flags = MTPDmessage::Flag::f_entities
  196. | MTPDmessage::Flag::f_from_id
  197. | (action.replyTo
  198. ? MTPDmessage::Flag::f_reply_to
  199. : MTPDmessage::Flag(0))
  200. | (update.vttl_period()
  201. ? MTPDmessage::Flag::f_ttl_period
  202. : MTPDmessage::Flag(0))
  203. | ((localFlags & MessageFlag::Outgoing)
  204. ? MTPDmessage::Flag::f_out
  205. : MTPDmessage::Flag(0))
  206. | (local->effectId()
  207. ? MTPDmessage::Flag::f_effect
  208. : MTPDmessage::Flag(0));
  209. const auto views = 1;
  210. const auto forwards = 0;
  211. history->addNewMessage(
  212. update.vid().v,
  213. MTP_message(
  214. MTP_flags(flags),
  215. update.vid(),
  216. peerToMTP(local->from()->id),
  217. MTPint(), // from_boosts_applied
  218. peerToMTP(history->peer->id),
  219. MTPPeer(), // saved_peer_id
  220. MTPMessageFwdHeader(),
  221. MTPlong(), // via_bot_id
  222. MTPlong(), // via_business_bot_id
  223. replyHeader,
  224. update.vdate(),
  225. MTP_string(local->originalText().text),
  226. MTP_messageMediaEmpty(),
  227. MTPReplyMarkup(),
  228. Api::EntitiesToMTP(
  229. &history->session(),
  230. local->originalText().entities),
  231. MTP_int(views),
  232. MTP_int(forwards),
  233. MTPMessageReplies(),
  234. MTPint(), // edit_date
  235. MTP_string(),
  236. MTPlong(),
  237. MTPMessageReactions(),
  238. MTPVector<MTPRestrictionReason>(),
  239. MTP_int(update.vttl_period().value_or_empty()),
  240. MTPint(), // quick_reply_shortcut_id
  241. MTP_long(local->effectId()), // effect
  242. MTPFactCheck(),
  243. MTPint(), // report_delivery_until_date
  244. MTPlong()), // paid_message_stars
  245. localFlags,
  246. NewMessageType::Unread);
  247. local->destroy();
  248. }
  249. void ScheduledMessages::apply(const MTPDupdateNewScheduledMessage &update) {
  250. const auto &message = update.vmessage();
  251. const auto peer = PeerFromMessage(message);
  252. if (!peer) {
  253. return;
  254. }
  255. const auto history = _session->data().historyLoaded(peer);
  256. if (!history) {
  257. return;
  258. }
  259. auto &list = _data[history];
  260. append(history, list, message);
  261. sort(list);
  262. _updates.fire_copy(history);
  263. }
  264. void ScheduledMessages::checkEntitiesAndUpdate(const MTPDmessage &data) {
  265. // When the user sends a message with a media scheduled until online
  266. // while the recipient is already online, or scheduled message
  267. // is already due and is sent immediately, the server sends
  268. // updateNewMessage or updateNewChannelMessage to the client
  269. // and the client calls this method.
  270. const auto peer = peerFromMTP(data.vpeer_id());
  271. const auto history = _session->data().historyLoaded(peer);
  272. if (!history) {
  273. return;
  274. }
  275. const auto i = _data.find(history);
  276. if (i == end(_data)) {
  277. return;
  278. }
  279. const auto &itemMap = i->second.itemById;
  280. const auto j = itemMap.find(data.vid().v);
  281. if (j == end(itemMap)) {
  282. return;
  283. }
  284. const auto existing = j->second;
  285. if (!HasScheduledDate(existing)) {
  286. // Destroy a local message, that should be in history.
  287. existing->updateSentContent({
  288. qs(data.vmessage()),
  289. Api::EntitiesFromMTP(_session, data.ventities().value_or_empty())
  290. }, data.vmedia());
  291. existing->updateReplyMarkup(
  292. HistoryMessageMarkupData(data.vreply_markup()));
  293. existing->updateForwardedInfo(data.vfwd_from());
  294. _session->data().requestItemTextRefresh(existing);
  295. existing->destroy();
  296. }
  297. }
  298. void ScheduledMessages::apply(
  299. const MTPDupdateDeleteScheduledMessages &update) {
  300. const auto peer = peerFromMTP(update.vpeer());
  301. if (!peer) {
  302. return;
  303. }
  304. const auto history = _session->data().historyLoaded(peer);
  305. if (!history) {
  306. return;
  307. }
  308. auto i = _data.find(history);
  309. if (i == end(_data)) {
  310. return;
  311. }
  312. const auto sent = update.vsent_messages();
  313. const auto &ids = update.vmessages().v;
  314. for (auto k = 0, count = int(ids.size()); k != count; ++k) {
  315. const auto id = ids[k].v;
  316. const auto &list = i->second;
  317. const auto j = list.itemById.find(id);
  318. if (j != end(list.itemById)) {
  319. if (sent && k < sent->v.size()) {
  320. const auto &sentId = sent->v[k];
  321. _session->data().sentFromScheduled({
  322. .item = j->second,
  323. .sentId = sentId.v,
  324. });
  325. }
  326. j->second->destroy();
  327. i = _data.find(history);
  328. if (i == end(_data)) {
  329. break;
  330. }
  331. }
  332. }
  333. _updates.fire_copy(history);
  334. }
  335. void ScheduledMessages::apply(
  336. const MTPDupdateMessageID &update,
  337. not_null<HistoryItem*> local) {
  338. const auto id = update.vid().v;
  339. const auto i = _data.find(local->history());
  340. Assert(i != end(_data));
  341. auto &list = i->second;
  342. const auto j = list.itemById.find(id);
  343. if (j != end(list.itemById) || !IsServerMsgId(id)) {
  344. local->destroy();
  345. } else {
  346. Assert(!list.itemById.contains(local->id));
  347. local->setRealId(localMessageId(id));
  348. list.itemById.emplace(id, local);
  349. }
  350. }
  351. void ScheduledMessages::appendSending(not_null<HistoryItem*> item) {
  352. Expects(item->isSending());
  353. Expects(item->isScheduled());
  354. const auto history = item->history();
  355. auto &list = _data[history];
  356. list.items.emplace_back(item);
  357. sort(list);
  358. _updates.fire_copy(history);
  359. }
  360. void ScheduledMessages::removeSending(not_null<HistoryItem*> item) {
  361. Expects(item->isSending() || item->hasFailed());
  362. Expects(item->isScheduled());
  363. item->destroy();
  364. }
  365. rpl::producer<> ScheduledMessages::updates(not_null<History*> history) {
  366. request(history);
  367. return _updates.events(
  368. ) | rpl::filter([=](not_null<History*> value) {
  369. return (value == history);
  370. }) | rpl::to_empty;
  371. }
  372. Data::MessagesSlice ScheduledMessages::list(
  373. not_null<History*> history) const {
  374. auto result = Data::MessagesSlice();
  375. const auto i = _data.find(history);
  376. if (i == end(_data)) {
  377. const auto i = _requests.find(history);
  378. if (i == end(_requests)) {
  379. return result;
  380. }
  381. result.fullCount = result.skippedAfter = result.skippedBefore = 0;
  382. return result;
  383. }
  384. const auto &list = i->second.items;
  385. result.skippedAfter = result.skippedBefore = 0;
  386. result.fullCount = int(list.size());
  387. result.ids = ranges::views::all(
  388. list
  389. ) | ranges::views::transform(
  390. &HistoryItem::fullId
  391. ) | ranges::to_vector;
  392. return result;
  393. }
  394. Data::MessagesSlice ScheduledMessages::list(
  395. not_null<const Data::ForumTopic*> topic) const {
  396. auto result = Data::MessagesSlice();
  397. const auto i = _data.find(topic->Data::Thread::owningHistory());
  398. if (i == end(_data)) {
  399. const auto i = _requests.find(topic->Data::Thread::owningHistory());
  400. if (i == end(_requests)) {
  401. return result;
  402. }
  403. result.fullCount = result.skippedAfter = result.skippedBefore = 0;
  404. return result;
  405. }
  406. const auto &list = i->second.items;
  407. result.skippedAfter = result.skippedBefore = 0;
  408. result.fullCount = int(list.size());
  409. result.ids = ranges::views::all(
  410. list
  411. ) | ranges::views::filter([&](const OwnedItem &item) {
  412. return item->topic() == topic;
  413. }) | ranges::views::transform(
  414. &HistoryItem::fullId
  415. ) | ranges::to_vector;
  416. return result;
  417. }
  418. void ScheduledMessages::request(not_null<History*> history) {
  419. const auto peer = history->peer;
  420. if (peer->isBroadcast() && !Data::CanSendAnything(peer)) {
  421. return;
  422. }
  423. auto &request = _requests[history];
  424. if (request.requestId || TooEarlyForRequest(request.lastReceived)) {
  425. return;
  426. }
  427. const auto i = _data.find(history);
  428. const auto hash = (i != end(_data))
  429. ? countListHash(i->second)
  430. : uint64(0);
  431. request.requestId = _session->api().request(
  432. MTPmessages_GetScheduledHistory(peer->input, MTP_long(hash))
  433. ).done([=](const MTPmessages_Messages &result) {
  434. parse(history, result);
  435. }).fail([=] {
  436. _requests.remove(history);
  437. }).send();
  438. }
  439. void ScheduledMessages::parse(
  440. not_null<History*> history,
  441. const MTPmessages_Messages &list) {
  442. auto &request = _requests[history];
  443. request.lastReceived = crl::now();
  444. request.requestId = 0;
  445. if (!_clearTimer.isActive()) {
  446. _clearTimer.callOnce(kRequestTimeLimit * 2);
  447. }
  448. list.match([&](const MTPDmessages_messagesNotModified &data) {
  449. }, [&](const auto &data) {
  450. _session->data().processUsers(data.vusers());
  451. _session->data().processChats(data.vchats());
  452. const auto &messages = data.vmessages().v;
  453. if (messages.isEmpty()) {
  454. clearNotSending(history);
  455. return;
  456. }
  457. auto received = base::flat_set<not_null<HistoryItem*>>();
  458. auto clear = base::flat_set<not_null<HistoryItem*>>();
  459. auto &list = _data.emplace(history, List()).first->second;
  460. for (const auto &message : messages) {
  461. if (const auto item = append(history, list, message)) {
  462. received.emplace(item);
  463. }
  464. }
  465. for (const auto &owned : list.items) {
  466. const auto item = owned.get();
  467. if (!item->isSending() && !received.contains(item)) {
  468. clear.emplace(item);
  469. }
  470. }
  471. updated(history, received, clear);
  472. });
  473. }
  474. HistoryItem *ScheduledMessages::append(
  475. not_null<History*> history,
  476. List &list,
  477. const MTPMessage &message) {
  478. const auto id = message.match([&](const auto &data) {
  479. return data.vid().v;
  480. });
  481. const auto i = list.itemById.find(id);
  482. if (i != end(list.itemById)) {
  483. const auto existing = i->second;
  484. message.match([&](const MTPDmessage &data) {
  485. // Scheduled messages never have an edit date,
  486. // so if we receive a flag about it,
  487. // probably this message was edited.
  488. if (data.is_edit_hide()) {
  489. existing->applyEdition(HistoryMessageEdition(_session, data));
  490. } else {
  491. existing->updateSentContent({
  492. qs(data.vmessage()),
  493. Api::EntitiesFromMTP(
  494. _session,
  495. data.ventities().value_or_empty())
  496. }, data.vmedia());
  497. existing->updateReplyMarkup(
  498. HistoryMessageMarkupData(data.vreply_markup()));
  499. existing->updateForwardedInfo(data.vfwd_from());
  500. }
  501. existing->updateDate(data.vdate().v);
  502. history->owner().requestItemTextRefresh(existing);
  503. }, [&](const auto &data) {});
  504. return existing;
  505. }
  506. if (!IsServerMsgId(id)) {
  507. LOG(("API Error: Bad id in scheduled messages: %1.").arg(id));
  508. return nullptr;
  509. }
  510. const auto item = _session->data().addNewMessage(
  511. localMessageId(id),
  512. PrepareMessage(message),
  513. MessageFlags(), // localFlags
  514. NewMessageType::Existing);
  515. if (!item || item->history() != history) {
  516. LOG(("API Error: Bad data received in scheduled messages."));
  517. return nullptr;
  518. }
  519. list.items.emplace_back(item);
  520. list.itemById.emplace(id, item);
  521. return item;
  522. }
  523. void ScheduledMessages::clearNotSending(not_null<History*> history) {
  524. const auto i = _data.find(history);
  525. if (i == end(_data)) {
  526. return;
  527. }
  528. auto clear = base::flat_set<not_null<HistoryItem*>>();
  529. for (const auto &owned : i->second.items) {
  530. if (!owned->isSending() && !owned->hasFailed()) {
  531. clear.emplace(owned.get());
  532. }
  533. }
  534. updated(history, {}, clear);
  535. }
  536. void ScheduledMessages::updated(
  537. not_null<History*> history,
  538. const base::flat_set<not_null<HistoryItem*>> &added,
  539. const base::flat_set<not_null<HistoryItem*>> &clear) {
  540. if (!clear.empty()) {
  541. for (const auto &item : clear) {
  542. item->destroy();
  543. }
  544. }
  545. const auto i = _data.find(history);
  546. if (i != end(_data)) {
  547. sort(i->second);
  548. }
  549. if (!added.empty() || !clear.empty()) {
  550. _updates.fire_copy(history);
  551. }
  552. }
  553. void ScheduledMessages::sort(List &list) {
  554. ranges::sort(list.items, ranges::less(), &HistoryItem::position);
  555. }
  556. void ScheduledMessages::remove(not_null<const HistoryItem*> item) {
  557. const auto history = item->history();
  558. const auto i = _data.find(history);
  559. Assert(i != end(_data));
  560. auto &list = i->second;
  561. if (!item->isSending() && !item->hasFailed()) {
  562. list.itemById.remove(lookupId(item));
  563. }
  564. const auto k = ranges::find(list.items, item, &OwnedItem::get);
  565. Assert(k != list.items.end());
  566. k->release();
  567. list.items.erase(k);
  568. if (list.items.empty()) {
  569. _data.erase(i);
  570. }
  571. _updates.fire_copy(history);
  572. }
  573. uint64 ScheduledMessages::countListHash(const List &list) const {
  574. using namespace Api;
  575. auto hash = HashInit();
  576. auto &&serverside = ranges::views::all(
  577. list.items
  578. ) | ranges::views::filter([](const OwnedItem &item) {
  579. return !item->isSending() && !item->hasFailed();
  580. }) | ranges::views::reverse;
  581. for (const auto &item : serverside) {
  582. HashUpdate(hash, lookupId(item.get()).bare);
  583. if (const auto edited = item->Get<HistoryMessageEdited>()) {
  584. HashUpdate(hash, edited->date);
  585. } else {
  586. HashUpdate(hash, TimeId(0));
  587. }
  588. HashUpdate(hash, item->date());
  589. }
  590. return HashFinalize(hash);
  591. }
  592. } // namespace Data