| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506 |
- /*
- This file is part of Telegram Desktop,
- the official desktop application for the Telegram messaging service.
- For license and copyright information please follow this link:
- https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
- */
- #include "media/streaming/media_streaming_file.h"
- #include "media/streaming/media_streaming_loader.h"
- #include "media/streaming/media_streaming_file_delegate.h"
- #include "ffmpeg/ffmpeg_utility.h"
- namespace Media {
- namespace Streaming {
- namespace {
- constexpr auto kMaxSingleReadAmount = 8 * 1024 * 1024;
- constexpr auto kMaxQueuedPackets = 1024;
- [[nodiscard]] bool UnreliableFormatDuration(
- not_null<AVFormatContext*> format,
- not_null<AVStream*> stream,
- Mode mode) {
- return (mode == Mode::Video || mode == Mode::Inspection)
- && stream->codecpar
- && (stream->codecpar->codec_id == AV_CODEC_ID_VP9)
- && format->iformat
- && format->iformat->name
- && QString::fromLatin1(
- format->iformat->name
- ).split(QChar(',')).contains(u"webm");
- }
- } // namespace
- File::Context::Context(
- not_null<FileDelegate*> delegate,
- not_null<Reader*> reader)
- : _delegate(delegate)
- , _reader(reader)
- , _size(reader->size()) {
- }
- File::Context::~Context() = default;
- int File::Context::Read(void *opaque, uint8_t *buffer, int bufferSize) {
- return static_cast<Context*>(opaque)->read(
- bytes::make_span(buffer, bufferSize));
- }
- int64_t File::Context::Seek(void *opaque, int64_t offset, int whence) {
- return static_cast<Context*>(opaque)->seek(offset, whence);
- }
- int File::Context::read(bytes::span buffer) {
- Expects(_size >= _offset);
- const auto amount = std::min(_size - _offset, int64(buffer.size()));
- if (unroll()) {
- return AVERROR_EXTERNAL;
- } else if (amount > kMaxSingleReadAmount) {
- LOG(("Streaming Error: Read callback asked for too much data: %1"
- ).arg(amount));
- return AVERROR_EXTERNAL;
- } else if (!amount) {
- return AVERROR_EOF;
- }
- buffer = buffer.subspan(0, amount);
- while (true) {
- const auto result = _reader->fill(_offset, buffer, &_semaphore);
- if (result == Reader::FillState::Success) {
- break;
- } else if (result == Reader::FillState::WaitingRemote) {
- // Perhaps for the correct sleeping in case of enough packets
- // being read already we require SleepPolicy::Allowed here.
- // Otherwise if we wait for the remote frequently and
- // _queuedPackets never get to kMaxQueuedPackets and we don't call
- // processQueuedPackets(SleepPolicy::Allowed) ever.
- //
- // But right now we can't simply pass SleepPolicy::Allowed here,
- // it freezes because of two _semaphore.acquire one after another.
- processQueuedPackets(SleepPolicy::Disallowed);
- _delegate->fileWaitingForData();
- }
- _semaphore.acquire();
- if (_interrupted) {
- return AVERROR_EXTERNAL;
- } else if (const auto error = _reader->streamingError()) {
- fail(*error);
- return AVERROR_EXTERNAL;
- }
- }
- sendFullInCache();
- _offset += amount;
- return amount;
- }
- int64_t File::Context::seek(int64_t offset, int whence) {
- const auto checkedSeek = [&](int64_t offset) {
- if (_failed || offset < 0 || offset > _size) {
- return int64(-1);
- }
- return (_offset = offset);
- };
- switch (whence) {
- case SEEK_SET: return checkedSeek(offset);
- case SEEK_CUR: return checkedSeek(_offset + offset);
- case SEEK_END: return checkedSeek(_size + offset);
- case AVSEEK_SIZE: return _size;
- }
- return -1;
- }
- void File::Context::logError(QLatin1String method) {
- if (!unroll()) {
- FFmpeg::LogError(method);
- }
- }
- void File::Context::logError(
- QLatin1String method,
- FFmpeg::AvErrorWrap error) {
- if (!unroll()) {
- FFmpeg::LogError(method, error);
- }
- }
- void File::Context::logFatal(QLatin1String method) {
- if (!unroll()) {
- FFmpeg::LogError(method);
- fail(_format ? Error::InvalidData : Error::OpenFailed);
- }
- }
- void File::Context::logFatal(
- QLatin1String method,
- FFmpeg::AvErrorWrap error) {
- if (!unroll()) {
- FFmpeg::LogError(method, error);
- fail(_format ? Error::InvalidData : Error::OpenFailed);
- }
- }
- Stream File::Context::initStream(
- not_null<AVFormatContext*> format,
- AVMediaType type,
- Mode mode,
- StartOptions options) {
- auto result = Stream();
- const auto index = result.index = av_find_best_stream(
- format,
- type,
- -1,
- -1,
- nullptr,
- 0);
- if (index < 0) {
- return {};
- }
- const auto info = format->streams[index];
- if (type == AVMEDIA_TYPE_VIDEO) {
- if (info->disposition & AV_DISPOSITION_ATTACHED_PIC) {
- // ignore cover streams
- return Stream();
- }
- result.codec = FFmpeg::MakeCodecPointer({
- .stream = info,
- .hwAllowed = options.hwAllow,
- });
- if (!result.codec) {
- return result;
- }
- result.rotation = FFmpeg::ReadRotationFromMetadata(info);
- result.aspect = FFmpeg::ValidateAspectRatio(
- info->sample_aspect_ratio);
- } else if (type == AVMEDIA_TYPE_AUDIO) {
- result.frequency = info->codecpar->sample_rate;
- if (!result.frequency) {
- return result;
- }
- result.codec = FFmpeg::MakeCodecPointer({ .stream = info });
- if (!result.codec) {
- return result;
- }
- }
- result.decodedFrame = FFmpeg::MakeFramePointer();
- if (!result.decodedFrame) {
- result.codec = nullptr;
- return result;
- }
- result.timeBase = info->time_base;
- result.duration = options.durationOverride
- ? options.durationOverride
- : (info->duration != AV_NOPTS_VALUE)
- ? FFmpeg::PtsToTime(info->duration, result.timeBase)
- : UnreliableFormatDuration(format, info, mode)
- ? kTimeUnknown
- : FFmpeg::PtsToTime(format->duration, FFmpeg::kUniversalTimeBase);
- if (result.duration == kTimeUnknown) {
- result.duration = kDurationUnavailable;
- } else if (result.duration <= 0) {
- result.codec = nullptr;
- } else {
- ++result.duration;
- if (result.duration > kDurationMax) {
- result.duration = 0;
- result.codec = nullptr;
- }
- }
- return result;
- }
- void File::Context::seekToPosition(
- not_null<AVFormatContext*> format,
- const Stream &stream,
- crl::time position) {
- auto error = FFmpeg::AvErrorWrap();
- if (!position) {
- return;
- } else if (stream.duration == kDurationUnavailable) {
- // Seek in files with unknown duration is not supported.
- return;
- }
- //
- // Non backward search reads the whole file if the position is after
- // the last keyframe inside the index. So we search only backward.
- //
- //const auto seekFlags = 0;
- //error = av_seek_frame(
- // format,
- // streamIndex,
- // TimeToPts(position, kUniversalTimeBase),
- // seekFlags);
- //if (!error) {
- // return;
- //}
- //
- error = av_seek_frame(
- format,
- stream.index,
- FFmpeg::TimeToPts(
- std::clamp(position, crl::time(0), stream.duration - 1),
- stream.timeBase),
- AVSEEK_FLAG_BACKWARD);
- if (!error) {
- return;
- }
- return logFatal(qstr("av_seek_frame"), error);
- }
- std::variant<FFmpeg::Packet, FFmpeg::AvErrorWrap> File::Context::readPacket() {
- auto error = FFmpeg::AvErrorWrap();
- auto result = FFmpeg::Packet();
- error = av_read_frame(_format.get(), &result.fields());
- if (unroll()) {
- return FFmpeg::AvErrorWrap();
- } else if (!error) {
- return result;
- } else if (error.code() != AVERROR_EOF) {
- logFatal(qstr("av_read_frame"), error);
- }
- return error;
- }
- void File::Context::start(StartOptions options) {
- Expects(options.seekable || !options.position);
- auto error = FFmpeg::AvErrorWrap();
- if (unroll()) {
- return;
- }
- auto format = FFmpeg::MakeFormatPointer(
- static_cast<void*>(this),
- &Context::Read,
- nullptr,
- options.seekable ? &Context::Seek : nullptr);
- if (!format) {
- return fail(Error::OpenFailed);
- }
- if ((error = avformat_find_stream_info(format.get(), nullptr))) {
- return logFatal(qstr("avformat_find_stream_info"), error);
- }
- const auto mode = _delegate->fileOpenMode();
- auto video = initStream(
- format.get(),
- AVMEDIA_TYPE_VIDEO,
- mode,
- options);
- if (unroll()) {
- return;
- }
- auto audio = initStream(
- format.get(),
- AVMEDIA_TYPE_AUDIO,
- mode,
- options);
- if (unroll()) {
- return;
- }
- _reader->headerDone();
- if (_reader->isRemoteLoader()) {
- sendFullInCache(true);
- }
- if (options.seekable && (video.codec || audio.codec)) {
- seekToPosition(
- format.get(),
- video.codec ? video : audio,
- options.position);
- }
- if (unroll()) {
- return;
- }
- if (video.codec) {
- _queuedPackets[video.index].reserve(kMaxQueuedPackets);
- }
- if (audio.codec) {
- _queuedPackets[audio.index].reserve(kMaxQueuedPackets);
- }
- const auto header = _reader->headerSize();
- if (!_delegate->fileReady(header, std::move(video), std::move(audio))) {
- return fail(Error::OpenFailed);
- }
- _format = std::move(format);
- }
- void File::Context::sendFullInCache(bool force) {
- const auto started = _fullInCache.has_value();
- if (force || started) {
- const auto nowFullInCache = _reader->fullInCache();
- if (!started || *_fullInCache != nowFullInCache) {
- _fullInCache = nowFullInCache;
- _delegate->fileFullInCache(nowFullInCache);
- }
- }
- }
- void File::Context::readNextPacket() {
- auto result = readPacket();
- if (unroll()) {
- return;
- } else if (const auto packet = std::get_if<FFmpeg::Packet>(&result)) {
- const auto index = packet->fields().stream_index;
- const auto i = _queuedPackets.find(index);
- if (i == end(_queuedPackets)) {
- return;
- }
- i->second.push_back(std::move(*packet));
- if (i->second.size() == kMaxQueuedPackets) {
- processQueuedPackets(SleepPolicy::Allowed);
- }
- Assert(i->second.size() < kMaxQueuedPackets);
- } else {
- // Still trying to read by drain.
- Assert(v::is<FFmpeg::AvErrorWrap>(result));
- Assert(v::get<FFmpeg::AvErrorWrap>(result).code() == AVERROR_EOF);
- processQueuedPackets(SleepPolicy::Allowed);
- if (!finished()) {
- handleEndOfFile();
- }
- }
- }
- void File::Context::handleEndOfFile() {
- _delegate->fileProcessEndOfFile();
- if (_delegate->fileReadMore()) {
- _readTillEnd = false;
- auto error = FFmpeg::AvErrorWrap(av_seek_frame(
- _format.get(),
- -1, // stream_index
- 0, // timestamp
- AVSEEK_FLAG_BACKWARD));
- if (error) {
- logFatal(qstr("av_seek_frame"));
- }
- // If we loaded a file till the end then we think it is fully cached,
- // assume we finished loading and don't want to keep all other
- // download tasks throttled because of an active streaming.
- _reader->tryRemoveLoaderAsync();
- } else {
- _readTillEnd = true;
- }
- }
- void File::Context::processQueuedPackets(SleepPolicy policy) {
- const auto more = _delegate->fileProcessPackets(_queuedPackets);
- if (!more && policy == SleepPolicy::Allowed) {
- do {
- _reader->startSleep(&_semaphore);
- _semaphore.acquire();
- _reader->stopSleep();
- } while (!unroll() && !_delegate->fileReadMore());
- }
- }
- void File::Context::interrupt() {
- _interrupted = true;
- _semaphore.release();
- }
- void File::Context::wake() {
- _semaphore.release();
- }
- bool File::Context::interrupted() const {
- return _interrupted;
- }
- bool File::Context::failed() const {
- return _failed;
- }
- bool File::Context::unroll() const {
- return failed() || interrupted();
- }
- void File::Context::fail(Error error) {
- _failed = true;
- _delegate->fileError(error);
- }
- bool File::Context::finished() const {
- return unroll() || _readTillEnd;
- }
- void File::Context::stopStreamingAsync() {
- // If we finished loading we don't want to keep all other
- // download tasks throttled because of an active streaming.
- _reader->stopStreamingAsync();
- }
- File::File(std::shared_ptr<Reader> reader)
- : _reader(std::move(reader)) {
- }
- void File::start(not_null<FileDelegate*> delegate, StartOptions options) {
- stop(true);
- _reader->startStreaming();
- _context.emplace(delegate, _reader.get());
- _thread = std::thread([=, context = &*_context] {
- crl::toggle_fp_exceptions(true);
- context->start(options);
- while (!context->finished()) {
- context->readNextPacket();
- }
- if (!context->interrupted()) {
- context->stopStreamingAsync();
- }
- });
- }
- void File::wake() {
- Expects(_context.has_value());
- _context->wake();
- }
- void File::stop(bool stillActive) {
- if (_thread.joinable()) {
- _context->interrupt();
- _thread.join();
- }
- _reader->stopStreaming(stillActive);
- _context.reset();
- }
- bool File::isRemoteLoader() const {
- return _reader->isRemoteLoader();
- }
- void File::setLoaderPriority(int priority) {
- _reader->setLoaderPriority(priority);
- }
- int64 File::size() const {
- return _reader->size();
- }
- rpl::producer<SpeedEstimate> File::speedEstimate() const {
- return _reader->speedEstimate();
- }
- File::~File() {
- stop();
- }
- } // namespace Streaming
- } // namespace Media
|