JitterBuffer.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462
  1. //
  2. // libtgvoip is free and unencumbered public domain software.
  3. // For more information, see http://unlicense.org or the UNLICENSE file
  4. // you should have received with this source code distribution.
  5. //
  6. #include "VoIPController.h"
  7. #include "JitterBuffer.h"
  8. #include "logging.h"
  9. #include "VoIPServerConfig.h"
  10. #include <math.h>
  11. using namespace tgvoip;
  12. JitterBuffer::JitterBuffer(MediaStreamItf *out, uint32_t step):bufferPool(JITTER_SLOT_SIZE, JITTER_SLOT_COUNT){
  13. if(out)
  14. out->SetCallback(JitterBuffer::CallbackOut, this);
  15. this->step=step;
  16. if(step<30){
  17. minMinDelay=(uint32_t) ServerConfig::GetSharedInstance()->GetInt("jitter_min_delay_20", 6);
  18. maxMinDelay=(uint32_t) ServerConfig::GetSharedInstance()->GetInt("jitter_max_delay_20", 25);
  19. maxUsedSlots=(uint32_t) ServerConfig::GetSharedInstance()->GetInt("jitter_max_slots_20", 50);
  20. }else if(step<50){
  21. minMinDelay=(uint32_t) ServerConfig::GetSharedInstance()->GetInt("jitter_min_delay_40", 4);
  22. maxMinDelay=(uint32_t) ServerConfig::GetSharedInstance()->GetInt("jitter_max_delay_40", 15);
  23. maxUsedSlots=(uint32_t) ServerConfig::GetSharedInstance()->GetInt("jitter_max_slots_40", 30);
  24. }else{
  25. minMinDelay=(uint32_t) ServerConfig::GetSharedInstance()->GetInt("jitter_min_delay_60", 2);
  26. maxMinDelay=(uint32_t) ServerConfig::GetSharedInstance()->GetInt("jitter_max_delay_60", 10);
  27. maxUsedSlots=(uint32_t) ServerConfig::GetSharedInstance()->GetInt("jitter_max_slots_60", 20);
  28. }
  29. lossesToReset=(uint32_t) ServerConfig::GetSharedInstance()->GetInt("jitter_losses_to_reset", 20);
  30. resyncThreshold=ServerConfig::GetSharedInstance()->GetDouble("jitter_resync_threshold", 1.0);
  31. #ifdef TGVOIP_DUMP_JITTER_STATS
  32. #ifdef TGVOIP_JITTER_DUMP_FILE
  33. dump=fopen(TGVOIP_JITTER_DUMP_FILE, "w");
  34. #elif defined(__ANDROID__)
  35. dump=fopen("/sdcard/tgvoip_jitter_dump.txt", "w");
  36. #else
  37. dump=fopen("tgvoip_jitter_dump.txt", "w");
  38. #endif
  39. tgvoip_log_file_write_header(dump);
  40. fprintf(dump, "PTS\tRTS\tNumInBuf\tAJitter\tADelay\tTDelay\n");
  41. #endif
  42. Reset();
  43. }
  44. JitterBuffer::~JitterBuffer(){
  45. Reset();
  46. }
  47. void JitterBuffer::SetMinPacketCount(uint32_t count){
  48. LOGI("jitter: set min packet count %u", count);
  49. minDelay=count;
  50. minMinDelay=count;
  51. //Reset();
  52. }
  53. int JitterBuffer::GetMinPacketCount(){
  54. return (int)minDelay;
  55. }
  56. size_t JitterBuffer::CallbackIn(unsigned char *data, size_t len, void *param){
  57. //((JitterBuffer*)param)->HandleInput(data, len);
  58. return 0;
  59. }
  60. size_t JitterBuffer::CallbackOut(unsigned char *data, size_t len, void *param){
  61. return 0; //((JitterBuffer*)param)->HandleOutput(data, len, 0, NULL);
  62. }
  63. void JitterBuffer::HandleInput(unsigned char *data, size_t len, uint32_t timestamp, bool isEC){
  64. MutexGuard m(mutex);
  65. jitter_packet_t pkt;
  66. pkt.size=len;
  67. pkt.buffer=data;
  68. pkt.timestamp=timestamp;
  69. pkt.isEC=isEC;
  70. PutInternal(&pkt, !isEC);
  71. //LOGV("in, ts=%d, ec=%d", timestamp, isEC);
  72. }
  73. void JitterBuffer::Reset(){
  74. wasReset=true;
  75. needBuffering=true;
  76. lastPutTimestamp=0;
  77. int i;
  78. for(i=0;i<JITTER_SLOT_COUNT;i++){
  79. if(slots[i].buffer){
  80. bufferPool.Reuse(slots[i].buffer);
  81. slots[i].buffer=NULL;
  82. }
  83. }
  84. delayHistory.Reset();
  85. lateHistory.Reset();
  86. adjustingDelay=false;
  87. lostSinceReset=0;
  88. gotSinceReset=0;
  89. expectNextAtTime=0;
  90. deviationHistory.Reset();
  91. outstandingDelayChange=0;
  92. dontChangeDelay=0;
  93. }
  94. size_t JitterBuffer::HandleOutput(unsigned char *buffer, size_t len, int offsetInSteps, bool advance, int& playbackScaledDuration, bool& isEC){
  95. jitter_packet_t pkt;
  96. pkt.buffer=buffer;
  97. pkt.size=len;
  98. MutexGuard m(mutex);
  99. if(first){
  100. first=false;
  101. unsigned int delay=GetCurrentDelay();
  102. if(GetCurrentDelay()>5){
  103. LOGW("jitter: delay too big upon start (%u), dropping packets", delay);
  104. while(delay>GetMinPacketCount()){
  105. for(int i=0;i<JITTER_SLOT_COUNT;i++){
  106. if(slots[i].timestamp==nextTimestamp){
  107. if(slots[i].buffer){
  108. bufferPool.Reuse(slots[i].buffer);
  109. slots[i].buffer=NULL;
  110. }
  111. break;
  112. }
  113. }
  114. Advance();
  115. delay--;
  116. }
  117. }
  118. }
  119. int result=GetInternal(&pkt, offsetInSteps, advance);
  120. if(outstandingDelayChange!=0){
  121. if(outstandingDelayChange<0){
  122. playbackScaledDuration=40;
  123. outstandingDelayChange+=20;
  124. }else{
  125. playbackScaledDuration=80;
  126. outstandingDelayChange-=20;
  127. }
  128. //LOGV("outstanding delay change: %d", outstandingDelayChange);
  129. }else if(advance && GetCurrentDelay()==0){
  130. //LOGV("stretching packet because the next one is late");
  131. playbackScaledDuration=80;
  132. }else{
  133. playbackScaledDuration=60;
  134. }
  135. if(result==JR_OK){
  136. isEC=pkt.isEC;
  137. return pkt.size;
  138. }else{
  139. return 0;
  140. }
  141. }
  142. int JitterBuffer::GetInternal(jitter_packet_t* pkt, int offset, bool advance){
  143. /*if(needBuffering && lastPutTimestamp<nextTimestamp){
  144. LOGV("jitter: don't have timestamp %lld, buffering", (long long int)nextTimestamp);
  145. Advance();
  146. return JR_BUFFERING;
  147. }*/
  148. //needBuffering=false;
  149. int64_t timestampToGet=nextTimestamp+offset*(int32_t)step;
  150. int i;
  151. for(i=0;i<JITTER_SLOT_COUNT;i++){
  152. if(slots[i].buffer!=NULL && slots[i].timestamp==timestampToGet){
  153. break;
  154. }
  155. }
  156. if(i<JITTER_SLOT_COUNT){
  157. if(pkt && pkt->size<slots[i].size){
  158. LOGE("jitter: packet won't fit into provided buffer of %d (need %d)", int(slots[i].size), int(pkt->size));
  159. }else{
  160. if(pkt) {
  161. pkt->size = slots[i].size;
  162. pkt->timestamp = slots[i].timestamp;
  163. memcpy(pkt->buffer, slots[i].buffer, slots[i].size);
  164. pkt->isEC=slots[i].isEC;
  165. }
  166. }
  167. bufferPool.Reuse(slots[i].buffer);
  168. slots[i].buffer=NULL;
  169. if(offset==0)
  170. Advance();
  171. lostCount=0;
  172. needBuffering=false;
  173. return JR_OK;
  174. }
  175. LOGV("jitter: found no packet for timestamp %lld (last put = %d, lost = %d)", (long long int)timestampToGet, lastPutTimestamp, lostCount);
  176. if(advance)
  177. Advance();
  178. if(!needBuffering){
  179. lostCount++;
  180. if(offset==0){
  181. lostPackets++;
  182. lostSinceReset++;
  183. }
  184. if(lostCount>=lossesToReset || (gotSinceReset>minDelay*25 && lostSinceReset>gotSinceReset/2)){
  185. LOGW("jitter: lost %d packets in a row, resetting", lostCount);
  186. //minDelay++;
  187. dontIncMinDelay=16;
  188. dontDecMinDelay+=128;
  189. if(GetCurrentDelay()<minDelay)
  190. nextTimestamp-=(int64_t)(minDelay-GetCurrentDelay());
  191. lostCount=0;
  192. Reset();
  193. }
  194. return JR_MISSING;
  195. }
  196. return JR_BUFFERING;
  197. }
  198. void JitterBuffer::PutInternal(jitter_packet_t* pkt, bool overwriteExisting){
  199. if(pkt->size>JITTER_SLOT_SIZE){
  200. LOGE("The packet is too big to fit into the jitter buffer");
  201. return;
  202. }
  203. int i;
  204. for(i=0;i<JITTER_SLOT_COUNT;i++){
  205. if(slots[i].buffer && slots[i].timestamp==pkt->timestamp){
  206. //LOGV("Found existing packet for timestamp %u, overwrite %d", pkt->timestamp, overwriteExisting);
  207. if(overwriteExisting){
  208. memcpy(slots[i].buffer, pkt->buffer, pkt->size);
  209. slots[i].size=pkt->size;
  210. slots[i].isEC=pkt->isEC;
  211. }
  212. return;
  213. }
  214. }
  215. gotSinceReset++;
  216. if(wasReset){
  217. wasReset=false;
  218. outstandingDelayChange=0;
  219. nextTimestamp=(int64_t)(((int64_t)pkt->timestamp)-step*minDelay);
  220. first=true;
  221. LOGI("jitter: resyncing, next timestamp = %lld (step=%d, minDelay=%f)", (long long int)nextTimestamp, step, minDelay);
  222. }
  223. for(i=0;i<JITTER_SLOT_COUNT;i++){
  224. if(slots[i].buffer!=NULL){
  225. if(slots[i].timestamp<nextTimestamp-1){
  226. bufferPool.Reuse(slots[i].buffer);
  227. slots[i].buffer=NULL;
  228. }
  229. }
  230. }
  231. /*double prevTime=0;
  232. uint32_t closestTime=0;
  233. for(i=0;i<JITTER_SLOT_COUNT;i++){
  234. if(slots[i].buffer!=NULL && pkt->timestamp-slots[i].timestamp<pkt->timestamp-closestTime){
  235. closestTime=slots[i].timestamp;
  236. prevTime=slots[i].recvTime;
  237. }
  238. }*/
  239. double time=VoIPController::GetCurrentTime();
  240. if(expectNextAtTime!=0){
  241. double dev=expectNextAtTime-time;
  242. //LOGV("packet dev %f", dev);
  243. deviationHistory.Add(dev);
  244. expectNextAtTime+=step/1000.0;
  245. }else{
  246. expectNextAtTime=time+step/1000.0;
  247. }
  248. if(pkt->timestamp<nextTimestamp){
  249. //LOGW("jitter: would drop packet with timestamp %d because it is late but not hopelessly", pkt->timestamp);
  250. latePacketCount++;
  251. lostPackets--;
  252. }else if(pkt->timestamp<nextTimestamp-1){
  253. //LOGW("jitter: dropping packet with timestamp %d because it is too late", pkt->timestamp);
  254. latePacketCount++;
  255. return;
  256. }
  257. if(pkt->timestamp>lastPutTimestamp)
  258. lastPutTimestamp=pkt->timestamp;
  259. for(i=0;i<JITTER_SLOT_COUNT;i++){
  260. if(slots[i].buffer==NULL)
  261. break;
  262. }
  263. if(i==JITTER_SLOT_COUNT || GetCurrentDelay()>=maxUsedSlots){
  264. int toRemove=JITTER_SLOT_COUNT;
  265. uint32_t bestTimestamp=0xFFFFFFFF;
  266. for(i=0;i<JITTER_SLOT_COUNT;i++){
  267. if(slots[i].buffer!=NULL && slots[i].timestamp<bestTimestamp){
  268. toRemove=i;
  269. bestTimestamp=slots[i].timestamp;
  270. }
  271. }
  272. Advance();
  273. bufferPool.Reuse(slots[toRemove].buffer);
  274. slots[toRemove].buffer=NULL;
  275. i=toRemove;
  276. }
  277. slots[i].timestamp=pkt->timestamp;
  278. slots[i].size=pkt->size;
  279. slots[i].buffer=bufferPool.Get();
  280. slots[i].recvTimeDiff=time-prevRecvTime;
  281. slots[i].isEC=pkt->isEC;
  282. if(slots[i].buffer)
  283. memcpy(slots[i].buffer, pkt->buffer, pkt->size);
  284. else
  285. LOGE("WTF!!");
  286. #ifdef TGVOIP_DUMP_JITTER_STATS
  287. fprintf(dump, "%u\t%.03f\t%d\t%.03f\t%.03f\t%.03f\n", pkt->timestamp, time, GetCurrentDelay(), lastMeasuredJitter, lastMeasuredDelay, minDelay);
  288. #endif
  289. prevRecvTime=time;
  290. }
  291. void JitterBuffer::Advance(){
  292. nextTimestamp+=step;
  293. }
  294. unsigned int JitterBuffer::GetCurrentDelay(){
  295. unsigned int delay=0;
  296. int i;
  297. for(i=0;i<JITTER_SLOT_COUNT;i++){
  298. if(slots[i].buffer!=NULL)
  299. delay++;
  300. }
  301. return delay;
  302. }
  303. void JitterBuffer::Tick(){
  304. MutexGuard m(mutex);
  305. int i;
  306. lateHistory.Add(latePacketCount);
  307. latePacketCount=0;
  308. bool absolutelyNoLatePackets=lateHistory.Max()==0;
  309. double avgLate16=lateHistory.Average(16);
  310. //LOGV("jitter: avg late=%.1f, %.1f, %.1f", avgLate16, avgLate32, avgLate64);
  311. if(avgLate16>=resyncThreshold){
  312. LOGV("resyncing: avgLate16=%f, resyncThreshold=%f", avgLate16, resyncThreshold);
  313. wasReset=true;
  314. }
  315. if(absolutelyNoLatePackets){
  316. if(dontDecMinDelay>0)
  317. dontDecMinDelay--;
  318. }
  319. delayHistory.Add(GetCurrentDelay());
  320. avgDelay=delayHistory.Average(32);
  321. double stddev=0;
  322. double avgdev=deviationHistory.Average();
  323. for(i=0;i<64;i++){
  324. double d=(deviationHistory[i]-avgdev);
  325. stddev+=(d*d);
  326. }
  327. stddev=sqrt(stddev/64);
  328. uint32_t stddevDelay=(uint32_t)ceil(stddev*2*1000/step);
  329. if(stddevDelay<minMinDelay)
  330. stddevDelay=minMinDelay;
  331. if(stddevDelay>maxMinDelay)
  332. stddevDelay=maxMinDelay;
  333. if(stddevDelay!=minDelay){
  334. int32_t diff=(int32_t)(stddevDelay-minDelay);
  335. if(diff>0){
  336. dontDecMinDelay=100;
  337. }
  338. if(diff<-1)
  339. diff=-1;
  340. if(diff>1)
  341. diff=1;
  342. if((diff>0 && dontIncMinDelay==0) || (diff<0 && dontDecMinDelay==0)){
  343. //nextTimestamp+=diff*(int32_t)step;
  344. minDelay+=diff;
  345. outstandingDelayChange+=diff*60;
  346. dontChangeDelay+=32;
  347. //LOGD("new delay from stddev %f", minDelay);
  348. if(diff<0){
  349. dontDecMinDelay+=25;
  350. }
  351. if(diff>0){
  352. dontIncMinDelay=25;
  353. }
  354. }
  355. }
  356. lastMeasuredJitter=stddev;
  357. lastMeasuredDelay=stddevDelay;
  358. //LOGV("stddev=%.3f, avg=%.3f, ndelay=%d, dontDec=%u", stddev, avgdev, stddevDelay, dontDecMinDelay);
  359. if(dontChangeDelay==0){
  360. if(avgDelay>minDelay+0.5){
  361. outstandingDelayChange-=avgDelay>minDelay+2 ? 60 : 20;
  362. dontChangeDelay+=10;
  363. }else if(avgDelay<minDelay-0.3){
  364. outstandingDelayChange+=20;
  365. dontChangeDelay+=10;
  366. }
  367. }
  368. if(dontChangeDelay>0)
  369. dontChangeDelay--;
  370. //LOGV("jitter: avg delay=%d, delay=%d, late16=%.1f, dontDecMinDelay=%d", avgDelay, delayHistory[0], avgLate16, dontDecMinDelay);
  371. /*if(!adjustingDelay) {
  372. if (((minDelay==1 ? (avgDelay>=3) : (avgDelay>=minDelay/2)) && delayHistory[0]>minDelay && avgLate16<=0.1 && absolutelyNoLatePackets && dontDecMinDelay<32 && min>minDelay)) {
  373. LOGI("jitter: need adjust");
  374. adjustingDelay=true;
  375. }
  376. }else{
  377. if(!absolutelyNoLatePackets){
  378. LOGI("jitter: done adjusting because we're losing packets");
  379. adjustingDelay=false;
  380. }else if(tickCount%5==0){
  381. LOGD("jitter: removing a packet to reduce delay");
  382. GetInternal(NULL, 0);
  383. expectNextAtTime=0;
  384. if(GetCurrentDelay()<=minDelay || min<=minDelay){
  385. adjustingDelay = false;
  386. LOGI("jitter: done adjusting");
  387. }
  388. }
  389. }*/
  390. tickCount++;
  391. }
  392. void JitterBuffer::GetAverageLateCount(double *out){
  393. double avgLate64=lateHistory.Average(), avgLate32=lateHistory.Average(32), avgLate16=lateHistory.Average(16);
  394. out[0]=avgLate16;
  395. out[1]=avgLate32;
  396. out[2]=avgLate64;
  397. }
  398. int JitterBuffer::GetAndResetLostPacketCount(){
  399. MutexGuard m(mutex);
  400. int r=lostPackets;
  401. lostPackets=0;
  402. return r;
  403. }
  404. double JitterBuffer::GetLastMeasuredJitter(){
  405. return lastMeasuredJitter;
  406. }
  407. double JitterBuffer::GetLastMeasuredDelay(){
  408. return lastMeasuredDelay;
  409. }
  410. double JitterBuffer::GetAverageDelay(){
  411. return avgDelay;
  412. }