media_streaming_file.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506
  1. /*
  2. This file is part of Telegram Desktop,
  3. the official desktop application for the Telegram messaging service.
  4. For license and copyright information please follow this link:
  5. https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
  6. */
  7. #include "media/streaming/media_streaming_file.h"
  8. #include "media/streaming/media_streaming_loader.h"
  9. #include "media/streaming/media_streaming_file_delegate.h"
  10. #include "ffmpeg/ffmpeg_utility.h"
  11. namespace Media {
  12. namespace Streaming {
  13. namespace {
  14. constexpr auto kMaxSingleReadAmount = 8 * 1024 * 1024;
  15. constexpr auto kMaxQueuedPackets = 1024;
  16. [[nodiscard]] bool UnreliableFormatDuration(
  17. not_null<AVFormatContext*> format,
  18. not_null<AVStream*> stream,
  19. Mode mode) {
  20. return (mode == Mode::Video || mode == Mode::Inspection)
  21. && stream->codecpar
  22. && (stream->codecpar->codec_id == AV_CODEC_ID_VP9)
  23. && format->iformat
  24. && format->iformat->name
  25. && QString::fromLatin1(
  26. format->iformat->name
  27. ).split(QChar(',')).contains(u"webm");
  28. }
  29. } // namespace
  30. File::Context::Context(
  31. not_null<FileDelegate*> delegate,
  32. not_null<Reader*> reader)
  33. : _delegate(delegate)
  34. , _reader(reader)
  35. , _size(reader->size()) {
  36. }
  37. File::Context::~Context() = default;
  38. int File::Context::Read(void *opaque, uint8_t *buffer, int bufferSize) {
  39. return static_cast<Context*>(opaque)->read(
  40. bytes::make_span(buffer, bufferSize));
  41. }
  42. int64_t File::Context::Seek(void *opaque, int64_t offset, int whence) {
  43. return static_cast<Context*>(opaque)->seek(offset, whence);
  44. }
  45. int File::Context::read(bytes::span buffer) {
  46. Expects(_size >= _offset);
  47. const auto amount = std::min(_size - _offset, int64(buffer.size()));
  48. if (unroll()) {
  49. return AVERROR_EXTERNAL;
  50. } else if (amount > kMaxSingleReadAmount) {
  51. LOG(("Streaming Error: Read callback asked for too much data: %1"
  52. ).arg(amount));
  53. return AVERROR_EXTERNAL;
  54. } else if (!amount) {
  55. return AVERROR_EOF;
  56. }
  57. buffer = buffer.subspan(0, amount);
  58. while (true) {
  59. const auto result = _reader->fill(_offset, buffer, &_semaphore);
  60. if (result == Reader::FillState::Success) {
  61. break;
  62. } else if (result == Reader::FillState::WaitingRemote) {
  63. // Perhaps for the correct sleeping in case of enough packets
  64. // being read already we require SleepPolicy::Allowed here.
  65. // Otherwise if we wait for the remote frequently and
  66. // _queuedPackets never get to kMaxQueuedPackets and we don't call
  67. // processQueuedPackets(SleepPolicy::Allowed) ever.
  68. //
  69. // But right now we can't simply pass SleepPolicy::Allowed here,
  70. // it freezes because of two _semaphore.acquire one after another.
  71. processQueuedPackets(SleepPolicy::Disallowed);
  72. _delegate->fileWaitingForData();
  73. }
  74. _semaphore.acquire();
  75. if (_interrupted) {
  76. return AVERROR_EXTERNAL;
  77. } else if (const auto error = _reader->streamingError()) {
  78. fail(*error);
  79. return AVERROR_EXTERNAL;
  80. }
  81. }
  82. sendFullInCache();
  83. _offset += amount;
  84. return amount;
  85. }
  86. int64_t File::Context::seek(int64_t offset, int whence) {
  87. const auto checkedSeek = [&](int64_t offset) {
  88. if (_failed || offset < 0 || offset > _size) {
  89. return int64(-1);
  90. }
  91. return (_offset = offset);
  92. };
  93. switch (whence) {
  94. case SEEK_SET: return checkedSeek(offset);
  95. case SEEK_CUR: return checkedSeek(_offset + offset);
  96. case SEEK_END: return checkedSeek(_size + offset);
  97. case AVSEEK_SIZE: return _size;
  98. }
  99. return -1;
  100. }
  101. void File::Context::logError(QLatin1String method) {
  102. if (!unroll()) {
  103. FFmpeg::LogError(method);
  104. }
  105. }
  106. void File::Context::logError(
  107. QLatin1String method,
  108. FFmpeg::AvErrorWrap error) {
  109. if (!unroll()) {
  110. FFmpeg::LogError(method, error);
  111. }
  112. }
  113. void File::Context::logFatal(QLatin1String method) {
  114. if (!unroll()) {
  115. FFmpeg::LogError(method);
  116. fail(_format ? Error::InvalidData : Error::OpenFailed);
  117. }
  118. }
  119. void File::Context::logFatal(
  120. QLatin1String method,
  121. FFmpeg::AvErrorWrap error) {
  122. if (!unroll()) {
  123. FFmpeg::LogError(method, error);
  124. fail(_format ? Error::InvalidData : Error::OpenFailed);
  125. }
  126. }
  127. Stream File::Context::initStream(
  128. not_null<AVFormatContext*> format,
  129. AVMediaType type,
  130. Mode mode,
  131. StartOptions options) {
  132. auto result = Stream();
  133. const auto index = result.index = av_find_best_stream(
  134. format,
  135. type,
  136. -1,
  137. -1,
  138. nullptr,
  139. 0);
  140. if (index < 0) {
  141. return {};
  142. }
  143. const auto info = format->streams[index];
  144. if (type == AVMEDIA_TYPE_VIDEO) {
  145. if (info->disposition & AV_DISPOSITION_ATTACHED_PIC) {
  146. // ignore cover streams
  147. return Stream();
  148. }
  149. result.codec = FFmpeg::MakeCodecPointer({
  150. .stream = info,
  151. .hwAllowed = options.hwAllow,
  152. });
  153. if (!result.codec) {
  154. return result;
  155. }
  156. result.rotation = FFmpeg::ReadRotationFromMetadata(info);
  157. result.aspect = FFmpeg::ValidateAspectRatio(
  158. info->sample_aspect_ratio);
  159. } else if (type == AVMEDIA_TYPE_AUDIO) {
  160. result.frequency = info->codecpar->sample_rate;
  161. if (!result.frequency) {
  162. return result;
  163. }
  164. result.codec = FFmpeg::MakeCodecPointer({ .stream = info });
  165. if (!result.codec) {
  166. return result;
  167. }
  168. }
  169. result.decodedFrame = FFmpeg::MakeFramePointer();
  170. if (!result.decodedFrame) {
  171. result.codec = nullptr;
  172. return result;
  173. }
  174. result.timeBase = info->time_base;
  175. result.duration = options.durationOverride
  176. ? options.durationOverride
  177. : (info->duration != AV_NOPTS_VALUE)
  178. ? FFmpeg::PtsToTime(info->duration, result.timeBase)
  179. : UnreliableFormatDuration(format, info, mode)
  180. ? kTimeUnknown
  181. : FFmpeg::PtsToTime(format->duration, FFmpeg::kUniversalTimeBase);
  182. if (result.duration == kTimeUnknown) {
  183. result.duration = kDurationUnavailable;
  184. } else if (result.duration <= 0) {
  185. result.codec = nullptr;
  186. } else {
  187. ++result.duration;
  188. if (result.duration > kDurationMax) {
  189. result.duration = 0;
  190. result.codec = nullptr;
  191. }
  192. }
  193. return result;
  194. }
  195. void File::Context::seekToPosition(
  196. not_null<AVFormatContext*> format,
  197. const Stream &stream,
  198. crl::time position) {
  199. auto error = FFmpeg::AvErrorWrap();
  200. if (!position) {
  201. return;
  202. } else if (stream.duration == kDurationUnavailable) {
  203. // Seek in files with unknown duration is not supported.
  204. return;
  205. }
  206. //
  207. // Non backward search reads the whole file if the position is after
  208. // the last keyframe inside the index. So we search only backward.
  209. //
  210. //const auto seekFlags = 0;
  211. //error = av_seek_frame(
  212. // format,
  213. // streamIndex,
  214. // TimeToPts(position, kUniversalTimeBase),
  215. // seekFlags);
  216. //if (!error) {
  217. // return;
  218. //}
  219. //
  220. error = av_seek_frame(
  221. format,
  222. stream.index,
  223. FFmpeg::TimeToPts(
  224. std::clamp(position, crl::time(0), stream.duration - 1),
  225. stream.timeBase),
  226. AVSEEK_FLAG_BACKWARD);
  227. if (!error) {
  228. return;
  229. }
  230. return logFatal(qstr("av_seek_frame"), error);
  231. }
  232. std::variant<FFmpeg::Packet, FFmpeg::AvErrorWrap> File::Context::readPacket() {
  233. auto error = FFmpeg::AvErrorWrap();
  234. auto result = FFmpeg::Packet();
  235. error = av_read_frame(_format.get(), &result.fields());
  236. if (unroll()) {
  237. return FFmpeg::AvErrorWrap();
  238. } else if (!error) {
  239. return result;
  240. } else if (error.code() != AVERROR_EOF) {
  241. logFatal(qstr("av_read_frame"), error);
  242. }
  243. return error;
  244. }
  245. void File::Context::start(StartOptions options) {
  246. Expects(options.seekable || !options.position);
  247. auto error = FFmpeg::AvErrorWrap();
  248. if (unroll()) {
  249. return;
  250. }
  251. auto format = FFmpeg::MakeFormatPointer(
  252. static_cast<void*>(this),
  253. &Context::Read,
  254. nullptr,
  255. options.seekable ? &Context::Seek : nullptr);
  256. if (!format) {
  257. return fail(Error::OpenFailed);
  258. }
  259. if ((error = avformat_find_stream_info(format.get(), nullptr))) {
  260. return logFatal(qstr("avformat_find_stream_info"), error);
  261. }
  262. const auto mode = _delegate->fileOpenMode();
  263. auto video = initStream(
  264. format.get(),
  265. AVMEDIA_TYPE_VIDEO,
  266. mode,
  267. options);
  268. if (unroll()) {
  269. return;
  270. }
  271. auto audio = initStream(
  272. format.get(),
  273. AVMEDIA_TYPE_AUDIO,
  274. mode,
  275. options);
  276. if (unroll()) {
  277. return;
  278. }
  279. _reader->headerDone();
  280. if (_reader->isRemoteLoader()) {
  281. sendFullInCache(true);
  282. }
  283. if (options.seekable && (video.codec || audio.codec)) {
  284. seekToPosition(
  285. format.get(),
  286. video.codec ? video : audio,
  287. options.position);
  288. }
  289. if (unroll()) {
  290. return;
  291. }
  292. if (video.codec) {
  293. _queuedPackets[video.index].reserve(kMaxQueuedPackets);
  294. }
  295. if (audio.codec) {
  296. _queuedPackets[audio.index].reserve(kMaxQueuedPackets);
  297. }
  298. const auto header = _reader->headerSize();
  299. if (!_delegate->fileReady(header, std::move(video), std::move(audio))) {
  300. return fail(Error::OpenFailed);
  301. }
  302. _format = std::move(format);
  303. }
  304. void File::Context::sendFullInCache(bool force) {
  305. const auto started = _fullInCache.has_value();
  306. if (force || started) {
  307. const auto nowFullInCache = _reader->fullInCache();
  308. if (!started || *_fullInCache != nowFullInCache) {
  309. _fullInCache = nowFullInCache;
  310. _delegate->fileFullInCache(nowFullInCache);
  311. }
  312. }
  313. }
  314. void File::Context::readNextPacket() {
  315. auto result = readPacket();
  316. if (unroll()) {
  317. return;
  318. } else if (const auto packet = std::get_if<FFmpeg::Packet>(&result)) {
  319. const auto index = packet->fields().stream_index;
  320. const auto i = _queuedPackets.find(index);
  321. if (i == end(_queuedPackets)) {
  322. return;
  323. }
  324. i->second.push_back(std::move(*packet));
  325. if (i->second.size() == kMaxQueuedPackets) {
  326. processQueuedPackets(SleepPolicy::Allowed);
  327. }
  328. Assert(i->second.size() < kMaxQueuedPackets);
  329. } else {
  330. // Still trying to read by drain.
  331. Assert(v::is<FFmpeg::AvErrorWrap>(result));
  332. Assert(v::get<FFmpeg::AvErrorWrap>(result).code() == AVERROR_EOF);
  333. processQueuedPackets(SleepPolicy::Allowed);
  334. if (!finished()) {
  335. handleEndOfFile();
  336. }
  337. }
  338. }
  339. void File::Context::handleEndOfFile() {
  340. _delegate->fileProcessEndOfFile();
  341. if (_delegate->fileReadMore()) {
  342. _readTillEnd = false;
  343. auto error = FFmpeg::AvErrorWrap(av_seek_frame(
  344. _format.get(),
  345. -1, // stream_index
  346. 0, // timestamp
  347. AVSEEK_FLAG_BACKWARD));
  348. if (error) {
  349. logFatal(qstr("av_seek_frame"));
  350. }
  351. // If we loaded a file till the end then we think it is fully cached,
  352. // assume we finished loading and don't want to keep all other
  353. // download tasks throttled because of an active streaming.
  354. _reader->tryRemoveLoaderAsync();
  355. } else {
  356. _readTillEnd = true;
  357. }
  358. }
  359. void File::Context::processQueuedPackets(SleepPolicy policy) {
  360. const auto more = _delegate->fileProcessPackets(_queuedPackets);
  361. if (!more && policy == SleepPolicy::Allowed) {
  362. do {
  363. _reader->startSleep(&_semaphore);
  364. _semaphore.acquire();
  365. _reader->stopSleep();
  366. } while (!unroll() && !_delegate->fileReadMore());
  367. }
  368. }
  369. void File::Context::interrupt() {
  370. _interrupted = true;
  371. _semaphore.release();
  372. }
  373. void File::Context::wake() {
  374. _semaphore.release();
  375. }
  376. bool File::Context::interrupted() const {
  377. return _interrupted;
  378. }
  379. bool File::Context::failed() const {
  380. return _failed;
  381. }
  382. bool File::Context::unroll() const {
  383. return failed() || interrupted();
  384. }
  385. void File::Context::fail(Error error) {
  386. _failed = true;
  387. _delegate->fileError(error);
  388. }
  389. bool File::Context::finished() const {
  390. return unroll() || _readTillEnd;
  391. }
  392. void File::Context::stopStreamingAsync() {
  393. // If we finished loading we don't want to keep all other
  394. // download tasks throttled because of an active streaming.
  395. _reader->stopStreamingAsync();
  396. }
  397. File::File(std::shared_ptr<Reader> reader)
  398. : _reader(std::move(reader)) {
  399. }
  400. void File::start(not_null<FileDelegate*> delegate, StartOptions options) {
  401. stop(true);
  402. _reader->startStreaming();
  403. _context.emplace(delegate, _reader.get());
  404. _thread = std::thread([=, context = &*_context] {
  405. crl::toggle_fp_exceptions(true);
  406. context->start(options);
  407. while (!context->finished()) {
  408. context->readNextPacket();
  409. }
  410. if (!context->interrupted()) {
  411. context->stopStreamingAsync();
  412. }
  413. });
  414. }
  415. void File::wake() {
  416. Expects(_context.has_value());
  417. _context->wake();
  418. }
  419. void File::stop(bool stillActive) {
  420. if (_thread.joinable()) {
  421. _context->interrupt();
  422. _thread.join();
  423. }
  424. _reader->stopStreaming(stillActive);
  425. _context.reset();
  426. }
  427. bool File::isRemoteLoader() const {
  428. return _reader->isRemoteLoader();
  429. }
  430. void File::setLoaderPriority(int priority) {
  431. _reader->setLoaderPriority(priority);
  432. }
  433. int64 File::size() const {
  434. return _reader->size();
  435. }
  436. rpl::producer<SpeedEstimate> File::speedEstimate() const {
  437. return _reader->speedEstimate();
  438. }
  439. File::~File() {
  440. stop();
  441. }
  442. } // namespace Streaming
  443. } // namespace Media