data_group_call.cpp 28 KB


  1. /*
  2. This file is part of Telegram Desktop,
  3. the official desktop application for the Telegram messaging service.
  4. For license and copyright information please follow this link:
  5. https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
  6. */
  7. #include "data/data_group_call.h"
  8. #include "base/unixtime.h"
  9. #include "data/data_channel.h"
  10. #include "data/data_chat.h"
  11. #include "data/data_changes.h"
  12. #include "data/data_session.h"
  13. #include "main/main_session.h"
  14. #include "calls/calls_instance.h"
  15. #include "calls/group/calls_group_call.h"
  16. #include "calls/group/calls_group_common.h"
  17. #include "core/application.h"
  18. #include "apiwrap.h"
  19. namespace Data {
  20. namespace {
  21. constexpr auto kRequestPerPage = 50;
  22. constexpr auto kSpeakingAfterActive = crl::time(6000);
  23. constexpr auto kActiveAfterJoined = crl::time(1000);
  24. constexpr auto kWaitForUpdatesTimeout = 3 * crl::time(1000);
  25. constexpr auto kReloadStaleTimeout = 16 * crl::time(1000);
  26. [[nodiscard]] QString ExtractNextOffset(const MTPphone_GroupCall &call) {
  27. return call.match([&](const MTPDphone_groupCall &data) {
  28. return qs(data.vparticipants_next_offset());
  29. });
  30. }
  31. } // namespace
  32. const std::string &RtmpEndpointId() {
  33. static const auto result = std::string("unified");
  34. return result;
  35. }
  36. const std::string &GroupCallParticipant::cameraEndpoint() const {
  37. return GetCameraEndpoint(videoParams);
  38. }
  39. const std::string &GroupCallParticipant::screenEndpoint() const {
  40. return GetScreenEndpoint(videoParams);
  41. }
  42. bool GroupCallParticipant::cameraPaused() const {
  43. return IsCameraPaused(videoParams);
  44. }
  45. bool GroupCallParticipant::screenPaused() const {
  46. return IsScreenPaused(videoParams);
  47. }
  48. GroupCall::GroupCall(
  49. not_null<PeerData*> peer,
  50. CallId id,
  51. CallId accessHash,
  52. TimeId scheduleDate,
  53. bool rtmp)
  54. : _id(id)
  55. , _accessHash(accessHash)
  56. , _peer(peer)
  57. , _reloadByQueuedUpdatesTimer([=] { reload(); })
  58. , _speakingByActiveFinishTimer([=] { checkFinishSpeakingByActive(); })
  59. , _scheduleDate(scheduleDate)
  60. , _rtmp(rtmp)
  61. , _listenersHidden(rtmp) {
  62. }
  63. GroupCall::~GroupCall() {
  64. api().request(_unknownParticipantPeersRequestId).cancel();
  65. api().request(_participantsRequestId).cancel();
  66. api().request(_reloadRequestId).cancel();
  67. }
  68. CallId GroupCall::id() const {
  69. return _id;
  70. }
  71. bool GroupCall::loaded() const {
  72. return _version > 0;
  73. }
  74. bool GroupCall::rtmp() const {
  75. return _rtmp;
  76. }
  77. bool GroupCall::listenersHidden() const {
  78. return _listenersHidden;
  79. }
  80. not_null<PeerData*> GroupCall::peer() const {
  81. return _peer;
  82. }
  83. MTPInputGroupCall GroupCall::input() const {
  84. return MTP_inputGroupCall(MTP_long(_id), MTP_long(_accessHash));
  85. }
  86. void GroupCall::setPeer(not_null<PeerData*> peer) {
  87. Expects(peer->migrateFrom() == _peer);
  88. Expects(_peer->migrateTo() == peer);
  89. _peer = peer;
  90. }
  91. auto GroupCall::participants() const
  92. -> const std::vector<Participant> & {
  93. return _participants;
  94. }
  95. void GroupCall::requestParticipants() {
  96. if (!_savedFull) {
  97. if (_participantsRequestId || _reloadRequestId) {
  98. return;
  99. } else if (_allParticipantsLoaded) {
  100. return;
  101. }
  102. }
  103. api().request(base::take(_participantsRequestId)).cancel();
  104. _participantsRequestId = api().request(MTPphone_GetGroupParticipants(
  105. input(),
  106. MTP_vector<MTPInputPeer>(), // ids
  107. MTP_vector<MTPint>(), // ssrcs
  108. MTP_string(_savedFull
  109. ? ExtractNextOffset(*_savedFull)
  110. : _nextOffset),
  111. MTP_int(kRequestPerPage)
  112. )).done([=](const MTPphone_GroupParticipants &result) {
  113. _participantsRequestId = 0;
  114. result.match([&](const MTPDphone_groupParticipants &data) {
  115. const auto reloaded = processSavedFullCall();
  116. _nextOffset = qs(data.vnext_offset());
  117. _peer->owner().processUsers(data.vusers());
  118. _peer->owner().processChats(data.vchats());
  119. applyParticipantsSlice(
  120. data.vparticipants().v,
  121. (reloaded
  122. ? ApplySliceSource::FullReloaded
  123. : ApplySliceSource::SliceLoaded));
  124. setServerParticipantsCount(data.vcount().v);
  125. if (data.vparticipants().v.isEmpty()) {
  126. _allParticipantsLoaded = true;
  127. }
  128. finishParticipantsSliceRequest();
  129. if (reloaded) {
  130. _participantsReloaded.fire({});
  131. }
  132. });
  133. }).fail([=] {
  134. _participantsRequestId = 0;
  135. const auto reloaded = processSavedFullCall();
  136. setServerParticipantsCount(_participants.size());
  137. _allParticipantsLoaded = true;
  138. finishParticipantsSliceRequest();
  139. if (reloaded) {
  140. _participantsReloaded.fire({});
  141. }
  142. }).send();
  143. }
  144. bool GroupCall::processSavedFullCall() {
  145. if (!_savedFull) {
  146. return false;
  147. }
  148. api().request(base::take(_reloadRequestId)).cancel();
  149. _reloadLastFinished = crl::now();
  150. processFullCallFields(*base::take(_savedFull));
  151. return true;
  152. }
  153. void GroupCall::finishParticipantsSliceRequest() {
  154. computeParticipantsCount();
  155. processQueuedUpdates();
  156. }
  157. void GroupCall::setServerParticipantsCount(int count) {
  158. _serverParticipantsCount = count;
  159. changePeerEmptyCallFlag();
  160. }
  161. void GroupCall::changePeerEmptyCallFlag() {
  162. const auto chat = _peer->asChat();
  163. const auto channel = _peer->asChannel();
  164. constexpr auto chatFlag = ChatDataFlag::CallNotEmpty;
  165. constexpr auto channelFlag = ChannelDataFlag::CallNotEmpty;
  166. if (_peer->groupCall() != this) {
  167. return;
  168. } else if (_serverParticipantsCount > 0) {
  169. if (chat && !(chat->flags() & chatFlag)) {
  170. chat->addFlags(chatFlag);
  171. chat->session().changes().peerUpdated(
  172. chat,
  173. Data::PeerUpdate::Flag::GroupCall);
  174. } else if (channel && !(channel->flags() & channelFlag)) {
  175. channel->addFlags(channelFlag);
  176. channel->session().changes().peerUpdated(
  177. channel,
  178. Data::PeerUpdate::Flag::GroupCall);
  179. }
  180. } else if (chat && (chat->flags() & chatFlag)) {
  181. chat->removeFlags(chatFlag);
  182. chat->session().changes().peerUpdated(
  183. chat,
  184. Data::PeerUpdate::Flag::GroupCall);
  185. } else if (channel && (channel->flags() & channelFlag)) {
  186. channel->removeFlags(channelFlag);
  187. channel->session().changes().peerUpdated(
  188. channel,
  189. Data::PeerUpdate::Flag::GroupCall);
  190. }
  191. }
  192. int GroupCall::fullCount() const {
  193. return _fullCount.current();
  194. }
  195. rpl::producer<int> GroupCall::fullCountValue() const {
  196. return _fullCount.value();
  197. }
  198. bool GroupCall::participantsLoaded() const {
  199. return _allParticipantsLoaded;
  200. }
  201. PeerData *GroupCall::participantPeerByAudioSsrc(uint32 ssrc) const {
  202. const auto i = _participantPeerByAudioSsrc.find(ssrc);
  203. return (i != end(_participantPeerByAudioSsrc))
  204. ? i->second.get()
  205. : nullptr;
  206. }
  207. const GroupCallParticipant *GroupCall::participantByPeer(
  208. not_null<PeerData*> peer) const {
  209. return const_cast<GroupCall*>(this)->findParticipant(peer);
  210. }
  211. GroupCallParticipant *GroupCall::findParticipant(
  212. not_null<PeerData*> peer) {
  213. const auto i = ranges::find(_participants, peer, &Participant::peer);
  214. return (i != end(_participants)) ? &*i : nullptr;
  215. }
  216. const GroupCallParticipant *GroupCall::participantByEndpoint(
  217. const std::string &endpoint) const {
  218. if (endpoint.empty()) {
  219. return nullptr;
  220. }
  221. for (const auto &participant : _participants) {
  222. if (GetCameraEndpoint(participant.videoParams) == endpoint
  223. || GetScreenEndpoint(participant.videoParams) == endpoint) {
  224. return &participant;
  225. }
  226. }
  227. return nullptr;
  228. }
  229. rpl::producer<> GroupCall::participantsReloaded() {
  230. return _participantsReloaded.events();
  231. }
  232. auto GroupCall::participantUpdated() const
  233. -> rpl::producer<ParticipantUpdate> {
  234. return _participantUpdates.events();
  235. }
  236. auto GroupCall::participantSpeaking() const
  237. -> rpl::producer<not_null<Participant*>> {
  238. return _participantSpeaking.events();
  239. }
  240. void GroupCall::enqueueUpdate(const MTPUpdate &update) {
  241. update.match([&](const MTPDupdateGroupCall &updateData) {
  242. updateData.vcall().match([&](const MTPDgroupCall &data) {
  243. const auto version = data.vversion().v;
  244. if (!_applyingQueuedUpdates
  245. && (!_version || _version == version)) {
  246. DEBUG_LOG(("Group Call Participants: "
  247. "Apply updateGroupCall %1 -> %2"
  248. ).arg(_version
  249. ).arg(version));
  250. applyEnqueuedUpdate(update);
  251. } else if (!_version || _version <= version) {
  252. DEBUG_LOG(("Group Call Participants: "
  253. "Queue updateGroupCall %1 -> %2"
  254. ).arg(_version
  255. ).arg(version));
  256. const auto type = QueuedType::Call;
  257. _queuedUpdates.emplace(std::pair{ version, type }, update);
  258. }
  259. }, [&](const MTPDgroupCallDiscarded &data) {
  260. discard(data);
  261. });
  262. }, [&](const MTPDupdateGroupCallParticipants &updateData) {
  263. const auto version = updateData.vversion().v;
  264. const auto proj = [](const MTPGroupCallParticipant &data) {
  265. return data.match([&](const MTPDgroupCallParticipant &data) {
  266. return data.is_versioned();
  267. });
  268. };
  269. const auto increment = ranges::contains(
  270. updateData.vparticipants().v,
  271. true,
  272. proj);
  273. const auto required = increment ? (version - 1) : version;
  274. if (!_applyingQueuedUpdates && (_version == required)) {
  275. DEBUG_LOG(("Group Call Participants: "
  276. "Apply updateGroupCallParticipant %1 (%2)"
  277. ).arg(_version
  278. ).arg(Logs::b(increment)));
  279. applyEnqueuedUpdate(update);
  280. } else if (_version <= required) {
  281. DEBUG_LOG(("Group Call Participants: "
  282. "Queue updateGroupCallParticipant %1 -> %2 (%3)"
  283. ).arg(_version
  284. ).arg(version
  285. ).arg(Logs::b(increment)));
  286. const auto type = increment
  287. ? QueuedType::VersionedParticipant
  288. : QueuedType::Participant;
  289. _queuedUpdates.emplace(std::pair{ version, type }, update);
  290. }
  291. }, [](const auto &) {
  292. Unexpected("Type in GroupCall::enqueueUpdate.");
  293. });
  294. processQueuedUpdates();
  295. }
  296. void GroupCall::discard(const MTPDgroupCallDiscarded &data) {
  297. const auto id = _id;
  298. const auto peer = _peer;
  299. crl::on_main(&peer->session(), [=] {
  300. if (peer->groupCall() && peer->groupCall()->id() == id) {
  301. if (const auto chat = peer->asChat()) {
  302. chat->clearGroupCall();
  303. } else if (const auto channel = peer->asChannel()) {
  304. channel->clearGroupCall();
  305. }
  306. }
  307. });
  308. Core::App().calls().applyGroupCallUpdateChecked(
  309. &peer->session(),
  310. MTP_updateGroupCall(
  311. MTP_flags(MTPDupdateGroupCall::Flag::f_chat_id),
  312. MTP_long(peer->isChat()
  313. ? peerToChat(peer->id).bare
  314. : peerToChannel(peer->id).bare),
  315. MTP_groupCallDiscarded(
  316. data.vid(),
  317. data.vaccess_hash(),
  318. data.vduration())));
  319. }
  320. void GroupCall::processFullCallUsersChats(const MTPphone_GroupCall &call) {
  321. call.match([&](const MTPDphone_groupCall &data) {
  322. _peer->owner().processUsers(data.vusers());
  323. _peer->owner().processChats(data.vchats());
  324. });
  325. }
  326. void GroupCall::processFullCallFields(const MTPphone_GroupCall &call) {
  327. call.match([&](const MTPDphone_groupCall &data) {
  328. const auto &participants = data.vparticipants().v;
  329. const auto nextOffset = qs(data.vparticipants_next_offset());
  330. data.vcall().match([&](const MTPDgroupCall &data) {
  331. _participants.clear();
  332. _speakingByActiveFinishes.clear();
  333. _participantPeerByAudioSsrc.clear();
  334. _allParticipantsLoaded = false;
  335. applyParticipantsSlice(
  336. participants,
  337. ApplySliceSource::FullReloaded);
  338. _nextOffset = nextOffset;
  339. applyCallFields(data);
  340. }, [&](const MTPDgroupCallDiscarded &data) {
  341. discard(data);
  342. });
  343. });
  344. }
  345. void GroupCall::processFullCall(const MTPphone_GroupCall &call) {
  346. processFullCallUsersChats(call);
  347. processFullCallFields(call);
  348. finishParticipantsSliceRequest();
  349. _participantsReloaded.fire({});
  350. }
  351. void GroupCall::applyCallFields(const MTPDgroupCall &data) {
  352. DEBUG_LOG(("Group Call Participants: "
  353. "Set from groupCall %1 -> %2"
  354. ).arg(_version
  355. ).arg(data.vversion().v));
  356. _version = data.vversion().v;
  357. if (!_version) {
  358. LOG(("API Error: Got zero version in groupCall."));
  359. _version = 1;
  360. }
  361. _rtmp = data.is_rtmp_stream();
  362. _listenersHidden = data.is_listeners_hidden();
  363. _joinMuted = data.is_join_muted();
  364. _canChangeJoinMuted = data.is_can_change_join_muted();
  365. _joinedToTop = !data.is_join_date_asc();
  366. setServerParticipantsCount(data.vparticipants_count().v);
  367. changePeerEmptyCallFlag();
  368. _title = qs(data.vtitle().value_or_empty());
  369. {
  370. _recordVideo = data.is_record_video_active();
  371. _recordStartDate = data.vrecord_start_date().value_or_empty();
  372. }
  373. _scheduleDate = data.vschedule_date().value_or_empty();
  374. _scheduleStartSubscribed = data.is_schedule_start_subscribed();
  375. _unmutedVideoLimit = data.vunmuted_video_limit().v;
  376. _allParticipantsLoaded
  377. = (_serverParticipantsCount == _participants.size());
  378. }
  379. void GroupCall::applyLocalUpdate(
  380. const MTPDupdateGroupCallParticipants &update) {
  381. applyParticipantsSlice(
  382. update.vparticipants().v,
  383. ApplySliceSource::UpdateConstructed);
  384. }
  385. void GroupCall::applyEnqueuedUpdate(const MTPUpdate &update) {
  386. Expects(!_applyingQueuedUpdates);
  387. _applyingQueuedUpdates = true;
  388. const auto guard = gsl::finally([&] { _applyingQueuedUpdates = false; });
  389. update.match([&](const MTPDupdateGroupCall &data) {
  390. data.vcall().match([&](const MTPDgroupCall &data) {
  391. applyCallFields(data);
  392. computeParticipantsCount();
  393. }, [&](const MTPDgroupCallDiscarded &data) {
  394. discard(data);
  395. });
  396. }, [&](const MTPDupdateGroupCallParticipants &data) {
  397. DEBUG_LOG(("Group Call Participants: "
  398. "Set from updateGroupCallParticipants %1 -> %2"
  399. ).arg(_version
  400. ).arg(data.vversion().v));
  401. _version = data.vversion().v;
  402. if (!_version) {
  403. LOG(("API Error: "
  404. "Got zero version in updateGroupCallParticipants."));
  405. _version = 1;
  406. }
  407. applyParticipantsSlice(
  408. data.vparticipants().v,
  409. ApplySliceSource::UpdateReceived);
  410. }, [](const auto &) {
  411. Unexpected("Type in GroupCall::applyEnqueuedUpdate.");
  412. });
  413. Core::App().calls().applyGroupCallUpdateChecked(
  414. &_peer->session(),
  415. update);
  416. }
  417. void GroupCall::processQueuedUpdates() {
  418. if (!_version || _applyingQueuedUpdates) {
  419. return;
  420. }
  421. const auto size = _queuedUpdates.size();
  422. while (!_queuedUpdates.empty()) {
  423. const auto &entry = _queuedUpdates.front();
  424. const auto version = entry.first.first;
  425. const auto type = entry.first.second;
  426. const auto incremented = (type == QueuedType::VersionedParticipant);
  427. if ((version < _version)
  428. || (version == _version && incremented)) {
  429. _queuedUpdates.erase(_queuedUpdates.begin());
  430. } else if (version == _version
  431. || (version == _version + 1 && incremented)) {
  432. const auto update = entry.second;
  433. _queuedUpdates.erase(_queuedUpdates.begin());
  434. applyEnqueuedUpdate(update);
  435. } else {
  436. break;
  437. }
  438. }
  439. if (_queuedUpdates.empty()) {
  440. _reloadByQueuedUpdatesTimer.cancel();
  441. } else if (_queuedUpdates.size() != size
  442. || !_reloadByQueuedUpdatesTimer.isActive()) {
  443. _reloadByQueuedUpdatesTimer.callOnce(kWaitForUpdatesTimeout);
  444. }
  445. }
  446. void GroupCall::computeParticipantsCount() {
  447. _fullCount = (_allParticipantsLoaded && !_listenersHidden)
  448. ? int(_participants.size())
  449. : std::max(int(_participants.size()), _serverParticipantsCount);
  450. }
  451. void GroupCall::reloadIfStale() {
  452. if (!fullCount() && !participantsLoaded()) {
  453. reload();
  454. } else if (!_reloadLastFinished
  455. || crl::now() > _reloadLastFinished + kReloadStaleTimeout) {
  456. reload();
  457. }
  458. }
  459. void GroupCall::reload() {
  460. if (_reloadRequestId || _applyingQueuedUpdates) {
  461. return;
  462. }
  463. api().request(base::take(_participantsRequestId)).cancel();
  464. DEBUG_LOG(("Group Call Participants: "
  465. "Reloading with queued: %1"
  466. ).arg(_queuedUpdates.size()));
  467. while (!_queuedUpdates.empty()) {
  468. const auto &entry = _queuedUpdates.front();
  469. const auto update = entry.second;
  470. _queuedUpdates.erase(_queuedUpdates.begin());
  471. applyEnqueuedUpdate(update);
  472. }
  473. _reloadByQueuedUpdatesTimer.cancel();
  474. const auto limit = 3;
  475. _reloadRequestId = api().request(
  476. MTPphone_GetGroupCall(input(), MTP_int(limit))
  477. ).done([=](const MTPphone_GroupCall &result) {
  478. if (requestParticipantsAfterReload(result)) {
  479. _savedFull = result;
  480. processFullCallUsersChats(result);
  481. requestParticipants();
  482. return;
  483. }
  484. _reloadRequestId = 0;
  485. _reloadLastFinished = crl::now();
  486. processFullCall(result);
  487. }).fail([=] {
  488. _reloadRequestId = 0;
  489. _reloadLastFinished = crl::now();
  490. }).send();
  491. }
  492. bool GroupCall::requestParticipantsAfterReload(
  493. const MTPphone_GroupCall &call) const {
  494. return call.match([&](const MTPDphone_groupCall &data) {
  495. const auto received = data.vparticipants().v.size();
  496. const auto size = data.vcall().match([&](const MTPDgroupCall &data) {
  497. return data.vparticipants_count().v;
  498. }, [](const auto &) {
  499. return 0;
  500. });
  501. return (received < size) && (received < _participants.size());
  502. });
  503. }
  504. void GroupCall::applyParticipantsSlice(
  505. const QVector<MTPGroupCallParticipant> &list,
  506. ApplySliceSource sliceSource) {
  507. for (const auto &participant : list) {
  508. participant.match([&](const MTPDgroupCallParticipant &data) {
  509. const auto participantPeerId = peerFromMTP(data.vpeer());
  510. const auto participantPeer = _peer->owner().peer(
  511. participantPeerId);
  512. const auto i = ranges::find(
  513. _participants,
  514. participantPeer,
  515. &Participant::peer);
  516. if (data.is_left()) {
  517. if (i != end(_participants)) {
  518. auto update = ParticipantUpdate{
  519. .was = *i,
  520. };
  521. _participantPeerByAudioSsrc.erase(i->ssrc);
  522. _participantPeerByAudioSsrc.erase(
  523. GetAdditionalAudioSsrc(i->videoParams));
  524. _speakingByActiveFinishes.remove(participantPeer);
  525. _participants.erase(i);
  526. if (sliceSource != ApplySliceSource::FullReloaded) {
  527. _participantUpdates.fire(std::move(update));
  528. }
  529. }
  530. if (_serverParticipantsCount > 0) {
  531. --_serverParticipantsCount;
  532. }
  533. return;
  534. }
  535. if (const auto about = data.vabout()) {
  536. participantPeer->setAbout(qs(*about));
  537. }
  538. const auto was = (i != end(_participants))
  539. ? std::make_optional(*i)
  540. : std::nullopt;
  541. const auto canSelfUnmute = !data.is_muted()
  542. || data.is_can_self_unmute();
  543. const auto lastActive = data.vactive_date().value_or(
  544. was ? was->lastActive : 0);
  545. const auto volume = (was
  546. && !was->applyVolumeFromMin
  547. && data.is_min())
  548. ? was->volume
  549. : data.vvolume().value_or(Calls::Group::kDefaultVolume);
  550. const auto applyVolumeFromMin = (was && data.is_min())
  551. ? was->applyVolumeFromMin
  552. : (data.is_min() || data.is_volume_by_admin());
  553. const auto mutedByMe = (was && data.is_min())
  554. ? was->mutedByMe
  555. : data.is_muted_by_you();
  556. const auto onlyMinLoaded = data.is_min()
  557. && (!was || was->onlyMinLoaded);
  558. const auto videoJoined = data.is_video_joined();
  559. const auto raisedHandRating
  560. = data.vraise_hand_rating().value_or_empty();
  561. const auto localUpdate = (sliceSource
  562. == ApplySliceSource::UpdateConstructed);
  563. const auto existingVideoParams = (i != end(_participants))
  564. ? i->videoParams
  565. : nullptr;
  566. auto videoParams = localUpdate
  567. ? existingVideoParams
  568. : Calls::ParseVideoParams(
  569. data.vvideo(),
  570. data.vpresentation(),
  571. existingVideoParams);
  572. const auto value = Participant{
  573. .peer = participantPeer,
  574. .videoParams = std::move(videoParams),
  575. .date = data.vdate().v,
  576. .lastActive = lastActive,
  577. .raisedHandRating = raisedHandRating,
  578. .ssrc = uint32(data.vsource().v),
  579. .volume = volume,
  580. .sounding = canSelfUnmute && was && was->sounding,
  581. .speaking = canSelfUnmute && was && was->speaking,
  582. .additionalSounding = (canSelfUnmute
  583. && was
  584. && was->additionalSounding),
  585. .additionalSpeaking = (canSelfUnmute
  586. && was
  587. && was->additionalSpeaking),
  588. .muted = data.is_muted(),
  589. .mutedByMe = mutedByMe,
  590. .canSelfUnmute = canSelfUnmute,
  591. .onlyMinLoaded = onlyMinLoaded,
  592. .videoJoined = videoJoined,
  593. .applyVolumeFromMin = applyVolumeFromMin,
  594. };
  595. if (i == end(_participants)) {
  596. if (value.ssrc) {
  597. _participantPeerByAudioSsrc.emplace(
  598. value.ssrc,
  599. participantPeer);
  600. }
  601. if (const auto additional = GetAdditionalAudioSsrc(
  602. value.videoParams)) {
  603. _participantPeerByAudioSsrc.emplace(
  604. additional,
  605. participantPeer);
  606. }
  607. _participants.push_back(value);
  608. if (const auto user = participantPeer->asUser()) {
  609. _peer->owner().unregisterInvitedToCallUser(_id, user);
  610. }
  611. } else {
  612. if (i->ssrc != value.ssrc) {
  613. _participantPeerByAudioSsrc.erase(i->ssrc);
  614. if (value.ssrc) {
  615. _participantPeerByAudioSsrc.emplace(
  616. value.ssrc,
  617. participantPeer);
  618. }
  619. }
  620. if (GetAdditionalAudioSsrc(i->videoParams)
  621. != GetAdditionalAudioSsrc(value.videoParams)) {
  622. _participantPeerByAudioSsrc.erase(
  623. GetAdditionalAudioSsrc(i->videoParams));
  624. if (const auto additional = GetAdditionalAudioSsrc(
  625. value.videoParams)) {
  626. _participantPeerByAudioSsrc.emplace(
  627. additional,
  628. participantPeer);
  629. }
  630. }
  631. *i = value;
  632. }
  633. if (data.is_just_joined()) {
  634. ++_serverParticipantsCount;
  635. }
  636. if (sliceSource != ApplySliceSource::FullReloaded) {
  637. _participantUpdates.fire({
  638. .was = was,
  639. .now = value,
  640. });
  641. }
  642. });
  643. }
  644. if (sliceSource == ApplySliceSource::UpdateReceived) {
  645. changePeerEmptyCallFlag();
  646. computeParticipantsCount();
  647. }
  648. }
  649. void GroupCall::applyLastSpoke(
  650. uint32 ssrc,
  651. LastSpokeTimes when,
  652. crl::time now) {
  653. const auto i = _participantPeerByAudioSsrc.find(ssrc);
  654. if (i == end(_participantPeerByAudioSsrc)) {
  655. _unknownSpokenSsrcs[ssrc] = when;
  656. requestUnknownParticipants();
  657. return;
  658. }
  659. const auto participant = findParticipant(i->second);
  660. Assert(participant != nullptr);
  661. _speakingByActiveFinishes.remove(participant->peer);
  662. const auto sounding = (when.anything + kSoundStatusKeptFor >= now)
  663. && participant->canSelfUnmute;
  664. const auto speaking = sounding
  665. && (when.voice + kSoundStatusKeptFor >= now);
  666. if (speaking) {
  667. _participantSpeaking.fire({ participant });
  668. }
  669. const auto useAdditional = (ssrc != participant->ssrc);
  670. const auto nowSounding = useAdditional
  671. ? participant->additionalSounding
  672. : participant->sounding;
  673. const auto nowSpeaking = useAdditional
  674. ? participant->additionalSpeaking
  675. : participant->speaking;
  676. if (nowSounding != sounding || nowSpeaking != speaking) {
  677. const auto was = *participant;
  678. if (useAdditional) {
  679. participant->additionalSounding = sounding;
  680. participant->additionalSpeaking = speaking;
  681. } else {
  682. participant->sounding = sounding;
  683. participant->speaking = speaking;
  684. }
  685. _participantUpdates.fire({
  686. .was = was,
  687. .now = *participant,
  688. });
  689. }
  690. }
  691. void GroupCall::resolveParticipants(const base::flat_set<uint32> &ssrcs) {
  692. if (ssrcs.empty()) {
  693. return;
  694. }
  695. for (const auto ssrc : ssrcs) {
  696. _unknownSpokenSsrcs.emplace(ssrc, LastSpokeTimes());
  697. }
  698. requestUnknownParticipants();
  699. }
  700. void GroupCall::applyActiveUpdate(
  701. PeerId participantPeerId,
  702. LastSpokeTimes when,
  703. PeerData *participantPeerLoaded) {
  704. if (inCall()) {
  705. return;
  706. }
  707. const auto participant = participantPeerLoaded
  708. ? findParticipant(participantPeerLoaded)
  709. : nullptr;
  710. const auto loadByUserId = !participant || participant->onlyMinLoaded;
  711. if (loadByUserId) {
  712. _unknownSpokenPeerIds[participantPeerId] = when;
  713. requestUnknownParticipants();
  714. }
  715. if (!participant || !participant->canSelfUnmute) {
  716. return;
  717. }
  718. const auto was = std::make_optional(*participant);
  719. const auto now = crl::now();
  720. const auto elapsed = TimeId((now - when.anything) / crl::time(1000));
  721. const auto lastActive = base::unixtime::now() - elapsed;
  722. const auto finishes = when.anything + kSpeakingAfterActive;
  723. if (lastActive <= participant->lastActive || finishes <= now) {
  724. return;
  725. }
  726. _speakingByActiveFinishes[participant->peer] = finishes;
  727. if (!_speakingByActiveFinishTimer.isActive()) {
  728. _speakingByActiveFinishTimer.callOnce(finishes - now);
  729. }
  730. participant->lastActive = lastActive;
  731. participant->speaking = true;
  732. participant->canSelfUnmute = true;
  733. if (!was->speaking || !was->canSelfUnmute) {
  734. _participantUpdates.fire({
  735. .was = was,
  736. .now = *participant,
  737. });
  738. }
  739. }
  740. void GroupCall::checkFinishSpeakingByActive() {
  741. const auto now = crl::now();
  742. auto nearest = crl::time(0);
  743. auto stop = std::vector<not_null<PeerData*>>();
  744. for (auto i = begin(_speakingByActiveFinishes)
  745. ; i != end(_speakingByActiveFinishes);) {
  746. const auto when = i->second;
  747. if (now >= when) {
  748. stop.push_back(i->first);
  749. i = _speakingByActiveFinishes.erase(i);
  750. } else {
  751. if (!nearest || nearest > when) {
  752. nearest = when;
  753. }
  754. ++i;
  755. }
  756. }
  757. for (const auto participantPeer : stop) {
  758. const auto participant = findParticipant(participantPeer);
  759. Assert(participant != nullptr);
  760. if (participant->speaking) {
  761. const auto was = *participant;
  762. participant->speaking = false;
  763. _participantUpdates.fire({
  764. .was = was,
  765. .now = *participant,
  766. });
  767. }
  768. }
  769. if (nearest) {
  770. _speakingByActiveFinishTimer.callOnce(nearest - now);
  771. }
  772. }
  773. void GroupCall::requestUnknownParticipants() {
  774. if (_unknownParticipantPeersRequestId
  775. || (_unknownSpokenSsrcs.empty() && _unknownSpokenPeerIds.empty())) {
  776. return;
  777. }
  778. const auto ssrcs = [&] {
  779. if (_unknownSpokenSsrcs.size() < kRequestPerPage) {
  780. return base::take(_unknownSpokenSsrcs);
  781. }
  782. auto result = base::flat_map<uint32, LastSpokeTimes>();
  783. result.reserve(kRequestPerPage);
  784. while (result.size() < kRequestPerPage) {
  785. const auto &[ssrc, when] = _unknownSpokenSsrcs.back();
  786. result.emplace(ssrc, when);
  787. _unknownSpokenSsrcs.erase(_unknownSpokenSsrcs.end() - 1);
  788. }
  789. return result;
  790. }();
  791. const auto participantPeerIds = [&] {
  792. if (_unknownSpokenPeerIds.size() + ssrcs.size() < kRequestPerPage) {
  793. return base::take(_unknownSpokenPeerIds);
  794. }
  795. auto result = base::flat_map<PeerId, LastSpokeTimes>();
  796. const auto available = (kRequestPerPage - int(ssrcs.size()));
  797. if (available > 0) {
  798. result.reserve(available);
  799. while (result.size() < available) {
  800. const auto &back = _unknownSpokenPeerIds.back();
  801. const auto &[participantPeerId, when] = back;
  802. result.emplace(participantPeerId, when);
  803. _unknownSpokenPeerIds.erase(_unknownSpokenPeerIds.end() - 1);
  804. }
  805. }
  806. return result;
  807. }();
  808. auto ssrcInputs = QVector<MTPint>();
  809. ssrcInputs.reserve(ssrcs.size());
  810. for (const auto &[ssrc, when] : ssrcs) {
  811. ssrcInputs.push_back(MTP_int(ssrc));
  812. }
  813. auto peerInputs = QVector<MTPInputPeer>();
  814. peerInputs.reserve(participantPeerIds.size());
  815. for (const auto &[participantPeerId, when] : participantPeerIds) {
  816. if (const auto userId = peerToUser(participantPeerId)) {
  817. peerInputs.push_back(
  818. MTP_inputPeerUser(MTP_long(userId.bare), MTP_long(0)));
  819. } else if (const auto chatId = peerToChat(participantPeerId)) {
  820. peerInputs.push_back(MTP_inputPeerChat(MTP_long(chatId.bare)));
  821. } else if (const auto channelId = peerToChannel(participantPeerId)) {
  822. peerInputs.push_back(
  823. MTP_inputPeerChannel(MTP_long(channelId.bare), MTP_long(0)));
  824. }
  825. }
  826. _unknownParticipantPeersRequestId = api().request(
  827. MTPphone_GetGroupParticipants(
  828. input(),
  829. MTP_vector<MTPInputPeer>(peerInputs),
  830. MTP_vector<MTPint>(ssrcInputs),
  831. MTP_string(QString()),
  832. MTP_int(kRequestPerPage)
  833. )
  834. ).done([=](const MTPphone_GroupParticipants &result) {
  835. result.match([&](const MTPDphone_groupParticipants &data) {
  836. _peer->owner().processUsers(data.vusers());
  837. _peer->owner().processChats(data.vchats());
  838. applyParticipantsSlice(
  839. data.vparticipants().v,
  840. ApplySliceSource::UnknownLoaded);
  841. });
  842. _unknownParticipantPeersRequestId = 0;
  843. const auto now = crl::now();
  844. for (const auto &[ssrc, when] : ssrcs) {
  845. if (when.voice || when.anything) {
  846. applyLastSpoke(ssrc, when, now);
  847. }
  848. _unknownSpokenSsrcs.remove(ssrc);
  849. }
  850. for (const auto &[id, when] : participantPeerIds) {
  851. if (const auto participantPeer = _peer->owner().peerLoaded(id)) {
  852. const auto isParticipant = ranges::contains(
  853. _participants,
  854. not_null{ participantPeer },
  855. &Participant::peer);
  856. if (isParticipant) {
  857. applyActiveUpdate(id, when, participantPeer);
  858. }
  859. }
  860. _unknownSpokenPeerIds.remove(id);
  861. }
  862. if (!ssrcs.empty()) {
  863. _participantsResolved.fire(&ssrcs);
  864. }
  865. requestUnknownParticipants();
  866. }).fail([=] {
  867. _unknownParticipantPeersRequestId = 0;
  868. for (const auto &[ssrc, when] : ssrcs) {
  869. _unknownSpokenSsrcs.remove(ssrc);
  870. }
  871. for (const auto &[participantPeerId, when] : participantPeerIds) {
  872. _unknownSpokenPeerIds.remove(participantPeerId);
  873. }
  874. requestUnknownParticipants();
  875. }).send();
  876. }
  877. void GroupCall::setInCall() {
  878. _unknownSpokenPeerIds.clear();
  879. if (_speakingByActiveFinishes.empty()) {
  880. return;
  881. }
  882. auto restartTimer = true;
  883. const auto latest = crl::now() + kActiveAfterJoined;
  884. for (auto &[peer, when] : _speakingByActiveFinishes) {
  885. if (when > latest) {
  886. when = latest;
  887. } else {
  888. restartTimer = false;
  889. }
  890. }
  891. if (restartTimer) {
  892. _speakingByActiveFinishTimer.callOnce(kActiveAfterJoined);
  893. }
  894. }
  895. bool GroupCall::inCall() const {
  896. const auto current = Core::App().calls().currentGroupCall();
  897. return (current != nullptr)
  898. && (current->id() == _id)
  899. && (current->state() == Calls::GroupCall::State::Joined);
  900. }
  901. void GroupCall::setJoinMutedLocally(bool muted) {
  902. _joinMuted = muted;
  903. }
  904. bool GroupCall::joinMuted() const {
  905. return _joinMuted;
  906. }
  907. bool GroupCall::canChangeJoinMuted() const {
  908. return _canChangeJoinMuted;
  909. }
  910. bool GroupCall::joinedToTop() const {
  911. return _joinedToTop;
  912. }
  913. ApiWrap &GroupCall::api() const {
  914. return _peer->session().api();
  915. }
  916. } // namespace Data