file_download_web.cpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620
  1. /*
  2. This file is part of Telegram Desktop,
  3. the official desktop application for the Telegram messaging service.
  4. For license and copyright information please follow this link:
  5. https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
  6. */
  7. #include "storage/file_download_web.h"
  8. #include "storage/cache/storage_cache_types.h"
  9. #include "base/timer.h"
  10. #include "base/weak_ptr.h"
  11. #include <QtNetwork/QAuthenticator>
  12. namespace {
  13. constexpr auto kMaxWebFileQueries = 8;
  14. constexpr auto kMaxHttpRedirects = 5;
  15. constexpr auto kResetDownloadPrioritiesTimeout = crl::time(200);
  16. constexpr auto kMaxWebFile = 4000 * int64(1024 * 1024);
  17. std::weak_ptr<WebLoadManager> GlobalLoadManager;
  18. [[nodiscard]] std::shared_ptr<WebLoadManager> GetManager() {
  19. auto result = GlobalLoadManager.lock();
  20. if (!result) {
  21. GlobalLoadManager = result = std::make_shared<WebLoadManager>();
  22. }
  23. return result;
  24. }
  25. enum class ProcessResult {
  26. Error,
  27. Progress,
  28. Finished,
  29. };
  30. enum class Error {
  31. };
  32. struct Progress {
  33. qint64 ready = 0;
  34. qint64 total = 0;
  35. QByteArray streamed;
  36. };
  37. using Update = std::variant<Progress, QByteArray, Error>;
  38. struct UpdateForLoader {
  39. not_null<webFileLoader*> loader;
  40. Update data;
  41. };
  42. } // namespace
  43. class WebLoadManager final : public base::has_weak_ptr {
  44. public:
  45. WebLoadManager();
  46. ~WebLoadManager();
  47. void enqueue(not_null<webFileLoader*> loader);
  48. void remove(not_null<webFileLoader*> loader);
  49. [[nodiscard]] rpl::producer<Update> updates(
  50. not_null<webFileLoader*> loader) const;
  51. private:
  52. struct Enqueued {
  53. int id = 0;
  54. QString url;
  55. bool stream = false;
  56. };
  57. struct Sent {
  58. QString url;
  59. not_null<QNetworkReply*> reply;
  60. bool stream = false;
  61. QByteArray data;
  62. int64 ready = 0;
  63. int64 total = 0;
  64. int redirectsLeft = kMaxHttpRedirects;
  65. };
  66. // Constructor.
  67. void handleNetworkErrors();
  68. // Worker thread.
  69. void enqueue(int id, const QString &url, bool stream);
  70. void remove(int id);
  71. void resetGeneration();
  72. void checkSendNext();
  73. void send(const Enqueued &entry);
  74. [[nodiscard]] not_null<QNetworkReply*> send(int id, const QString &url);
  75. [[nodiscard]] Sent *findSent(int id, not_null<QNetworkReply*> reply);
  76. void removeSent(int id);
  77. void progress(
  78. int id,
  79. not_null<QNetworkReply*> reply,
  80. int64 ready,
  81. int64 total);
  82. void failed(
  83. int id,
  84. not_null<QNetworkReply*> reply,
  85. QNetworkReply::NetworkError error);
  86. void redirect(int id, not_null<QNetworkReply*> reply);
  87. void notify(
  88. int id,
  89. not_null<QNetworkReply*> reply,
  90. int64 ready,
  91. int64 total);
  92. void failed(int id, not_null<QNetworkReply*> reply);
  93. void finished(int id, not_null<QNetworkReply*> reply);
  94. void deleteDeferred(not_null<QNetworkReply*> reply);
  95. void queueProgressUpdate(
  96. int id,
  97. int64 ready,
  98. int64 total,
  99. QByteArray streamed);
  100. void queueFailedUpdate(int id);
  101. void queueFinishedUpdate(int id, const QByteArray &data);
  102. void clear();
  103. // Main thread.
  104. void sendUpdate(int id, Update &&data);
  105. QThread _thread;
  106. std::unique_ptr<QNetworkAccessManager> _network;
  107. base::Timer _resetGenerationTimer;
  108. // Main thread.
  109. rpl::event_stream<UpdateForLoader> _updates;
  110. int _autoincrement = 0;
  111. base::flat_map<not_null<webFileLoader*>, int> _ids;
  112. // Worker thread.
  113. std::deque<Enqueued> _queue;
  114. std::deque<Enqueued> _previousGeneration;
  115. base::flat_map<int, Sent> _sent;
  116. std::vector<QPointer<QNetworkReply>> _repliesBeingDeleted;
  117. };
  118. WebLoadManager::WebLoadManager()
  119. : _network(std::make_unique<QNetworkAccessManager>())
  120. , _resetGenerationTimer(&_thread, [=] { resetGeneration(); }) {
  121. handleNetworkErrors();
  122. _network->moveToThread(&_thread);
  123. QObject::connect(&_thread, &QThread::finished, [=] {
  124. clear();
  125. _network = nullptr;
  126. });
  127. _thread.start();
  128. }
  129. void WebLoadManager::handleNetworkErrors() {
  130. const auto fail = [=](QNetworkReply *reply) {
  131. for (const auto &[id, sent] : _sent) {
  132. if (sent.reply == reply) {
  133. failed(id, reply);
  134. return;
  135. }
  136. }
  137. };
  138. QObject::connect(
  139. _network.get(),
  140. &QNetworkAccessManager::authenticationRequired,
  141. fail);
  142. QObject::connect(
  143. _network.get(),
  144. &QNetworkAccessManager::sslErrors,
  145. fail);
  146. }
  147. WebLoadManager::~WebLoadManager() {
  148. _thread.quit();
  149. _thread.wait();
  150. }
  151. [[nodiscard]] rpl::producer<Update> WebLoadManager::updates(
  152. not_null<webFileLoader*> loader) const {
  153. return _updates.events(
  154. ) | rpl::filter([=](const UpdateForLoader &update) {
  155. return (update.loader == loader);
  156. }) | rpl::map([=](UpdateForLoader &&update) {
  157. return std::move(update.data);
  158. });
  159. }
  160. void WebLoadManager::enqueue(not_null<webFileLoader*> loader) {
  161. const auto id = [&] {
  162. const auto i = _ids.find(loader);
  163. return (i != end(_ids))
  164. ? i->second
  165. : _ids.emplace(loader, ++_autoincrement).first->second;
  166. }();
  167. const auto url = loader->url();
  168. const auto stream = loader->streamLoading();
  169. InvokeQueued(_network.get(), [=] {
  170. enqueue(id, url, stream);
  171. });
  172. }
  173. void WebLoadManager::remove(not_null<webFileLoader*> loader) {
  174. const auto i = _ids.find(loader);
  175. if (i == end(_ids)) {
  176. return;
  177. }
  178. const auto id = i->second;
  179. _ids.erase(i);
  180. InvokeQueued(_network.get(), [=] {
  181. remove(id);
  182. });
  183. }
  184. void WebLoadManager::enqueue(int id, const QString &url, bool stream) {
  185. const auto i = ranges::find(_queue, id, &Enqueued::id);
  186. if (i != end(_queue)) {
  187. return;
  188. }
  189. _previousGeneration.erase(
  190. ranges::remove(_previousGeneration, id, &Enqueued::id),
  191. end(_previousGeneration));
  192. _queue.push_back(Enqueued{ id, url, stream });
  193. if (!_resetGenerationTimer.isActive()) {
  194. _resetGenerationTimer.callOnce(kResetDownloadPrioritiesTimeout);
  195. }
  196. checkSendNext();
  197. }
  198. void WebLoadManager::remove(int id) {
  199. _queue.erase(ranges::remove(_queue, id, &Enqueued::id), end(_queue));
  200. _previousGeneration.erase(
  201. ranges::remove(_previousGeneration, id, &Enqueued::id),
  202. end(_previousGeneration));
  203. removeSent(id);
  204. }
  205. void WebLoadManager::resetGeneration() {
  206. if (!_previousGeneration.empty()) {
  207. std::copy(
  208. begin(_previousGeneration),
  209. end(_previousGeneration),
  210. std::back_inserter(_queue));
  211. _previousGeneration.clear();
  212. }
  213. std::swap(_queue, _previousGeneration);
  214. }
  215. void WebLoadManager::checkSendNext() {
  216. if (_sent.size() >= kMaxWebFileQueries
  217. || (_queue.empty() && _previousGeneration.empty())) {
  218. return;
  219. }
  220. const auto entry = _queue.empty()
  221. ? _previousGeneration.front()
  222. : _queue.front();
  223. (_queue.empty() ? _previousGeneration : _queue).pop_front();
  224. send(entry);
  225. }
  226. void WebLoadManager::send(const Enqueued &entry) {
  227. const auto id = entry.id;
  228. const auto url = entry.url;
  229. _sent.emplace(id, Sent{ url, send(id, url), entry.stream });
  230. }
  231. void WebLoadManager::removeSent(int id) {
  232. if (const auto i = _sent.find(id); i != end(_sent)) {
  233. deleteDeferred(i->second.reply);
  234. _sent.erase(i);
  235. checkSendNext();
  236. }
  237. }
  238. not_null<QNetworkReply*> WebLoadManager::send(int id, const QString &url) {
  239. const auto result = _network->get(QNetworkRequest(url));
  240. const auto handleProgress = [=](qint64 ready, qint64 total) {
  241. progress(id, result, ready, total);
  242. };
  243. const auto handleError = [=](QNetworkReply::NetworkError error) {
  244. failed(id, result, error);
  245. };
  246. QObject::connect(
  247. result,
  248. &QNetworkReply::downloadProgress,
  249. handleProgress);
  250. QObject::connect(result, &QNetworkReply::errorOccurred, handleError);
  251. return result;
  252. }
  253. WebLoadManager::Sent *WebLoadManager::findSent(
  254. int id,
  255. not_null<QNetworkReply*> reply) {
  256. const auto i = _sent.find(id);
  257. return (i != end(_sent) && i->second.reply == reply)
  258. ? &i->second
  259. : nullptr;
  260. }
  261. void WebLoadManager::progress(
  262. int id,
  263. not_null<QNetworkReply*> reply,
  264. int64 ready,
  265. int64 total) {
  266. if (total <= 0) {
  267. const auto originalContentLength = reply->attribute(
  268. QNetworkRequest::OriginalContentLengthAttribute);
  269. if (originalContentLength.isValid()) {
  270. total = originalContentLength.toLongLong();
  271. }
  272. }
  273. const auto statusCode = reply->attribute(
  274. QNetworkRequest::HttpStatusCodeAttribute);
  275. const auto status = statusCode.isValid() ? statusCode.toInt() : 200;
  276. if (status == 301 || status == 302) {
  277. redirect(id, reply);
  278. } else if (status != 200 && status != 206 && status != 416) {
  279. LOG(("Network Error: "
  280. "Bad HTTP status received in WebLoadManager::onProgress() %1"
  281. ).arg(status));
  282. failed(id, reply);
  283. } else {
  284. notify(id, reply, ready, std::max(ready, total));
  285. }
  286. }
  287. void WebLoadManager::redirect(int id, not_null<QNetworkReply*> reply) {
  288. const auto header = reply->header(QNetworkRequest::LocationHeader);
  289. const auto url = header.toString();
  290. if (url.isEmpty()) {
  291. return;
  292. }
  293. if (const auto sent = findSent(id, reply)) {
  294. if (!sent->redirectsLeft--) {
  295. LOG(("Network Error: "
  296. "Too many HTTP redirects in onFinished() "
  297. "for web file loader: %1").arg(url));
  298. failed(id, reply);
  299. return;
  300. }
  301. deleteDeferred(reply);
  302. sent->url = url;
  303. sent->reply = send(id, url);
  304. }
  305. }
  306. void WebLoadManager::notify(
  307. int id,
  308. not_null<QNetworkReply*> reply,
  309. int64 ready,
  310. int64 total) {
  311. if (const auto sent = findSent(id, reply)) {
  312. sent->ready = ready;
  313. sent->total = std::max(total, int64(0));
  314. if (total <= 0) {
  315. LOG(("Network Error: "
  316. "Bad size received for HTTP download progress "
  317. "in WebLoadManager::onProgress(): %1 / %2 (bytes %3)"
  318. ).arg(ready
  319. ).arg(total
  320. ).arg(sent->data.size()));
  321. failed(id, reply);
  322. return;
  323. }
  324. auto bytes = reply->readAll();
  325. if (sent->stream) {
  326. if (total > kMaxWebFile) {
  327. LOG(("Network Error: "
  328. "Bad size received for HTTP download progress "
  329. "in WebLoadManager::onProgress(): %1 / %2"
  330. ).arg(ready
  331. ).arg(total));
  332. failed(id, reply);
  333. } else {
  334. queueProgressUpdate(
  335. id,
  336. sent->ready,
  337. sent->total,
  338. std::move(bytes));
  339. if (ready >= total) {
  340. finished(id, reply);
  341. }
  342. }
  343. } else {
  344. sent->data.append(std::move(bytes));
  345. if (total > Storage::kMaxFileInMemory
  346. || sent->data.size() > Storage::kMaxFileInMemory) {
  347. LOG(("Network Error: "
  348. "Bad size received for HTTP download progress "
  349. "in WebLoadManager::onProgress(): %1 / %2 (bytes %3)"
  350. ).arg(ready
  351. ).arg(total
  352. ).arg(sent->data.size()));
  353. failed(id, reply);
  354. } else if (ready >= total) {
  355. finished(id, reply);
  356. } else {
  357. queueProgressUpdate(id, sent->ready, sent->total, {});
  358. }
  359. }
  360. }
  361. }
  362. void WebLoadManager::failed(
  363. int id,
  364. not_null<QNetworkReply*> reply,
  365. QNetworkReply::NetworkError error) {
  366. if (const auto sent = findSent(id, reply)) {
  367. LOG(("Network Error: "
  368. "Failed to request '%1', error %2 (%3)"
  369. ).arg(sent->url
  370. ).arg(int(error)
  371. ).arg(reply->errorString()));
  372. failed(id, reply);
  373. }
  374. }
  375. void WebLoadManager::failed(int id, not_null<QNetworkReply*> reply) {
  376. if (const auto sent = findSent(id, reply)) {
  377. removeSent(id);
  378. queueFailedUpdate(id);
  379. }
  380. }
  381. void WebLoadManager::deleteDeferred(not_null<QNetworkReply*> reply) {
  382. reply->deleteLater();
  383. _repliesBeingDeleted.erase(
  384. ranges::remove(_repliesBeingDeleted, nullptr),
  385. end(_repliesBeingDeleted));
  386. _repliesBeingDeleted.emplace_back(reply.get());
  387. }
  388. void WebLoadManager::finished(int id, not_null<QNetworkReply*> reply) {
  389. if (const auto sent = findSent(id, reply)) {
  390. const auto data = base::take(sent->data);
  391. removeSent(id);
  392. queueFinishedUpdate(id, data);
  393. }
  394. }
  395. void WebLoadManager::clear() {
  396. for (const auto &[id, sent] : base::take(_sent)) {
  397. sent.reply->abort();
  398. delete sent.reply;
  399. }
  400. for (const auto &reply : base::take(_repliesBeingDeleted)) {
  401. if (reply) {
  402. delete reply;
  403. }
  404. }
  405. }
  406. void WebLoadManager::queueProgressUpdate(
  407. int id,
  408. int64 ready,
  409. int64 total,
  410. QByteArray streamed) {
  411. crl::on_main(this, [=, bytes = std::move(streamed)]() mutable {
  412. sendUpdate(id, Progress{ ready, total, std::move(bytes) });
  413. });
  414. }
  415. void WebLoadManager::queueFailedUpdate(int id) {
  416. crl::on_main(this, [=] {
  417. sendUpdate(id, Error{});
  418. });
  419. }
  420. void WebLoadManager::queueFinishedUpdate(int id, const QByteArray &data) {
  421. crl::on_main(this, [=] {
  422. for (const auto &[loader, loaderId] : _ids) {
  423. if (loaderId == id) {
  424. break;
  425. }
  426. }
  427. sendUpdate(id, QByteArray(data));
  428. });
  429. }
  430. void WebLoadManager::sendUpdate(int id, Update &&data) {
  431. for (const auto &[loader, loaderId] : _ids) {
  432. if (loaderId == id) {
  433. _updates.fire(UpdateForLoader{ loader, std::move(data) });
  434. return;
  435. }
  436. }
  437. }
  438. webFileLoader::webFileLoader(
  439. not_null<Main::Session*> session,
  440. const QString &url,
  441. const QString &to,
  442. LoadFromCloudSetting fromCloud,
  443. bool autoLoading,
  444. uint8 cacheTag)
  445. : FileLoader(
  446. session,
  447. QString(),
  448. 0,
  449. 0,
  450. UnknownFileLocation,
  451. LoadToCacheAsWell,
  452. fromCloud,
  453. autoLoading,
  454. cacheTag)
  455. , _url(url) {
  456. }
  457. webFileLoader::webFileLoader(
  458. not_null<Main::Session*> session,
  459. const QString &url,
  460. const QString &path,
  461. WebRequestType type)
  462. : FileLoader(
  463. session,
  464. path,
  465. 0,
  466. 0,
  467. UnknownFileLocation,
  468. LoadToFileOnly,
  469. LoadFromCloudOrLocal,
  470. false,
  471. 0)
  472. , _url(url)
  473. , _requestType(type) {
  474. }
  475. webFileLoader::~webFileLoader() {
  476. if (!_finished) {
  477. cancel();
  478. }
  479. }
  480. QString webFileLoader::url() const {
  481. return _url;
  482. }
  483. WebRequestType webFileLoader::requestType() const {
  484. return _requestType;
  485. }
  486. bool webFileLoader::streamLoading() const {
  487. return (_toCache == LoadToFileOnly);
  488. }
  489. void webFileLoader::startLoading() {
  490. if (_finished) {
  491. return;
  492. } else if (!_manager) {
  493. _manager = GetManager();
  494. _manager->updates(
  495. this
  496. ) | rpl::start_with_next([=](const Update &data) {
  497. if (const auto progress = std::get_if<Progress>(&data)) {
  498. loadProgress(
  499. progress->ready,
  500. progress->total,
  501. progress->streamed);
  502. } else if (const auto bytes = std::get_if<QByteArray>(&data)) {
  503. loadFinished(*bytes);
  504. } else {
  505. loadFailed();
  506. }
  507. }, _managerLifetime);
  508. }
  509. _manager->enqueue(this);
  510. }
  511. int64 webFileLoader::currentOffset() const {
  512. return _ready;
  513. }
  514. void webFileLoader::loadProgress(
  515. qint64 ready,
  516. qint64 total,
  517. const QByteArray &streamed) {
  518. _fullSize = _loadSize = total;
  519. _ready = ready;
  520. if (!streamed.isEmpty()
  521. && !writeResultPart(_streamedOffset, bytes::make_span(streamed))) {
  522. loadFailed();
  523. } else {
  524. _streamedOffset += streamed.size();
  525. notifyAboutProgress();
  526. }
  527. }
  528. void webFileLoader::loadFinished(const QByteArray &data) {
  529. cancelRequest();
  530. if (writeResultPart(0, bytes::make_span(data))) {
  531. finalizeResult();
  532. }
  533. }
  534. void webFileLoader::loadFailed() {
  535. cancel(FailureReason::OtherFailure);
  536. }
  537. Storage::Cache::Key webFileLoader::cacheKey() const {
  538. return Data::UrlCacheKey(_url);
  539. }
  540. std::optional<MediaKey> webFileLoader::fileLocationKey() const {
  541. return std::nullopt;
  542. }
  543. void webFileLoader::cancelHook() {
  544. cancelRequest();
  545. }
  546. void webFileLoader::cancelRequest() {
  547. if (!_manager) {
  548. return;
  549. }
  550. _managerLifetime.destroy();
  551. _manager->remove(this);
  552. _manager = nullptr;
  553. }