MessageThread.cpp 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. //
  2. // Created by Grishka on 17.06.2018.
  3. //
  4. #include <assert.h>
  5. #include <time.h>
  6. #include <math.h>
  7. #include <float.h>
  8. #include <stdint.h>
  9. #include "MessageThread.h"
  10. #ifndef TGVOIP_WIN32_THREADING
  11. #include <sys/time.h>
  12. #endif
  13. #include "VoIPController.h"
  14. #include "logging.h"
  15. using namespace tgvoip;
  16. MessageThread::MessageThread() : Thread(std::bind(&MessageThread::Run, this)){
  17. SetName("MessageThread");
  18. #ifdef TGVOIP_WIN32_THREADING
  19. #if !defined(WINAPI_FAMILY) || WINAPI_FAMILY!=WINAPI_FAMILY_PHONE_APP
  20. event=CreateEvent(NULL, false, false, NULL);
  21. #else
  22. event=CreateEventEx(NULL, NULL, 0, EVENT_ALL_ACCESS);
  23. #endif
  24. #else
  25. pthread_cond_init(&cond, NULL);
  26. #endif
  27. }
  28. MessageThread::~MessageThread(){
  29. Stop();
  30. #ifdef TGVOIP_WIN32_THREADING
  31. CloseHandle(event);
  32. #else
  33. pthread_cond_destroy(&cond);
  34. #endif
  35. }
  36. void MessageThread::Stop(){
  37. if(running){
  38. running=false;
  39. #ifdef TGVOIP_WIN32_THREADING
  40. SetEvent(event);
  41. #else
  42. pthread_cond_signal(&cond);
  43. #endif
  44. Join();
  45. }
  46. }
  47. void MessageThread::Run(){
  48. queueMutex.Lock();
  49. while(running){
  50. double currentTime=VoIPController::GetCurrentTime();
  51. double waitTimeout=queue.empty() ? DBL_MAX : (queue[0].deliverAt-currentTime);
  52. //LOGW("MessageThread wait timeout %f", waitTimeout);
  53. if(waitTimeout>0.0){
  54. #ifdef TGVOIP_WIN32_THREADING
  55. queueMutex.Unlock();
  56. DWORD actualWaitTimeout=waitTimeout==DBL_MAX ? INFINITE : ((DWORD)round(waitTimeout*1000.0));
  57. #if !defined(WINAPI_FAMILY) || WINAPI_FAMILY!=WINAPI_FAMILY_PHONE_APP
  58. WaitForSingleObject(event, actualWaitTimeout);
  59. #else
  60. WaitForSingleObjectEx(event, actualWaitTimeout, false);
  61. #endif
  62. // we don't really care if a context switch happens here and anything gets added to the queue by another thread
  63. // since any new no-delay messages will get delivered on this iteration anyway
  64. queueMutex.Lock();
  65. #else
  66. if(waitTimeout!=DBL_MAX){
  67. struct timeval now;
  68. struct timespec timeout;
  69. gettimeofday(&now, NULL);
  70. waitTimeout+=now.tv_sec;
  71. waitTimeout+=(now.tv_usec/1000000.0);
  72. timeout.tv_sec=(time_t)(floor(waitTimeout));
  73. timeout.tv_nsec=(long)((waitTimeout-floor(waitTimeout))*1000000000.0);
  74. pthread_cond_timedwait(&cond, queueMutex.NativeHandle(), &timeout);
  75. }else{
  76. pthread_cond_wait(&cond, queueMutex.NativeHandle());
  77. }
  78. #endif
  79. }
  80. if(!running){
  81. queueMutex.Unlock();
  82. return;
  83. }
  84. currentTime=VoIPController::GetCurrentTime();
  85. std::vector<Message> msgsToDeliverNow;
  86. for(std::vector<Message>::iterator m=queue.begin();m!=queue.end();){
  87. if(m->deliverAt==0.0 || currentTime>=m->deliverAt){
  88. msgsToDeliverNow.push_back(*m);
  89. m=queue.erase(m);
  90. continue;
  91. }
  92. ++m;
  93. }
  94. for(Message& m:msgsToDeliverNow){
  95. //LOGI("MessageThread delivering %u", m.msg);
  96. cancelCurrent=false;
  97. if(m.deliverAt==0.0)
  98. m.deliverAt=VoIPController::GetCurrentTime();
  99. if(m.func!=nullptr){
  100. m.func();
  101. }
  102. if(!cancelCurrent && m.interval>0.0){
  103. m.deliverAt+=m.interval;
  104. InsertMessageInternal(m);
  105. }
  106. }
  107. }
  108. queueMutex.Unlock();
  109. }
  110. uint32_t MessageThread::Post(std::function<void()> func, double delay, double interval){
  111. assert(delay>=0);
  112. //LOGI("MessageThread post [function] delay %f", delay);
  113. if(!IsCurrent()){
  114. queueMutex.Lock();
  115. }
  116. double currentTime=VoIPController::GetCurrentTime();
  117. Message m{lastMessageID++, delay==0.0 ? 0.0 : (currentTime+delay), interval, func};
  118. InsertMessageInternal(m);
  119. if(!IsCurrent()){
  120. #ifdef TGVOIP_WIN32_THREADING
  121. SetEvent(event);
  122. #else
  123. pthread_cond_signal(&cond);
  124. #endif
  125. queueMutex.Unlock();
  126. }
  127. return m.id;
  128. }
  129. void MessageThread::InsertMessageInternal(MessageThread::Message &m){
  130. if(queue.empty()){
  131. queue.push_back(m);
  132. }else{
  133. if(queue[0].deliverAt>m.deliverAt){
  134. queue.insert(queue.begin(), m);
  135. }else{
  136. std::vector<Message>::iterator insertAfter=queue.begin();
  137. for(; insertAfter!=queue.end(); ++insertAfter){
  138. std::vector<Message>::iterator next=std::next(insertAfter);
  139. if(next==queue.end() || (next->deliverAt>m.deliverAt && insertAfter->deliverAt<=m.deliverAt)){
  140. queue.insert(next, m);
  141. break;
  142. }
  143. }
  144. }
  145. }
  146. }
  147. void MessageThread::Cancel(uint32_t id){
  148. if(!IsCurrent()){
  149. queueMutex.Lock();
  150. }
  151. for(std::vector<Message>::iterator m=queue.begin();m!=queue.end();){
  152. if(m->id==id){
  153. m=queue.erase(m);
  154. }else{
  155. ++m;
  156. }
  157. }
  158. if(!IsCurrent()){
  159. queueMutex.Unlock();
  160. }
  161. }
  162. void MessageThread::CancelSelf(){
  163. assert(IsCurrent());
  164. cancelCurrent=true;
  165. }