| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471 |
- /*
- This file is part of Telegram Desktop,
- the official desktop application for the Telegram messaging service.
- For license and copyright information please follow this link:
- https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
- */
- #include "mtproto/dedicated_file_loader.h"
- #include "mtproto/facade.h"
- #include "main/main_account.h" // Account::sessionChanges.
- #include "main/main_session.h" // Session::account.
- #include "core/application.h"
- #include "base/call_delayed.h"
- namespace MTP {
- namespace {
- std::optional<MTPInputChannel> ExtractChannel(
- const MTPcontacts_ResolvedPeer &result) {
- const auto &data = result.c_contacts_resolvedPeer();
- if (const auto peer = peerFromMTP(data.vpeer())) {
- for (const auto &chat : data.vchats().v) {
- if (chat.type() == mtpc_channel) {
- const auto &channel = chat.c_channel();
- if (peer == peerFromChannel(channel.vid())) {
- return MTP_inputChannel(
- channel.vid(),
- MTP_long(channel.vaccess_hash().value_or_empty()));
- }
- }
- }
- }
- return std::nullopt;
- }
- std::optional<DedicatedLoader::File> ParseFile(
- const MTPmessages_Messages &result) {
- const auto message = GetMessagesElement(result);
- if (!message || message->type() != mtpc_message) {
- LOG(("Update Error: MTP file message not found."));
- return std::nullopt;
- }
- const auto &data = message->c_message();
- const auto media = data.vmedia();
- if (!media || media->type() != mtpc_messageMediaDocument) {
- LOG(("Update Error: MTP file media not found."));
- return std::nullopt;
- }
- const auto &inner = media->c_messageMediaDocument();
- const auto document = inner.vdocument();
- if (!document || document->type() != mtpc_document) {
- LOG(("Update Error: MTP file not found."));
- return std::nullopt;
- }
- const auto &fields = document->c_document();
- const auto name = [&] {
- for (const auto &attribute : fields.vattributes().v) {
- if (attribute.type() == mtpc_documentAttributeFilename) {
- const auto &data = attribute.c_documentAttributeFilename();
- return qs(data.vfile_name());
- }
- }
- return QString();
- }();
- if (name.isEmpty()) {
- LOG(("Update Error: MTP file name not found."));
- return std::nullopt;
- }
- const auto size = int64(fields.vsize().v);
- if (size <= 0) {
- LOG(("Update Error: MTP file size is invalid."));
- return std::nullopt;
- }
- const auto location = MTP_inputDocumentFileLocation(
- fields.vid(),
- fields.vaccess_hash(),
- fields.vfile_reference(),
- MTP_string());
- return DedicatedLoader::File{ name, size, fields.vdc_id().v, location };
- }
- } // namespace
- WeakInstance::WeakInstance(base::weak_ptr<Main::Session> session)
- : _session(session)
- , _instance(_session ? &_session->account().mtp() : nullptr) {
- if (!valid()) {
- return;
- }
- connect(_instance, &QObject::destroyed, this, [=] {
- _instance = nullptr;
- _session = nullptr;
- die();
- });
- _session->account().sessionChanges(
- ) | rpl::filter([](Main::Session *session) {
- return !session;
- }) | rpl::start_with_next([=] {
- die();
- }, _lifetime);
- }
- base::weak_ptr<Main::Session> WeakInstance::session() const {
- return _session;
- }
- bool WeakInstance::valid() const {
- return (_session != nullptr);
- }
- Instance *WeakInstance::instance() const {
- return _instance;
- }
- void WeakInstance::die() {
- for (const auto &[requestId, fail] : base::take(_requests)) {
- if (_instance) {
- _instance->cancel(requestId);
- }
- fail(Error::Local(
- "UNAVAILABLE",
- "MTP instance is not available."));
- }
- }
- bool WeakInstance::removeRequest(mtpRequestId requestId) {
- if (const auto i = _requests.find(requestId); i != end(_requests)) {
- _requests.erase(i);
- return true;
- }
- return false;
- }
- void WeakInstance::reportUnavailable(
- Fn<void(const Error &error)> callback) {
- InvokeQueued(this, [=] {
- callback(Error::Local(
- "UNAVAILABLE",
- "MTP instance is not available."));
- });
- }
- WeakInstance::~WeakInstance() {
- if (_instance) {
- for (const auto &[requestId, fail] : base::take(_requests)) {
- _instance->cancel(requestId);
- }
- }
- }
- AbstractDedicatedLoader::AbstractDedicatedLoader(
- const QString &filepath,
- int chunkSize)
- : _filepath(filepath)
- , _chunkSize(chunkSize) {
- }
- void AbstractDedicatedLoader::start() {
- if (!validateOutput()
- || (!_output.isOpen() && !_output.open(QIODevice::Append))) {
- QFile(_filepath).remove();
- threadSafeFailed();
- return;
- }
- LOG(("Update Info: Starting loading '%1' from %2 offset."
- ).arg(_filepath
- ).arg(alreadySize()));
- startLoading();
- }
- int64 AbstractDedicatedLoader::alreadySize() const {
- QMutexLocker lock(&_sizesMutex);
- return _alreadySize;
- }
- int64 AbstractDedicatedLoader::totalSize() const {
- QMutexLocker lock(&_sizesMutex);
- return _totalSize;
- }
- rpl::producer<QString> AbstractDedicatedLoader::ready() const {
- return _ready.events();
- }
- auto AbstractDedicatedLoader::progress() const -> rpl::producer<Progress> {
- return _progress.events();
- }
- rpl::producer<> AbstractDedicatedLoader::failed() const {
- return _failed.events();
- }
- void AbstractDedicatedLoader::wipeFolder() {
- QFileInfo info(_filepath);
- const auto dir = info.dir();
- const auto all = dir.entryInfoList(QDir::Files);
- for (auto i = all.begin(), e = all.end(); i != e; ++i) {
- if (i->absoluteFilePath() != info.absoluteFilePath()) {
- QFile::remove(i->absoluteFilePath());
- }
- }
- }
- bool AbstractDedicatedLoader::validateOutput() {
- if (_filepath.isEmpty()) {
- return false;
- }
- QFileInfo info(_filepath);
- const auto dir = info.dir();
- if (!dir.exists()) {
- dir.mkdir(dir.absolutePath());
- }
- _output.setFileName(_filepath);
- if (!info.exists()) {
- return true;
- }
- const auto fullSize = info.size();
- if (fullSize < _chunkSize || fullSize > kMaxFileSize) {
- return _output.remove();
- }
- const auto goodSize = int64((fullSize % _chunkSize)
- ? (fullSize - (fullSize % _chunkSize))
- : fullSize);
- if (_output.resize(goodSize)) {
- _alreadySize = goodSize;
- return true;
- }
- return false;
- }
- void AbstractDedicatedLoader::threadSafeProgress(Progress progress) {
- crl::on_main(this, [=] {
- _progress.fire_copy(progress);
- });
- }
- void AbstractDedicatedLoader::threadSafeReady() {
- crl::on_main(this, [=] {
- _ready.fire_copy(_filepath);
- });
- }
- void AbstractDedicatedLoader::threadSafeFailed() {
- crl::on_main(this, [=] {
- _failed.fire({});
- });
- }
- void AbstractDedicatedLoader::writeChunk(bytes::const_span data, int totalSize) {
- const auto size = data.size();
- if (size > 0) {
- const auto written = _output.write(QByteArray::fromRawData(
- reinterpret_cast<const char*>(data.data()),
- size));
- if (written != size) {
- threadSafeFailed();
- return;
- }
- }
- const auto progress = [&] {
- QMutexLocker lock(&_sizesMutex);
- if (!_totalSize) {
- _totalSize = totalSize;
- }
- _alreadySize += size;
- return Progress { _alreadySize, _totalSize };
- }();
- if (progress.size > 0 && progress.already >= progress.size) {
- _output.close();
- threadSafeReady();
- } else {
- threadSafeProgress(progress);
- }
- }
- rpl::lifetime &AbstractDedicatedLoader::lifetime() {
- return _lifetime;
- }
- DedicatedLoader::DedicatedLoader(
- base::weak_ptr<Main::Session> session,
- const QString &folder,
- const File &file)
- : AbstractDedicatedLoader(folder + '/' + file.name, kChunkSize)
- , _size(file.size)
- , _dcId(file.dcId)
- , _location(file.location)
- , _mtp(session) {
- Expects(_size > 0);
- }
- void DedicatedLoader::startLoading() {
- if (!_mtp.valid()) {
- LOG(("Update Error: MTP is unavailable."));
- threadSafeFailed();
- return;
- }
- LOG(("Update Info: Loading using MTP from '%1'.").arg(_dcId));
- _offset = alreadySize();
- writeChunk({}, _size);
- sendRequest();
- }
- void DedicatedLoader::sendRequest() {
- if (_requests.size() >= kRequestsCount || _offset >= _size) {
- return;
- }
- const auto offset = _offset;
- _requests.push_back({ offset });
- _mtp.send(
- MTPupload_GetFile(
- MTP_flags(0),
- _location,
- MTP_long(offset),
- MTP_int(kChunkSize)),
- [=](const MTPupload_File &result) { gotPart(offset, result); },
- failHandler(),
- MTP::updaterDcId(_dcId));
- _offset += kChunkSize;
- if (_requests.size() < kRequestsCount) {
- base::call_delayed(kNextRequestDelay, this, [=] { sendRequest(); });
- }
- }
- void DedicatedLoader::gotPart(int offset, const MTPupload_File &result) {
- Expects(!_requests.empty());
- if (result.type() == mtpc_upload_fileCdnRedirect) {
- LOG(("Update Error: MTP does not support cdn right now."));
- threadSafeFailed();
- return;
- }
- const auto &data = result.c_upload_file();
- if (data.vbytes().v.isEmpty()) {
- LOG(("Update Error: MTP empty part received."));
- threadSafeFailed();
- return;
- }
- const auto i = ranges::find(
- _requests,
- offset,
- [](const Request &request) { return request.offset; });
- Assert(i != end(_requests));
- i->bytes = data.vbytes().v;
- while (!_requests.empty() && !_requests.front().bytes.isEmpty()) {
- writeChunk(bytes::make_span(_requests.front().bytes), _size);
- _requests.pop_front();
- }
- sendRequest();
- }
- Fn<void(const Error &)> DedicatedLoader::failHandler() {
- return [=](const Error &error) {
- LOG(("Update Error: MTP load failed with '%1'"
- ).arg(QString::number(error.code()) + ':' + error.type()));
- threadSafeFailed();
- };
- }
- void ResolveChannel(
- not_null<MTP::WeakInstance*> mtp,
- const QString &username,
- Fn<void(const MTPInputChannel &channel)> done,
- Fn<void()> fail) {
- const auto failed = [&] {
- LOG(("Dedicated MTP Error: Channel '%1' resolve failed."
- ).arg(username));
- fail();
- };
- const auto session = mtp->session();
- if (!mtp->valid()) {
- failed();
- return;
- }
- struct ResolveResult {
- base::weak_ptr<Main::Session> session;
- MTPInputChannel channel;
- };
- static std::map<QString, ResolveResult> ResolveCache;
- const auto i = ResolveCache.find(username);
- if (i != end(ResolveCache)) {
- if (i->second.session.get() == session.get()) {
- done(i->second.channel);
- return;
- }
- ResolveCache.erase(i);
- }
- const auto doneHandler = [=](const MTPcontacts_ResolvedPeer &result) {
- Expects(result.type() == mtpc_contacts_resolvedPeer);
- if (const auto channel = ExtractChannel(result)) {
- ResolveCache.emplace(
- username,
- ResolveResult { session, *channel });
- done(*channel);
- } else {
- failed();
- }
- };
- const auto failHandler = [=](const Error &error) {
- LOG(("Dedicated MTP Error: Resolve failed with '%1'"
- ).arg(QString::number(error.code()) + ':' + error.type()));
- fail();
- };
- mtp->send(MTPcontacts_ResolveUsername(
- MTP_flags(0),
- MTP_string(username),
- MTP_string()
- ), doneHandler, failHandler);
- }
- std::optional<MTPMessage> GetMessagesElement(
- const MTPmessages_Messages &list) {
- return list.match([&](const MTPDmessages_messagesNotModified &) {
- return std::optional<MTPMessage>(std::nullopt);
- }, [&](const auto &data) {
- return data.vmessages().v.isEmpty()
- ? std::nullopt
- : std::make_optional(data.vmessages().v[0]);
- });
- }
- void StartDedicatedLoader(
- not_null<MTP::WeakInstance*> mtp,
- const DedicatedLoader::Location &location,
- const QString &folder,
- Fn<void(std::unique_ptr<DedicatedLoader>)> ready) {
- const auto doneHandler = [=](const MTPmessages_Messages &result) {
- const auto file = ParseFile(result);
- ready(file
- ? std::make_unique<MTP::DedicatedLoader>(
- mtp->session(),
- folder,
- *file)
- : nullptr);
- };
- const auto failHandler = [=](const Error &error) {
- LOG(("Update Error: MTP check failed with '%1'"
- ).arg(QString::number(error.code()) + ':' + error.type()));
- ready(nullptr);
- };
- const auto &[username, postId] = location;
- ResolveChannel(mtp, username, [=, postId = postId](
- const MTPInputChannel &channel) {
- mtp->send(
- MTPchannels_GetMessages(
- channel,
- MTP_vector<MTPInputMessage>(
- 1,
- MTP_inputMessageID(MTP_int(postId)))),
- doneHandler,
- failHandler);
- }, [=] { ready(nullptr); });
- }
- } // namespace MTP
|