| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184 |
- //
- // Created by Grishka on 17.06.2018.
- //
- #include <assert.h>
- #include <time.h>
- #include <math.h>
- #include <float.h>
- #include <stdint.h>
- #include "MessageThread.h"
- #ifndef TGVOIP_WIN32_THREADING
- #include <sys/time.h>
- #endif
- #include "VoIPController.h"
- #include "logging.h"
- using namespace tgvoip;
- MessageThread::MessageThread() : Thread(std::bind(&MessageThread::Run, this)){
- SetName("MessageThread");
- #ifdef TGVOIP_WIN32_THREADING
- #if !defined(WINAPI_FAMILY) || WINAPI_FAMILY!=WINAPI_FAMILY_PHONE_APP
- event=CreateEvent(NULL, false, false, NULL);
- #else
- event=CreateEventEx(NULL, NULL, 0, EVENT_ALL_ACCESS);
- #endif
- #else
- pthread_cond_init(&cond, NULL);
- #endif
- }
- MessageThread::~MessageThread(){
- Stop();
- #ifdef TGVOIP_WIN32_THREADING
- CloseHandle(event);
- #else
- pthread_cond_destroy(&cond);
- #endif
- }
- void MessageThread::Stop(){
- if(running){
- running=false;
- #ifdef TGVOIP_WIN32_THREADING
- SetEvent(event);
- #else
- pthread_cond_signal(&cond);
- #endif
- Join();
- }
- }
- void MessageThread::Run(){
- queueMutex.Lock();
- while(running){
- double currentTime=VoIPController::GetCurrentTime();
- double waitTimeout=queue.empty() ? DBL_MAX : (queue[0].deliverAt-currentTime);
- //LOGW("MessageThread wait timeout %f", waitTimeout);
- if(waitTimeout>0.0){
- #ifdef TGVOIP_WIN32_THREADING
- queueMutex.Unlock();
- DWORD actualWaitTimeout=waitTimeout==DBL_MAX ? INFINITE : ((DWORD)round(waitTimeout*1000.0));
- #if !defined(WINAPI_FAMILY) || WINAPI_FAMILY!=WINAPI_FAMILY_PHONE_APP
- WaitForSingleObject(event, actualWaitTimeout);
- #else
- WaitForSingleObjectEx(event, actualWaitTimeout, false);
- #endif
- // we don't really care if a context switch happens here and anything gets added to the queue by another thread
- // since any new no-delay messages will get delivered on this iteration anyway
- queueMutex.Lock();
- #else
- if(waitTimeout!=DBL_MAX){
- struct timeval now;
- struct timespec timeout;
- gettimeofday(&now, NULL);
- waitTimeout+=now.tv_sec;
- waitTimeout+=(now.tv_usec/1000000.0);
- timeout.tv_sec=(time_t)(floor(waitTimeout));
- timeout.tv_nsec=(long)((waitTimeout-floor(waitTimeout))*1000000000.0);
- pthread_cond_timedwait(&cond, queueMutex.NativeHandle(), &timeout);
- }else{
- pthread_cond_wait(&cond, queueMutex.NativeHandle());
- }
- #endif
- }
- if(!running){
- queueMutex.Unlock();
- return;
- }
- currentTime=VoIPController::GetCurrentTime();
- std::vector<Message> msgsToDeliverNow;
- for(std::vector<Message>::iterator m=queue.begin();m!=queue.end();){
- if(m->deliverAt==0.0 || currentTime>=m->deliverAt){
- msgsToDeliverNow.push_back(*m);
- m=queue.erase(m);
- continue;
- }
- ++m;
- }
- for(Message& m:msgsToDeliverNow){
- //LOGI("MessageThread delivering %u", m.msg);
- cancelCurrent=false;
- if(m.deliverAt==0.0)
- m.deliverAt=VoIPController::GetCurrentTime();
- if(m.func!=nullptr){
- m.func();
- }
- if(!cancelCurrent && m.interval>0.0){
- m.deliverAt+=m.interval;
- InsertMessageInternal(m);
- }
- }
- }
- queueMutex.Unlock();
- }
- uint32_t MessageThread::Post(std::function<void()> func, double delay, double interval){
- assert(delay>=0);
- //LOGI("MessageThread post [function] delay %f", delay);
- if(!IsCurrent()){
- queueMutex.Lock();
- }
- double currentTime=VoIPController::GetCurrentTime();
- Message m{lastMessageID++, delay==0.0 ? 0.0 : (currentTime+delay), interval, func};
- InsertMessageInternal(m);
- if(!IsCurrent()){
- #ifdef TGVOIP_WIN32_THREADING
- SetEvent(event);
- #else
- pthread_cond_signal(&cond);
- #endif
- queueMutex.Unlock();
- }
- return m.id;
- }
- void MessageThread::InsertMessageInternal(MessageThread::Message &m){
- if(queue.empty()){
- queue.push_back(m);
- }else{
- if(queue[0].deliverAt>m.deliverAt){
- queue.insert(queue.begin(), m);
- }else{
- std::vector<Message>::iterator insertAfter=queue.begin();
- for(; insertAfter!=queue.end(); ++insertAfter){
- std::vector<Message>::iterator next=std::next(insertAfter);
- if(next==queue.end() || (next->deliverAt>m.deliverAt && insertAfter->deliverAt<=m.deliverAt)){
- queue.insert(next, m);
- break;
- }
- }
- }
- }
- }
- void MessageThread::Cancel(uint32_t id){
- if(!IsCurrent()){
- queueMutex.Lock();
- }
- for(std::vector<Message>::iterator m=queue.begin();m!=queue.end();){
- if(m->id==id){
- m=queue.erase(m);
- }else{
- ++m;
- }
- }
- if(!IsCurrent()){
- queueMutex.Unlock();
- }
- }
- void MessageThread::CancelSelf(){
- assert(IsCurrent());
- cancelCurrent=true;
- }
|