dedicated_file_loader.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471
  1. /*
  2. This file is part of Telegram Desktop,
  3. the official desktop application for the Telegram messaging service.
  4. For license and copyright information please follow this link:
  5. https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
  6. */
  7. #include "mtproto/dedicated_file_loader.h"
  8. #include "mtproto/facade.h"
  9. #include "main/main_account.h" // Account::sessionChanges.
  10. #include "main/main_session.h" // Session::account.
  11. #include "core/application.h"
  12. #include "base/call_delayed.h"
  13. namespace MTP {
  14. namespace {
  15. std::optional<MTPInputChannel> ExtractChannel(
  16. const MTPcontacts_ResolvedPeer &result) {
  17. const auto &data = result.c_contacts_resolvedPeer();
  18. if (const auto peer = peerFromMTP(data.vpeer())) {
  19. for (const auto &chat : data.vchats().v) {
  20. if (chat.type() == mtpc_channel) {
  21. const auto &channel = chat.c_channel();
  22. if (peer == peerFromChannel(channel.vid())) {
  23. return MTP_inputChannel(
  24. channel.vid(),
  25. MTP_long(channel.vaccess_hash().value_or_empty()));
  26. }
  27. }
  28. }
  29. }
  30. return std::nullopt;
  31. }
  32. std::optional<DedicatedLoader::File> ParseFile(
  33. const MTPmessages_Messages &result) {
  34. const auto message = GetMessagesElement(result);
  35. if (!message || message->type() != mtpc_message) {
  36. LOG(("Update Error: MTP file message not found."));
  37. return std::nullopt;
  38. }
  39. const auto &data = message->c_message();
  40. const auto media = data.vmedia();
  41. if (!media || media->type() != mtpc_messageMediaDocument) {
  42. LOG(("Update Error: MTP file media not found."));
  43. return std::nullopt;
  44. }
  45. const auto &inner = media->c_messageMediaDocument();
  46. const auto document = inner.vdocument();
  47. if (!document || document->type() != mtpc_document) {
  48. LOG(("Update Error: MTP file not found."));
  49. return std::nullopt;
  50. }
  51. const auto &fields = document->c_document();
  52. const auto name = [&] {
  53. for (const auto &attribute : fields.vattributes().v) {
  54. if (attribute.type() == mtpc_documentAttributeFilename) {
  55. const auto &data = attribute.c_documentAttributeFilename();
  56. return qs(data.vfile_name());
  57. }
  58. }
  59. return QString();
  60. }();
  61. if (name.isEmpty()) {
  62. LOG(("Update Error: MTP file name not found."));
  63. return std::nullopt;
  64. }
  65. const auto size = int64(fields.vsize().v);
  66. if (size <= 0) {
  67. LOG(("Update Error: MTP file size is invalid."));
  68. return std::nullopt;
  69. }
  70. const auto location = MTP_inputDocumentFileLocation(
  71. fields.vid(),
  72. fields.vaccess_hash(),
  73. fields.vfile_reference(),
  74. MTP_string());
  75. return DedicatedLoader::File{ name, size, fields.vdc_id().v, location };
  76. }
  77. } // namespace
  78. WeakInstance::WeakInstance(base::weak_ptr<Main::Session> session)
  79. : _session(session)
  80. , _instance(_session ? &_session->account().mtp() : nullptr) {
  81. if (!valid()) {
  82. return;
  83. }
  84. connect(_instance, &QObject::destroyed, this, [=] {
  85. _instance = nullptr;
  86. _session = nullptr;
  87. die();
  88. });
  89. _session->account().sessionChanges(
  90. ) | rpl::filter([](Main::Session *session) {
  91. return !session;
  92. }) | rpl::start_with_next([=] {
  93. die();
  94. }, _lifetime);
  95. }
  96. base::weak_ptr<Main::Session> WeakInstance::session() const {
  97. return _session;
  98. }
  99. bool WeakInstance::valid() const {
  100. return (_session != nullptr);
  101. }
  102. Instance *WeakInstance::instance() const {
  103. return _instance;
  104. }
  105. void WeakInstance::die() {
  106. for (const auto &[requestId, fail] : base::take(_requests)) {
  107. if (_instance) {
  108. _instance->cancel(requestId);
  109. }
  110. fail(Error::Local(
  111. "UNAVAILABLE",
  112. "MTP instance is not available."));
  113. }
  114. }
  115. bool WeakInstance::removeRequest(mtpRequestId requestId) {
  116. if (const auto i = _requests.find(requestId); i != end(_requests)) {
  117. _requests.erase(i);
  118. return true;
  119. }
  120. return false;
  121. }
  122. void WeakInstance::reportUnavailable(
  123. Fn<void(const Error &error)> callback) {
  124. InvokeQueued(this, [=] {
  125. callback(Error::Local(
  126. "UNAVAILABLE",
  127. "MTP instance is not available."));
  128. });
  129. }
  130. WeakInstance::~WeakInstance() {
  131. if (_instance) {
  132. for (const auto &[requestId, fail] : base::take(_requests)) {
  133. _instance->cancel(requestId);
  134. }
  135. }
  136. }
  137. AbstractDedicatedLoader::AbstractDedicatedLoader(
  138. const QString &filepath,
  139. int chunkSize)
  140. : _filepath(filepath)
  141. , _chunkSize(chunkSize) {
  142. }
  143. void AbstractDedicatedLoader::start() {
  144. if (!validateOutput()
  145. || (!_output.isOpen() && !_output.open(QIODevice::Append))) {
  146. QFile(_filepath).remove();
  147. threadSafeFailed();
  148. return;
  149. }
  150. LOG(("Update Info: Starting loading '%1' from %2 offset."
  151. ).arg(_filepath
  152. ).arg(alreadySize()));
  153. startLoading();
  154. }
  155. int64 AbstractDedicatedLoader::alreadySize() const {
  156. QMutexLocker lock(&_sizesMutex);
  157. return _alreadySize;
  158. }
  159. int64 AbstractDedicatedLoader::totalSize() const {
  160. QMutexLocker lock(&_sizesMutex);
  161. return _totalSize;
  162. }
  163. rpl::producer<QString> AbstractDedicatedLoader::ready() const {
  164. return _ready.events();
  165. }
  166. auto AbstractDedicatedLoader::progress() const -> rpl::producer<Progress> {
  167. return _progress.events();
  168. }
  169. rpl::producer<> AbstractDedicatedLoader::failed() const {
  170. return _failed.events();
  171. }
  172. void AbstractDedicatedLoader::wipeFolder() {
  173. QFileInfo info(_filepath);
  174. const auto dir = info.dir();
  175. const auto all = dir.entryInfoList(QDir::Files);
  176. for (auto i = all.begin(), e = all.end(); i != e; ++i) {
  177. if (i->absoluteFilePath() != info.absoluteFilePath()) {
  178. QFile::remove(i->absoluteFilePath());
  179. }
  180. }
  181. }
  182. bool AbstractDedicatedLoader::validateOutput() {
  183. if (_filepath.isEmpty()) {
  184. return false;
  185. }
  186. QFileInfo info(_filepath);
  187. const auto dir = info.dir();
  188. if (!dir.exists()) {
  189. dir.mkdir(dir.absolutePath());
  190. }
  191. _output.setFileName(_filepath);
  192. if (!info.exists()) {
  193. return true;
  194. }
  195. const auto fullSize = info.size();
  196. if (fullSize < _chunkSize || fullSize > kMaxFileSize) {
  197. return _output.remove();
  198. }
  199. const auto goodSize = int64((fullSize % _chunkSize)
  200. ? (fullSize - (fullSize % _chunkSize))
  201. : fullSize);
  202. if (_output.resize(goodSize)) {
  203. _alreadySize = goodSize;
  204. return true;
  205. }
  206. return false;
  207. }
  208. void AbstractDedicatedLoader::threadSafeProgress(Progress progress) {
  209. crl::on_main(this, [=] {
  210. _progress.fire_copy(progress);
  211. });
  212. }
  213. void AbstractDedicatedLoader::threadSafeReady() {
  214. crl::on_main(this, [=] {
  215. _ready.fire_copy(_filepath);
  216. });
  217. }
  218. void AbstractDedicatedLoader::threadSafeFailed() {
  219. crl::on_main(this, [=] {
  220. _failed.fire({});
  221. });
  222. }
  223. void AbstractDedicatedLoader::writeChunk(bytes::const_span data, int totalSize) {
  224. const auto size = data.size();
  225. if (size > 0) {
  226. const auto written = _output.write(QByteArray::fromRawData(
  227. reinterpret_cast<const char*>(data.data()),
  228. size));
  229. if (written != size) {
  230. threadSafeFailed();
  231. return;
  232. }
  233. }
  234. const auto progress = [&] {
  235. QMutexLocker lock(&_sizesMutex);
  236. if (!_totalSize) {
  237. _totalSize = totalSize;
  238. }
  239. _alreadySize += size;
  240. return Progress { _alreadySize, _totalSize };
  241. }();
  242. if (progress.size > 0 && progress.already >= progress.size) {
  243. _output.close();
  244. threadSafeReady();
  245. } else {
  246. threadSafeProgress(progress);
  247. }
  248. }
  249. rpl::lifetime &AbstractDedicatedLoader::lifetime() {
  250. return _lifetime;
  251. }
  252. DedicatedLoader::DedicatedLoader(
  253. base::weak_ptr<Main::Session> session,
  254. const QString &folder,
  255. const File &file)
  256. : AbstractDedicatedLoader(folder + '/' + file.name, kChunkSize)
  257. , _size(file.size)
  258. , _dcId(file.dcId)
  259. , _location(file.location)
  260. , _mtp(session) {
  261. Expects(_size > 0);
  262. }
  263. void DedicatedLoader::startLoading() {
  264. if (!_mtp.valid()) {
  265. LOG(("Update Error: MTP is unavailable."));
  266. threadSafeFailed();
  267. return;
  268. }
  269. LOG(("Update Info: Loading using MTP from '%1'.").arg(_dcId));
  270. _offset = alreadySize();
  271. writeChunk({}, _size);
  272. sendRequest();
  273. }
  274. void DedicatedLoader::sendRequest() {
  275. if (_requests.size() >= kRequestsCount || _offset >= _size) {
  276. return;
  277. }
  278. const auto offset = _offset;
  279. _requests.push_back({ offset });
  280. _mtp.send(
  281. MTPupload_GetFile(
  282. MTP_flags(0),
  283. _location,
  284. MTP_long(offset),
  285. MTP_int(kChunkSize)),
  286. [=](const MTPupload_File &result) { gotPart(offset, result); },
  287. failHandler(),
  288. MTP::updaterDcId(_dcId));
  289. _offset += kChunkSize;
  290. if (_requests.size() < kRequestsCount) {
  291. base::call_delayed(kNextRequestDelay, this, [=] { sendRequest(); });
  292. }
  293. }
  294. void DedicatedLoader::gotPart(int offset, const MTPupload_File &result) {
  295. Expects(!_requests.empty());
  296. if (result.type() == mtpc_upload_fileCdnRedirect) {
  297. LOG(("Update Error: MTP does not support cdn right now."));
  298. threadSafeFailed();
  299. return;
  300. }
  301. const auto &data = result.c_upload_file();
  302. if (data.vbytes().v.isEmpty()) {
  303. LOG(("Update Error: MTP empty part received."));
  304. threadSafeFailed();
  305. return;
  306. }
  307. const auto i = ranges::find(
  308. _requests,
  309. offset,
  310. [](const Request &request) { return request.offset; });
  311. Assert(i != end(_requests));
  312. i->bytes = data.vbytes().v;
  313. while (!_requests.empty() && !_requests.front().bytes.isEmpty()) {
  314. writeChunk(bytes::make_span(_requests.front().bytes), _size);
  315. _requests.pop_front();
  316. }
  317. sendRequest();
  318. }
  319. Fn<void(const Error &)> DedicatedLoader::failHandler() {
  320. return [=](const Error &error) {
  321. LOG(("Update Error: MTP load failed with '%1'"
  322. ).arg(QString::number(error.code()) + ':' + error.type()));
  323. threadSafeFailed();
  324. };
  325. }
  326. void ResolveChannel(
  327. not_null<MTP::WeakInstance*> mtp,
  328. const QString &username,
  329. Fn<void(const MTPInputChannel &channel)> done,
  330. Fn<void()> fail) {
  331. const auto failed = [&] {
  332. LOG(("Dedicated MTP Error: Channel '%1' resolve failed."
  333. ).arg(username));
  334. fail();
  335. };
  336. const auto session = mtp->session();
  337. if (!mtp->valid()) {
  338. failed();
  339. return;
  340. }
  341. struct ResolveResult {
  342. base::weak_ptr<Main::Session> session;
  343. MTPInputChannel channel;
  344. };
  345. static std::map<QString, ResolveResult> ResolveCache;
  346. const auto i = ResolveCache.find(username);
  347. if (i != end(ResolveCache)) {
  348. if (i->second.session.get() == session.get()) {
  349. done(i->second.channel);
  350. return;
  351. }
  352. ResolveCache.erase(i);
  353. }
  354. const auto doneHandler = [=](const MTPcontacts_ResolvedPeer &result) {
  355. Expects(result.type() == mtpc_contacts_resolvedPeer);
  356. if (const auto channel = ExtractChannel(result)) {
  357. ResolveCache.emplace(
  358. username,
  359. ResolveResult { session, *channel });
  360. done(*channel);
  361. } else {
  362. failed();
  363. }
  364. };
  365. const auto failHandler = [=](const Error &error) {
  366. LOG(("Dedicated MTP Error: Resolve failed with '%1'"
  367. ).arg(QString::number(error.code()) + ':' + error.type()));
  368. fail();
  369. };
  370. mtp->send(MTPcontacts_ResolveUsername(
  371. MTP_flags(0),
  372. MTP_string(username),
  373. MTP_string()
  374. ), doneHandler, failHandler);
  375. }
  376. std::optional<MTPMessage> GetMessagesElement(
  377. const MTPmessages_Messages &list) {
  378. return list.match([&](const MTPDmessages_messagesNotModified &) {
  379. return std::optional<MTPMessage>(std::nullopt);
  380. }, [&](const auto &data) {
  381. return data.vmessages().v.isEmpty()
  382. ? std::nullopt
  383. : std::make_optional(data.vmessages().v[0]);
  384. });
  385. }
  386. void StartDedicatedLoader(
  387. not_null<MTP::WeakInstance*> mtp,
  388. const DedicatedLoader::Location &location,
  389. const QString &folder,
  390. Fn<void(std::unique_ptr<DedicatedLoader>)> ready) {
  391. const auto doneHandler = [=](const MTPmessages_Messages &result) {
  392. const auto file = ParseFile(result);
  393. ready(file
  394. ? std::make_unique<MTP::DedicatedLoader>(
  395. mtp->session(),
  396. folder,
  397. *file)
  398. : nullptr);
  399. };
  400. const auto failHandler = [=](const Error &error) {
  401. LOG(("Update Error: MTP check failed with '%1'"
  402. ).arg(QString::number(error.code()) + ':' + error.type()));
  403. ready(nullptr);
  404. };
  405. const auto &[username, postId] = location;
  406. ResolveChannel(mtp, username, [=, postId = postId](
  407. const MTPInputChannel &channel) {
  408. mtp->send(
  409. MTPchannels_GetMessages(
  410. channel,
  411. MTP_vector<MTPInputMessage>(
  412. 1,
  413. MTP_inputMessageID(MTP_int(postId)))),
  414. doneHandler,
  415. failHandler);
  416. }, [=] { ready(nullptr); });
  417. }
  418. } // namespace MTP