| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744 |
- /*
- This file is part of Telegram Desktop,
- the official desktop application for the Telegram messaging service.
- For license and copyright information please follow this link:
- https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
- */
- #include "base/options.h"
- #include "mtproto/session_private.h"
- #include "mtproto/details/mtproto_bound_key_creator.h"
- #include "mtproto/details/mtproto_dcenter.h"
- #include "mtproto/details/mtproto_dump_to_text.h"
- #include "mtproto/details/mtproto_rsa_public_key.h"
- #include "mtproto/session.h"
- #include "mtproto/mtproto_response.h"
- #include "mtproto/mtproto_dc_options.h"
- #include "mtproto/connection_abstract.h"
- #include "base/random.h"
- #include "base/qthelp_url.h"
- #include "base/openssl_help.h"
- #include "base/unixtime.h"
- #include "base/platform/base_platform_info.h"
- #include <ksandbox.h>
- #include <zlib.h>
- namespace MTP {
- namespace details {
- namespace {
- constexpr auto kIntSize = static_cast<int>(sizeof(mtpPrime));
- constexpr auto kWaitForBetterTimeout = crl::time(2000);
- constexpr auto kMinConnectedTimeout = crl::time(1000);
- constexpr auto kMaxConnectedTimeout = crl::time(8000);
- constexpr auto kMinReceiveTimeout = crl::time(4000);
- constexpr auto kMaxReceiveTimeout = crl::time(64000);
- constexpr auto kMarkConnectionOldTimeout = crl::time(192000);
- constexpr auto kPingDelayDisconnect = 60;
- constexpr auto kPingSendAfter = 30 * crl::time(1000);
- constexpr auto kPingSendAfterForce = 45 * crl::time(1000);
- constexpr auto kTemporaryExpiresIn = TimeId(86400);
- constexpr auto kBindKeyAdditionalExpiresTimeout = TimeId(30);
- constexpr auto kKeyOldEnoughForDestroy = 60 * crl::time(1000);
- constexpr auto kSentContainerLives = 600 * crl::time(1000);
- constexpr auto kFastRequestDuration = crl::time(500);
- // If we can't connect for this time we will ask _instance to update config.
- constexpr auto kRequestConfigTimeout = 8 * crl::time(1000);
- // Don't try to handle messages larger than this size.
- constexpr auto kMaxMessageLength = 16 * 1024 * 1024;
- // How much time passed from send till we resend request or check its state.
- constexpr auto kCheckSentRequestTimeout = 10 * crl::time(1000);
- // How much time to wait for some more requests,
- // when resending request or checking its state.
- constexpr auto kSendStateRequestWaiting = crl::time(1000);
- // How much time to wait for some more requests, when sending msg acks.
- constexpr auto kAckSendWaiting = 10 * crl::time(1000);
- constexpr auto kCutContainerOnSize = 16 * 1024;
- auto SyncTimeRequestDuration = kFastRequestDuration;
- using namespace details;
- [[nodiscard]] QString LogIdsVector(const QVector<MTPlong> &ids) {
- if (!ids.size()) return "[]";
- auto idsStr = QString("[%1").arg(ids.cbegin()->v);
- for (const auto &id : ids) {
- idsStr += QString(", %2").arg(id.v);
- }
- return idsStr + "]";
- }
- [[nodiscard]] QString ComputeAppVersion() {
- #if defined Q_OS_WIN && defined Q_PROCESSOR_X86_64
- const auto arch = u" x64"_q;
- #elif (defined Q_OS_WIN && defined Q_PROCESSOR_X86_32) || defined Q_PROCESSOR_X86_64
- const auto arch = QString();
- #else
- const auto arch = ' ' + QSysInfo::buildCpuArchitecture();
- #endif
- return QString::fromLatin1(AppVersionStr) + arch + ([] {
- #if defined OS_MAC_STORE
- return u" Mac App Store"_q;
- #elif defined OS_WIN_STORE // OS_MAC_STORE
- return u" Microsoft Store"_q;
- #else // OS_MAC_STORE || OS_WIN_STORE
- return KSandbox::isFlatpak()
- ? u" Flatpak"_q
- : KSandbox::isSnap()
- ? u" Snap"_q
- : QString();
- #endif // OS_MAC_STORE || OS_WIN_STORE
- })();
- }
- void WrapInvokeAfter(
- SerializedRequest &to,
- const SerializedRequest &from,
- const base::flat_map<mtpMsgId, SerializedRequest> &haveSent,
- int32 skipBeforeRequest = 0) {
- const auto afterId = *(mtpMsgId*)(from->after->data() + 4);
- const auto i = afterId ? haveSent.find(afterId) : haveSent.end();
- int32 size = to->size(), lenInInts = (tl::count_length(from) >> 2), headlen = 4, fulllen = headlen + lenInInts;
- if (i == haveSent.end()) { // no invoke after or such msg was not sent or was completed recently
- to->resize(size + fulllen + skipBeforeRequest);
- if (skipBeforeRequest) {
- memcpy(to->data() + size, from->constData() + 4, headlen * sizeof(mtpPrime));
- memcpy(to->data() + size + headlen + skipBeforeRequest, from->constData() + 4 + headlen, lenInInts * sizeof(mtpPrime));
- } else {
- memcpy(to->data() + size, from->constData() + 4, fulllen * sizeof(mtpPrime));
- }
- } else {
- to->resize(size + fulllen + skipBeforeRequest + 3);
- memcpy(to->data() + size, from->constData() + 4, headlen * sizeof(mtpPrime));
- (*to)[size + 3] += 3 * sizeof(mtpPrime);
- *((mtpTypeId*)&((*to)[size + headlen + skipBeforeRequest])) = mtpc_invokeAfterMsg;
- memcpy(to->data() + size + headlen + skipBeforeRequest + 1, &afterId, 2 * sizeof(mtpPrime));
- memcpy(to->data() + size + headlen + skipBeforeRequest + 3, from->constData() + 4 + headlen, lenInInts * sizeof(mtpPrime));
- if (size + 3 != 7) (*to)[7] += 3 * sizeof(mtpPrime);
- }
- }
- [[nodiscard]] bool ConstTimeIsDifferent(
- const void *a,
- const void *b,
- size_t size) {
- auto ca = reinterpret_cast<const char*>(a);
- auto cb = reinterpret_cast<const char*>(b);
- volatile auto different = false;
- for (const auto ce = ca + size; ca != ce; ++ca, ++cb) {
- different = different | (*ca != *cb);
- }
- return different;
- }
- base::options::toggle OptionPreferIPv6({
- .id = kOptionPreferIPv6,
- .name = "Prefer IPv6",
- .description = "Prefer IPv6 if it is available. Require \"Try connecting through IPv6\" to be enabled",
- });
- } // namespace
- const char kOptionPreferIPv6[] = "prefer-ipv6";
- SessionPrivate::SessionPrivate(
- not_null<Instance*> instance,
- not_null<QThread*> thread,
- std::shared_ptr<SessionData> data,
- ShiftedDcId shiftedDcId)
- : QObject(nullptr)
- , _instance(instance)
- , _shiftedDcId(shiftedDcId)
- , _realDcType(_instance->dcOptions().dcType(_shiftedDcId))
- , _currentDcType(_realDcType)
- , _state(DisconnectedState)
- , _retryTimer(thread, [=] { retryByTimer(); })
- , _oldConnectionTimer(thread, [=] { markConnectionOld(); })
- , _waitForConnectedTimer(thread, [=] { waitConnectedFailed(); })
- , _waitForReceivedTimer(thread, [=] { waitReceivedFailed(); })
- , _waitForBetterTimer(thread, [=] { waitBetterFailed(); })
- , _waitForReceived(kMinReceiveTimeout)
- , _waitForConnected(kMinConnectedTimeout)
- , _pingSender(thread, [=] { sendPingByTimer(); })
- , _checkSentRequestsTimer(thread, [=] { checkSentRequests(); })
- , _clearOldContainersTimer(thread, [=] { clearOldContainers(); })
- , _sessionData(std::move(data)) {
- Expects(_shiftedDcId != 0);
- moveToThread(thread);
- InvokeQueued(this, [=] {
- _clearOldContainersTimer.callEach(kSentContainerLives);
- connectToServer();
- });
- }
- SessionPrivate::~SessionPrivate() {
- releaseKeyCreationOnFail();
- doDisconnect();
- Expects(!_connection);
- Expects(_testConnections.empty());
- }
- void SessionPrivate::appendTestConnection(
- DcOptions::Variants::Protocol protocol,
- const QString &ip,
- int port,
- const bytes::vector &protocolSecret) {
- QWriteLocker lock(&_stateMutex);
- const auto priority = (qthelp::is_ipv6(ip) ? (OptionPreferIPv6.value() ? 2 : 0) : 1)
- + (protocol == DcOptions::Variants::Tcp ? 1 : 0)
- + (protocolSecret.empty() ? 0 : 1);
- _testConnections.push_back({
- AbstractConnection::Create(
- _instance,
- protocol,
- thread(),
- protocolSecret,
- _options->proxy),
- priority
- });
- const auto weak = _testConnections.back().data.get();
- connect(weak, &AbstractConnection::error, [=](int errorCode) {
- onError(weak, errorCode);
- });
- connect(weak, &AbstractConnection::receivedSome, [=] {
- onReceivedSome();
- });
- _firstSentAt = 0;
- if (_oldConnection) {
- _oldConnection = false;
- DEBUG_LOG(("This connection marked as not old!"));
- }
- _oldConnectionTimer.callOnce(kMarkConnectionOldTimeout);
- connect(weak, &AbstractConnection::connected, [=] {
- onConnected(weak);
- });
- connect(weak, &AbstractConnection::disconnected, [=] {
- onDisconnected(weak);
- });
- connect(weak, &AbstractConnection::syncTimeRequest, [=] {
- InvokeQueued(_instance, [instance = _instance] {
- instance->syncHttpUnixtime();
- });
- });
- const auto protocolForFiles = isMediaClusterDcId(_shiftedDcId)
- //|| isUploadDcId(_shiftedDcId)
- || (_realDcType == DcType::Cdn);
- const auto protocolDcId = getProtocolDcId();
- InvokeQueued(_testConnections.back().data, [=] {
- weak->connectToServer(
- ip,
- port,
- protocolSecret,
- protocolDcId,
- protocolForFiles);
- });
- }
- int16 SessionPrivate::getProtocolDcId() const {
- const auto dcId = BareDcId(_shiftedDcId);
- const auto simpleDcId = isTemporaryDcId(dcId)
- ? getRealIdFromTemporaryDcId(dcId)
- : dcId;
- const auto testedDcId = _instance->isTestMode()
- ? (kTestModeDcIdShift + simpleDcId)
- : simpleDcId;
- return (_currentDcType == DcType::MediaCluster)
- ? -testedDcId
- : testedDcId;
- }
- void SessionPrivate::checkSentRequests() {
- const auto now = crl::now();
- const auto checkTime = now - kCheckSentRequestTimeout;
- if (_bindMsgId && _bindMessageSent < checkTime) {
- DEBUG_LOG(("MTP Info: "
- "Request state while key is not bound, restarting."));
- restart();
- _checkSentRequestsTimer.callOnce(kCheckSentRequestTimeout);
- return;
- }
- auto requesting = false;
- auto nextTimeout = kCheckSentRequestTimeout;
- {
- QReadLocker locker(_sessionData->haveSentMutex());
- auto &haveSent = _sessionData->haveSentMap();
- for (const auto &[msgId, request] : haveSent) {
- if (request->lastSentTime <= checkTime) {
- // Need to check state.
- request->lastSentTime = now;
- if (_stateRequestData.emplace(msgId).second) {
- requesting = true;
- }
- } else {
- nextTimeout = std::min(request->lastSentTime - checkTime, nextTimeout);
- }
- }
- }
- if (requesting) {
- _sessionData->queueSendAnything(kSendStateRequestWaiting);
- }
- if (nextTimeout < kCheckSentRequestTimeout) {
- _checkSentRequestsTimer.callOnce(nextTimeout);
- }
- }
- void SessionPrivate::clearOldContainers() {
- auto resent = false;
- auto nextTimeout = kSentContainerLives;
- const auto now = crl::now();
- const auto checkTime = now - kSentContainerLives;
- for (auto i = _sentContainers.begin(); i != _sentContainers.end();) {
- if (i->second.sent <= checkTime) {
- DEBUG_LOG(("MTP Info: Removing old container with resending %1, "
- "sent: %2, now: %3, current unixtime: %4"
- ).arg(i->first
- ).arg(i->second.sent
- ).arg(now
- ).arg(base::unixtime::now()));
- const auto ids = std::move(i->second.messages);
- i = _sentContainers.erase(i);
- resent = resent || !ids.empty();
- for (const auto innerMsgId : ids) {
- resend(innerMsgId, -1);
- }
- } else {
- nextTimeout = std::min(i->second.sent - checkTime, nextTimeout);
- ++i;
- }
- }
- if (resent) {
- _sessionData->queueNeedToResumeAndSend();
- }
- if (nextTimeout < kSentContainerLives) {
- _clearOldContainersTimer.callOnce(nextTimeout);
- } else if (!_clearOldContainersTimer.isActive()) {
- _clearOldContainersTimer.callEach(nextTimeout);
- }
- }
- void SessionPrivate::destroyAllConnections() {
- clearUnboundKeyCreator();
- _waitForBetterTimer.cancel();
- _waitForReceivedTimer.cancel();
- _waitForConnectedTimer.cancel();
- _testConnections.clear();
- _connection = nullptr;
- }
- void SessionPrivate::cdnConfigChanged() {
- connectToServer(true);
- }
- int32 SessionPrivate::getShiftedDcId() const {
- return _shiftedDcId;
- }
- void SessionPrivate::dcOptionsChanged() {
- _retryTimeout = 1;
- connectToServer(true);
- }
- int32 SessionPrivate::getState() const {
- QReadLocker lock(&_stateMutex);
- int32 result = _state;
- if (_state < 0) {
- if (_retryTimer.isActive()) {
- result = int32(crl::now() - _retryWillFinish);
- if (result >= 0) {
- result = -1;
- }
- }
- }
- return result;
- }
- QString SessionPrivate::transport() const {
- QReadLocker lock(&_stateMutex);
- if (!_connection || (_state < 0)) {
- return QString();
- }
- Assert(_options != nullptr);
- return _connection->transport();
- }
- bool SessionPrivate::setState(int state, int ifState) {
- if (ifState != kUpdateStateAlways) {
- QReadLocker lock(&_stateMutex);
- if (_state != ifState) {
- return false;
- }
- }
- QWriteLocker lock(&_stateMutex);
- if (_state == state) {
- return false;
- }
- _state = state;
- if (state < 0) {
- _retryTimeout = -state;
- _retryTimer.callOnce(_retryTimeout);
- _retryWillFinish = crl::now() + _retryTimeout;
- }
- lock.unlock();
- _sessionData->queueConnectionStateChange(state);
- return true;
- }
- void SessionPrivate::resetSession() {
- MTP_LOG(_shiftedDcId, ("Resetting session!"));
- _needSessionReset = false;
- DEBUG_LOG(("MTP Info: creating new session in resetSession."));
- changeSessionId();
- _sessionData->queueResetDone();
- }
- void SessionPrivate::changeSessionId() {
- auto sessionId = _sessionId;
- do {
- sessionId = base::RandomValue<uint64>();
- } while (_sessionId == sessionId);
- DEBUG_LOG(("MTP Info: setting server_session: %1").arg(sessionId));
- _sessionId = sessionId;
- _messagesCounter = 0;
- _sessionMarkedAsStarted = false;
- _ackRequestData.clear();
- _resendRequestData.clear();
- _stateRequestData.clear();
- _receivedMessageIds.clear();
- }
- uint32 SessionPrivate::nextRequestSeqNumber(bool needAck) {
- const auto result = _messagesCounter;
- _messagesCounter += (needAck ? 1 : 0);
- return result * 2 + (needAck ? 1 : 0);
- }
- bool SessionPrivate::realDcTypeChanged() {
- const auto now = _instance->dcOptions().dcType(_shiftedDcId);
- if (_realDcType == now) {
- return false;
- }
- _realDcType = now;
- return true;
- }
- bool SessionPrivate::markSessionAsStarted() {
- if (_sessionMarkedAsStarted) {
- return false;
- }
- _sessionMarkedAsStarted = true;
- return true;
- }
- mtpMsgId SessionPrivate::prepareToSend(
- SerializedRequest &request,
- mtpMsgId currentLastId,
- bool forceNewMsgId) {
- Expects(request->size() > 8);
- if (const auto msgId = request.getMsgId()) {
- // resending this request
- const auto i = _resendingIds.find(msgId);
- if (i != _resendingIds.cend()) {
- _resendingIds.erase(i);
- }
- return (forceNewMsgId || msgId > currentLastId)
- ? replaceMsgId(request, currentLastId)
- : msgId;
- }
- request.setMsgId(currentLastId);
- request.setSeqNo(nextRequestSeqNumber(request.needAck()));
- if (request->requestId) {
- MTP_LOG(_shiftedDcId, ("[r%1] msg_id 0 -> %2").arg(request->requestId).arg(currentLastId));
- }
- return currentLastId;
- }
- mtpMsgId SessionPrivate::replaceMsgId(SerializedRequest &request, mtpMsgId newId) {
- Expects(request->size() > 8);
- const auto oldMsgId = request.getMsgId();
- if (oldMsgId == newId) {
- return newId;
- }
- // haveSentMutex() was locked in tryToSend()
- auto &haveSent = _sessionData->haveSentMap();
- while (_resendingIds.contains(newId)
- || _ackedIds.contains(newId)
- || haveSent.contains(newId)) {
- newId = base::unixtime::mtproto_msg_id();
- }
- MTP_LOG(_shiftedDcId, ("[r%1] msg_id %2 -> %3"
- ).arg(request->requestId
- ).arg(oldMsgId
- ).arg(newId));
- const auto i = _resendingIds.find(oldMsgId);
- if (i != _resendingIds.end()) {
- const auto requestId = i->second;
- _resendingIds.erase(i);
- _resendingIds.emplace(newId, requestId);
- }
- const auto j = _ackedIds.find(oldMsgId);
- if (j != _ackedIds.end()) {
- const auto requestId = j->second;
- _ackedIds.erase(j);
- _ackedIds.emplace(newId, requestId);
- }
- const auto k = haveSent.find(oldMsgId);
- if (k != haveSent.end()) {
- const auto request = k->second;
- haveSent.erase(k);
- haveSent.emplace(newId, request);
- }
- for (auto &[msgId, container] : _sentContainers) {
- for (auto &innerMsgId : container.messages) {
- if (innerMsgId == oldMsgId) {
- innerMsgId = newId;
- }
- }
- }
- request.setMsgId(newId);
- request.setSeqNo(nextRequestSeqNumber(request.needAck()));
- return newId;
- }
- mtpMsgId SessionPrivate::placeToContainer(
- SerializedRequest &toSendRequest,
- mtpMsgId &bigMsgId,
- bool forceNewMsgId,
- SerializedRequest &req) {
- const auto msgId = prepareToSend(req, bigMsgId, forceNewMsgId);
- if (msgId >= bigMsgId) {
- bigMsgId = base::unixtime::mtproto_msg_id();
- }
- uint32 from = toSendRequest->size(), len = req.messageSize();
- toSendRequest->resize(from + len);
- memcpy(toSendRequest->data() + from, req->constData() + 4, len * sizeof(mtpPrime));
- return msgId;
- }
- MTPVector<MTPJSONObjectValue> SessionPrivate::prepareInitParams() {
- const auto local = QDateTime::currentDateTime();
- const auto utc = QDateTime(local.date(), local.time(), Qt::UTC);
- const auto shift = base::unixtime::now() - (TimeId)::time(nullptr);
- const auto delta = int(utc.toSecsSinceEpoch()) - int(local.toSecsSinceEpoch()) - shift;
- auto sliced = delta;
- while (sliced < -12 * 3600) {
- sliced += 24 * 3600;
- }
- while (sliced > 14 * 3600) {
- sliced -= 24 * 3600;
- }
- const auto sign = (sliced < 0) ? -1 : 1;
- const auto rounded = base::SafeRound(std::abs(sliced) / 900.)
- * 900
- * sign;
- return MTP_vector<MTPJSONObjectValue>(
- 1,
- MTP_jsonObjectValue(
- MTP_string("tz_offset"),
- MTP_jsonNumber(MTP_double(rounded))));
- }
- void SessionPrivate::tryToSend() {
- DEBUG_LOG(("MTP Info: tryToSend for dc %1.").arg(_shiftedDcId));
- if (!_connection) {
- DEBUG_LOG(("MTP Info: not yet connected in dc %1.").arg(_shiftedDcId));
- return;
- } else if (!_keyId) {
- DEBUG_LOG(("MTP Info: not yet with auth key in dc %1.").arg(_shiftedDcId));
- return;
- }
- const auto needsLayer = !_sessionData->connectionInited();
- const auto state = getState();
- const auto sendOnlyFirstPing = (state != ConnectedState);
- const auto sendAll = !sendOnlyFirstPing && !_keyCreator;
- const auto isMainSession = (GetDcIdShift(_shiftedDcId) == 0);
- if (sendOnlyFirstPing && !_pingIdToSend) {
- DEBUG_LOG(("MTP Info: dc %1 not sending, waiting for Connected state, state: %2").arg(_shiftedDcId).arg(state));
- return; // just do nothing, if is not connected yet
- } else if (isMainSession
- && !sendOnlyFirstPing
- && !_pingIdToSend
- && !_pingId
- && _pingSendAt <= crl::now()) {
- _pingIdToSend = base::RandomValue<mtpPingId>();
- }
- const auto forceNewMsgId = sendAll && markSessionAsStarted();
- if (forceNewMsgId && _keyCreator) {
- _keyCreator->restartBinder();
- }
- auto pingRequest = SerializedRequest();
- auto ackRequest = SerializedRequest();
- auto resendRequest = SerializedRequest();
- auto stateRequest = SerializedRequest();
- auto httpWaitRequest = SerializedRequest();
- auto bindDcKeyRequest = SerializedRequest();
- if (_pingIdToSend) {
- if (sendOnlyFirstPing || !isMainSession) {
- DEBUG_LOG(("MTP Info: sending ping, ping_id: %1"
- ).arg(_pingIdToSend));
- pingRequest = SerializedRequest::Serialize(MTPPing(
- MTP_long(_pingIdToSend)
- ));
- } else {
- DEBUG_LOG(("MTP Info: sending ping_delay_disconnect, "
- "ping_id: %1").arg(_pingIdToSend));
- pingRequest = SerializedRequest::Serialize(MTPPing_delay_disconnect(
- MTP_long(_pingIdToSend),
- MTP_int(kPingDelayDisconnect)));
- _pingSender.callOnce(kPingSendAfterForce);
- }
- _pingSendAt = pingRequest->lastSentTime + kPingSendAfter;
- _pingId = base::take(_pingIdToSend);
- } else if (!sendAll) {
- DEBUG_LOG(("MTP Info: dc %1 sending only service or bind."
- ).arg(_shiftedDcId));
- } else {
- DEBUG_LOG(("MTP Info: dc %1 trying to send after ping, state: %2"
- ).arg(_shiftedDcId
- ).arg(state));
- }
- if (!sendOnlyFirstPing) {
- if (!_ackRequestData.isEmpty()) {
- ackRequest = SerializedRequest::Serialize(MTPMsgsAck(
- MTP_msgs_ack(MTP_vector<MTPlong>(
- base::take(_ackRequestData)))));
- }
- if (!_resendRequestData.isEmpty()) {
- resendRequest = SerializedRequest::Serialize(MTPMsgResendReq(
- MTP_msg_resend_req(MTP_vector<MTPlong>(
- base::take(_resendRequestData)))));
- }
- if (!_stateRequestData.empty()) {
- auto ids = QVector<MTPlong>();
- ids.reserve(_stateRequestData.size());
- for (const auto id : base::take(_stateRequestData)) {
- ids.push_back(MTP_long(id));
- }
- stateRequest = SerializedRequest::Serialize(MTPMsgsStateReq(
- MTP_msgs_state_req(MTP_vector<MTPlong>(ids))));
- }
- if (_connection->usingHttpWait()) {
- httpWaitRequest = SerializedRequest::Serialize(MTPHttpWait(
- MTP_http_wait(MTP_int(100), MTP_int(30), MTP_int(25000))));
- }
- if (!_bindMsgId && _keyCreator && _keyCreator->readyToBind()) {
- bindDcKeyRequest = _keyCreator->prepareBindRequest(
- _encryptionKey,
- _sessionId);
- // This is a special request with msgId used inside the message
- // body, so it is prepared already with a msgId and we place
- // seqNo for it manually here.
- bindDcKeyRequest.setSeqNo(
- nextRequestSeqNumber(bindDcKeyRequest.needAck()));
- }
- }
- MTPInitConnection<SerializedRequest> initWrapper;
- int32 initSize = 0, initSizeInInts = 0;
- if (needsLayer) {
- Assert(_options != nullptr);
- const auto systemLangCode = _options->systemLangCode;
- const auto cloudLangCode = _options->cloudLangCode;
- const auto langPackName = _options->langPackName;
- const auto deviceModel = (_currentDcType == DcType::Cdn)
- ? "n/a"
- : _instance->deviceModel();
- const auto systemVersion = (_currentDcType == DcType::Cdn)
- ? "n/a"
- : _instance->systemVersion();
- const auto appVersion = ComputeAppVersion();
- const auto proxyType = _options->proxy.type;
- const auto mtprotoProxy = (proxyType == ProxyData::Type::Mtproto);
- const auto clientProxyFields = mtprotoProxy
- ? MTP_inputClientProxy(
- MTP_string(_options->proxy.host),
- MTP_int(_options->proxy.port))
- : MTPInputClientProxy();
- using Flag = MTPInitConnection<SerializedRequest>::Flag;
- initWrapper = MTPInitConnection<SerializedRequest>(
- MTP_flags(Flag::f_params
- | (mtprotoProxy ? Flag::f_proxy : Flag(0))),
- MTP_int(ApiId),
- MTP_string(deviceModel),
- MTP_string(systemVersion),
- MTP_string(appVersion),
- MTP_string(systemLangCode),
- MTP_string(langPackName),
- MTP_string(cloudLangCode),
- clientProxyFields,
- MTP_jsonObject(prepareInitParams()),
- SerializedRequest());
- initSizeInInts = (tl::count_length(initWrapper) >> 2) + 2;
- initSize = initSizeInInts * sizeof(mtpPrime);
- }
- auto needAnyResponse = false;
- auto someSkipped = false;
- SerializedRequest toSendRequest;
- {
- QWriteLocker locker1(_sessionData->toSendMutex());
- auto scheduleCheckSentRequests = false;
- auto toSendDummy = base::flat_map<mtpRequestId, SerializedRequest>();
- auto &toSend = sendAll
- ? _sessionData->toSendMap()
- : toSendDummy;
- if (!sendAll) {
- locker1.unlock();
- }
- auto totalSending = int(toSend.size());
- auto sendingFrom = begin(toSend);
- auto sendingTill = end(toSend);
- auto combinedLength = 0;
- for (auto i = sendingFrom; i != sendingTill; ++i) {
- combinedLength += i->second->size();
- if (combinedLength >= kCutContainerOnSize) {
- ++i;
- if (const auto skipping = int(sendingTill - i)) {
- sendingTill = i;
- totalSending -= skipping;
- Assert(totalSending > 0);
- someSkipped = true;
- }
- break;
- }
- }
- auto sendingRange = ranges::make_subrange(sendingFrom, sendingTill);
- const auto sendingCount = totalSending;
- if (pingRequest) ++totalSending;
- if (ackRequest) ++totalSending;
- if (resendRequest) ++totalSending;
- if (stateRequest) ++totalSending;
- if (httpWaitRequest) ++totalSending;
- if (bindDcKeyRequest) ++totalSending;
- if (!totalSending) {
- return; // nothing to send
- }
- const auto first = pingRequest
- ? pingRequest
- : ackRequest
- ? ackRequest
- : resendRequest
- ? resendRequest
- : stateRequest
- ? stateRequest
- : httpWaitRequest
- ? httpWaitRequest
- : bindDcKeyRequest
- ? bindDcKeyRequest
- : sendingRange.begin()->second;
- if (totalSending == 1 && !first->forceSendInContainer) {
- toSendRequest = first;
- if (sendAll) {
- toSend.erase(sendingFrom, sendingTill);
- locker1.unlock();
- }
- const auto msgId = prepareToSend(
- toSendRequest,
- base::unixtime::mtproto_msg_id(),
- forceNewMsgId && !bindDcKeyRequest);
- if (bindDcKeyRequest) {
- _bindMsgId = msgId;
- _bindMessageSent = crl::now();
- needAnyResponse = true;
- } else if (pingRequest) {
- _pingMsgId = msgId;
- needAnyResponse = true;
- } else if (stateRequest || resendRequest) {
- _stateAndResendRequests.emplace(
- msgId,
- stateRequest ? stateRequest : resendRequest);
- needAnyResponse = true;
- }
- if (toSendRequest->requestId) {
- if (toSendRequest.needAck()) {
- toSendRequest->lastSentTime = crl::now();
- QWriteLocker locker2(_sessionData->haveSentMutex());
- auto &haveSent = _sessionData->haveSentMap();
- haveSent.emplace(msgId, toSendRequest);
- scheduleCheckSentRequests = true;
- const auto wrapLayer = needsLayer && toSendRequest->needsLayer;
- if (toSendRequest->after) {
- const auto toSendSize = tl::count_length(toSendRequest) >> 2;
- auto wrappedRequest = SerializedRequest::Prepare(
- toSendSize,
- toSendSize + 3);
- wrappedRequest->resize(4);
- memcpy(wrappedRequest->data(), toSendRequest->constData(), 4 * sizeof(mtpPrime));
- WrapInvokeAfter(wrappedRequest, toSendRequest, haveSent);
- toSendRequest = std::move(wrappedRequest);
- }
- if (wrapLayer) {
- const auto noWrapSize = (tl::count_length(toSendRequest) >> 2);
- const auto toSendSize = noWrapSize + initSizeInInts;
- auto wrappedRequest = SerializedRequest::Prepare(toSendSize);
- memcpy(wrappedRequest->data(), toSendRequest->constData(), 7 * sizeof(mtpPrime)); // all except length
- wrappedRequest->push_back(mtpc_invokeWithLayer);
- wrappedRequest->push_back(kCurrentLayer);
- initWrapper.write<mtpBuffer>(*wrappedRequest);
- wrappedRequest->resize(wrappedRequest->size() + noWrapSize);
- memcpy(wrappedRequest->data() + wrappedRequest->size() - noWrapSize, toSendRequest->constData() + 8, noWrapSize * sizeof(mtpPrime));
- toSendRequest = std::move(wrappedRequest);
- }
- needAnyResponse = true;
- } else {
- _ackedIds.emplace(msgId, toSendRequest->requestId);
- }
- }
- } else { // send in container
- bool willNeedInit = false;
- uint32 containerSize = 1 + 1; // cons + vector size
- if (pingRequest) containerSize += pingRequest.messageSize();
- if (ackRequest) containerSize += ackRequest.messageSize();
- if (resendRequest) containerSize += resendRequest.messageSize();
- if (stateRequest) containerSize += stateRequest.messageSize();
- if (httpWaitRequest) containerSize += httpWaitRequest.messageSize();
- if (bindDcKeyRequest) containerSize += bindDcKeyRequest.messageSize();
- for (const auto &[requestId, request] : sendingRange) {
- containerSize += request.messageSize();
- if (needsLayer && request->needsLayer) {
- containerSize += initSizeInInts;
- willNeedInit = true;
- }
- }
- mtpBuffer initSerialized;
- if (willNeedInit) {
- initSerialized.reserve(initSizeInInts);
- initSerialized.push_back(mtpc_invokeWithLayer);
- initSerialized.push_back(kCurrentLayer);
- initWrapper.write<mtpBuffer>(initSerialized);
- }
- // prepare container + each in invoke after
- toSendRequest = SerializedRequest::Prepare(
- containerSize,
- containerSize + 3 * sendingCount);
- toSendRequest->push_back(mtpc_msg_container);
- toSendRequest->push_back(totalSending);
- // check for a valid container
- auto bigMsgId = base::unixtime::mtproto_msg_id();
- // the fact of this lock is used in replaceMsgId()
- QWriteLocker locker2(_sessionData->haveSentMutex());
- auto &haveSent = _sessionData->haveSentMap();
- // prepare sent container
- auto sentIdsWrap = SentContainer();
- sentIdsWrap.sent = crl::now();
- sentIdsWrap.messages.reserve(totalSending);
- if (bindDcKeyRequest) {
- _bindMsgId = placeToContainer(
- toSendRequest,
- bigMsgId,
- false,
- bindDcKeyRequest);
- _bindMessageSent = crl::now();
- sentIdsWrap.messages.push_back(_bindMsgId);
- needAnyResponse = true;
- }
- if (pingRequest) {
- _pingMsgId = placeToContainer(
- toSendRequest,
- bigMsgId,
- forceNewMsgId,
- pingRequest);
- sentIdsWrap.messages.push_back(_pingMsgId);
- needAnyResponse = true;
- }
- for (auto &[requestId, request] : sendingRange) {
- const auto msgId = prepareToSend(
- request,
- bigMsgId,
- forceNewMsgId);
- if (msgId >= bigMsgId) {
- bigMsgId = base::unixtime::mtproto_msg_id();
- }
- bool added = false;
- if (request->requestId) {
- if (request.needAck()) {
- request->lastSentTime = crl::now();
- int32 reqNeedsLayer = (needsLayer && request->needsLayer) ? toSendRequest->size() : 0;
- if (request->after) {
- WrapInvokeAfter(toSendRequest, request, haveSent, reqNeedsLayer ? initSizeInInts : 0);
- if (reqNeedsLayer) {
- memcpy(toSendRequest->data() + reqNeedsLayer + 4, initSerialized.constData(), initSize);
- *(toSendRequest->data() + reqNeedsLayer + 3) += initSize;
- }
- added = true;
- } else if (reqNeedsLayer) {
- toSendRequest->resize(reqNeedsLayer + initSizeInInts + request.messageSize());
- memcpy(toSendRequest->data() + reqNeedsLayer, request->constData() + 4, 4 * sizeof(mtpPrime));
- memcpy(toSendRequest->data() + reqNeedsLayer + 4, initSerialized.constData(), initSize);
- memcpy(toSendRequest->data() + reqNeedsLayer + 4 + initSizeInInts, request->constData() + 8, tl::count_length(request));
- *(toSendRequest->data() + reqNeedsLayer + 3) += initSize;
- added = true;
- }
- // #TODO rewrite so that it will always hold.
- //Assert(!haveSent.contains(msgId));
- haveSent.emplace(msgId, request);
- sentIdsWrap.messages.push_back(msgId);
- scheduleCheckSentRequests = true;
- needAnyResponse = true;
- } else {
- _ackedIds.emplace(msgId, request->requestId);
- }
- }
- if (!added) {
- uint32 from = toSendRequest->size(), len = request.messageSize();
- toSendRequest->resize(from + len);
- memcpy(toSendRequest->data() + from, request->constData() + 4, len * sizeof(mtpPrime));
- }
- }
- toSend.erase(sendingFrom, sendingTill);
- if (stateRequest) {
- const auto msgId = placeToContainer(
- toSendRequest,
- bigMsgId,
- forceNewMsgId,
- stateRequest);
- _stateAndResendRequests.emplace(msgId, stateRequest);
- needAnyResponse = true;
- }
- if (resendRequest) {
- const auto msgId = placeToContainer(
- toSendRequest,
- bigMsgId,
- forceNewMsgId,
- resendRequest);
- _stateAndResendRequests.emplace(msgId, resendRequest);
- needAnyResponse = true;
- }
- if (ackRequest) {
- placeToContainer(
- toSendRequest,
- bigMsgId,
- forceNewMsgId,
- ackRequest);
- }
- if (httpWaitRequest) {
- placeToContainer(
- toSendRequest,
- bigMsgId,
- forceNewMsgId,
- httpWaitRequest);
- }
- const auto containerMsgId = prepareToSend(
- toSendRequest,
- bigMsgId,
- forceNewMsgId);
- _sentContainers.emplace(containerMsgId, std::move(sentIdsWrap));
- if (scheduleCheckSentRequests && !_checkSentRequestsTimer.isActive()) {
- _checkSentRequestsTimer.callOnce(kCheckSentRequestTimeout);
- }
- }
- }
- sendSecureRequest(std::move(toSendRequest), needAnyResponse);
- if (someSkipped) {
- InvokeQueued(this, [=] {
- tryToSend();
- });
- }
- }
- void SessionPrivate::retryByTimer() {
- if (_retryTimeout < 3) {
- ++_retryTimeout;
- } else if (_retryTimeout == 3) {
- _retryTimeout = 1000;
- } else if (_retryTimeout < 64000) {
- _retryTimeout *= 2;
- }
- connectToServer();
- }
- void SessionPrivate::restartNow() {
- _retryTimeout = 1;
- _retryTimer.cancel();
- restart();
- }
- void SessionPrivate::connectToServer(bool afterConfig) {
- if (afterConfig && (!_testConnections.empty() || _connection)) {
- return;
- }
- destroyAllConnections();
- if (realDcTypeChanged() && _keyCreator) {
- destroyTemporaryKey();
- return;
- }
- _options = std::make_unique<SessionOptions>(_sessionData->options());
- const auto bareDc = BareDcId(_shiftedDcId);
- _currentDcType = tryAcquireKeyCreation();
- if (_currentDcType == DcType::Cdn && !_instance->isKeysDestroyer()) {
- if (!_instance->dcOptions().hasCDNKeysForDc(bareDc)) {
- requestCDNConfig();
- return;
- }
- }
- if (_options->proxy.type == ProxyData::Type::Mtproto) {
- // host, port, secret for mtproto proxy are taken from proxy.
- appendTestConnection(DcOptions::Variants::Tcp, {}, 0, {});
- } else {
- using Variants = DcOptions::Variants;
- const auto special = (_currentDcType == DcType::Temporary);
- const auto variants = _instance->dcOptions().lookup(
- bareDc,
- _currentDcType,
- _options->proxy.type != ProxyData::Type::None);
- const auto useIPv4 = special ? true : _options->useIPv4;
- const auto useIPv6 = special ? false : _options->useIPv6;
- const auto useTcp = special ? true : _options->useTcp;
- const auto useHttp = special ? false : _options->useHttp;
- const auto skipAddress = !useIPv4
- ? Variants::IPv4
- : !useIPv6
- ? Variants::IPv6
- : Variants::AddressTypeCount;
- const auto skipProtocol = !useTcp
- ? Variants::Tcp
- : !useHttp
- ? Variants::Http
- : Variants::ProtocolCount;
- for (auto address = 0; address != Variants::AddressTypeCount; ++address) {
- if (address == skipAddress) {
- continue;
- }
- for (auto protocol = 0; protocol != Variants::ProtocolCount; ++protocol) {
- if (protocol == skipProtocol) {
- continue;
- }
- for (const auto &endpoint : variants.data[address][protocol]) {
- appendTestConnection(
- static_cast<Variants::Protocol>(protocol),
- QString::fromStdString(endpoint.ip),
- endpoint.port,
- endpoint.secret);
- }
- }
- }
- }
- if (_testConnections.empty()) {
- if (_instance->isKeysDestroyer()) {
- LOG(("MTP Error: DC %1 options for not found for auth key destruction!").arg(_shiftedDcId));
- _instance->keyWasPossiblyDestroyed(_shiftedDcId);
- return;
- } else if (afterConfig) {
- LOG(("MTP Error: DC %1 options for not found right after config load!").arg(_shiftedDcId));
- return restart();
- }
- DEBUG_LOG(("MTP Info: DC %1 options not found, waiting for config").arg(_shiftedDcId));
- InvokeQueued(_instance, [instance = _instance] {
- instance->requestConfig();
- });
- return;
- }
- DEBUG_LOG(("Connection Info: Connecting to %1 with %2 test connections."
- ).arg(_shiftedDcId
- ).arg(_testConnections.size()));
- if (!_startedConnectingAt) {
- _startedConnectingAt = crl::now();
- } else if (crl::now() - _startedConnectingAt > kRequestConfigTimeout) {
- InvokeQueued(_instance, [instance = _instance] {
- instance->requestConfigIfOld();
- });
- }
- _retryTimer.cancel();
- _waitForConnectedTimer.cancel();
- setState(ConnectingState);
- _bindMsgId = 0;
- _pingId = _pingMsgId = _pingIdToSend = _pingSendAt = 0;
- _pingSender.cancel();
- _waitForConnectedTimer.callOnce(_waitForConnected);
- }
- void SessionPrivate::restart() {
- DEBUG_LOG(("MTP Info: restarting Connection"));
- _waitForReceivedTimer.cancel();
- _waitForConnectedTimer.cancel();
- doDisconnect();
- if (_needSessionReset) {
- resetSession();
- }
- if (_retryTimer.isActive()) {
- return;
- }
- DEBUG_LOG(("MTP Info: restart timeout: %1ms").arg(_retryTimeout));
- setState(-_retryTimeout);
- }
- void SessionPrivate::onSentSome(uint64 size) {
- if (!_waitForReceivedTimer.isActive()) {
- auto remain = static_cast<uint64>(_waitForReceived);
- if (!_oldConnection) {
- // 8kb / sec, so 512 kb give 64 sec
- auto remainBySize = size * _waitForReceived / 8192;
- remain = std::clamp(
- remainBySize,
- remain,
- uint64(kMaxReceiveTimeout));
- if (remain != _waitForReceived) {
- DEBUG_LOG(("Checking connect for request with size %1 bytes, delay will be %2").arg(size).arg(remain));
- }
- }
- _waitForReceivedTimer.callOnce(remain);
- }
- if (!_firstSentAt) {
- _firstSentAt = crl::now();
- }
- }
- void SessionPrivate::onReceivedSome() {
- if (_oldConnection) {
- _oldConnection = false;
- DEBUG_LOG(("This connection marked as not old!"));
- }
- _oldConnectionTimer.callOnce(kMarkConnectionOldTimeout);
- _waitForReceivedTimer.cancel();
- if (_firstSentAt > 0) {
- const auto ms = crl::now() - _firstSentAt;
- DEBUG_LOG(("MTP Info: response in %1ms, _waitForReceived: %2ms"
- ).arg(ms
- ).arg(_waitForReceived));
- if (ms > 0 && ms * 2 < _waitForReceived) {
- _waitForReceived = qMax(ms * 2, kMinReceiveTimeout);
- }
- _firstSentAt = -1;
- }
- }
- void SessionPrivate::markConnectionOld() {
- _oldConnection = true;
- _waitForReceived = kMinReceiveTimeout;
- DEBUG_LOG(("This connection marked as old! _waitForReceived now %1ms"
- ).arg(_waitForReceived));
- }
- void SessionPrivate::sendPingByTimer() {
- if (_pingId) {
- // _pingSendAt: when to send next ping (lastPingAt + kPingSendAfter)
- // could be equal to zero.
- const auto now = crl::now();
- const auto mustSendTill = _pingSendAt
- + kPingSendAfterForce
- - kPingSendAfter;
- if (mustSendTill < now + 1000) {
- LOG(("Could not send ping for some seconds, restarting..."));
- return restart();
- } else {
- _pingSender.callOnce(mustSendTill - now);
- }
- } else {
- _sessionData->queueNeedToResumeAndSend();
- }
- }
- void SessionPrivate::sendPingForce() {
- DEBUG_LOG(("MTP Info: send ping force for dcWithShift %1.").arg(_shiftedDcId));
- if (!_pingId) {
- _pingSendAt = 0;
- DEBUG_LOG(("Will send ping!"));
- tryToSend();
- }
- }
- void SessionPrivate::waitReceivedFailed() {
- Expects(_options != nullptr);
- DEBUG_LOG(("MTP Info: bad connection, _waitForReceived: %1ms").arg(_waitForReceived));
- if (_waitForReceived < kMaxReceiveTimeout) {
- _waitForReceived *= 2;
- }
- doDisconnect();
- if (_retryTimer.isActive()) {
- return;
- }
- DEBUG_LOG(("MTP Info: immediate restart!"));
- InvokeQueued(this, [=] { connectToServer(); });
- const auto instance = _instance;
- const auto shiftedDcId = _shiftedDcId;
- InvokeQueued(instance, [=] {
- instance->restartedByTimeout(shiftedDcId);
- });
- }
- void SessionPrivate::waitConnectedFailed() {
- DEBUG_LOG(("MTP Info: can't connect in %1ms").arg(_waitForConnected));
- auto maxTimeout = kMaxConnectedTimeout;
- for (const auto &connection : _testConnections) {
- accumulate_max(maxTimeout, connection.data->fullConnectTimeout());
- }
- if (_waitForConnected < maxTimeout) {
- _waitForConnected = std::min(maxTimeout, 2 * _waitForConnected);
- }
- connectingTimedOut();
- DEBUG_LOG(("MTP Info: immediate restart!"));
- InvokeQueued(this, [=] { connectToServer(); });
- }
- void SessionPrivate::waitBetterFailed() {
- confirmBestConnection();
- }
- void SessionPrivate::connectingTimedOut() {
- for (const auto &connection : _testConnections) {
- connection.data->timedOut();
- }
- doDisconnect();
- }
- void SessionPrivate::doDisconnect() {
- destroyAllConnections();
- setState(DisconnectedState);
- }
- void SessionPrivate::requestCDNConfig() {
- InvokeQueued(_instance, [instance = _instance] {
- instance->requestCDNConfig();
- });
- }
- void SessionPrivate::handleReceived() {
- Expects(_encryptionKey != nullptr);
- onReceivedSome();
- while (!_connection->received().empty()) {
- auto intsBuffer = std::move(_connection->received().front());
- _connection->received().pop_front();
- constexpr auto kExternalHeaderIntsCount = 6U; // 2 auth_key_id, 4 msg_key
- constexpr auto kEncryptedHeaderIntsCount = 8U; // 2 salt, 2 session, 2 msg_id, 1 seq_no, 1 length
- constexpr auto kMinimalEncryptedIntsCount = kEncryptedHeaderIntsCount + 4U; // + 1 data + 3 padding
- constexpr auto kMinimalIntsCount = kExternalHeaderIntsCount + kMinimalEncryptedIntsCount;
- auto intsCount = uint32(intsBuffer.size());
- auto ints = intsBuffer.constData();
- if ((intsCount < kMinimalIntsCount) || (intsCount > kMaxMessageLength / kIntSize)) {
- LOG(("TCP Error: bad message received, len %1").arg(intsCount * kIntSize));
- return restart();
- }
- if (_keyId != *(uint64*)ints) {
- LOG(("TCP Error: bad auth_key_id %1 instead of %2 received").arg(_keyId).arg(*(uint64*)ints));
- return restart();
- }
- constexpr auto kMinPaddingSize = 12U;
- constexpr auto kMaxPaddingSize = 1024U;
- auto encryptedInts = ints + kExternalHeaderIntsCount;
- auto encryptedIntsCount = (intsCount - kExternalHeaderIntsCount) & ~0x03U;
- auto encryptedBytesCount = encryptedIntsCount * kIntSize;
- auto decryptedBuffer = QByteArray(encryptedBytesCount, Qt::Uninitialized);
- auto msgKey = *(MTPint128*)(ints + 2);
- aesIgeDecrypt(encryptedInts, decryptedBuffer.data(), encryptedBytesCount, _encryptionKey, msgKey);
- auto decryptedInts = reinterpret_cast<const mtpPrime*>(decryptedBuffer.constData());
- auto serverSalt = *(uint64*)&decryptedInts[0];
- auto session = *(uint64*)&decryptedInts[2];
- auto msgId = *(uint64*)&decryptedInts[4];
- auto seqNo = *(uint32*)&decryptedInts[6];
- auto needAck = ((seqNo & 0x01) != 0);
- auto messageLength = *(uint32*)&decryptedInts[7];
- auto fullDataLength = kEncryptedHeaderIntsCount * kIntSize + messageLength; // Without padding.
- // Can underflow, but it is an unsigned type, so we just check the range later.
- auto paddingSize = static_cast<uint32>(encryptedBytesCount) - static_cast<uint32>(fullDataLength);
- std::array<uchar, 32> sha256Buffer = { { 0 } };
- SHA256_CTX msgKeyLargeContext;
- SHA256_Init(&msgKeyLargeContext);
- SHA256_Update(&msgKeyLargeContext, _encryptionKey->partForMsgKey(false), 32);
- SHA256_Update(&msgKeyLargeContext, decryptedInts, encryptedBytesCount);
- SHA256_Final(sha256Buffer.data(), &msgKeyLargeContext);
- constexpr auto kMsgKeyShift = 8U;
- if (ConstTimeIsDifferent(&msgKey, sha256Buffer.data() + kMsgKeyShift, sizeof(msgKey))) {
- LOG(("TCP Error: bad SHA256 hash after aesDecrypt in message"));
- return restart();
- }
- if ((messageLength > kMaxMessageLength)
- || (messageLength & 0x03)
- || (paddingSize < kMinPaddingSize)
- || (paddingSize > kMaxPaddingSize)) {
- LOG(("TCP Error: bad msg_len received %1, data size: %2").arg(messageLength).arg(encryptedBytesCount));
- return restart();
- }
- if (Logs::DebugEnabled()) {
- _connection->logInfo(u"Decrypted message %1,%2,%3 is %4 len"_q
- .arg(msgId)
- .arg(seqNo)
- .arg(Logs::b(needAck))
- .arg(fullDataLength));
- }
- if (session != _sessionId) {
- LOG(("MTP Error: bad server session received"));
- return restart();
- }
- const auto serverTime = int32(msgId >> 32);
- const auto isReply = ((msgId & 0x03) == 1);
- if (!isReply && ((msgId & 0x03) != 3)) {
- LOG(("MTP Error: bad msg_id %1 in message received").arg(msgId));
- return restart();
- }
- const auto clientTime = base::unixtime::now();
- const auto badTime = (serverTime > clientTime + 60)
- || (serverTime + 300 < clientTime);
- if (badTime) {
- DEBUG_LOG(("MTP Info: bad server time from msg_id: %1, my time: %2").arg(serverTime).arg(clientTime));
- }
- bool wasConnected = (getState() == ConnectedState);
- if (serverSalt != _sessionSalt) {
- if (!badTime) {
- DEBUG_LOG(("MTP Info: other salt received... received: %1, my salt: %2, updating...").arg(serverSalt).arg(_sessionSalt));
- _sessionSalt = serverSalt;
- if (setState(ConnectedState, ConnectingState)) {
- resendAll();
- }
- } else {
- DEBUG_LOG(("MTP Info: other salt received... received: %1, my salt: %2").arg(serverSalt).arg(_sessionSalt));
- }
- } else {
- serverSalt = 0; // dont pass to handle method, so not to lock in setSalt()
- }
- if (needAck) _ackRequestData.push_back(MTP_long(msgId));
- auto res = HandleResult::Success; // if no need to handle, then succeed
- auto from = decryptedInts + kEncryptedHeaderIntsCount;
- auto end = from + (messageLength / kIntSize);
- auto sfrom = decryptedInts + 4U; // msg_id + seq_no + length + message
- MTP_LOG(_shiftedDcId, ("Recv: ")
- + DumpToText(sfrom, end)
- + QString(" (dc:%1,key:%2)"
- ).arg(AbstractConnection::ProtocolDcDebugId(getProtocolDcId())
- ).arg(_encryptionKey->keyId()));
- const auto registered = _receivedMessageIds.registerMsgId(
- msgId,
- needAck);
- if (registered == ReceivedIdsManager::Result::Success) {
- res = handleOneReceived(from, end, msgId, {
- .outerMsgId = msgId,
- .serverSalt = serverSalt,
- .serverTime = serverTime,
- .badTime = badTime,
- });
- } else if (registered == ReceivedIdsManager::Result::TooOld) {
- res = HandleResult::ResetSession;
- }
- _receivedMessageIds.shrink();
- // send acks
- if (const auto toAckSize = _ackRequestData.size()) {
- DEBUG_LOG(("MTP Info: will send %1 acks, ids: %2").arg(toAckSize).arg(LogIdsVector(_ackRequestData)));
- _sessionData->queueSendAnything(kAckSendWaiting);
- }
- auto lock = QReadLocker(_sessionData->haveReceivedMutex());
- const auto tryToReceive = !_sessionData->haveReceivedMessages().empty();
- lock.unlock();
- if (tryToReceive) {
- DEBUG_LOG(("MTP Info: queueTryToReceive() - need to parse in another thread, %1 messages.").arg(_sessionData->haveReceivedMessages().size()));
- _sessionData->queueTryToReceive();
- }
- if (res != HandleResult::Success && res != HandleResult::Ignored) {
- if (res == HandleResult::DestroyTemporaryKey) {
- destroyTemporaryKey();
- } else if (res == HandleResult::ResetSession) {
- _needSessionReset = true;
- }
- return restart();
- }
- _retryTimeout = 1; // reset restart() timer
- _startedConnectingAt = crl::time(0);
- if (!wasConnected) {
- if (getState() == ConnectedState) {
- _sessionData->queueNeedToResumeAndSend();
- }
- }
- }
- if (_connection->needHttpWait()) {
- _sessionData->queueSendAnything();
- }
- }
- SessionPrivate::HandleResult SessionPrivate::handleOneReceived(
- const mtpPrime *from,
- const mtpPrime *end,
- uint64 msgId,
- OuterInfo info) {
- Expects(from < end);
- switch (mtpTypeId(*from)) {
- case mtpc_gzip_packed: {
- DEBUG_LOG(("Message Info: gzip container"));
- mtpBuffer response = ungzip(++from, end);
- if (response.empty()) {
- return HandleResult::RestartConnection;
- }
- return handleOneReceived(response.data(), response.data() + response.size(), msgId, info);
- }
- case mtpc_msg_container: {
- if (++from >= end) {
- return HandleResult::ParseError;
- }
- const mtpPrime *otherEnd;
- const auto msgsCount = (uint32)*(from++);
- DEBUG_LOG(("Message Info: container received, count: %1").arg(msgsCount));
- for (uint32 i = 0; i < msgsCount; ++i) {
- if (from + 4 >= end) {
- return HandleResult::ParseError;
- }
- otherEnd = from + 4;
- MTPlong inMsgId;
- if (!inMsgId.read(from, otherEnd)) {
- return HandleResult::ParseError;
- }
- bool isReply = ((inMsgId.v & 0x03) == 1);
- if (!isReply && ((inMsgId.v & 0x03) != 3)) {
- LOG(("Message Error: bad msg_id %1 in contained message received").arg(inMsgId.v));
- return HandleResult::RestartConnection;
- }
- MTPint inSeqNo;
- if (!inSeqNo.read(from, otherEnd)) {
- return HandleResult::ParseError;
- }
- MTPint bytes;
- if (!bytes.read(from, otherEnd)) {
- return HandleResult::ParseError;
- }
- if ((bytes.v & 0x03) || bytes.v < 4) {
- LOG(("Message Error: bad length %1 of contained message received").arg(bytes.v));
- return HandleResult::RestartConnection;
- }
- bool needAck = (inSeqNo.v & 0x01);
- if (needAck) _ackRequestData.push_back(inMsgId);
- DEBUG_LOG(("Message Info: message from container, msg_id: %1, needAck: %2").arg(inMsgId.v).arg(Logs::b(needAck)));
- otherEnd = from + (bytes.v >> 2);
- if (otherEnd > end) {
- return HandleResult::ParseError;
- }
- auto res = HandleResult::Success; // if no need to handle, then succeed
- const auto registered = _receivedMessageIds.registerMsgId(
- inMsgId.v,
- needAck);
- if (registered == ReceivedIdsManager::Result::Success) {
- res = handleOneReceived(from, otherEnd, inMsgId.v, info);
- info.badTime = false;
- } else if (registered == ReceivedIdsManager::Result::TooOld) {
- res = HandleResult::ResetSession;
- }
- if (res != HandleResult::Success) {
- return res;
- }
- from = otherEnd;
- }
- } return HandleResult::Success;
- case mtpc_msgs_ack: {
- MTPMsgsAck msg;
- if (!msg.read(from, end)) {
- return HandleResult::ParseError;
- }
- const auto &ids = msg.c_msgs_ack().vmsg_ids().v;
- DEBUG_LOG(("Message Info: acks received, ids: %1"
- ).arg(LogIdsVector(ids)));
- if (ids.isEmpty()) {
- return info.badTime ? HandleResult::Ignored : HandleResult::Success;
- }
- if (info.badTime) {
- if (!requestsFixTimeSalt(ids, info)) {
- return HandleResult::Ignored;
- }
- } else {
- correctUnixtimeByFastRequest(ids, info.serverTime);
- }
- requestsAcked(ids);
- } return HandleResult::Success;
- case mtpc_bad_msg_notification: {
- MTPBadMsgNotification msg;
- if (!msg.read(from, end)) {
- return HandleResult::ParseError;
- }
- const auto &data(msg.c_bad_msg_notification());
- LOG(("Message Info: bad message notification received (error_code %3) for msg_id = %1, seq_no = %2").arg(data.vbad_msg_id().v).arg(data.vbad_msg_seqno().v).arg(data.verror_code().v));
- const auto resendId = data.vbad_msg_id().v;
- const auto errorCode = data.verror_code().v;
- if (false
- || errorCode == 16
- || errorCode == 17
- || errorCode == 32
- || errorCode == 33
- || errorCode == 64) { // can handle
- const auto needResend = false
- || (errorCode == 16) // bad msg_id
- || (errorCode == 17) // bad msg_id
- || (errorCode == 64); // bad container
- if (errorCode == 64) { // bad container!
- if (Logs::DebugEnabled()) {
- const auto i = _sentContainers.find(resendId);
- if (i == _sentContainers.end()) {
- LOG(("Message Error: Container not found!"));
- } else {
- auto idsList = QStringList();
- for (const auto innerMsgId : i->second.messages) {
- idsList.push_back(QString::number(innerMsgId));
- }
- LOG(("Message Info: bad container received! messages: %1").arg(idsList.join(',')));
- }
- }
- }
- if (!wasSent(resendId)) {
- DEBUG_LOG(("Message Error: "
- "such message was not sent recently %1").arg(resendId));
- return info.badTime
- ? HandleResult::Ignored
- : HandleResult::Success;
- }
- if (needResend) { // bad msg_id or bad container
- if (info.serverSalt) {
- _sessionSalt = info.serverSalt;
- }
- correctUnixtimeWithBadLocal(info.serverTime);
- DEBUG_LOG(("Message Info: unixtime updated, now %1, resending in container...").arg(info.serverTime));
- resend(resendId);
- } else { // must create new session, because msg_id and msg_seqno are inconsistent
- if (info.badTime) {
- if (info.serverSalt) {
- _sessionSalt = info.serverSalt;
- }
- correctUnixtimeWithBadLocal(info.serverTime);
- info.badTime = false;
- }
- if (_bindMsgId) {
- LOG(("Message Info: bad message notification received"
- " while binding temp key, restarting."));
- return HandleResult::RestartConnection;
- }
- LOG(("Message Info: bad message notification received, msgId %1, error_code %2").arg(data.vbad_msg_id().v).arg(errorCode));
- return HandleResult::ResetSession;
- }
- } else { // fatal (except 48, but it must not get here)
- const auto badMsgId = mtpMsgId(data.vbad_msg_id().v);
- const auto requestId = wasSent(resendId);
- if (_bindMsgId) {
- LOG(("Message Error: fatal bad message notification received"
- " while binding temp key, restarting."));
- return HandleResult::RestartConnection;
- } else if (requestId) {
- LOG(("Message Error: "
- "fatal bad message notification received, "
- "msgId %1, error_code %2, requestId: %3"
- ).arg(badMsgId
- ).arg(errorCode
- ).arg(requestId));
- auto reply = mtpBuffer();
- MTPRpcError(MTP_rpc_error(
- MTP_int(500),
- MTP_string("PROTOCOL_ERROR")
- )).write(reply);
- // Save rpc_error for processing in the main thread.
- QWriteLocker locker(_sessionData->haveReceivedMutex());
- _sessionData->haveReceivedMessages().push_back({
- .reply = std::move(reply),
- .outerMsgId = info.outerMsgId,
- .requestId = requestId,
- });
- } else {
- DEBUG_LOG(("Message Error: "
- "such message was not sent recently %1").arg(badMsgId));
- }
- return info.badTime
- ? HandleResult::Ignored
- : HandleResult::Success;
- }
- } return HandleResult::Success;
- case mtpc_bad_server_salt: {
- MTPBadMsgNotification msg;
- if (!msg.read(from, end)) {
- return HandleResult::ParseError;
- }
- const auto &data = msg.c_bad_server_salt();
- DEBUG_LOG(("Message Info: bad server salt received (error_code %4) for msg_id = %1, seq_no = %2, new salt: %3").arg(data.vbad_msg_id().v).arg(data.vbad_msg_seqno().v).arg(data.vnew_server_salt().v).arg(data.verror_code().v));
- const auto resendId = data.vbad_msg_id().v;
- if (!wasSent(resendId)) {
- DEBUG_LOG(("Message Error: such message was not sent recently %1").arg(resendId));
- return (info.badTime ? HandleResult::Ignored : HandleResult::Success);
- }
- _sessionSalt = data.vnew_server_salt().v;
- // Don't force time update here.
- base::unixtime::update(info.serverTime);
- if (_bindMsgId) {
- LOG(("Message Info: bad_server_salt received while binding temp key, restarting."));
- return HandleResult::RestartConnection;
- }
- if (setState(ConnectedState, ConnectingState)) {
- resendAll();
- }
- info.badTime = false;
- DEBUG_LOG(("Message Info: unixtime updated, now %1, server_salt updated, now %2, resending...").arg(info.serverTime).arg(info.serverSalt));
- resend(resendId);
- } return HandleResult::Success;
- case mtpc_msgs_state_info: {
- MTPMsgsStateInfo msg;
- if (!msg.read(from, end)) {
- return HandleResult::ParseError;
- }
- auto &data = msg.c_msgs_state_info();
- auto reqMsgId = data.vreq_msg_id().v;
- auto &states = data.vinfo().v;
- DEBUG_LOG(("Message Info: msg state received, msgId %1, reqMsgId: %2, HEX states %3").arg(msgId).arg(reqMsgId).arg(Logs::mb(states.data(), states.length()).str()));
- const auto i = _stateAndResendRequests.find(reqMsgId);
- if (i == _stateAndResendRequests.end()) {
- DEBUG_LOG(("Message Error: such message was not sent recently %1").arg(reqMsgId));
- return info.badTime
- ? HandleResult::Ignored
- : HandleResult::Success;
- }
- if (info.badTime) {
- if (info.serverSalt) {
- _sessionSalt = info.serverSalt; // requestsFixTimeSalt with no lookup
- }
- correctUnixtimeWithBadLocal(info.serverTime);
- DEBUG_LOG(("Message Info: unixtime updated from mtpc_msgs_state_info, now %1").arg(info.serverTime));
- info.badTime = false;
- }
- const auto originalRequest = i->second;
- Assert(originalRequest->size() > 8);
- requestsAcked(QVector<MTPlong>(1, MTP_long(reqMsgId)), true);
- auto rFrom = originalRequest->constData() + 8;
- const auto rEnd = originalRequest->constData() + originalRequest->size();
- if (mtpTypeId(*rFrom) == mtpc_msgs_state_req) {
- MTPMsgsStateReq request;
- if (!request.read(rFrom, rEnd)) {
- LOG(("Message Error: could not parse sent msgs_state_req"));
- return HandleResult::ParseError;
- }
- handleMsgsStates(request.c_msgs_state_req().vmsg_ids().v, states);
- } else {
- MTPMsgResendReq request;
- if (!request.read(rFrom, rEnd)) {
- LOG(("Message Error: could not parse sent msgs_resend_req"));
- return HandleResult::ParseError;
- }
- handleMsgsStates(request.c_msg_resend_req().vmsg_ids().v, states);
- }
- } return HandleResult::Success;
- case mtpc_msgs_all_info: {
- if (info.badTime) {
- DEBUG_LOG(("Message Info: skipping with bad time..."));
- return HandleResult::Ignored;
- }
- MTPMsgsAllInfo msg;
- if (!msg.read(from, end)) {
- return HandleResult::ParseError;
- }
- auto &data = msg.c_msgs_all_info();
- auto &ids = data.vmsg_ids().v;
- auto &states = data.vinfo().v;
- DEBUG_LOG(("Message Info: msgs all info received, msgId %1, reqMsgIds: %2, states %3").arg(
- QString::number(msgId),
- LogIdsVector(ids),
- Logs::mb(states.data(), states.length()).str()));
- handleMsgsStates(ids, states);
- } return HandleResult::Success;
- case mtpc_msg_detailed_info: {
- MTPMsgDetailedInfo msg;
- if (!msg.read(from, end)) {
- return HandleResult::ParseError;
- }
- const auto &data(msg.c_msg_detailed_info());
- DEBUG_LOG(("Message Info: msg detailed info, sent msgId %1, answerId %2, status %3, bytes %4").arg(data.vmsg_id().v).arg(data.vanswer_msg_id().v).arg(data.vstatus().v).arg(data.vbytes().v));
- QVector<MTPlong> ids(1, data.vmsg_id());
- if (info.badTime) {
- if (requestsFixTimeSalt(ids, info)) {
- info.badTime = false;
- } else {
- DEBUG_LOG(("Message Info: error, such message was not sent recently %1").arg(data.vmsg_id().v));
- return HandleResult::Ignored;
- }
- }
- requestsAcked(ids);
- const auto resMsgId = data.vanswer_msg_id();
- if (_receivedMessageIds.lookup(resMsgId.v) != ReceivedIdsManager::State::NotFound) {
- _ackRequestData.push_back(resMsgId);
- } else {
- DEBUG_LOG(("Message Info: answer message %1 was not received, requesting...").arg(resMsgId.v));
- _resendRequestData.push_back(resMsgId);
- }
- } return HandleResult::Success;
- case mtpc_msg_new_detailed_info: {
- if (info.badTime) {
- DEBUG_LOG(("Message Info: skipping msg_new_detailed_info with bad time..."));
- return HandleResult::Ignored;
- }
- MTPMsgDetailedInfo msg;
- if (!msg.read(from, end)) {
- return HandleResult::ParseError;
- }
- const auto &data(msg.c_msg_new_detailed_info());
- DEBUG_LOG(("Message Info: msg new detailed info, answerId %2, status %3, bytes %4").arg(data.vanswer_msg_id().v).arg(data.vstatus().v).arg(data.vbytes().v));
- const auto resMsgId = data.vanswer_msg_id();
- if (_receivedMessageIds.lookup(resMsgId.v) != ReceivedIdsManager::State::NotFound) {
- _ackRequestData.push_back(resMsgId);
- } else {
- DEBUG_LOG(("Message Info: answer message %1 was not received, requesting...").arg(resMsgId.v));
- _resendRequestData.push_back(resMsgId);
- }
- } return HandleResult::Success;
- case mtpc_rpc_result: {
- if (from + 3 > end) {
- return HandleResult::ParseError;
- }
- auto response = mtpBuffer();
- MTPlong reqMsgId;
- if (!reqMsgId.read(++from, end)) {
- return HandleResult::ParseError;
- }
- const auto requestMsgId = reqMsgId.v;
- DEBUG_LOG(("RPC Info: response received for %1, queueing...").arg(requestMsgId));
- QVector<MTPlong> ids(1, reqMsgId);
- if (info.badTime) {
- if (requestsFixTimeSalt(ids, info)) {
- info.badTime = false;
- } else {
- DEBUG_LOG(("Message Info: error, such message was not sent recently %1").arg(requestMsgId));
- return HandleResult::Ignored;
- }
- }
- mtpTypeId typeId = from[0];
- if (typeId == mtpc_gzip_packed) {
- DEBUG_LOG(("RPC Info: gzip container"));
- response = ungzip(++from, end);
- if (response.empty()) {
- return HandleResult::RestartConnection;
- }
- typeId = response[0];
- } else {
- response.resize(end - from);
- memcpy(response.data(), from, (end - from) * sizeof(mtpPrime));
- }
- if (typeId == mtpc_rpc_error) {
- if (IsDestroyedTemporaryKeyError(response)) {
- return HandleResult::DestroyTemporaryKey;
- }
- // An error could be some RPC_CALL_FAIL or other error inside
- // the initConnection, so we're not sure yet that it was inited.
- // Wait till a good response is received.
- } else {
- _sessionData->notifyConnectionInited(*_options);
- }
- requestsAcked(ids, true);
- const auto bindResult = handleBindResponse(requestMsgId, response);
- if (bindResult != HandleResult::Ignored) {
- return bindResult;
- }
- const auto requestId = wasSent(requestMsgId);
- if (requestId && requestId != mtpRequestId(0xFFFFFFFF)) {
- // Save rpc_result for processing in the main thread.
- QWriteLocker locker(_sessionData->haveReceivedMutex());
- _sessionData->haveReceivedMessages().push_back({
- .reply = std::move(response),
- .outerMsgId = info.outerMsgId,
- .requestId = requestId,
- });
- } else {
- DEBUG_LOG(("RPC Info: requestId not found for msgId %1").arg(requestMsgId));
- }
- } return HandleResult::Success;
- case mtpc_new_session_created: {
- const mtpPrime *start = from;
- MTPNewSession msg;
- if (!msg.read(from, end)) {
- return HandleResult::ParseError;
- }
- const auto &data(msg.c_new_session_created());
- if (info.badTime) {
- if (requestsFixTimeSalt(QVector<MTPlong>(1, data.vfirst_msg_id()), info)) {
- info.badTime = false;
- } else {
- DEBUG_LOG(("Message Info: error, such message was not sent recently %1").arg(data.vfirst_msg_id().v));
- return HandleResult::Ignored;
- }
- }
- DEBUG_LOG(("Message Info: new server session created, unique_id %1, first_msg_id %2, server_salt %3").arg(data.vunique_id().v).arg(data.vfirst_msg_id().v).arg(data.vserver_salt().v));
- _sessionSalt = data.vserver_salt().v;
- mtpMsgId firstMsgId = data.vfirst_msg_id().v;
- QVector<quint64> toResend;
- {
- QReadLocker locker(_sessionData->haveSentMutex());
- const auto &haveSent = _sessionData->haveSentMap();
- toResend.reserve(haveSent.size());
- for (const auto &[msgId, request] : haveSent) {
- if (msgId >= firstMsgId) {
- break;
- } else if (request->requestId) {
- toResend.push_back(msgId);
- }
- }
- }
- for (const auto msgId : toResend) {
- resend(msgId, 10);
- }
- mtpBuffer update(from - start);
- if (from > start) memcpy(update.data(), start, (from - start) * sizeof(mtpPrime));
- // Notify main process about new session - need to get difference.
- QWriteLocker locker(_sessionData->haveReceivedMutex());
- _sessionData->haveReceivedMessages().push_back({
- .reply = update,
- .outerMsgId = info.outerMsgId,
- });
- } return HandleResult::Success;
- case mtpc_pong: {
- MTPPong msg;
- if (!msg.read(from, end)) {
- return HandleResult::ParseError;
- }
- const auto &data(msg.c_pong());
- DEBUG_LOG(("Message Info: pong received, msg_id: %1, ping_id: %2").arg(data.vmsg_id().v).arg(data.vping_id().v));
- if (!wasSent(data.vmsg_id().v)) {
- DEBUG_LOG(("Message Error: such msg_id %1 ping_id %2 was not sent recently").arg(data.vmsg_id().v).arg(data.vping_id().v));
- return HandleResult::Ignored;
- }
- if (data.vping_id().v == _pingId) {
- _pingId = 0;
- } else {
- DEBUG_LOG(("Message Info: just pong..."));
- }
- QVector<MTPlong> ids(1, data.vmsg_id());
- if (info.badTime) {
- if (requestsFixTimeSalt(ids, info)) {
- info.badTime = false;
- } else {
- return HandleResult::Ignored;
- }
- }
- requestsAcked(ids, true);
- } return HandleResult::Success;
- }
- if (info.badTime) {
- DEBUG_LOG(("Message Error: bad time in updates cons, must create new session"));
- return HandleResult::ResetSession;
- }
- if (_currentDcType == DcType::Regular) {
- mtpBuffer update(end - from);
- if (end > from) {
- memcpy(update.data(), from, (end - from) * sizeof(mtpPrime));
- }
- // Notify main process about the new updates.
- QWriteLocker locker(_sessionData->haveReceivedMutex());
- _sessionData->haveReceivedMessages().push_back({
- .reply = update,
- .outerMsgId = info.outerMsgId,
- });
- } else {
- LOG(("Message Error: unexpected updates in dcType: %1"
- ).arg(static_cast<int>(_currentDcType)));
- }
- return HandleResult::Success;
- }
- SessionPrivate::HandleResult SessionPrivate::handleBindResponse(
- mtpMsgId requestMsgId,
- const mtpBuffer &response) {
- if (!_keyCreator || !_bindMsgId || _bindMsgId != requestMsgId) {
- return HandleResult::Ignored;
- }
- _bindMsgId = 0;
- const auto result = _keyCreator->handleBindResponse(response);
- switch (result) {
- case DcKeyBindState::Success:
- if (!_sessionData->releaseKeyCreationOnDone(
- _encryptionKey,
- base::take(_keyCreator)->bindPersistentKey())) {
- return HandleResult::DestroyTemporaryKey;
- }
- _sessionData->queueNeedToResumeAndSend();
- return HandleResult::Success;
- case DcKeyBindState::DefinitelyDestroyed:
- if (destroyOldEnoughPersistentKey()) {
- return HandleResult::DestroyTemporaryKey;
- }
- [[fallthrough]];
- case DcKeyBindState::Failed:
- _sessionData->queueNeedToResumeAndSend();
- return HandleResult::Success;
- }
- Unexpected("Result of BoundKeyCreator::handleBindResponse.");
- }
- mtpBuffer SessionPrivate::ungzip(const mtpPrime *from, const mtpPrime *end) const {
- mtpBuffer result; // * 4 because of mtpPrime type
- result.resize(0);
- MTPstring packed;
- if (!packed.read(from, end)) { // read packed string as serialized mtp string type
- LOG(("RPC Error: could not read gziped bytes."));
- return result;
- }
- uint32 packedLen = packed.v.size(), unpackedChunk = packedLen;
- z_stream stream;
- stream.zalloc = 0;
- stream.zfree = 0;
- stream.opaque = 0;
- stream.avail_in = 0;
- stream.next_in = 0;
- int res = inflateInit2(&stream, 16 + MAX_WBITS);
- if (res != Z_OK) {
- LOG(("RPC Error: could not init zlib stream, code: %1").arg(res));
- return result;
- }
- stream.avail_in = packedLen;
- stream.next_in = reinterpret_cast<Bytef*>(packed.v.data());
- stream.avail_out = 0;
- while (!stream.avail_out) {
- result.resize(result.size() + unpackedChunk);
- stream.avail_out = unpackedChunk * sizeof(mtpPrime);
- stream.next_out = (Bytef*)&result[result.size() - unpackedChunk];
- int res = inflate(&stream, Z_NO_FLUSH);
- if (res != Z_OK && res != Z_STREAM_END) {
- inflateEnd(&stream);
- LOG(("RPC Error: could not unpack gziped data, code: %1").arg(res));
- DEBUG_LOG(("RPC Error: bad gzip: %1").arg(Logs::mb(packed.v.constData(), packedLen).str()));
- return mtpBuffer();
- }
- }
- if (stream.avail_out & 0x03) {
- uint32 badSize = result.size() * sizeof(mtpPrime) - stream.avail_out;
- LOG(("RPC Error: bad length of unpacked data %1").arg(badSize));
- DEBUG_LOG(("RPC Error: bad unpacked data %1").arg(Logs::mb(result.data(), badSize).str()));
- return mtpBuffer();
- }
- result.resize(result.size() - (stream.avail_out >> 2));
- inflateEnd(&stream);
- if (!result.size()) {
- LOG(("RPC Error: bad length of unpacked data 0"));
- }
- return result;
- }
- bool SessionPrivate::requestsFixTimeSalt(const QVector<MTPlong> &ids, const OuterInfo &info) {
- for (const auto &id : ids) {
- if (wasSent(id.v)) {
- // Found such msg_id in recent acked or in recent sent requests.
- if (info.serverSalt) {
- _sessionSalt = info.serverSalt;
- }
- correctUnixtimeWithBadLocal(info.serverTime);
- return true;
- }
- }
- return false;
- }
- void SessionPrivate::correctUnixtimeByFastRequest(
- const QVector<MTPlong> &ids,
- TimeId serverTime) {
- const auto now = crl::now();
- QReadLocker locker(_sessionData->haveSentMutex());
- const auto &haveSent = _sessionData->haveSentMap();
- for (const auto &id : ids) {
- const auto i = haveSent.find(id.v);
- if (i == haveSent.end()) {
- continue;
- }
- const auto duration = (now - i->second->lastSentTime);
- if (duration < 0 || duration > SyncTimeRequestDuration) {
- continue;
- }
- locker.unlock();
- SyncTimeRequestDuration = duration;
- base::unixtime::update(serverTime);
- return;
- }
- }
- void SessionPrivate::correctUnixtimeWithBadLocal(TimeId serverTime) {
- SyncTimeRequestDuration = kFastRequestDuration;
- base::unixtime::update(serverTime, true);
- }
- void SessionPrivate::requestsAcked(const QVector<MTPlong> &ids, bool byResponse) {
- DEBUG_LOG(("Message Info: requests acked, ids %1").arg(LogIdsVector(ids)));
- QVector<MTPlong> toAckMore;
- {
- QWriteLocker locker2(_sessionData->haveSentMutex());
- auto &haveSent = _sessionData->haveSentMap();
- for (const auto &wrappedMsgId : ids) {
- const auto msgId = wrappedMsgId.v;
- if (const auto i = _sentContainers.find(msgId); i != end(_sentContainers)) {
- DEBUG_LOG(("Message Info: container ack received, msgId %1").arg(msgId));
- const auto &list = i->second.messages;
- toAckMore.reserve(toAckMore.size() + list.size());
- for (const auto msgId : list) {
- toAckMore.push_back(MTP_long(msgId));
- }
- _sentContainers.erase(i);
- continue;
- }
- if (const auto i = _stateAndResendRequests.find(msgId); i != end(_stateAndResendRequests)) {
- _stateAndResendRequests.erase(i);
- continue;
- }
- if (const auto i = haveSent.find(msgId); i != end(haveSent)) {
- const auto requestId = i->second->requestId;
- if (!byResponse && _instance->hasCallback(requestId)) {
- DEBUG_LOG(("Message Info: ignoring ACK for msgId %1 because request %2 requires a response").arg(msgId).arg(requestId));
- continue;
- }
- haveSent.erase(i);
- _ackedIds.emplace(msgId, requestId);
- continue;
- }
- DEBUG_LOG(("Message Info: msgId %1 was not found in recent sent, while acking requests, searching in resend...").arg(msgId));
- if (const auto i = _resendingIds.find(msgId); i != end(_resendingIds)) {
- const auto requestId = i->second;
- if (!byResponse && _instance->hasCallback(requestId)) {
- DEBUG_LOG(("Message Info: ignoring ACK for msgId %1 because request %2 requires a response").arg(msgId).arg(requestId));
- continue;
- }
- _resendingIds.erase(i);
- QWriteLocker locker4(_sessionData->toSendMutex());
- auto &toSend = _sessionData->toSendMap();
- const auto j = toSend.find(requestId);
- if (j == end(toSend)) {
- DEBUG_LOG(("Message Info: msgId %1 was found in recent resent, requestId %2 was not found in prepared to send").arg(msgId).arg(requestId));
- continue;
- }
- if (j->second->requestId != requestId) {
- DEBUG_LOG(("Message Error: for msgId %1 found resent request, requestId %2, contains requestId %3").arg(msgId).arg(requestId).arg(j->second->requestId));
- } else {
- DEBUG_LOG(("Message Info: acked msgId %1 that was prepared to resend, requestId %2").arg(msgId).arg(requestId));
- }
- _ackedIds.emplace(msgId, j->second->requestId);
- toSend.erase(j);
- continue;
- }
- DEBUG_LOG(("Message Info: msgId %1 was not found in recent resent either").arg(msgId));
- }
- }
- auto ackedCount = _ackedIds.size();
- if (ackedCount > kIdsBufferSize) {
- DEBUG_LOG(("Message Info: removing some old acked sent msgIds %1").arg(ackedCount - kIdsBufferSize));
- while (ackedCount-- > kIdsBufferSize) {
- _ackedIds.erase(_ackedIds.begin());
- }
- }
- if (toAckMore.size()) {
- requestsAcked(toAckMore);
- }
- }
- void SessionPrivate::handleMsgsStates(const QVector<MTPlong> &ids, const QByteArray &states) {
- const auto idsCount = ids.size();
- if (!idsCount) {
- DEBUG_LOG(("Message Info: void ids vector in handleMsgsStates()"));
- return;
- }
- if (states.size() != idsCount) {
- LOG(("Message Error: got less states than required ids count."));
- return;
- }
- auto acked = QVector<MTPlong>();
- acked.reserve(idsCount);
- for (auto i = 0; i != idsCount; ++i) {
- const auto state = states[i];
- const auto requestMsgId = ids[i].v;
- {
- QReadLocker locker(_sessionData->haveSentMutex());
- if (!_sessionData->haveSentMap().contains(requestMsgId)) {
- DEBUG_LOG(("Message Info: state was received for msgId %1, but request is not found, looking in resent requests...").arg(requestMsgId));
- const auto reqIt = _resendingIds.find(requestMsgId);
- if (reqIt != _resendingIds.cend()) {
- if ((state & 0x07) != 0x04) { // was received
- DEBUG_LOG(("Message Info: state was received for msgId %1, state %2, already resending in container").arg(requestMsgId).arg((int32)state));
- } else {
- DEBUG_LOG(("Message Info: state was received for msgId %1, state %2, ack, cancelling resend").arg(requestMsgId).arg((int32)state));
- acked.push_back(MTP_long(requestMsgId)); // will remove from resend in requestsAcked
- }
- } else {
- DEBUG_LOG(("Message Info: msgId %1 was not found in recent resent either").arg(requestMsgId));
- }
- continue;
- }
- }
- if ((state & 0x07) != 0x04) { // was received
- DEBUG_LOG(("Message Info: state was received for msgId %1, state %2, resending in container").arg(requestMsgId).arg((int32)state));
- resend(requestMsgId, 10);
- } else {
- DEBUG_LOG(("Message Info: state was received for msgId %1, state %2, ack").arg(requestMsgId).arg((int32)state));
- acked.push_back(MTP_long(requestMsgId));
- }
- }
- requestsAcked(acked);
- }
- void SessionPrivate::clearSpecialMsgId(mtpMsgId msgId) {
- if (msgId == _pingMsgId) {
- _pingMsgId = 0;
- _pingId = 0;
- } else if (msgId == _bindMsgId) {
- _bindMsgId = 0;
- }
- }
- void SessionPrivate::resend(mtpMsgId msgId, crl::time msCanWait) {
- const auto guard = gsl::finally([&] {
- clearSpecialMsgId(msgId);
- if (msCanWait >= 0) {
- _sessionData->queueSendAnything(msCanWait);
- }
- });
- if (const auto i = _sentContainers.find(msgId); i != end(_sentContainers)) {
- DEBUG_LOG(("Message Info: resending container, msgId %1").arg(msgId));
- const auto ids = std::move(i->second.messages);
- _sentContainers.erase(i);
- for (const auto innerMsgId : ids) {
- resend(innerMsgId, -1);
- }
- return;
- }
- auto lock = QWriteLocker(_sessionData->haveSentMutex());
- auto &haveSent = _sessionData->haveSentMap();
- auto i = haveSent.find(msgId);
- if (i == haveSent.end()) {
- return;
- }
- auto request = i->second;
- haveSent.erase(i);
- lock.unlock();
- request->lastSentTime = crl::now();
- request->forceSendInContainer = true;
- _resendingIds.emplace(msgId, request->requestId);
- {
- QWriteLocker locker(_sessionData->toSendMutex());
- _sessionData->toSendMap().emplace(request->requestId, request);
- }
- }
- void SessionPrivate::resendAll() {
- auto lock = QWriteLocker(_sessionData->haveSentMutex());
- auto haveSent = base::take(_sessionData->haveSentMap());
- lock.unlock();
- {
- auto lock = QWriteLocker(_sessionData->toSendMutex());
- auto &toSend = _sessionData->toSendMap();
- const auto now = crl::now();
- for (auto &[msgId, request] : haveSent) {
- const auto requestId = request->requestId;
- request->lastSentTime = now;
- request->forceSendInContainer = true;
- _resendingIds.emplace(msgId, requestId);
- toSend.emplace(requestId, std::move(request));
- }
- }
- _sessionData->queueSendAnything();
- }
- void SessionPrivate::onConnected(
- not_null<AbstractConnection*> connection) {
- disconnect(connection, &AbstractConnection::connected, nullptr, nullptr);
- if (!connection->isConnected()) {
- LOG(("Connection Error: not connected in onConnected(), "
- "state: %1").arg(connection->debugState()));
- return restart();
- }
- _waitForConnected = kMinConnectedTimeout;
- _waitForConnectedTimer.cancel();
- const auto i = ranges::find(
- _testConnections,
- connection.get(),
- [](const TestConnection &test) { return test.data.get(); });
- Assert(i != end(_testConnections));
- const auto my = i->priority;
- const auto j = ranges::find_if(
- _testConnections,
- [&](const TestConnection &test) { return test.priority > my; });
- if (j != end(_testConnections)) {
- DEBUG_LOG(("MTP Info: connection %1 succeed, waiting for %2.").arg(
- i->data->tag(),
- j->data->tag()));
- _waitForBetterTimer.callOnce(kWaitForBetterTimeout);
- } else {
- DEBUG_LOG(("MTP Info: connection through IPv4 succeed."));
- _waitForBetterTimer.cancel();
- _connection = std::move(i->data);
- _testConnections.clear();
- checkAuthKey();
- }
- }
- void SessionPrivate::onDisconnected(
- not_null<AbstractConnection*> connection) {
- removeTestConnection(connection);
- if (_testConnections.empty()) {
- destroyAllConnections();
- restart();
- } else {
- confirmBestConnection();
- }
- }
- void SessionPrivate::confirmBestConnection() {
- if (_waitForBetterTimer.isActive()) {
- return;
- }
- const auto i = ranges::max_element(
- _testConnections,
- std::less<>(),
- [](const TestConnection &test) {
- return test.data->isConnected() ? test.priority : -1;
- });
- Assert(i != end(_testConnections));
- if (!i->data->isConnected()) {
- return;
- }
- DEBUG_LOG(("MTP Info: can't connect through better, using %1."
- ).arg(i->data->tag()));
- _connection = std::move(i->data);
- _testConnections.clear();
- checkAuthKey();
- }
- void SessionPrivate::removeTestConnection(
- not_null<AbstractConnection*> connection) {
- _testConnections.erase(
- ranges::remove(
- _testConnections,
- connection.get(),
- [](const TestConnection &test) { return test.data.get(); }),
- end(_testConnections));
- }
- void SessionPrivate::checkAuthKey() {
- if (_keyId) {
- authKeyChecked();
- } else if (_instance->isKeysDestroyer()) {
- applyAuthKey(_sessionData->getPersistentKey());
- } else {
- applyAuthKey(_sessionData->getTemporaryKey(
- TemporaryKeyTypeByDcType(_currentDcType)));
- }
- }
- void SessionPrivate::updateAuthKey() {
- if (_instance->isKeysDestroyer() || _keyCreator || !_connection) {
- return;
- }
- DEBUG_LOG(("AuthKey Info: Connection updating key from Session, dc %1"
- ).arg(_shiftedDcId));
- applyAuthKey(_sessionData->getTemporaryKey(
- TemporaryKeyTypeByDcType(_currentDcType)));
- }
- void SessionPrivate::setCurrentKeyId(uint64 newKeyId) {
- if (_keyId == newKeyId) {
- return;
- }
- _keyId = newKeyId;
- DEBUG_LOG(("MTP Info: auth key id set to id %1").arg(newKeyId));
- changeSessionId();
- }
- void SessionPrivate::applyAuthKey(AuthKeyPtr &&encryptionKey) {
- _encryptionKey = std::move(encryptionKey);
- const auto newKeyId = _encryptionKey ? _encryptionKey->keyId() : 0;
- if (_keyId) {
- if (_keyId == newKeyId) {
- return;
- }
- setCurrentKeyId(0);
- DEBUG_LOG(("MTP Info: auth_key id for dc %1 changed, restarting..."
- ).arg(_shiftedDcId));
- if (_connection) {
- restart();
- }
- return;
- }
- if (!_connection) {
- return;
- }
- setCurrentKeyId(newKeyId);
- Assert(!_connection->sentEncryptedWithKeyId());
- DEBUG_LOG(("AuthKey Info: Connection update key from Session, "
- "dc %1 result: %2"
- ).arg(_shiftedDcId
- ).arg(Logs::mb(&_keyId, sizeof(_keyId)).str()));
- if (_keyId) {
- return authKeyChecked();
- }
- if (_instance->isKeysDestroyer()) {
- // We are here to destroy an old key, so we're done.
- LOG(("MTP Error: No key %1 in updateAuthKey() for destroying."
- ).arg(_shiftedDcId));
- _instance->keyWasPossiblyDestroyed(_shiftedDcId);
- } else if (noMediaKeyWithExistingRegularKey()) {
- DEBUG_LOG(("AuthKey Info: No key in updateAuthKey() for media, "
- "but someone has created regular, trying to acquire."));
- const auto dcType = tryAcquireKeyCreation();
- if (_keyCreator && dcType != _currentDcType) {
- DEBUG_LOG(("AuthKey Info: "
- "Dc type changed for creation, restarting."));
- restart();
- return;
- }
- }
- if (_keyCreator) {
- DEBUG_LOG(("AuthKey Info: No key in updateAuthKey(), creating."));
- _keyCreator->start(
- BareDcId(_shiftedDcId),
- getProtocolDcId(),
- _connection.get(),
- &_instance->dcOptions());
- } else {
- DEBUG_LOG(("AuthKey Info: No key in updateAuthKey(), "
- "but someone is creating already, waiting."));
- }
- }
- bool SessionPrivate::noMediaKeyWithExistingRegularKey() const {
- return (TemporaryKeyTypeByDcType(_currentDcType)
- == TemporaryKeyType::MediaCluster)
- && _sessionData->getTemporaryKey(TemporaryKeyType::Regular);
- }
- bool SessionPrivate::destroyOldEnoughPersistentKey() {
- Expects(_keyCreator != nullptr);
- const auto key = _keyCreator->bindPersistentKey();
- Assert(key != nullptr);
- const auto created = key->creationTime();
- if (created > 0 && crl::now() - created < kKeyOldEnoughForDestroy) {
- return false;
- }
- const auto instance = _instance;
- const auto shiftedDcId = _shiftedDcId;
- const auto keyId = key->keyId();
- InvokeQueued(instance, [=] {
- instance->keyDestroyedOnServer(shiftedDcId, keyId);
- });
- return true;
- }
- DcType SessionPrivate::tryAcquireKeyCreation() {
- if (_keyCreator) {
- return _currentDcType;
- } else if (_instance->isKeysDestroyer()) {
- return _realDcType;
- }
- const auto acquired = _sessionData->acquireKeyCreation(_realDcType);
- if (acquired == CreatingKeyType::None) {
- return _realDcType;
- }
- using Result = DcKeyResult;
- using Error = DcKeyError;
- auto delegate = BoundKeyCreator::Delegate();
- delegate.unboundReady = [=](base::expected<Result, Error> result) {
- if (!result) {
- releaseKeyCreationOnFail();
- if (result.error() == Error::UnknownPublicKey) {
- if (_realDcType == DcType::Cdn) {
- LOG(("Warning: CDN public RSA key not found"));
- requestCDNConfig();
- return;
- }
- LOG(("AuthKey Error: could not choose public RSA key"));
- }
- restart();
- return;
- }
- DEBUG_LOG(("AuthKey Info: unbound key creation succeed, "
- "ids: (%1, %2) server salts: (%3, %4)"
- ).arg(result->temporaryKey
- ? result->temporaryKey->keyId()
- : 0
- ).arg(result->persistentKey
- ? result->persistentKey->keyId()
- : 0
- ).arg(result->temporaryServerSalt
- ).arg(result->persistentServerSalt));
- _sessionSalt = result->temporaryServerSalt;
- result->temporaryKey->setExpiresAt(base::unixtime::now()
- + kTemporaryExpiresIn
- + kBindKeyAdditionalExpiresTimeout);
- if (_realDcType != DcType::Cdn) {
- auto key = result->persistentKey
- ? std::move(result->persistentKey)
- : _sessionData->getPersistentKey();
- if (!key) {
- releaseKeyCreationOnFail();
- restart();
- return;
- }
- _keyCreator->bind(std::move(key));
- }
- applyAuthKey(std::move(result->temporaryKey));
- if (_realDcType == DcType::Cdn) {
- _keyCreator = nullptr;
- if (!_sessionData->releaseCdnKeyCreationOnDone(_encryptionKey)) {
- restart();
- } else {
- _sessionData->queueNeedToResumeAndSend();
- }
- }
- };
- delegate.sentSome = [=](uint64 size) {
- onSentSome(size);
- };
- delegate.receivedSome = [=] {
- onReceivedSome();
- };
- auto request = DcKeyRequest();
- request.persistentNeeded = (acquired == CreatingKeyType::Persistent);
- request.temporaryExpiresIn = kTemporaryExpiresIn;
- _keyCreator = std::make_unique<BoundKeyCreator>(
- request,
- std::move(delegate));
- const auto forceUseRegular = (_realDcType == DcType::MediaCluster)
- && (acquired != CreatingKeyType::TemporaryMediaCluster);
- return forceUseRegular ? DcType::Regular : _realDcType;
- }
- void SessionPrivate::authKeyChecked() {
- connect(_connection, &AbstractConnection::receivedData, [=] {
- handleReceived();
- });
- if (_sessionSalt && setState(ConnectedState)) {
- resendAll();
- } // else receive salt in bad_server_salt first, then try to send all the requests
- _pingIdToSend = base::RandomValue<uint64>(); // get server_salt
- _sessionData->queueNeedToResumeAndSend();
- }
- void SessionPrivate::onError(
- not_null<AbstractConnection*> connection,
- qint32 errorCode) {
- if (errorCode == -429) {
- LOG(("Protocol Error: -429 flood code returned!"));
- } else if (errorCode == -444) {
- LOG(("Protocol Error: -444 bad dc_id code returned!"));
- InvokeQueued(_instance, [instance = _instance] {
- instance->badConfigurationError();
- });
- }
- removeTestConnection(connection);
- if (_testConnections.empty()) {
- handleError(errorCode);
- } else {
- confirmBestConnection();
- }
- }
- void SessionPrivate::handleError(int errorCode) {
- destroyAllConnections();
- _waitForConnectedTimer.cancel();
- if (errorCode == -404) {
- destroyTemporaryKey();
- } else {
- MTP_LOG(_shiftedDcId, ("Restarting after error in connection, error code: %1...").arg(errorCode));
- return restart();
- }
- }
- void SessionPrivate::destroyTemporaryKey() {
- if (_instance->isKeysDestroyer()) {
- LOG(("MTP Info: -404 error received in destroyer %1, assuming key was destroyed.").arg(_shiftedDcId));
- _instance->keyWasPossiblyDestroyed(_shiftedDcId);
- return;
- }
- LOG(("MTP Info: -404 error received in %1 with temporary key, assuming it was destroyed.").arg(_shiftedDcId));
- releaseKeyCreationOnFail();
- if (_encryptionKey) {
- _sessionData->destroyTemporaryKey(_encryptionKey->keyId());
- }
- applyAuthKey(nullptr);
- restart();
- }
- bool SessionPrivate::sendSecureRequest(
- SerializedRequest &&request,
- bool needAnyResponse) {
- request.addPadding(false);
- uint32 fullSize = request->size();
- if (fullSize < 9) {
- return false;
- }
- auto messageSize = request.messageSize();
- if (messageSize < 5 || fullSize < messageSize + 4) {
- return false;
- }
- memcpy(request->data() + 0, &_sessionSalt, 2 * sizeof(mtpPrime));
- memcpy(request->data() + 2, &_sessionId, 2 * sizeof(mtpPrime));
- auto from = request->constData() + 4;
- MTP_LOG(_shiftedDcId, ("Send: ")
- + DumpToText(from, from + messageSize)
- + QString(" (dc:%1,key:%2)"
- ).arg(AbstractConnection::ProtocolDcDebugId(getProtocolDcId())
- ).arg(_encryptionKey->keyId()));
- uchar encryptedSHA256[32];
- MTPint128 &msgKey(*(MTPint128*)(encryptedSHA256 + 8));
- SHA256_CTX msgKeyLargeContext;
- SHA256_Init(&msgKeyLargeContext);
- SHA256_Update(&msgKeyLargeContext, _encryptionKey->partForMsgKey(true), 32);
- SHA256_Update(&msgKeyLargeContext, request->constData(), fullSize * sizeof(mtpPrime));
- SHA256_Final(encryptedSHA256, &msgKeyLargeContext);
- auto packet = _connection->prepareSecurePacket(_keyId, msgKey, fullSize);
- const auto prefix = packet.size();
- packet.resize(prefix + fullSize);
- aesIgeEncrypt(
- request->constData(),
- &packet[prefix],
- fullSize * sizeof(mtpPrime),
- _encryptionKey,
- msgKey);
- DEBUG_LOG(("MTP Info: sending request, size: %1, num: %2, time: %3").arg(fullSize + 6).arg((*request)[4]).arg((*request)[5]));
- _connection->setSentEncryptedWithKeyId(_keyId);
- _connection->sendData(std::move(packet));
- if (needAnyResponse) {
- onSentSome((prefix + fullSize) * sizeof(mtpPrime));
- }
- return true;
- }
- mtpRequestId SessionPrivate::wasSent(mtpMsgId msgId) const {
- if (msgId == _pingMsgId || msgId == _bindMsgId) {
- return mtpRequestId(0xFFFFFFFF);
- }
- if (const auto i = _resendingIds.find(msgId); i != end(_resendingIds)) {
- return i->second;
- }
- if (const auto i = _ackedIds.find(msgId); i != end(_ackedIds)) {
- return i->second;
- }
- if (const auto i = _sentContainers.find(msgId); i != end(_sentContainers)) {
- return mtpRequestId(0xFFFFFFFF);
- }
- {
- QReadLocker locker(_sessionData->haveSentMutex());
- const auto &haveSent = _sessionData->haveSentMap();
- const auto i = haveSent.find(msgId);
- if (i != haveSent.end()) {
- return i->second->requestId
- ? i->second->requestId
- : mtpRequestId(0xFFFFFFFF);
- }
- }
- return 0;
- }
- void SessionPrivate::clearUnboundKeyCreator() {
- if (_keyCreator) {
- _keyCreator->stop();
- }
- }
- void SessionPrivate::releaseKeyCreationOnFail() {
- if (!_keyCreator) {
- return;
- }
- _keyCreator = nullptr;
- _sessionData->releaseKeyCreationOnFail();
- }
- } // namespace details
- } // namespace MTP
|