VoIPController.cpp 124 KB


  1. //
  2. // libtgvoip is free and unencumbered public domain software.
  3. // For more information, see http://unlicense.org or the UNLICENSE file
  4. // you should have received with this source code distribution.
  5. //
  6. #ifndef _WIN32
  7. #include <unistd.h>
  8. #include <sys/time.h>
  9. #endif
  10. #include <errno.h>
  11. #include <string.h>
  12. #include <wchar.h>
  13. #include "VoIPController.h"
  14. #include "EchoCanceller.h"
  15. #include "logging.h"
  16. #include "threading.h"
  17. #include "Buffers.h"
  18. #include "OpusEncoder.h"
  19. #include "OpusDecoder.h"
  20. #include "VoIPServerConfig.h"
  21. #include "PrivateDefines.h"
  22. #include "json11.hpp"
  23. #include <assert.h>
  24. #include <time.h>
  25. #include <math.h>
  26. #include <exception>
  27. #include <stdexcept>
  28. #include <algorithm>
  29. #include <sstream>
  30. #include <inttypes.h>
  31. #include <float.h>
  32. #if TGVOIP_INCLUDE_OPUS_PACKAGE
  33. #include <opus/opus.h>
  34. #else
  35. #include <opus.h>
  36. #endif
  37. inline int pad4(int x){
  38. int r=PAD4(x);
  39. if(r==4)
  40. return 0;
  41. return r;
  42. }
  43. using namespace tgvoip;
  44. // using namespace std; // Already used in BlockingQueue.h.
  45. #ifdef __APPLE__
  46. #include "os/darwin/AudioUnitIO.h"
  47. #include <mach/mach_time.h>
  48. double VoIPController::machTimebase=0;
  49. uint64_t VoIPController::machTimestart=0;
  50. #endif
  51. #ifdef _WIN32
  52. int64_t VoIPController::win32TimeScale = 0;
  53. bool VoIPController::didInitWin32TimeScale = false;
  54. #endif
  55. #ifdef __ANDROID__
  56. #include "os/android/JNIUtilities.h"
  57. #include "os/android/AudioInputAndroid.h"
  58. extern jclass jniUtilitiesClass;
  59. #endif
  60. #if defined(TGVOIP_USE_CALLBACK_AUDIO_IO)
  61. #include "audio/AudioIOCallback.h"
  62. #endif
  63. extern FILE* tgvoipLogFile;
  64. #pragma mark - Public API
  65. VoIPController::VoIPController() : activeNetItfName(""),
  66. currentAudioInput("default"),
  67. currentAudioOutput("default"),
  68. proxyAddress(""),
  69. proxyUsername(""),
  70. proxyPassword(""),
  71. outputVolume(std::make_unique<effects::Volume>()),
  72. inputVolume(std::make_unique<effects::Volume>())
  73. {
  74. seq=1;
  75. lastRemoteSeq=0;
  76. state=STATE_WAIT_INIT;
  77. audioInput=NULL;
  78. audioOutput=NULL;
  79. encoder=NULL;
  80. audioOutStarted=false;
  81. audioTimestampIn=0;
  82. audioTimestampOut=0;
  83. stopping=false;
  84. memset(recvPacketTimes, 0, sizeof(double)*32);
  85. memset(&stats, 0, sizeof(TrafficStats));
  86. lastRemoteAckSeq=0;
  87. lastSentSeq=0;
  88. recvLossCount=0;
  89. packetsReceived=0;
  90. waitingForAcks=false;
  91. networkType=NET_TYPE_UNKNOWN;
  92. echoCanceller=NULL;
  93. dontSendPackets=0;
  94. micMuted=false;
  95. waitingForRelayPeerInfo=false;
  96. allowP2p=true;
  97. dataSavingMode=false;
  98. publicEndpointsReqTime=0;
  99. connectionInitTime=0;
  100. lastRecvPacketTime=0;
  101. dataSavingRequestedByPeer=false;
  102. peerVersion=0;
  103. conctl=new CongestionControl();
  104. prevSendLossCount=0;
  105. receivedInit=false;
  106. receivedInitAck=false;
  107. statsDump=NULL;
  108. useTCP=false;
  109. useUDP=true;
  110. didAddTcpRelays=false;
  111. udpPingCount=0;
  112. lastUdpPingTime=0;
  113. proxyProtocol=PROXY_NONE;
  114. proxyPort=0;
  115. resolvedProxyAddress=NULL;
  116. selectCanceller=SocketSelectCanceller::Create();
  117. udpSocket=NetworkSocket::Create(PROTO_UDP);
  118. realUdpSocket=udpSocket;
  119. udpConnectivityState=UDP_UNKNOWN;
  120. echoCancellationStrength=1;
  121. peerCapabilities=0;
  122. callbacks={0};
  123. didReceiveGroupCallKey=false;
  124. didReceiveGroupCallKeyAck=false;
  125. didSendGroupCallKey=false;
  126. didSendUpgradeRequest=false;
  127. didInvokeUpgradeCallback=false;
  128. connectionMaxLayer=0;
  129. useMTProto2=false;
  130. setCurrentEndpointToTCP=false;
  131. useIPv6=false;
  132. peerIPv6Available=false;
  133. shittyInternetMode=false;
  134. didAddIPv6Relays=false;
  135. didSendIPv6Endpoint=false;
  136. unsentStreamPackets.store(0);
  137. sendThread=NULL;
  138. recvThread=NULL;
  139. maxAudioBitrate=(uint32_t) ServerConfig::GetSharedInstance()->GetInt("audio_max_bitrate", 20000);
  140. maxAudioBitrateGPRS=(uint32_t) ServerConfig::GetSharedInstance()->GetInt("audio_max_bitrate_gprs", 8000);
  141. maxAudioBitrateEDGE=(uint32_t) ServerConfig::GetSharedInstance()->GetInt("audio_max_bitrate_edge", 16000);
  142. maxAudioBitrateSaving=(uint32_t) ServerConfig::GetSharedInstance()->GetInt("audio_max_bitrate_saving", 8000);
  143. initAudioBitrate=(uint32_t) ServerConfig::GetSharedInstance()->GetInt("audio_init_bitrate", 16000);
  144. initAudioBitrateGPRS=(uint32_t) ServerConfig::GetSharedInstance()->GetInt("audio_init_bitrate_gprs", 8000);
  145. initAudioBitrateEDGE=(uint32_t) ServerConfig::GetSharedInstance()->GetInt("audio_init_bitrate_edge", 8000);
  146. initAudioBitrateSaving=(uint32_t) ServerConfig::GetSharedInstance()->GetInt("audio_init_bitrate_saving", 8000);
  147. audioBitrateStepIncr=(uint32_t) ServerConfig::GetSharedInstance()->GetInt("audio_bitrate_step_incr", 1000);
  148. audioBitrateStepDecr=(uint32_t) ServerConfig::GetSharedInstance()->GetInt("audio_bitrate_step_decr", 1000);
  149. minAudioBitrate=(uint32_t) ServerConfig::GetSharedInstance()->GetInt("audio_min_bitrate", 8000);
  150. relaySwitchThreshold=ServerConfig::GetSharedInstance()->GetDouble("relay_switch_threshold", 0.8);
  151. p2pToRelaySwitchThreshold=ServerConfig::GetSharedInstance()->GetDouble("p2p_to_relay_switch_threshold", 0.6);
  152. relayToP2pSwitchThreshold=ServerConfig::GetSharedInstance()->GetDouble("relay_to_p2p_switch_threshold", 0.8);
  153. reconnectingTimeout=ServerConfig::GetSharedInstance()->GetDouble("reconnecting_state_timeout", 2.0);
  154. needRateFlags=static_cast<uint32_t>(ServerConfig::GetSharedInstance()->GetInt("rate_flags", 0xFFFFFFFF));
  155. rateMaxAcceptableRTT=ServerConfig::GetSharedInstance()->GetDouble("rate_min_rtt", 0.6);
  156. rateMaxAcceptableSendLoss=ServerConfig::GetSharedInstance()->GetDouble("rate_min_send_loss", 0.2);
  157. packetLossToEnableExtraEC=ServerConfig::GetSharedInstance()->GetDouble("packet_loss_for_extra_ec", 0.02);
  158. maxUnsentStreamPackets=static_cast<uint32_t>(ServerConfig::GetSharedInstance()->GetInt("max_unsent_stream_packets", 2));
  159. #ifdef __APPLE__
  160. machTimestart=0;
  161. #endif
  162. shared_ptr<Stream> stm=make_shared<Stream>();
  163. stm->id=1;
  164. stm->type=STREAM_TYPE_AUDIO;
  165. stm->codec=CODEC_OPUS;
  166. stm->enabled=1;
  167. stm->frameDuration=60;
  168. outgoingStreams.push_back(stm);
  169. }
  170. VoIPController::~VoIPController(){
  171. LOGD("Entered VoIPController::~VoIPController");
  172. if(!stopping){
  173. LOGE("!!!!!!!!!!!!!!!!!!!! CALL controller->Stop() BEFORE DELETING THE CONTROLLER OBJECT !!!!!!!!!!!!!!!!!!!!!!!1");
  174. abort();
  175. }
  176. LOGD("before close socket");
  177. if(udpSocket)
  178. delete udpSocket;
  179. if(udpSocket!=realUdpSocket)
  180. delete realUdpSocket;
  181. LOGD("before delete audioIO");
  182. if(audioIO){
  183. delete audioIO;
  184. audioInput=NULL;
  185. audioOutput=NULL;
  186. }
  187. for(vector<shared_ptr<Stream>>::iterator _stm=incomingStreams.begin();_stm!=incomingStreams.end();++_stm){
  188. shared_ptr<Stream> stm=*_stm;
  189. LOGD("before stop decoder");
  190. if(stm->decoder){
  191. stm->decoder->Stop();
  192. }
  193. }
  194. LOGD("before delete encoder");
  195. if(encoder){
  196. encoder->Stop();
  197. delete encoder;
  198. }
  199. LOGD("before delete echo canceller");
  200. if(echoCanceller){
  201. echoCanceller->Stop();
  202. delete echoCanceller;
  203. }
  204. delete conctl;
  205. if(statsDump)
  206. fclose(statsDump);
  207. if(resolvedProxyAddress)
  208. delete resolvedProxyAddress;
  209. delete selectCanceller;
  210. LOGD("Left VoIPController::~VoIPController");
  211. if(tgvoipLogFile){
  212. FILE* log=tgvoipLogFile;
  213. tgvoipLogFile=NULL;
  214. fclose(log);
  215. }
  216. #if defined(TGVOIP_USE_CALLBACK_AUDIO_IO)
  217. if (preprocDecoder) {
  218. opus_decoder_destroy(preprocDecoder);
  219. preprocDecoder=nullptr;
  220. }
  221. #endif
  222. }
  223. void VoIPController::Stop(){
  224. LOGD("Entered VoIPController::Stop");
  225. stopping=true;
  226. runReceiver=false;
  227. LOGD("before shutdown socket");
  228. if(udpSocket)
  229. udpSocket->Close();
  230. if(realUdpSocket!=udpSocket)
  231. realUdpSocket->Close();
  232. selectCanceller->CancelSelect();
  233. Buffer emptyBuf(0);
  234. //PendingOutgoingPacket emptyPacket{0, 0, 0, move(emptyBuf), 0};
  235. //sendQueue->Put(move(emptyPacket));
  236. LOGD("before join sendThread");
  237. if(sendThread){
  238. sendThread->Join();
  239. delete sendThread;
  240. }
  241. LOGD("before join recvThread");
  242. if(recvThread){
  243. recvThread->Join();
  244. delete recvThread;
  245. }
  246. LOGD("before stop messageThread");
  247. messageThread.Stop();
  248. {
  249. LOGD("Before stop audio I/O");
  250. MutexGuard m(audioIOMutex);
  251. if(audioInput){
  252. audioInput->Stop();
  253. audioInput->SetCallback(NULL, NULL);
  254. }
  255. if(audioOutput){
  256. audioOutput->Stop();
  257. audioOutput->SetCallback(NULL, NULL);
  258. }
  259. }
  260. LOGD("Left VoIPController::Stop [need rate = %d]", (int)needRate);
  261. }
  262. bool VoIPController::NeedRate(){
  263. return needRate && ServerConfig::GetSharedInstance()->GetBoolean("bad_call_rating", false);
  264. }
  265. void VoIPController::SetRemoteEndpoints(vector<Endpoint> endpoints, bool allowP2p, int32_t connectionMaxLayer){
  266. LOGW("Set remote endpoints, allowP2P=%d, connectionMaxLayer=%u", allowP2p ? 1 : 0, connectionMaxLayer);
  267. preferredRelay=0;
  268. {
  269. MutexGuard m(endpointsMutex);
  270. this->endpoints.clear();
  271. didAddTcpRelays=false;
  272. useTCP=true;
  273. for(vector<Endpoint>::iterator itrtr=endpoints.begin();itrtr!=endpoints.end();++itrtr){
  274. if(this->endpoints.find(itrtr->id)!=this->endpoints.end())
  275. LOGE("Endpoint IDs are not unique!");
  276. this->endpoints[itrtr->id]=*itrtr;
  277. if(currentEndpoint==0)
  278. currentEndpoint=itrtr->id;
  279. if(itrtr->type==Endpoint::Type::TCP_RELAY)
  280. didAddTcpRelays=true;
  281. if(itrtr->type==Endpoint::Type::UDP_RELAY)
  282. useTCP=false;
  283. LOGV("Adding endpoint: %s:%d, %s", itrtr->address.ToString().c_str(), itrtr->port, itrtr->type==Endpoint::Type::UDP_RELAY ? "UDP" : "TCP");
  284. }
  285. }
  286. preferredRelay=currentEndpoint;
  287. this->allowP2p=allowP2p;
  288. this->connectionMaxLayer=connectionMaxLayer;
  289. if(connectionMaxLayer>=74){
  290. useMTProto2=true;
  291. }
  292. AddIPv6Relays();
  293. }
  294. void VoIPController::Start(){
  295. LOGW("Starting voip controller");
  296. udpSocket->Open();
  297. if(udpSocket->IsFailed()){
  298. SetState(STATE_FAILED);
  299. return;
  300. }
  301. //SendPacket(NULL, 0, currentEndpoint);
  302. runReceiver=true;
  303. recvThread=new Thread(bind(&VoIPController::RunRecvThread, this));
  304. recvThread->SetName("VoipRecv");
  305. recvThread->Start();
  306. messageThread.Start();
  307. }
  308. void VoIPController::Connect(){
  309. assert(state!=STATE_WAIT_INIT_ACK);
  310. connectionInitTime=GetCurrentTime();
  311. if(config.initTimeout==0.0){
  312. LOGE("Init timeout is 0 -- did you forget to set config?");
  313. config.initTimeout=30.0;
  314. }
  315. //InitializeTimers();
  316. //SendInit();
  317. sendThread=new Thread(bind(&VoIPController::RunSendThread, this));
  318. sendThread->SetName("VoipSend");
  319. sendThread->Start();
  320. }
  321. void VoIPController::SetEncryptionKey(char *key, bool isOutgoing){
  322. memcpy(encryptionKey, key, 256);
  323. uint8_t sha1[SHA1_LENGTH];
  324. crypto.sha1((uint8_t*) encryptionKey, 256, sha1);
  325. memcpy(keyFingerprint, sha1+(SHA1_LENGTH-8), 8);
  326. uint8_t sha256[SHA256_LENGTH];
  327. crypto.sha256((uint8_t*) encryptionKey, 256, sha256);
  328. memcpy(callID, sha256+(SHA256_LENGTH-16), 16);
  329. this->isOutgoing=isOutgoing;
  330. }
  331. void VoIPController::SetNetworkType(int type){
  332. networkType=type;
  333. UpdateDataSavingState();
  334. UpdateAudioBitrateLimit();
  335. myIPv6=IPv6Address();
  336. string itfName=udpSocket->GetLocalInterfaceInfo(NULL, &myIPv6);
  337. LOGI("set network type: %s, active interface %s", NetworkTypeToString(type).c_str(), itfName.c_str());
  338. LOGI("Local IPv6 address: %s", myIPv6.ToString().c_str());
  339. if(IS_MOBILE_NETWORK(networkType)){
  340. CellularCarrierInfo carrier=GetCarrierInfo();
  341. if(!carrier.name.empty()){
  342. LOGI("Carrier: %s [%s; mcc=%s, mnc=%s]", carrier.name.c_str(), carrier.countryCode.c_str(), carrier.mcc.c_str(), carrier.mnc.c_str());
  343. }
  344. }
  345. if(itfName!=activeNetItfName){
  346. udpSocket->OnActiveInterfaceChanged();
  347. LOGI("Active network interface changed: %s -> %s", activeNetItfName.c_str(), itfName.c_str());
  348. bool isFirstChange=activeNetItfName.length()==0 && state!=STATE_ESTABLISHED && state!=STATE_RECONNECTING;
  349. activeNetItfName=itfName;
  350. if(isFirstChange)
  351. return;
  352. wasNetworkHandover=true;
  353. if(currentEndpoint){
  354. const Endpoint& _currentEndpoint=endpoints.at(currentEndpoint);
  355. const Endpoint& _preferredRelay=endpoints.at(preferredRelay);
  356. if(_currentEndpoint.type!=Endpoint::Type::UDP_RELAY){
  357. if(_preferredRelay.type==Endpoint::Type::UDP_RELAY)
  358. currentEndpoint=preferredRelay;
  359. MutexGuard m(endpointsMutex);
  360. constexpr int64_t lanID=(int64_t)(FOURCC('L','A','N','4')) << 32;
  361. endpoints.erase(lanID);
  362. for(pair<const int64_t, Endpoint>& e:endpoints){
  363. Endpoint& endpoint=e.second;
  364. if(endpoint.type==Endpoint::Type::UDP_RELAY && useTCP){
  365. useTCP=false;
  366. if(_preferredRelay.type==Endpoint::Type::TCP_RELAY){
  367. preferredRelay=currentEndpoint=endpoint.id;
  368. }
  369. }else if(endpoint.type==Endpoint::Type::TCP_RELAY && endpoint.socket){
  370. endpoint.socket->Close();
  371. //delete endpoint.socket;
  372. //endpoint.socket=NULL;
  373. }
  374. //if(endpoint->type==Endpoint::Type::UDP_P2P_INET){
  375. endpoint.averageRTT=0;
  376. endpoint.rtts.Reset();
  377. //}
  378. }
  379. }
  380. }
  381. lastUdpPingTime=0;
  382. if(proxyProtocol==PROXY_SOCKS5)
  383. InitUDPProxy();
  384. if(allowP2p && currentEndpoint){
  385. SendPublicEndpointsRequest();
  386. }
  387. BufferOutputStream s(4);
  388. s.WriteInt32(dataSavingMode ? INIT_FLAG_DATA_SAVING_ENABLED : 0);
  389. if(peerVersion<6){
  390. SendPacketReliably(PKT_NETWORK_CHANGED, s.GetBuffer(), s.GetLength(), 1, 20);
  391. }else{
  392. Buffer buf(move(s));
  393. SendExtra(buf, EXTRA_TYPE_NETWORK_CHANGED);
  394. }
  395. needReInitUdpProxy=true;
  396. selectCanceller->CancelSelect();
  397. didSendIPv6Endpoint=false;
  398. AddIPv6Relays();
  399. ResetUdpAvailability();
  400. ResetEndpointPingStats();
  401. }
  402. }
  403. double VoIPController::GetAverageRTT(){
  404. if(lastSentSeq>=lastRemoteAckSeq){
  405. uint32_t diff=lastSentSeq-lastRemoteAckSeq;
  406. //LOGV("rtt diff=%u", diff);
  407. if(diff<32){
  408. double res=0;
  409. int count=0;
  410. /*for(i=diff;i<32;i++){
  411. if(remoteAcks[i-diff]>0){
  412. res+=(remoteAcks[i-diff]-sentPacketTimes[i]);
  413. count++;
  414. }
  415. }*/
  416. MutexGuard m(queuedPacketsMutex);
  417. for(std::vector<RecentOutgoingPacket>::iterator itr=recentOutgoingPackets.begin();itr!=recentOutgoingPackets.end();++itr){
  418. if(itr->ackTime>0){
  419. res+=(itr->ackTime-itr->sendTime);
  420. count++;
  421. }
  422. }
  423. if(count>0)
  424. res/=count;
  425. return res;
  426. }
  427. }
  428. return 999;
  429. }
  430. void VoIPController::SetMicMute(bool mute){
  431. if(micMuted==mute)
  432. return;
  433. micMuted=mute;
  434. if(audioInput){
  435. if(mute)
  436. audioInput->Stop();
  437. else
  438. audioInput->Start();
  439. if(!audioInput->IsInitialized()){
  440. lastError=ERROR_AUDIO_IO;
  441. SetState(STATE_FAILED);
  442. return;
  443. }
  444. }
  445. if(echoCanceller)
  446. echoCanceller->Enable(!mute);
  447. if(state==STATE_ESTABLISHED){
  448. for(shared_ptr<Stream>& s:outgoingStreams){
  449. if(s->type==STREAM_TYPE_AUDIO){
  450. s->enabled=!mute;
  451. if(peerVersion<6){
  452. unsigned char buf[2];
  453. buf[0]=s->id;
  454. buf[1]=(char) (mute ? 0 : 1);
  455. SendPacketReliably(PKT_STREAM_STATE, buf, 2, .5f, 20);
  456. }else{
  457. SendStreamFlags(*s);
  458. }
  459. }
  460. }
  461. }
  462. if(mute){
  463. if(noStreamsNopID==MessageThread::INVALID_ID)
  464. noStreamsNopID=messageThread.Post(std::bind(&VoIPController::SendNopPacket, this), 0.2, 0.2);
  465. }else{
  466. if(noStreamsNopID!=MessageThread::INVALID_ID){
  467. messageThread.Cancel(noStreamsNopID);
  468. noStreamsNopID=MessageThread::INVALID_ID;
  469. }
  470. }
  471. }
  472. string VoIPController::GetDebugString(){
  473. string r="Remote endpoints: \n";
  474. char buffer[2048];
  475. MutexGuard m(endpointsMutex);
  476. for(pair<const int64_t, Endpoint>& _e:endpoints){
  477. Endpoint& endpoint=_e.second;
  478. const char* type;
  479. switch(endpoint.type){
  480. case Endpoint::Type::UDP_P2P_INET:
  481. type="UDP_P2P_INET";
  482. break;
  483. case Endpoint::Type::UDP_P2P_LAN:
  484. type="UDP_P2P_LAN";
  485. break;
  486. case Endpoint::Type::UDP_RELAY:
  487. type="UDP_RELAY";
  488. break;
  489. case Endpoint::Type::TCP_RELAY:
  490. type="TCP_RELAY";
  491. break;
  492. default:
  493. type="UNKNOWN";
  494. break;
  495. }
  496. snprintf(buffer, sizeof(buffer), "%s:%u %dms %d 0x%" PRIx64 " [%s%s]\n", endpoint.address.IsEmpty() ? ("["+endpoint.v6address.ToString()+"]").c_str() : endpoint.address.ToString().c_str(), endpoint.port, (int)(endpoint.averageRTT*1000), endpoint.udpPongCount, (uint64_t)endpoint.id, type, currentEndpoint==endpoint.id ? ", IN_USE" : "");
  497. r+=buffer;
  498. }
  499. if(shittyInternetMode){
  500. snprintf(buffer, sizeof(buffer), "ShittyInternetMode: level %d\n", extraEcLevel);
  501. r+=buffer;
  502. }
  503. double avgLate[3];
  504. shared_ptr<Stream> stm=GetStreamByType(STREAM_TYPE_AUDIO, false);
  505. shared_ptr<JitterBuffer> jitterBuffer;
  506. if(stm)
  507. jitterBuffer=stm->jitterBuffer;
  508. if(jitterBuffer)
  509. jitterBuffer->GetAverageLateCount(avgLate);
  510. else
  511. memset(avgLate, 0, 3*sizeof(double));
  512. snprintf(buffer, sizeof(buffer),
  513. "Jitter buffer: %d/%.2f | %.1f, %.1f, %.1f\n"
  514. "RTT avg/min: %d/%d\n"
  515. "Congestion window: %d/%d bytes\n"
  516. "Key fingerprint: %02hhX%02hhX%02hhX%02hhX%02hhX%02hhX%02hhX%02hhX%s\n"
  517. "Last sent/ack'd seq: %u/%u\n"
  518. "Last recvd seq: %u\n"
  519. "Send/recv losses: %u/%u (%d%%)\n"
  520. "Audio bitrate: %d kbit\n"
  521. "Outgoing queue: %u\n"
  522. // "Packet grouping: %d\n"
  523. "Frame size out/in: %d/%d\n"
  524. "Bytes sent/recvd: %llu/%llu",
  525. jitterBuffer ? jitterBuffer->GetMinPacketCount() : 0, jitterBuffer ? jitterBuffer->GetAverageDelay() : 0, avgLate[0], avgLate[1], avgLate[2],
  526. // (int)(GetAverageRTT()*1000), 0,
  527. (int)(conctl->GetAverageRTT()*1000), (int)(conctl->GetMinimumRTT()*1000),
  528. int(conctl->GetInflightDataSize()), int(conctl->GetCongestionWindow()),
  529. keyFingerprint[0],keyFingerprint[1],keyFingerprint[2],keyFingerprint[3],keyFingerprint[4],keyFingerprint[5],keyFingerprint[6],keyFingerprint[7],
  530. useMTProto2 ? " (MTProto2.0)" : "",
  531. lastSentSeq, lastRemoteAckSeq, lastRemoteSeq,
  532. conctl->GetSendLossCount(), recvLossCount, encoder ? encoder->GetPacketLoss() : 0,
  533. encoder ? (encoder->GetBitrate()/1000) : 0,
  534. static_cast<unsigned int>(unsentStreamPackets),
  535. // audioPacketGrouping,
  536. outgoingStreams[0]->frameDuration, incomingStreams.size()>0 ? incomingStreams[0]->frameDuration : 0,
  537. (long long unsigned int)(stats.bytesSentMobile+stats.bytesSentWifi),
  538. (long long unsigned int)(stats.bytesRecvdMobile+stats.bytesRecvdWifi));
  539. r+=buffer;
  540. return r;
  541. }
  542. const char* VoIPController::GetVersion(){
  543. return LIBTGVOIP_VERSION;
  544. }
  545. int64_t VoIPController::GetPreferredRelayID(){
  546. return preferredRelay;
  547. }
  548. int VoIPController::GetLastError(){
  549. return lastError;
  550. }
  551. void VoIPController::GetStats(TrafficStats *stats){
  552. memcpy(stats, &this->stats, sizeof(TrafficStats));
  553. }
  554. string VoIPController::GetDebugLog(){
  555. map<string, json11::Json> network{
  556. {"type", NetworkTypeToString(networkType)}
  557. };
  558. if(IS_MOBILE_NETWORK(networkType)){
  559. CellularCarrierInfo carrier=GetCarrierInfo();
  560. if(!carrier.name.empty()){
  561. network["carrier"]=carrier.name;
  562. network["country"]=carrier.countryCode;
  563. network["mcc"]=carrier.mcc;
  564. network["mnc"]=carrier.mnc;
  565. }
  566. }else if(networkType==NET_TYPE_WIFI){
  567. #ifdef __ANDROID__
  568. jni::DoWithJNI([&](JNIEnv* env){
  569. jmethodID getWifiInfoMethod=env->GetStaticMethodID(jniUtilitiesClass, "getWifiInfo", "()[I");
  570. jintArray res=static_cast<jintArray>(env->CallStaticObjectMethod(jniUtilitiesClass, getWifiInfoMethod));
  571. if(res){
  572. jint* wifiInfo=env->GetIntArrayElements(res, NULL);
  573. network["rssi"]=wifiInfo[0];
  574. network["link_speed"]=wifiInfo[1];
  575. env->ReleaseIntArrayElements(res, wifiInfo, JNI_ABORT);
  576. }
  577. });
  578. #endif
  579. }
  580. /*vector<json11::Json> lpkts;
  581. for(DebugLoggedPacket& lpkt:debugLoggedPackets){
  582. lpkts.push_back(json11::Json::array{lpkt.timestamp, lpkt.seq, lpkt.length});
  583. }
  584. return json11::Json(json11::Json::object{
  585. {"log_type", "out_packet_stats"},
  586. {"libtgvoip_version", LIBTGVOIP_VERSION},
  587. {"network", network},
  588. {"protocol_version", std::min(peerVersion, PROTOCOL_VERSION)},
  589. {"total_losses", json11::Json::object{
  590. {"s", (int32_t)conctl->GetSendLossCount()},
  591. {"r", (int32_t)recvLossCount}
  592. }},
  593. {"call_duration", GetCurrentTime()-connectionInitTime},
  594. {"out_packet_stats", lpkts}
  595. }).dump();*/
  596. string p2pType="none";
  597. Endpoint& cur=endpoints[currentEndpoint];
  598. if(cur.type==Endpoint::Type::UDP_P2P_INET)
  599. p2pType=cur.IsIPv6Only() ? "inet6" : "inet";
  600. else if(cur.type==Endpoint::Type::UDP_P2P_LAN)
  601. p2pType="lan";
  602. vector<string> problems;
  603. if(lastError==ERROR_TIMEOUT)
  604. problems.push_back("timeout");
  605. if(wasReconnecting)
  606. problems.push_back("reconnecting");
  607. if(wasExtraEC)
  608. problems.push_back("extra_ec");
  609. if(wasEncoderLaggy)
  610. problems.push_back("encoder_lag");
  611. if(!wasEstablished)
  612. problems.push_back("not_inited");
  613. if(wasNetworkHandover)
  614. problems.push_back("network_handover");
  615. return json11::Json(json11::Json::object{
  616. {"log_type", "call_stats"},
  617. {"libtgvoip_version", LIBTGVOIP_VERSION},
  618. {"network", network},
  619. {"protocol_version", std::min(peerVersion, PROTOCOL_VERSION)},
  620. {"udp_avail", udpConnectivityState==UDP_AVAILABLE},
  621. {"tcp_used", useTCP},
  622. {"relay_rtt", (int)(endpoints[preferredRelay].averageRTT*1000.0)},
  623. {"p2p_type", p2pType},
  624. {"rtt", (int)(endpoints[currentEndpoint].averageRTT*1000.0)},
  625. {"packet_stats", json11::Json::object{
  626. {"out", (int)seq},
  627. {"in", (int)packetsReceived},
  628. {"lost_out", (int)conctl->GetSendLossCount()},
  629. {"lost_in", (int)recvLossCount}
  630. }},
  631. {"problems", problems},
  632. {"pref_relay", std::to_string(preferredRelay)}
  633. }).dump();
  634. }
  635. vector<AudioInputDevice> VoIPController::EnumerateAudioInputs(){
  636. vector<AudioInputDevice> devs;
  637. audio::AudioInput::EnumerateDevices(devs);
  638. return devs;
  639. }
  640. vector<AudioOutputDevice> VoIPController::EnumerateAudioOutputs(){
  641. vector<AudioOutputDevice> devs;
  642. audio::AudioOutput::EnumerateDevices(devs);
  643. return devs;
  644. }
  645. void VoIPController::SetCurrentAudioInput(string id){
  646. currentAudioInput=id;
  647. if(audioInput)
  648. audioInput->SetCurrentDevice(id);
  649. }
  650. void VoIPController::SetCurrentAudioOutput(string id){
  651. currentAudioOutput=id;
  652. if(audioOutput)
  653. audioOutput->SetCurrentDevice(id);
  654. }
  655. string VoIPController::GetCurrentAudioInputID(){
  656. return currentAudioInput;
  657. }
  658. string VoIPController::GetCurrentAudioOutputID(){
  659. return currentAudioOutput;
  660. }
  661. void VoIPController::SetProxy(int protocol, string address, uint16_t port, string username, string password){
  662. proxyProtocol=protocol;
  663. proxyAddress=std::move(address);
  664. proxyPort=port;
  665. proxyUsername=std::move(username);
  666. proxyPassword=std::move(password);
  667. }
  668. int VoIPController::GetSignalBarsCount(){
  669. return signalBarsHistory.NonZeroAverage();
  670. }
  671. void VoIPController::SetCallbacks(VoIPController::Callbacks callbacks){
  672. this->callbacks=callbacks;
  673. if(callbacks.connectionStateChanged)
  674. callbacks.connectionStateChanged(this, state);
  675. }
  676. void VoIPController::SetAudioOutputGainControlEnabled(bool enabled){
  677. LOGD("New output AGC state: %d", enabled);
  678. }
  679. uint32_t VoIPController::GetPeerCapabilities(){
  680. return peerCapabilities;
  681. }
  682. void VoIPController::SendGroupCallKey(unsigned char *key){
  683. if(!(peerCapabilities & TGVOIP_PEER_CAP_GROUP_CALLS)){
  684. LOGE("Tried to send group call key but peer isn't capable of them");
  685. return;
  686. }
  687. if(didSendGroupCallKey){
  688. LOGE("Tried to send a group call key repeatedly");
  689. return;
  690. }
  691. if(!isOutgoing){
  692. LOGE("You aren't supposed to send group call key in an incoming call, use VoIPController::RequestCallUpgrade() instead");
  693. return;
  694. }
  695. didSendGroupCallKey=true;
  696. Buffer buf(256);
  697. buf.CopyFrom(key, 0, 256);
  698. SendExtra(buf, EXTRA_TYPE_GROUP_CALL_KEY);
  699. }
  700. void VoIPController::RequestCallUpgrade(){
  701. if(!(peerCapabilities & TGVOIP_PEER_CAP_GROUP_CALLS)){
  702. LOGE("Tried to send group call key but peer isn't capable of them");
  703. return;
  704. }
  705. if(didSendUpgradeRequest){
  706. LOGE("Tried to send upgrade request repeatedly");
  707. return;
  708. }
  709. if(isOutgoing){
  710. LOGE("You aren't supposed to send an upgrade request in an outgoing call, generate an encryption key and use VoIPController::SendGroupCallKey instead");
  711. return;
  712. }
  713. didSendUpgradeRequest=true;
  714. Buffer empty(0);
  715. SendExtra(empty, EXTRA_TYPE_REQUEST_GROUP);
  716. }
  717. void VoIPController::SetEchoCancellationStrength(int strength){
  718. echoCancellationStrength=strength;
  719. if(echoCanceller)
  720. echoCanceller->SetAECStrength(strength);
  721. }
  722. #if defined(TGVOIP_USE_CALLBACK_AUDIO_IO)
  723. void VoIPController::SetAudioDataCallbacks(std::function<void(int16_t*, size_t)> input, std::function<void(int16_t*, size_t)> output, std::function<void(int16_t*, size_t)> preproc=nullptr){
  724. audioInputDataCallback=input;
  725. audioOutputDataCallback=output;
  726. audioPreprocDataCallback=preproc;
  727. preprocDecoder=preprocDecoder ? preprocDecoder : opus_decoder_create(48000, 1, NULL);
  728. }
  729. #endif
  730. int VoIPController::GetConnectionState(){
  731. return state;
  732. }
  733. void VoIPController::SetConfig(const Config& cfg){
  734. config=cfg;
  735. if(tgvoipLogFile){
  736. fclose(tgvoipLogFile);
  737. tgvoipLogFile=NULL;
  738. }
  739. if(!config.logFilePath.empty()){
  740. #ifndef _WIN32
  741. tgvoipLogFile=fopen(config.logFilePath.c_str(), "a");
  742. #else
  743. if(_wfopen_s(&tgvoipLogFile, config.logFilePath.c_str(), L"a")!=0){
  744. tgvoipLogFile=NULL;
  745. }
  746. #endif
  747. tgvoip_log_file_write_header(tgvoipLogFile);
  748. }else{
  749. tgvoipLogFile=NULL;
  750. }
  751. if(statsDump){
  752. fclose(statsDump);
  753. statsDump=NULL;
  754. }
  755. if(!config.statsDumpFilePath.empty()){
  756. #ifndef _WIN32
  757. statsDump=fopen(config.statsDumpFilePath.c_str(), "w");
  758. #else
  759. if(_wfopen_s(&statsDump, config.statsDumpFilePath.c_str(), L"w")!=0){
  760. statsDump=NULL;
  761. }
  762. #endif
  763. if(statsDump)
  764. fprintf(statsDump, "Time\tRTT\tLRSeq\tLSSeq\tLASeq\tLostR\tLostS\tCWnd\tBitrate\tLoss%%\tJitter\tJDelay\tAJDelay\n");
  765. //else
  766. // LOGW("Failed to open stats dump file %s for writing", config.statsDumpFilePath.c_str());
  767. }else{
  768. statsDump=NULL;
  769. }
  770. UpdateDataSavingState();
  771. UpdateAudioBitrateLimit();
  772. }
  773. void VoIPController::SetPersistentState(vector<uint8_t> state){
  774. using namespace json11;
  775. if(state.empty())
  776. return;
  777. string jsonErr;
  778. string json=string(state.begin(), state.end());
  779. Json _obj=Json::parse(json, jsonErr);
  780. if(!jsonErr.empty()){
  781. LOGE("Error parsing persistable state: %s", jsonErr.c_str());
  782. return;
  783. }
  784. Json::object obj=_obj.object_items();
  785. if(obj.find("proxy")!=obj.end()){
  786. Json::object proxy=obj["proxy"].object_items();
  787. lastTestedProxyServer=proxy["server"].string_value();
  788. proxySupportsUDP=proxy["udp"].bool_value();
  789. proxySupportsTCP=proxy["tcp"].bool_value();
  790. }
  791. }
  792. vector<uint8_t> VoIPController::GetPersistentState(){
  793. using namespace json11;
  794. Json::object obj=Json::object{
  795. {"ver", 1},
  796. };
  797. if(proxyProtocol==PROXY_SOCKS5){
  798. char pbuf[128];
  799. snprintf(pbuf, sizeof(pbuf), "%s:%u", proxyAddress.c_str(), proxyPort);
  800. obj.insert({"proxy", Json::object{
  801. {"server", string(pbuf)},
  802. {"udp", proxySupportsUDP},
  803. {"tcp", proxySupportsTCP}
  804. }});
  805. }
  806. string _jstr=Json(obj).dump();
  807. const char* jstr=_jstr.c_str();
  808. return vector<uint8_t>(jstr, jstr+strlen(jstr));
  809. }
  810. void VoIPController::SetOutputVolume(float level){
  811. outputVolume->SetLevel(level);
  812. }
  813. void VoIPController::SetInputVolume(float level){
  814. inputVolume->SetLevel(level);
  815. }
  816. #if defined(__APPLE__) && TARGET_OS_OSX
  817. void VoIPController::SetAudioOutputDuckingEnabled(bool enabled){
  818. macAudioDuckingEnabled=enabled;
  819. audio::AudioUnitIO* osxAudio=dynamic_cast<audio::AudioUnitIO*>(audioIO);
  820. if(osxAudio){
  821. osxAudio->SetDuckingEnabled(enabled);
  822. }
  823. }
  824. #endif
  825. #pragma mark - Internal intialization
  826. void VoIPController::InitializeTimers(){
  827. initTimeoutID=messageThread.Post([this]{
  828. LOGW("Init timeout, disconnecting");
  829. lastError=ERROR_TIMEOUT;
  830. SetState(STATE_FAILED);
  831. }, config.initTimeout);
  832. if(!config.statsDumpFilePath.empty()){
  833. messageThread.Post([this]{
  834. if(statsDump && incomingStreams.size()==1){
  835. shared_ptr<JitterBuffer>& jitterBuffer=incomingStreams[0]->jitterBuffer;
  836. //fprintf(statsDump, "Time\tRTT\tLISeq\tLASeq\tCWnd\tBitrate\tJitter\tJDelay\tAJDelay\n");
  837. fprintf(statsDump, "%.3f\t%.3f\t%d\t%d\t%d\t%d\t%d\t%d\t%d\t%d\t%.3f\t%.3f\t%.3f\n",
  838. GetCurrentTime()-connectionInitTime,
  839. endpoints.at(currentEndpoint).rtts[0],
  840. lastRemoteSeq,
  841. (uint32_t)seq,
  842. lastRemoteAckSeq,
  843. recvLossCount,
  844. conctl ? conctl->GetSendLossCount() : 0,
  845. conctl ? (int)conctl->GetInflightDataSize() : 0,
  846. encoder ? encoder->GetBitrate() : 0,
  847. encoder ? encoder->GetPacketLoss() : 0,
  848. jitterBuffer ? jitterBuffer->GetLastMeasuredJitter() : 0,
  849. jitterBuffer ? jitterBuffer->GetLastMeasuredDelay()*0.06 : 0,
  850. jitterBuffer ? jitterBuffer->GetAverageDelay()*0.06 : 0);
  851. }
  852. }, 0.1, 0.1);
  853. }
  854. messageThread.Post(std::bind(&VoIPController::SendRelayPings, this), 0.0, 2.0);
  855. }
  856. void VoIPController::RunSendThread(){
  857. InitializeAudio();
  858. InitializeTimers();
  859. SendInit();
  860. LOGI("=== send thread exiting ===");
  861. }
  862. #pragma mark - Miscellaneous
  863. void VoIPController::SetState(int state){
  864. this->state=state;
  865. LOGV("Call state changed to %d", state);
  866. stateChangeTime=GetCurrentTime();
  867. messageThread.Post([this, state]{
  868. if(callbacks.connectionStateChanged)
  869. callbacks.connectionStateChanged(this, state);
  870. });
  871. if(state==STATE_ESTABLISHED){
  872. SetMicMute(micMuted);
  873. if(!wasEstablished){
  874. wasEstablished=true;
  875. messageThread.Post(std::bind(&VoIPController::UpdateRTT, this), 0.1, 0.5);
  876. messageThread.Post(std::bind(&VoIPController::UpdateAudioBitrate, this), 0.0, 0.3);
  877. messageThread.Post(std::bind(&VoIPController::UpdateCongestion, this), 0.0, 1.0);
  878. messageThread.Post(std::bind(&VoIPController::UpdateSignalBars, this), 1.0, 1.0);
  879. messageThread.Post(std::bind(&VoIPController::TickJitterBufferAngCongestionControl, this), 0.0, 0.1);
  880. }
  881. }
  882. }
  883. void VoIPController::SendStreamFlags(Stream& stream){
  884. BufferOutputStream s(5);
  885. s.WriteByte(stream.id);
  886. uint32_t flags=0;
  887. if(stream.enabled)
  888. flags|=STREAM_FLAG_ENABLED;
  889. if(stream.extraECEnabled)
  890. flags|=STREAM_FLAG_EXTRA_EC;
  891. s.WriteInt32(flags);
  892. LOGV("My stream state: id %u flags %u", (unsigned int)stream.id, (unsigned int)flags);
  893. Buffer buf(move(s));
  894. SendExtra(buf, EXTRA_TYPE_STREAM_FLAGS);
  895. }
  896. shared_ptr<VoIPController::Stream> VoIPController::GetStreamByType(int type, bool outgoing){
  897. shared_ptr<Stream> s;
  898. for(shared_ptr<Stream>& ss:(outgoing ? outgoingStreams : incomingStreams)){
  899. if(ss->type==type)
  900. return ss;
  901. }
  902. return s;
  903. }
  904. CellularCarrierInfo VoIPController::GetCarrierInfo(){
  905. #if defined(__APPLE__) && TARGET_OS_IOS
  906. return DarwinSpecific::GetCarrierInfo();
  907. #elif defined(__ANDROID__)
  908. CellularCarrierInfo carrier;
  909. jni::DoWithJNI([&carrier](JNIEnv* env){
  910. jmethodID getCarrierInfoMethod=env->GetStaticMethodID(jniUtilitiesClass, "getCarrierInfo", "()[Ljava/lang/String;");
  911. jobjectArray jinfo=(jobjectArray) env->CallStaticObjectMethod(jniUtilitiesClass, getCarrierInfoMethod);
  912. if(jinfo && env->GetArrayLength(jinfo)==4){
  913. carrier.name=jni::JavaStringToStdString(env, (jstring)env->GetObjectArrayElement(jinfo, 0));
  914. carrier.countryCode=jni::JavaStringToStdString(env, (jstring)env->GetObjectArrayElement(jinfo, 1));
  915. carrier.mcc=jni::JavaStringToStdString(env, (jstring)env->GetObjectArrayElement(jinfo, 2));
  916. carrier.mnc=jni::JavaStringToStdString(env, (jstring)env->GetObjectArrayElement(jinfo, 3));
  917. }else{
  918. LOGW("Failed to get carrier info");
  919. }
  920. });
  921. return carrier;
  922. #else
  923. return CellularCarrierInfo();
  924. #endif
  925. }
  926. #pragma mark - Audio I/O
  927. void VoIPController::AudioInputCallback(unsigned char* data, size_t length, unsigned char* secondaryData, size_t secondaryLength, void* param){
  928. ((VoIPController*)param)->HandleAudioInput(data, length, secondaryData, secondaryLength);
  929. }
  930. void VoIPController::HandleAudioInput(unsigned char *data, size_t len, unsigned char* secondaryData, size_t secondaryLen){
  931. if(stopping)
  932. return;
  933. unsentStreamPacketsHistory.Add(static_cast<unsigned int>(unsentStreamPackets));
  934. if(unsentStreamPacketsHistory.Average()>=maxUnsentStreamPackets && !videoSource){
  935. LOGW("Resetting stalled send queue");
  936. sendQueue.clear();
  937. unsentStreamPacketsHistory.Reset();
  938. unsentStreamPackets=0;
  939. }
  940. if(waitingForAcks || dontSendPackets>0 || ((unsigned int)unsentStreamPackets>=maxUnsentStreamPackets /*&& endpoints[currentEndpoint].type==Endpoint::Type::TCP_RELAY*/)){
  941. LOGV("waiting for queue, dropping outgoing audio packet, %d %d %d [%d]", (unsigned int)unsentStreamPackets, waitingForAcks, dontSendPackets, maxUnsentStreamPackets);
  942. return;
  943. }
  944. //LOGV("Audio packet size %u", (unsigned int)len);
  945. if(!receivedInitAck)
  946. return;
  947. BufferOutputStream pkt(1500);
  948. bool hasExtraFEC=peerVersion>=7 && secondaryData && secondaryLen && shittyInternetMode;
  949. unsigned char flags=(unsigned char) (len>255 || hasExtraFEC ? STREAM_DATA_FLAG_LEN16 : 0);
  950. pkt.WriteByte((unsigned char) (1 | flags)); // streamID + flags
  951. if(len>255 || hasExtraFEC){
  952. int16_t lenAndFlags=static_cast<int16_t>(len);
  953. if(hasExtraFEC)
  954. lenAndFlags|=STREAM_DATA_XFLAG_EXTRA_FEC;
  955. pkt.WriteInt16(lenAndFlags);
  956. }else{
  957. pkt.WriteByte((unsigned char) len);
  958. }
  959. pkt.WriteInt32(audioTimestampOut);
  960. pkt.WriteBytes(data, len);
  961. if(hasExtraFEC){
  962. Buffer ecBuf(secondaryLen);
  963. ecBuf.CopyFrom(secondaryData, 0, secondaryLen);
  964. ecAudioPackets.push_back(move(ecBuf));
  965. while(ecAudioPackets.size()>4)
  966. ecAudioPackets.erase(ecAudioPackets.begin());
  967. pkt.WriteByte((unsigned char)MIN(ecAudioPackets.size(), extraEcLevel));
  968. for(vector<Buffer>::iterator ecData=ecAudioPackets.begin()+MAX(0, (int)ecAudioPackets.size()-extraEcLevel);ecData!=ecAudioPackets.end();++ecData){
  969. pkt.WriteByte((unsigned char)ecData->Length());
  970. pkt.WriteBytes(*ecData);
  971. }
  972. }
  973. unsentStreamPackets++;
  974. size_t pktLength = pkt.GetLength();
  975. PendingOutgoingPacket p{
  976. /*.seq=*/GenerateOutSeq(),
  977. /*.type=*/PKT_STREAM_DATA,
  978. /*.len=*/pktLength,
  979. /*.data=*/Buffer(move(pkt)),
  980. /*.endpoint=*/0,
  981. };
  982. conctl->PacketSent(p.seq, p.len);
  983. SendOrEnqueuePacket(move(p));
  984. if(peerVersion<7 && secondaryData && secondaryLen && shittyInternetMode){
  985. Buffer ecBuf(secondaryLen);
  986. ecBuf.CopyFrom(secondaryData, 0, secondaryLen);
  987. ecAudioPackets.push_back(move(ecBuf));
  988. while(ecAudioPackets.size()>4)
  989. ecAudioPackets.erase(ecAudioPackets.begin());
  990. pkt=BufferOutputStream(1500);
  991. pkt.WriteByte(outgoingStreams[0]->id);
  992. pkt.WriteInt32(audioTimestampOut);
  993. pkt.WriteByte((unsigned char)MIN(ecAudioPackets.size(), extraEcLevel));
  994. for(vector<Buffer>::iterator ecData=ecAudioPackets.begin()+MAX(0, (int)ecAudioPackets.size()-extraEcLevel);ecData!=ecAudioPackets.end();++ecData){
  995. pkt.WriteByte((unsigned char)ecData->Length());
  996. pkt.WriteBytes(*ecData);
  997. }
  998. pktLength = pkt.GetLength();
  999. PendingOutgoingPacket p{
  1000. GenerateOutSeq(),
  1001. PKT_STREAM_EC,
  1002. pktLength,
  1003. Buffer(move(pkt)),
  1004. 0
  1005. };
  1006. SendOrEnqueuePacket(move(p));
  1007. }
  1008. audioTimestampOut+=outgoingStreams[0]->frameDuration;
  1009. #if defined(TGVOIP_USE_CALLBACK_AUDIO_IO)
  1010. if (audioPreprocDataCallback && preprocDecoder) {
  1011. int size=opus_decode(preprocDecoder, data, len, preprocBuffer, 4096, 0);
  1012. audioPreprocDataCallback(preprocBuffer, size);
  1013. }
  1014. #endif
  1015. }
  1016. void VoIPController::InitializeAudio(){
  1017. double t=GetCurrentTime();
  1018. shared_ptr<Stream> outgoingAudioStream=GetStreamByType(STREAM_TYPE_AUDIO, true);
  1019. LOGI("before create audio io");
  1020. audioIO=audio::AudioIO::Create(currentAudioInput, currentAudioOutput);
  1021. if(audioIO->Failed()){
  1022. lastError=ERROR_AUDIO_IO;
  1023. SetState(STATE_FAILED);
  1024. return;
  1025. }
  1026. audioInput=audioIO->GetInput();
  1027. audioOutput=audioIO->GetOutput();
  1028. #ifdef __ANDROID__
  1029. audio::AudioInputAndroid* androidInput=dynamic_cast<audio::AudioInputAndroid*>(audioInput);
  1030. if(androidInput){
  1031. unsigned int effects=androidInput->GetEnabledEffects();
  1032. if(!(effects & audio::AudioInputAndroid::EFFECT_AEC)){
  1033. config.enableAEC=true;
  1034. LOGI("Forcing software AEC because built-in is not good");
  1035. }
  1036. if(!(effects & audio::AudioInputAndroid::EFFECT_NS)){
  1037. config.enableNS=true;
  1038. LOGI("Forcing software NS because built-in is not good");
  1039. }
  1040. }
  1041. #elif defined(__APPLE__) && TARGET_OS_OSX
  1042. SetAudioOutputDuckingEnabled(macAudioDuckingEnabled);
  1043. #endif
  1044. LOGI("AEC: %d NS: %d AGC: %d", config.enableAEC, config.enableNS, config.enableAGC);
  1045. echoCanceller=new EchoCanceller(config.enableAEC, config.enableNS, config.enableAGC);
  1046. encoder=new OpusEncoder(audioInput, true);
  1047. encoder->SetCallback(AudioInputCallback, this);
  1048. encoder->SetOutputFrameDuration(outgoingAudioStream->frameDuration);
  1049. encoder->SetEchoCanceller(echoCanceller);
  1050. encoder->SetSecondaryEncoderEnabled(false);
  1051. if(config.enableVolumeControl){
  1052. encoder->AddAudioEffect(inputVolume.get());
  1053. }
  1054. #if defined(TGVOIP_USE_CALLBACK_AUDIO_IO)
  1055. dynamic_cast<audio::AudioInputCallback*>(audioInput)->SetDataCallback(audioInputDataCallback);
  1056. dynamic_cast<audio::AudioOutputCallback*>(audioOutput)->SetDataCallback(audioOutputDataCallback);
  1057. #endif
  1058. if(!audioOutput->IsInitialized()){
  1059. LOGE("Error initializing audio playback");
  1060. lastError=ERROR_AUDIO_IO;
  1061. SetState(STATE_FAILED);
  1062. return;
  1063. }
  1064. UpdateAudioBitrateLimit();
  1065. LOGI("Audio initialization took %f seconds", GetCurrentTime()-t);
  1066. }
  1067. void VoIPController::StartAudio(){
  1068. OnAudioOutputReady();
  1069. encoder->Start();
  1070. if(!micMuted){
  1071. audioInput->Start();
  1072. if(!audioInput->IsInitialized()){
  1073. LOGE("Error initializing audio capture");
  1074. lastError=ERROR_AUDIO_IO;
  1075. SetState(STATE_FAILED);
  1076. return;
  1077. }
  1078. }
  1079. }
  1080. void VoIPController::OnAudioOutputReady(){
  1081. LOGI("Audio I/O ready");
  1082. shared_ptr<Stream>& stm=incomingStreams[0];
  1083. stm->decoder=make_shared<OpusDecoder>(audioOutput, true, peerVersion>=6);
  1084. stm->decoder->SetEchoCanceller(echoCanceller);
  1085. if(config.enableVolumeControl){
  1086. stm->decoder->AddAudioEffect(outputVolume.get());
  1087. }
  1088. stm->decoder->SetJitterBuffer(stm->jitterBuffer);
  1089. stm->decoder->SetFrameDuration(stm->frameDuration);
  1090. stm->decoder->Start();
  1091. }
  1092. void VoIPController::UpdateAudioOutputState(){
  1093. bool areAnyAudioStreamsEnabled=false;
  1094. for(vector<shared_ptr<Stream>>::iterator s=incomingStreams.begin();s!=incomingStreams.end();++s){
  1095. if((*s)->type==STREAM_TYPE_AUDIO && (*s)->enabled)
  1096. areAnyAudioStreamsEnabled=true;
  1097. }
  1098. if(audioOutput){
  1099. LOGV("New audio output state: %d", areAnyAudioStreamsEnabled);
  1100. if(audioOutput->IsPlaying()!=areAnyAudioStreamsEnabled){
  1101. if(areAnyAudioStreamsEnabled)
  1102. audioOutput->Start();
  1103. else
  1104. audioOutput->Stop();
  1105. }
  1106. }
  1107. }
  1108. #pragma mark - Bandwidth management
  1109. void VoIPController::UpdateAudioBitrateLimit(){
  1110. if(encoder){
  1111. if(dataSavingMode || dataSavingRequestedByPeer){
  1112. maxBitrate=maxAudioBitrateSaving;
  1113. encoder->SetBitrate(initAudioBitrateSaving);
  1114. }else if(networkType==NET_TYPE_GPRS){
  1115. maxBitrate=maxAudioBitrateGPRS;
  1116. encoder->SetBitrate(initAudioBitrateGPRS);
  1117. }else if(networkType==NET_TYPE_EDGE){
  1118. maxBitrate=maxAudioBitrateEDGE;
  1119. encoder->SetBitrate(initAudioBitrateEDGE);
  1120. }else{
  1121. maxBitrate=maxAudioBitrate;
  1122. encoder->SetBitrate(initAudioBitrate);
  1123. }
  1124. encoder->SetVadMode(dataSavingMode || dataSavingRequestedByPeer);
  1125. if(echoCanceller)
  1126. echoCanceller->SetVoiceDetectionEnabled(dataSavingMode || dataSavingRequestedByPeer);
  1127. }
  1128. }
  1129. void VoIPController::UpdateDataSavingState(){
  1130. if(config.dataSaving==DATA_SAVING_ALWAYS){
  1131. dataSavingMode=true;
  1132. }else if(config.dataSaving==DATA_SAVING_MOBILE){
  1133. dataSavingMode=networkType==NET_TYPE_GPRS || networkType==NET_TYPE_EDGE ||
  1134. networkType==NET_TYPE_3G || networkType==NET_TYPE_HSPA || networkType==NET_TYPE_LTE || networkType==NET_TYPE_OTHER_MOBILE;
  1135. }else{
  1136. dataSavingMode=false;
  1137. }
  1138. LOGI("update data saving mode, config %d, enabled %d, reqd by peer %d", config.dataSaving, dataSavingMode, dataSavingRequestedByPeer);
  1139. }
  1140. #pragma mark - Networking & crypto
  1141. uint32_t VoIPController::GenerateOutSeq(){
  1142. return seq++;
  1143. }
  1144. void VoIPController::WritePacketHeader(uint32_t pseq, BufferOutputStream *s, unsigned char type, uint32_t length){
  1145. uint32_t acks=0;
  1146. int i;
  1147. for(i=0;i<32;i++){
  1148. if(recvPacketTimes[i]>0)
  1149. acks|=1;
  1150. if(i<31)
  1151. acks<<=1;
  1152. }
  1153. if(peerVersion>=8 || (!peerVersion && connectionMaxLayer>=92)){
  1154. s->WriteByte(type);
  1155. s->WriteInt32(lastRemoteSeq);
  1156. s->WriteInt32(pseq);
  1157. s->WriteInt32(acks);
  1158. MutexGuard m(queuedPacketsMutex);
  1159. unsigned char flags;
  1160. if(currentExtras.empty()){
  1161. flags=0;
  1162. }else{
  1163. flags=XPFLAG_HAS_EXTRA;
  1164. }
  1165. shared_ptr<Stream> videoStream=GetStreamByType(STREAM_TYPE_VIDEO, false);
  1166. if(peerVersion>=9 && videoStream && videoStream->enabled)
  1167. flags |= XPFLAG_HAS_RECV_TS;
  1168. s->WriteByte(flags);
  1169. if(!currentExtras.empty()){
  1170. s->WriteByte(static_cast<unsigned char>(currentExtras.size()));
  1171. for(vector<UnacknowledgedExtraData>::iterator x=currentExtras.begin(); x!=currentExtras.end(); ++x){
  1172. LOGV("Writing extra into header: type %u, length %d", x->type, int(x->data.Length()));
  1173. assert(x->data.Length()<=254);
  1174. s->WriteByte(static_cast<unsigned char>(x->data.Length()+1));
  1175. s->WriteByte(x->type);
  1176. s->WriteBytes(*x->data, x->data.Length());
  1177. if(x->firstContainingSeq==0)
  1178. x->firstContainingSeq=pseq;
  1179. }
  1180. }
  1181. if(peerVersion>=9 && videoStream && videoStream->enabled){
  1182. s->WriteInt32((uint32_t)((lastRecvPacketTime-connectionInitTime)*1000.0));
  1183. }
  1184. }else{
  1185. if(state==STATE_WAIT_INIT || state==STATE_WAIT_INIT_ACK){
  1186. s->WriteInt32(TLID_DECRYPTED_AUDIO_BLOCK);
  1187. int64_t randomID;
  1188. crypto.rand_bytes((uint8_t *) &randomID, 8);
  1189. s->WriteInt64(randomID);
  1190. unsigned char randBytes[7];
  1191. crypto.rand_bytes(randBytes, 7);
  1192. s->WriteByte(7);
  1193. s->WriteBytes(randBytes, 7);
  1194. uint32_t pflags=PFLAG_HAS_RECENT_RECV | PFLAG_HAS_SEQ;
  1195. if(length>0)
  1196. pflags|=PFLAG_HAS_DATA;
  1197. if(state==STATE_WAIT_INIT || state==STATE_WAIT_INIT_ACK){
  1198. pflags|=PFLAG_HAS_CALL_ID | PFLAG_HAS_PROTO;
  1199. }
  1200. pflags|=((uint32_t) type) << 24;
  1201. s->WriteInt32(pflags);
  1202. if(pflags & PFLAG_HAS_CALL_ID){
  1203. s->WriteBytes(callID, 16);
  1204. }
  1205. s->WriteInt32(lastRemoteSeq);
  1206. s->WriteInt32(pseq);
  1207. s->WriteInt32(acks);
  1208. if(pflags & PFLAG_HAS_PROTO){
  1209. s->WriteInt32(PROTOCOL_NAME);
  1210. }
  1211. if(length>0){
  1212. if(length<=253){
  1213. s->WriteByte((unsigned char) length);
  1214. }else{
  1215. s->WriteByte(254);
  1216. s->WriteByte((unsigned char) (length & 0xFF));
  1217. s->WriteByte((unsigned char) ((length >> 8) & 0xFF));
  1218. s->WriteByte((unsigned char) ((length >> 16) & 0xFF));
  1219. }
  1220. }
  1221. }else{
  1222. s->WriteInt32(TLID_SIMPLE_AUDIO_BLOCK);
  1223. int64_t randomID;
  1224. crypto.rand_bytes((uint8_t *) &randomID, 8);
  1225. s->WriteInt64(randomID);
  1226. unsigned char randBytes[7];
  1227. crypto.rand_bytes(randBytes, 7);
  1228. s->WriteByte(7);
  1229. s->WriteBytes(randBytes, 7);
  1230. uint32_t lenWithHeader=length+13;
  1231. if(lenWithHeader>0){
  1232. if(lenWithHeader<=253){
  1233. s->WriteByte((unsigned char) lenWithHeader);
  1234. }else{
  1235. s->WriteByte(254);
  1236. s->WriteByte((unsigned char) (lenWithHeader & 0xFF));
  1237. s->WriteByte((unsigned char) ((lenWithHeader >> 8) & 0xFF));
  1238. s->WriteByte((unsigned char) ((lenWithHeader >> 16) & 0xFF));
  1239. }
  1240. }
  1241. s->WriteByte(type);
  1242. s->WriteInt32(lastRemoteSeq);
  1243. s->WriteInt32(pseq);
  1244. s->WriteInt32(acks);
  1245. if(peerVersion>=6){
  1246. MutexGuard m(queuedPacketsMutex);
  1247. if(currentExtras.empty()){
  1248. s->WriteByte(0);
  1249. }else{
  1250. s->WriteByte(XPFLAG_HAS_EXTRA);
  1251. s->WriteByte(static_cast<unsigned char>(currentExtras.size()));
  1252. for(vector<UnacknowledgedExtraData>::iterator x=currentExtras.begin(); x!=currentExtras.end(); ++x){
  1253. LOGV("Writing extra into header: type %u, length %d", x->type, int(x->data.Length()));
  1254. assert(x->data.Length()<=254);
  1255. s->WriteByte(static_cast<unsigned char>(x->data.Length()+1));
  1256. s->WriteByte(x->type);
  1257. s->WriteBytes(*x->data, x->data.Length());
  1258. if(x->firstContainingSeq==0)
  1259. x->firstContainingSeq=pseq;
  1260. }
  1261. }
  1262. }
  1263. }
  1264. }
  1265. MutexGuard m(queuedPacketsMutex);
  1266. recentOutgoingPackets.push_back(RecentOutgoingPacket{
  1267. pseq,
  1268. 0,
  1269. GetCurrentTime(),
  1270. 0,
  1271. type,
  1272. length
  1273. });
  1274. while(recentOutgoingPackets.size()>MAX_RECENT_PACKETS){
  1275. recentOutgoingPackets.erase(recentOutgoingPackets.begin());
  1276. }
  1277. lastSentSeq=pseq;
  1278. //LOGI("packet header size %d", s->GetLength());
  1279. }
  1280. void VoIPController::SendInit(){
  1281. {
  1282. MutexGuard m(endpointsMutex);
  1283. uint32_t initSeq=GenerateOutSeq();
  1284. for(pair<const int64_t, Endpoint>& _e:endpoints){
  1285. Endpoint& e=_e.second;
  1286. if(e.type==Endpoint::Type::TCP_RELAY && !useTCP)
  1287. continue;
  1288. BufferOutputStream out(1024);
  1289. out.WriteInt32(PROTOCOL_VERSION);
  1290. out.WriteInt32(MIN_PROTOCOL_VERSION);
  1291. uint32_t flags=0;
  1292. if(config.enableCallUpgrade)
  1293. flags|=INIT_FLAG_GROUP_CALLS_SUPPORTED;
  1294. if(config.enableVideoReceive)
  1295. flags|=INIT_FLAG_VIDEO_RECV_SUPPORTED;
  1296. if(config.enableVideoSend)
  1297. flags|=INIT_FLAG_VIDEO_SEND_SUPPORTED;
  1298. if(dataSavingMode)
  1299. flags|=INIT_FLAG_DATA_SAVING_ENABLED;
  1300. out.WriteInt32(flags);
  1301. if(connectionMaxLayer<74){
  1302. out.WriteByte(2); // audio codecs count
  1303. out.WriteByte(CODEC_OPUS_OLD);
  1304. out.WriteByte(0);
  1305. out.WriteByte(0);
  1306. out.WriteByte(0);
  1307. out.WriteInt32(CODEC_OPUS);
  1308. out.WriteByte(0); // video codecs count (decode)
  1309. out.WriteByte(0); // video codecs count (encode)
  1310. }else{
  1311. out.WriteByte(1);
  1312. out.WriteInt32(CODEC_OPUS);
  1313. vector<uint32_t> decoders=config.enableVideoReceive ? video::VideoRenderer::GetAvailableDecoders() : vector<uint32_t>();
  1314. vector<uint32_t> encoders=config.enableVideoSend ? video::VideoSource::GetAvailableEncoders() : vector<uint32_t>();
  1315. out.WriteByte((unsigned char)decoders.size());
  1316. for(uint32_t id:decoders){
  1317. out.WriteInt32(id);
  1318. }
  1319. if(connectionMaxLayer>=92)
  1320. out.WriteByte((unsigned char)video::VideoRenderer::GetMaximumResolution());
  1321. else
  1322. out.WriteByte(0);
  1323. /*out.WriteByte((unsigned char)encoders.size());
  1324. for(uint32_t id:encoders){
  1325. out.WriteInt32(id);
  1326. }*/
  1327. }
  1328. size_t outLength = out.GetLength();
  1329. SendOrEnqueuePacket(PendingOutgoingPacket{
  1330. /*.seq=*/initSeq,
  1331. /*.type=*/PKT_INIT,
  1332. /*.len=*/outLength,
  1333. /*.data=*/Buffer(move(out)),
  1334. /*.endpoint=*/e.id
  1335. });
  1336. }
  1337. }
  1338. if(state==STATE_WAIT_INIT)
  1339. SetState(STATE_WAIT_INIT_ACK);
  1340. messageThread.Post([this]{
  1341. if(state==STATE_WAIT_INIT_ACK){
  1342. SendInit();
  1343. }
  1344. }, 0.5);
  1345. }
  1346. void VoIPController::InitUDPProxy(){
  1347. if(realUdpSocket!=udpSocket){
  1348. udpSocket->Close();
  1349. delete udpSocket;
  1350. udpSocket=realUdpSocket;
  1351. }
  1352. char sbuf[128];
  1353. snprintf(sbuf, sizeof(sbuf), "%s:%u", proxyAddress.c_str(), proxyPort);
  1354. string proxyHostPort(sbuf);
  1355. if(proxyHostPort==lastTestedProxyServer && !proxySupportsUDP){
  1356. LOGI("Proxy does not support UDP - using UDP directly instead");
  1357. ResetUdpAvailability();
  1358. return;
  1359. }
  1360. NetworkSocket* tcp=NetworkSocket::Create(PROTO_TCP);
  1361. tcp->Connect(resolvedProxyAddress, proxyPort);
  1362. vector<NetworkSocket*> writeSockets;
  1363. vector<NetworkSocket*> readSockets;
  1364. vector<NetworkSocket*> errorSockets;
  1365. while(!tcp->IsFailed() && !tcp->IsReadyToSend()){
  1366. writeSockets.push_back(tcp);
  1367. if(!NetworkSocket::Select(readSockets, writeSockets, errorSockets, selectCanceller)){
  1368. LOGW("Select canceled while waiting for proxy control socket to connect");
  1369. delete tcp;
  1370. return;
  1371. }
  1372. }
  1373. LOGV("UDP proxy control socket ready to send");
  1374. NetworkSocketSOCKS5Proxy* udpProxy=new NetworkSocketSOCKS5Proxy(tcp, realUdpSocket, proxyUsername, proxyPassword);
  1375. udpProxy->OnReadyToSend();
  1376. writeSockets.clear();
  1377. while(!udpProxy->IsFailed() && !tcp->IsFailed() && !udpProxy->IsReadyToSend()){
  1378. readSockets.clear();
  1379. errorSockets.clear();
  1380. readSockets.push_back(tcp);
  1381. errorSockets.push_back(tcp);
  1382. if(!NetworkSocket::Select(readSockets, writeSockets, errorSockets, selectCanceller)){
  1383. LOGW("Select canceled while waiting for UDP proxy to initialize");
  1384. delete udpProxy;
  1385. return;
  1386. }
  1387. if(!readSockets.empty())
  1388. udpProxy->OnReadyToReceive();
  1389. }
  1390. LOGV("UDP proxy initialized");
  1391. if(udpProxy->IsFailed()){
  1392. udpProxy->Close();
  1393. delete udpProxy;
  1394. proxySupportsUDP=false;
  1395. }else{
  1396. udpSocket=udpProxy;
  1397. }
  1398. ResetUdpAvailability();
  1399. }
  1400. void VoIPController::RunRecvThread(){
  1401. LOGI("Receive thread starting");
  1402. Buffer buffer(1500);
  1403. NetworkPacket packet={0};
  1404. if(proxyProtocol==PROXY_SOCKS5){
  1405. resolvedProxyAddress=NetworkSocket::ResolveDomainName(proxyAddress);
  1406. if(!resolvedProxyAddress){
  1407. LOGW("Error resolving proxy address %s", proxyAddress.c_str());
  1408. SetState(STATE_FAILED);
  1409. return;
  1410. }
  1411. }else{
  1412. udpConnectivityState=UDP_PING_PENDING;
  1413. udpPingTimeoutID=messageThread.Post(std::bind(&VoIPController::SendUdpPings, this), 0.0, 0.5);
  1414. }
  1415. while(runReceiver){
  1416. if(proxyProtocol==PROXY_SOCKS5 && needReInitUdpProxy){
  1417. InitUDPProxy();
  1418. needReInitUdpProxy=false;
  1419. }
  1420. packet.data=*buffer;
  1421. packet.length=buffer.Length();
  1422. vector<NetworkSocket*> readSockets;
  1423. vector<NetworkSocket*> errorSockets;
  1424. vector<NetworkSocket*> writeSockets;
  1425. readSockets.push_back(udpSocket);
  1426. errorSockets.push_back(realUdpSocket);
  1427. if(!realUdpSocket->IsReadyToSend())
  1428. writeSockets.push_back(realUdpSocket);
  1429. {
  1430. MutexGuard m(endpointsMutex);
  1431. for(pair<const int64_t, Endpoint>& _e:endpoints){
  1432. const Endpoint& e=_e.second;
  1433. if(e.type==Endpoint::Type::TCP_RELAY){
  1434. if(e.socket){
  1435. readSockets.push_back(e.socket);
  1436. errorSockets.push_back(e.socket);
  1437. if(!e.socket->IsReadyToSend()){
  1438. NetworkSocketSOCKS5Proxy* proxy=dynamic_cast<NetworkSocketSOCKS5Proxy*>(e.socket);
  1439. if(!proxy || proxy->NeedSelectForSending())
  1440. writeSockets.push_back(e.socket);
  1441. }
  1442. }
  1443. }
  1444. }
  1445. }
  1446. {
  1447. MutexGuard m(socketSelectMutex);
  1448. bool selRes=NetworkSocket::Select(readSockets, writeSockets, errorSockets, selectCanceller);
  1449. if(!selRes){
  1450. LOGV("Select canceled");
  1451. continue;
  1452. }
  1453. }
  1454. if(!runReceiver)
  1455. return;
  1456. if(!errorSockets.empty()){
  1457. if(find(errorSockets.begin(), errorSockets.end(), realUdpSocket)!=errorSockets.end()){
  1458. LOGW("UDP socket failed");
  1459. SetState(STATE_FAILED);
  1460. return;
  1461. }
  1462. MutexGuard m(endpointsMutex);
  1463. for(NetworkSocket*& socket:errorSockets){
  1464. for(pair<const int64_t, Endpoint>& _e:endpoints){
  1465. Endpoint& e=_e.second;
  1466. if(e.socket && e.socket==socket){
  1467. e.socket->Close();
  1468. delete e.socket;
  1469. e.socket=NULL;
  1470. LOGI("Closing failed TCP socket for %s:%u", e.GetAddress().ToString().c_str(), e.port);
  1471. }
  1472. }
  1473. }
  1474. continue;
  1475. }
  1476. for(NetworkSocket*& socket:readSockets){
  1477. //while(packet.length){
  1478. packet.length=1500;
  1479. socket->Receive(&packet);
  1480. if(!packet.address){
  1481. LOGE("Packet has null address. This shouldn't happen.");
  1482. continue;
  1483. }
  1484. size_t len=packet.length;
  1485. if(!len){
  1486. LOGE("Packet has zero length.");
  1487. continue;
  1488. }
  1489. //LOGV("Received %d bytes from %s:%d at %.5lf", len, packet.address->ToString().c_str(), packet.port, GetCurrentTime());
  1490. int64_t srcEndpointID=0;
  1491. IPv4Address *src4=dynamic_cast<IPv4Address *>(packet.address);
  1492. if(src4){
  1493. MutexGuard m(endpointsMutex);
  1494. for(pair<const int64_t, Endpoint>& _e:endpoints){
  1495. const Endpoint& e=_e.second;
  1496. if(e.address==*src4 && e.port==packet.port){
  1497. if((e.type!=Endpoint::Type::TCP_RELAY && packet.protocol==PROTO_UDP) || (e.type==Endpoint::Type::TCP_RELAY && packet.protocol==PROTO_TCP)){
  1498. srcEndpointID=e.id;
  1499. break;
  1500. }
  1501. }
  1502. }
  1503. if(!srcEndpointID && packet.protocol==PROTO_UDP){
  1504. try{
  1505. Endpoint &p2p=GetEndpointByType(Endpoint::Type::UDP_P2P_INET);
  1506. if(p2p.rtts[0]==0.0 && p2p.address.PrefixMatches(24, *packet.address)){
  1507. LOGD("Packet source matches p2p endpoint partially: %s:%u", packet.address->ToString().c_str(), packet.port);
  1508. srcEndpointID=p2p.id;
  1509. }
  1510. }catch(out_of_range&){}
  1511. }
  1512. }else{
  1513. IPv6Address *src6=dynamic_cast<IPv6Address *>(packet.address);
  1514. if(src6){
  1515. MutexGuard m(endpointsMutex);
  1516. for(pair<const int64_t, Endpoint> &_e:endpoints){
  1517. const Endpoint& e=_e.second;
  1518. if(e.v6address==*src6 && e.port==packet.port && e.IsIPv6Only()){
  1519. if((e.type!=Endpoint::Type::TCP_RELAY && packet.protocol==PROTO_UDP) || (e.type==Endpoint::Type::TCP_RELAY && packet.protocol==PROTO_TCP)){
  1520. srcEndpointID=e.id;
  1521. break;
  1522. }
  1523. }
  1524. }
  1525. }
  1526. }
  1527. if(!srcEndpointID){
  1528. LOGW("Received a packet from unknown source %s:%u", packet.address->ToString().c_str(), packet.port);
  1529. continue;
  1530. }
  1531. if(len<=0){
  1532. //LOGW("error receiving: %d / %s", errno, strerror(errno));
  1533. continue;
  1534. }
  1535. if(IS_MOBILE_NETWORK(networkType))
  1536. stats.bytesRecvdMobile+=(uint64_t) len;
  1537. else
  1538. stats.bytesRecvdWifi+=(uint64_t) len;
  1539. try{
  1540. ProcessIncomingPacket(packet, endpoints.at(srcEndpointID));
  1541. }catch(out_of_range& x){
  1542. LOGW("Error parsing packet: %s", x.what());
  1543. }
  1544. //}
  1545. }
  1546. for(vector<PendingOutgoingPacket>::iterator opkt=sendQueue.begin();opkt!=sendQueue.end();){
  1547. Endpoint* endpoint=GetEndpointForPacket(*opkt);
  1548. if(!endpoint){
  1549. opkt=sendQueue.erase(opkt);
  1550. LOGE("SendQueue contained packet for nonexistent endpoint");
  1551. continue;
  1552. }
  1553. bool canSend;
  1554. if(endpoint->type!=Endpoint::Type::TCP_RELAY)
  1555. canSend=realUdpSocket->IsReadyToSend();
  1556. else
  1557. canSend=endpoint->socket && endpoint->socket->IsReadyToSend();
  1558. if(canSend){
  1559. LOGI("Sending queued packet");
  1560. SendOrEnqueuePacket(move(*opkt), false);
  1561. opkt=sendQueue.erase(opkt);
  1562. }else{
  1563. ++opkt;
  1564. }
  1565. }
  1566. }
  1567. LOGI("=== recv thread exiting ===");
  1568. }
  1569. bool VoIPController::WasOutgoingPacketAcknowledged(uint32_t seq){
  1570. RecentOutgoingPacket* pkt=GetRecentOutgoingPacket(seq);
  1571. if(!pkt)
  1572. return false;
  1573. return pkt->ackTime!=0.0;
  1574. }
  1575. VoIPController::RecentOutgoingPacket *VoIPController::GetRecentOutgoingPacket(uint32_t seq){
  1576. for(RecentOutgoingPacket& opkt:recentOutgoingPackets){
  1577. if(opkt.seq==seq){
  1578. return &opkt;
  1579. }
  1580. }
  1581. return NULL;
  1582. }
  1583. void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint& srcEndpoint){
  1584. unsigned char *buffer=packet.data;
  1585. size_t len=packet.length;
  1586. BufferInputStream in(buffer, (size_t) len);
  1587. bool hasPeerTag=false;
  1588. if(peerVersion<9 || srcEndpoint.type==Endpoint::Type::UDP_RELAY || srcEndpoint.type==Endpoint::Type::TCP_RELAY){
  1589. if(memcmp(buffer, srcEndpoint.type==Endpoint::Type::UDP_RELAY || srcEndpoint.type==Endpoint::Type::TCP_RELAY ? (void *) srcEndpoint.peerTag : (void *) callID, 16)!=0){
  1590. LOGW("Received packet has wrong peerTag");
  1591. return;
  1592. }
  1593. in.Seek(16);
  1594. hasPeerTag=true;
  1595. }
  1596. if(in.Remaining()>=16 && (srcEndpoint.type==Endpoint::Type::UDP_RELAY || srcEndpoint.type==Endpoint::Type::TCP_RELAY)
  1597. && *reinterpret_cast<uint64_t *>(buffer+16)==0xFFFFFFFFFFFFFFFFLL && *reinterpret_cast<uint32_t *>(buffer+24)==0xFFFFFFFF){
  1598. // relay special request response
  1599. in.Seek(16+12);
  1600. uint32_t tlid=(uint32_t) in.ReadInt32();
  1601. if(tlid==TLID_UDP_REFLECTOR_SELF_INFO){
  1602. if(srcEndpoint.type==Endpoint::Type::UDP_RELAY /*&& udpConnectivityState==UDP_PING_SENT*/ && in.Remaining()>=32){
  1603. int32_t date=in.ReadInt32();
  1604. int64_t queryID=in.ReadInt64();
  1605. unsigned char myIP[16];
  1606. in.ReadBytes(myIP, 16);
  1607. int32_t myPort=in.ReadInt32();
  1608. //udpConnectivityState=UDP_AVAILABLE;
  1609. LOGV("Received UDP ping reply from %s:%d: date=%d, queryID=%ld, my IP=%s, my port=%d", srcEndpoint.address.ToString().c_str(), srcEndpoint.port, date, (long int) queryID, IPv4Address(*reinterpret_cast<uint32_t *>(myIP+12)).ToString().c_str(), myPort);
  1610. srcEndpoint.udpPongCount++;
  1611. if(srcEndpoint.IsIPv6Only() && !didSendIPv6Endpoint){
  1612. IPv6Address realAddr(myIP);
  1613. if(realAddr==myIPv6){
  1614. LOGI("Public IPv6 matches local address");
  1615. useIPv6=true;
  1616. if(allowP2p){
  1617. didSendIPv6Endpoint=true;
  1618. BufferOutputStream o(18);
  1619. o.WriteBytes(myIP, 16);
  1620. o.WriteInt16(udpSocket->GetLocalPort());
  1621. Buffer b(move(o));
  1622. SendExtra(b, EXTRA_TYPE_IPV6_ENDPOINT);
  1623. }
  1624. }
  1625. }
  1626. }
  1627. }else if(tlid==TLID_UDP_REFLECTOR_PEER_INFO){
  1628. if(in.Remaining()>=16){
  1629. MutexGuard _m(endpointsMutex);
  1630. uint32_t myAddr=(uint32_t) in.ReadInt32();
  1631. uint32_t myPort=(uint32_t) in.ReadInt32();
  1632. uint32_t peerAddr=(uint32_t) in.ReadInt32();
  1633. uint32_t peerPort=(uint32_t) in.ReadInt32();
  1634. constexpr int64_t p2pID=(int64_t) (FOURCC('P', '2', 'P', '4')) << 32;
  1635. constexpr int64_t lanID=(int64_t) (FOURCC('L', 'A', 'N', '4')) << 32;
  1636. if(currentEndpoint==p2pID || currentEndpoint==lanID)
  1637. currentEndpoint=preferredRelay;
  1638. endpoints.erase(lanID);
  1639. IPv4Address _peerAddr(peerAddr);
  1640. IPv6Address emptyV6(string("::0"));
  1641. unsigned char peerTag[16];
  1642. LOGW("Received reflector peer info, my=%s:%u, peer=%s:%u", IPv4Address(myAddr).ToString().c_str(), myPort, IPv4Address(peerAddr).ToString().c_str(), peerPort);
  1643. if(waitingForRelayPeerInfo){
  1644. Endpoint p2p(p2pID, (uint16_t) peerPort, _peerAddr, emptyV6, Endpoint::Type::UDP_P2P_INET, peerTag);
  1645. endpoints[p2pID]=p2p;
  1646. if(myAddr==peerAddr){
  1647. LOGW("Detected LAN");
  1648. IPv4Address lanAddr(0);
  1649. udpSocket->GetLocalInterfaceInfo(&lanAddr, NULL);
  1650. BufferOutputStream pkt(8);
  1651. pkt.WriteInt32(lanAddr.GetAddress());
  1652. pkt.WriteInt32(udpSocket->GetLocalPort());
  1653. if(peerVersion<6){
  1654. SendPacketReliably(PKT_LAN_ENDPOINT, pkt.GetBuffer(), pkt.GetLength(), 0.5, 10);
  1655. }else{
  1656. Buffer buf(move(pkt));
  1657. SendExtra(buf, EXTRA_TYPE_LAN_ENDPOINT);
  1658. }
  1659. }
  1660. waitingForRelayPeerInfo=false;
  1661. }
  1662. }
  1663. }else{
  1664. LOGV("Received relay response with unknown tl id: 0x%08X", tlid);
  1665. }
  1666. return;
  1667. }
  1668. if(in.Remaining()<40){
  1669. LOGV("Received packet is too small");
  1670. return;
  1671. }
  1672. bool retryWith2=false;
  1673. size_t innerLen=0;
  1674. bool shortFormat=peerVersion>=8 || (!peerVersion && connectionMaxLayer>=92);
  1675. if(!useMTProto2){
  1676. unsigned char fingerprint[8], msgHash[16];
  1677. in.ReadBytes(fingerprint, 8);
  1678. in.ReadBytes(msgHash, 16);
  1679. unsigned char key[32], iv[32];
  1680. KDF(msgHash, isOutgoing ? 8 : 0, key, iv);
  1681. unsigned char aesOut[MSC_STACK_FALLBACK(in.Remaining(), 1500)];
  1682. if(in.Remaining()>sizeof(aesOut))
  1683. return;
  1684. crypto.aes_ige_decrypt((unsigned char *) buffer+in.GetOffset(), aesOut, in.Remaining(), key, iv);
  1685. BufferInputStream _in(aesOut, in.Remaining());
  1686. unsigned char sha[SHA1_LENGTH];
  1687. uint32_t _len=(uint32_t) _in.ReadInt32();
  1688. if(_len>_in.Remaining())
  1689. _len=(uint32_t) _in.Remaining();
  1690. crypto.sha1((uint8_t *) (aesOut), (size_t) (_len+4), sha);
  1691. if(memcmp(msgHash, sha+(SHA1_LENGTH-16), 16)!=0){
  1692. LOGW("Received packet has wrong hash after decryption");
  1693. if(state==STATE_WAIT_INIT || state==STATE_WAIT_INIT_ACK)
  1694. retryWith2=true;
  1695. else
  1696. return;
  1697. }else{
  1698. memcpy(buffer+in.GetOffset(), aesOut, in.Remaining());
  1699. in.ReadInt32();
  1700. }
  1701. }
  1702. if(useMTProto2 || retryWith2){
  1703. if(hasPeerTag)
  1704. in.Seek(16); // peer tag
  1705. unsigned char fingerprint[8], msgKey[16];
  1706. if(!shortFormat){
  1707. in.ReadBytes(fingerprint, 8);
  1708. if(memcmp(fingerprint, keyFingerprint, 8)!=0){
  1709. LOGW("Received packet has wrong key fingerprint");
  1710. return;
  1711. }
  1712. }
  1713. in.ReadBytes(msgKey, 16);
  1714. unsigned char decrypted[1500];
  1715. unsigned char aesKey[32], aesIv[32];
  1716. KDF2(msgKey, isOutgoing ? 8 : 0, aesKey, aesIv);
  1717. size_t decryptedLen=in.Remaining();
  1718. if(decryptedLen>sizeof(decrypted))
  1719. return;
  1720. if(decryptedLen%16!=0){
  1721. LOGW("wrong decrypted length");
  1722. return;
  1723. }
  1724. crypto.aes_ige_decrypt(packet.data+in.GetOffset(), decrypted, decryptedLen, aesKey, aesIv);
  1725. in=BufferInputStream(decrypted, decryptedLen);
  1726. //LOGD("received packet length: %d", in.ReadInt32());
  1727. size_t sizeSize=shortFormat ? 0 : 4;
  1728. BufferOutputStream buf(decryptedLen+32);
  1729. size_t x=isOutgoing ? 8 : 0;
  1730. buf.WriteBytes(encryptionKey+88+x, 32);
  1731. buf.WriteBytes(decrypted+sizeSize, decryptedLen-sizeSize);
  1732. unsigned char msgKeyLarge[32];
  1733. crypto.sha256(buf.GetBuffer(), buf.GetLength(), msgKeyLarge);
  1734. if(memcmp(msgKey, msgKeyLarge+8, 16)!=0){
  1735. LOGW("Received packet has wrong hash");
  1736. return;
  1737. }
  1738. innerLen=(uint32_t) (shortFormat ? in.ReadInt16() : in.ReadInt32());
  1739. if(innerLen>decryptedLen-sizeSize){
  1740. LOGW("Received packet has wrong inner length (%d with total of %u)", (int) innerLen, (unsigned int) decryptedLen);
  1741. return;
  1742. }
  1743. if(decryptedLen-innerLen<(shortFormat ? 16 : 12)){
  1744. LOGW("Received packet has too little padding (%u)", (unsigned int) (decryptedLen-innerLen));
  1745. return;
  1746. }
  1747. memcpy(buffer, decrypted+(shortFormat ? 2 : 4), innerLen);
  1748. in=BufferInputStream(buffer, (size_t) innerLen);
  1749. if(retryWith2){
  1750. LOGD("Successfully decrypted packet in MTProto2.0 fallback, upgrading");
  1751. useMTProto2=true;
  1752. }
  1753. }
  1754. lastRecvPacketTime=GetCurrentTime();
  1755. if(state==STATE_RECONNECTING){
  1756. LOGI("Received a valid packet while reconnecting - setting state to established");
  1757. SetState(STATE_ESTABLISHED);
  1758. }
  1759. if(srcEndpoint.type==Endpoint::Type::UDP_P2P_INET && !srcEndpoint.IsIPv6Only()){
  1760. if(srcEndpoint.port!=packet.port || srcEndpoint.address!=*packet.address){
  1761. IPv4Address *v4=dynamic_cast<IPv4Address *>(packet.address);
  1762. if(v4){
  1763. LOGI("Incoming packet was decrypted successfully, changing P2P endpoint to %s:%u", packet.address->ToString().c_str(), packet.port);
  1764. srcEndpoint.address=*v4;
  1765. srcEndpoint.port=packet.port;
  1766. }
  1767. }
  1768. }
  1769. /*decryptedAudioBlock random_id:long random_bytes:string flags:# voice_call_id:flags.2?int128 in_seq_no:flags.4?int out_seq_no:flags.4?int
  1770. * recent_received_mask:flags.5?int proto:flags.3?int extra:flags.1?string raw_data:flags.0?string = DecryptedAudioBlock
  1771. simpleAudioBlock random_id:long random_bytes:string raw_data:string = DecryptedAudioBlock;
  1772. */
  1773. uint32_t ackId, pseq, acks;
  1774. unsigned char type, pflags;
  1775. size_t packetInnerLen=0;
  1776. if(shortFormat){
  1777. type=in.ReadByte();
  1778. ackId=(uint32_t) in.ReadInt32();
  1779. pseq=(uint32_t) in.ReadInt32();
  1780. acks=(uint32_t) in.ReadInt32();
  1781. pflags=in.ReadByte();
  1782. packetInnerLen=innerLen-14;
  1783. }else{
  1784. uint32_t tlid=(uint32_t) in.ReadInt32();
  1785. if(tlid==TLID_DECRYPTED_AUDIO_BLOCK){
  1786. in.ReadInt64(); // random id
  1787. uint32_t randLen=(uint32_t) in.ReadTlLength();
  1788. in.Seek(in.GetOffset()+randLen+pad4(randLen));
  1789. uint32_t flags=(uint32_t) in.ReadInt32();
  1790. type=(unsigned char) ((flags >> 24) & 0xFF);
  1791. if(!(flags & PFLAG_HAS_SEQ && flags & PFLAG_HAS_RECENT_RECV)){
  1792. LOGW("Received packet doesn't have PFLAG_HAS_SEQ, PFLAG_HAS_RECENT_RECV, or both");
  1793. return;
  1794. }
  1795. if(flags & PFLAG_HAS_CALL_ID){
  1796. unsigned char pktCallID[16];
  1797. in.ReadBytes(pktCallID, 16);
  1798. if(memcmp(pktCallID, callID, 16)!=0){
  1799. LOGW("Received packet has wrong call id");
  1800. lastError=ERROR_UNKNOWN;
  1801. SetState(STATE_FAILED);
  1802. return;
  1803. }
  1804. }
  1805. ackId=(uint32_t) in.ReadInt32();
  1806. pseq=(uint32_t) in.ReadInt32();
  1807. acks=(uint32_t) in.ReadInt32();
  1808. if(flags & PFLAG_HAS_PROTO){
  1809. uint32_t proto=(uint32_t) in.ReadInt32();
  1810. if(proto!=PROTOCOL_NAME){
  1811. LOGW("Received packet uses wrong protocol");
  1812. lastError=ERROR_INCOMPATIBLE;
  1813. SetState(STATE_FAILED);
  1814. return;
  1815. }
  1816. }
  1817. if(flags & PFLAG_HAS_EXTRA){
  1818. uint32_t extraLen=(uint32_t) in.ReadTlLength();
  1819. in.Seek(in.GetOffset()+extraLen+pad4(extraLen));
  1820. }
  1821. if(flags & PFLAG_HAS_DATA){
  1822. packetInnerLen=in.ReadTlLength();
  1823. }
  1824. pflags=0;
  1825. }else if(tlid==TLID_SIMPLE_AUDIO_BLOCK){
  1826. in.ReadInt64(); // random id
  1827. uint32_t randLen=(uint32_t) in.ReadTlLength();
  1828. in.Seek(in.GetOffset()+randLen+pad4(randLen));
  1829. packetInnerLen=in.ReadTlLength();
  1830. type=in.ReadByte();
  1831. ackId=(uint32_t) in.ReadInt32();
  1832. pseq=(uint32_t) in.ReadInt32();
  1833. acks=(uint32_t) in.ReadInt32();
  1834. if(peerVersion>=6)
  1835. pflags=in.ReadByte();
  1836. else
  1837. pflags=0;
  1838. }else{
  1839. LOGW("Received a packet of unknown type %08X", tlid);
  1840. return;
  1841. }
  1842. }
  1843. packetsReceived++;
  1844. if(seqgt(pseq, lastRemoteSeq)){
  1845. uint32_t diff=pseq-lastRemoteSeq;
  1846. if(diff>31){
  1847. memset(recvPacketTimes, 0, 32*sizeof(double));
  1848. }else{
  1849. memmove(&recvPacketTimes[diff], recvPacketTimes, (32-diff)*sizeof(double));
  1850. if(diff>1){
  1851. memset(recvPacketTimes, 0, diff*sizeof(double));
  1852. }
  1853. recvPacketTimes[0]=GetCurrentTime();
  1854. }
  1855. lastRemoteSeq=pseq;
  1856. }else if(!seqgt(pseq, lastRemoteSeq) && lastRemoteSeq-pseq<32){
  1857. if(recvPacketTimes[lastRemoteSeq-pseq]!=0){
  1858. LOGW("Received duplicated packet for seq %u", pseq);
  1859. return;
  1860. }
  1861. recvPacketTimes[lastRemoteSeq-pseq]=GetCurrentTime();
  1862. }else if(lastRemoteSeq-pseq>=32){
  1863. LOGW("Packet %u is out of order and too late", pseq);
  1864. return;
  1865. }
  1866. bool didAckNewPackets=false;
  1867. unsigned int newlyAckedVideoBytes=0;
  1868. if(seqgt(ackId, lastRemoteAckSeq)){
  1869. didAckNewPackets=true;
  1870. MutexGuard _m(queuedPacketsMutex);
  1871. if(waitingForAcks && lastRemoteAckSeq>=firstSentPing){
  1872. rttHistory.Reset();
  1873. waitingForAcks=false;
  1874. dontSendPackets=10;
  1875. messageThread.Post([this]{
  1876. dontSendPackets=0;
  1877. }, 1.0);
  1878. LOGI("resuming sending");
  1879. }
  1880. lastRemoteAckSeq=ackId;
  1881. conctl->PacketAcknowledged(ackId);
  1882. unsigned int i;
  1883. for(i=0;i<31;i++){
  1884. for(vector<RecentOutgoingPacket>::iterator itr=recentOutgoingPackets.begin();itr!=recentOutgoingPackets.end();++itr){
  1885. if(itr->ackTime!=0)
  1886. continue;
  1887. if(((acks >> (31-i)) & 1) && itr->seq==ackId-(i+1)){
  1888. itr->ackTime=GetCurrentTime();
  1889. conctl->PacketAcknowledged(itr->seq);
  1890. }
  1891. }
  1892. /*if(remoteAcks[i+1]==0){
  1893. if((acks >> (31-i)) & 1){
  1894. remoteAcks[i+1]=GetCurrentTime();
  1895. conctl->PacketAcknowledged(ackId-(i+1));
  1896. }
  1897. }*/
  1898. }
  1899. for(i=0;i<queuedPackets.size();i++){
  1900. QueuedPacket& qp=queuedPackets[i];
  1901. int j;
  1902. bool didAck=false;
  1903. for(j=0;j<16;j++){
  1904. LOGD("queued packet %u, seq %u=%u", i, j, qp.seqs[j]);
  1905. if(qp.seqs[j]==0)
  1906. break;
  1907. int remoteAcksIndex=lastRemoteAckSeq-qp.seqs[j];
  1908. //LOGV("remote acks index %u, value %f", remoteAcksIndex, remoteAcksIndex>=0 && remoteAcksIndex<32 ? remoteAcks[remoteAcksIndex] : -1);
  1909. if(seqgt(lastRemoteAckSeq, qp.seqs[j]) && remoteAcksIndex>=0 && remoteAcksIndex<32){
  1910. for(RecentOutgoingPacket& opkt:recentOutgoingPackets){
  1911. if(opkt.seq==qp.seqs[j] && opkt.ackTime>0){
  1912. LOGD("did ack seq %u, removing", qp.seqs[j]);
  1913. didAck=true;
  1914. break;
  1915. }
  1916. }
  1917. if(didAck)
  1918. break;
  1919. }
  1920. }
  1921. if(didAck){
  1922. queuedPackets.erase(queuedPackets.begin()+i);
  1923. i--;
  1924. continue;
  1925. }
  1926. }
  1927. for(vector<UnacknowledgedExtraData>::iterator x=currentExtras.begin();x!=currentExtras.end();){
  1928. if(x->firstContainingSeq!=0 && (lastRemoteAckSeq==x->firstContainingSeq || seqgt(lastRemoteAckSeq, x->firstContainingSeq))){
  1929. LOGV("Peer acknowledged extra type %u length %d", x->type, int(x->data.Length()));
  1930. ProcessAcknowledgedOutgoingExtra(*x);
  1931. x=currentExtras.erase(x);
  1932. continue;
  1933. }
  1934. ++x;
  1935. }
  1936. if(videoSource && !videoKeyframeRequested){
  1937. // video frames are stored in sentVideoFrames in order of increasing numbers
  1938. // so if a frame (or part of it) is acknowledged but isn't sentVideoFrames[0], we know there was a packet loss
  1939. MutexGuard m(sentVideoFramesMutex);
  1940. for(SentVideoFrame& f:sentVideoFrames){
  1941. for(vector<uint32_t>::iterator s=f.unacknowledgedPackets.begin(); s!=f.unacknowledgedPackets.end();){
  1942. RecentOutgoingPacket* opkt=GetRecentOutgoingPacket(*s);
  1943. if(opkt && opkt->ackTime!=0.0){
  1944. s=f.unacknowledgedPackets.erase(s);
  1945. newlyAckedVideoBytes+=opkt->size;
  1946. }else{
  1947. ++s;
  1948. }
  1949. }
  1950. }
  1951. bool first=true;
  1952. for(vector<SentVideoFrame>::iterator f=sentVideoFrames.begin();f!=sentVideoFrames.end();){
  1953. if(f->unacknowledgedPackets.empty() && f->fragmentsInQueue==0){
  1954. //LOGV("Video frame %u was acknowledged", f->num);
  1955. if(first){
  1956. f=sentVideoFrames.erase(f);
  1957. continue;
  1958. }else{
  1959. LOGE("!!!!!!!!!!!!!!11 VIDEO FRAME LOSS DETECTED [1] %d of %u fragments", int(sentVideoFrames[0].unacknowledgedPackets.size()), sentVideoFrames[0].fragmentCount);
  1960. videoPacketLossCount++;
  1961. videoKeyframeRequested=true;
  1962. videoSource->RequestKeyFrame();
  1963. break;
  1964. }
  1965. }else if(first){
  1966. first=false;
  1967. }else if(!first && f->unacknowledgedPackets.size()<f->fragmentCount){
  1968. LOGE("!!!!!!!!!!!!!!11 VIDEO FRAME LOSS DETECTED [2] %d of %u fragments", int(f->unacknowledgedPackets.size()), f->fragmentCount);
  1969. videoPacketLossCount++;
  1970. videoKeyframeRequested=true;
  1971. videoSource->RequestKeyFrame();
  1972. break;
  1973. }
  1974. ++f;
  1975. }
  1976. }
  1977. }
  1978. Endpoint* _currentEndpoint=&endpoints.at(currentEndpoint);
  1979. if(srcEndpoint.id!=currentEndpoint && (srcEndpoint.type==Endpoint::Type::UDP_RELAY || srcEndpoint.type==Endpoint::Type::TCP_RELAY) && ((_currentEndpoint->type!=Endpoint::Type::UDP_RELAY && _currentEndpoint->type!=Endpoint::Type::TCP_RELAY) || _currentEndpoint->averageRTT==0)){
  1980. if(seqgt(lastSentSeq-32, lastRemoteAckSeq)){
  1981. currentEndpoint=srcEndpoint.id;
  1982. _currentEndpoint=&srcEndpoint;
  1983. LOGI("Peer network address probably changed, switching to relay");
  1984. if(allowP2p)
  1985. SendPublicEndpointsRequest();
  1986. }
  1987. }
  1988. if(pflags & XPFLAG_HAS_EXTRA){
  1989. unsigned char extraCount=in.ReadByte();
  1990. for(int i=0;i<extraCount;i++){
  1991. size_t extraLen=in.ReadByte();
  1992. Buffer xbuffer(extraLen);
  1993. in.ReadBytes(*xbuffer, extraLen);
  1994. ProcessExtraData(xbuffer);
  1995. }
  1996. }
  1997. if(pflags & XPFLAG_HAS_RECV_TS){
  1998. uint32_t recvTS=static_cast<uint32_t>(in.ReadInt32());
  1999. if(didAckNewPackets){
  2000. //LOGV("recv ts %u", recvTS);
  2001. for(RecentOutgoingPacket& opkt:recentOutgoingPackets){
  2002. if(opkt.seq==lastRemoteAckSeq){
  2003. float sendTime=(float)(opkt.sendTime-connectionInitTime);
  2004. float recvTime=(float)recvTS/1000.0f;
  2005. float oneWayDelay=recvTime-sendTime;
  2006. //LOGV("one-way delay: %f", oneWayDelay);
  2007. videoCongestionControl.ProcessAcks(oneWayDelay, newlyAckedVideoBytes, videoPacketLossCount, rttHistory.Average(5));
  2008. break;
  2009. }
  2010. }
  2011. }
  2012. }
  2013. if(config.logPacketStats){
  2014. DebugLoggedPacket dpkt={
  2015. static_cast<int32_t>(pseq),
  2016. GetCurrentTime()-connectionInitTime,
  2017. static_cast<int32_t>(packet.length)
  2018. };
  2019. debugLoggedPackets.push_back(dpkt);
  2020. if(debugLoggedPackets.size()>=2500){
  2021. debugLoggedPackets.erase(debugLoggedPackets.begin(), debugLoggedPackets.begin()+500);
  2022. }
  2023. }
  2024. #ifdef LOG_PACKETS
  2025. LOGV("Received: from=%s:%u, seq=%u, length=%u, type=%s", srcEndpoint.GetAddress().ToString().c_str(), srcEndpoint.port, pseq, packet.length, GetPacketTypeString(type).c_str());
  2026. #endif
  2027. //LOGV("acks: %u -> %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf", lastRemoteAckSeq, remoteAcks[0], remoteAcks[1], remoteAcks[2], remoteAcks[3], remoteAcks[4], remoteAcks[5], remoteAcks[6], remoteAcks[7]);
  2028. //LOGD("recv: %u -> %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf", lastRemoteSeq, recvPacketTimes[0], recvPacketTimes[1], recvPacketTimes[2], recvPacketTimes[3], recvPacketTimes[4], recvPacketTimes[5], recvPacketTimes[6], recvPacketTimes[7]);
  2029. //LOGI("RTT = %.3lf", GetAverageRTT());
  2030. //LOGV("Packet %u type is %d", pseq, type);
  2031. if(type==PKT_INIT){
  2032. LOGD("Received init");
  2033. uint32_t ver=(uint32_t)in.ReadInt32();
  2034. if(!receivedInit)
  2035. peerVersion=ver;
  2036. LOGI("Peer version is %d", peerVersion);
  2037. uint32_t minVer=(uint32_t) in.ReadInt32();
  2038. if(minVer>PROTOCOL_VERSION || peerVersion<MIN_PROTOCOL_VERSION){
  2039. lastError=ERROR_INCOMPATIBLE;
  2040. SetState(STATE_FAILED);
  2041. return;
  2042. }
  2043. uint32_t flags=(uint32_t) in.ReadInt32();
  2044. if(!receivedInit){
  2045. if(flags & INIT_FLAG_DATA_SAVING_ENABLED){
  2046. dataSavingRequestedByPeer=true;
  2047. UpdateDataSavingState();
  2048. UpdateAudioBitrateLimit();
  2049. }
  2050. if(flags & INIT_FLAG_GROUP_CALLS_SUPPORTED){
  2051. peerCapabilities|=TGVOIP_PEER_CAP_GROUP_CALLS;
  2052. }
  2053. if(flags & INIT_FLAG_VIDEO_RECV_SUPPORTED){
  2054. peerCapabilities|=TGVOIP_PEER_CAP_VIDEO_DISPLAY;
  2055. }
  2056. if(flags & INIT_FLAG_VIDEO_SEND_SUPPORTED){
  2057. peerCapabilities|=TGVOIP_PEER_CAP_VIDEO_CAPTURE;
  2058. }
  2059. }
  2060. unsigned int i;
  2061. unsigned int numSupportedAudioCodecs=in.ReadByte();
  2062. for(i=0; i<numSupportedAudioCodecs; i++){
  2063. if(peerVersion<5)
  2064. in.ReadByte(); // ignore for now
  2065. else
  2066. in.ReadInt32();
  2067. }
  2068. if(!receivedInit && ((flags & INIT_FLAG_VIDEO_SEND_SUPPORTED && config.enableVideoReceive) || (flags & INIT_FLAG_VIDEO_RECV_SUPPORTED && config.enableVideoSend))){
  2069. LOGD("Peer video decoders:");
  2070. unsigned int numSupportedVideoDecoders=in.ReadByte();
  2071. for(i=0; i<numSupportedVideoDecoders; i++){
  2072. uint32_t id=static_cast<uint32_t>(in.ReadInt32());
  2073. peerVideoDecoders.push_back(id);
  2074. char* _id=reinterpret_cast<char*>(&id);
  2075. LOGD("%c%c%c%c", _id[3], _id[2], _id[1], _id[0]);
  2076. }
  2077. peerMaxVideoResolution=in.ReadByte();
  2078. SetupOutgoingVideoStream();
  2079. }
  2080. BufferOutputStream out(1024);
  2081. out.WriteInt32(PROTOCOL_VERSION);
  2082. out.WriteInt32(MIN_PROTOCOL_VERSION);
  2083. out.WriteByte((unsigned char) outgoingStreams.size());
  2084. for(vector<shared_ptr<Stream>>::iterator s=outgoingStreams.begin(); s!=outgoingStreams.end(); ++s){
  2085. out.WriteByte((*s)->id);
  2086. out.WriteByte((*s)->type);
  2087. if(peerVersion<5)
  2088. out.WriteByte((unsigned char) ((*s)->codec==CODEC_OPUS ? CODEC_OPUS_OLD : 0));
  2089. else
  2090. out.WriteInt32((*s)->codec);
  2091. out.WriteInt16((*s)->frameDuration);
  2092. out.WriteByte((unsigned char) ((*s)->enabled ? 1 : 0));
  2093. }
  2094. LOGI("Sending init ack");
  2095. size_t outLength = out.GetLength();
  2096. SendOrEnqueuePacket(PendingOutgoingPacket{
  2097. /*.seq=*/GenerateOutSeq(),
  2098. /*.type=*/PKT_INIT_ACK,
  2099. /*.len=*/outLength,
  2100. /*.data=*/Buffer(move(out)),
  2101. /*.endpoint=*/0
  2102. });
  2103. if(!receivedInit){
  2104. receivedInit=true;
  2105. if((srcEndpoint.type==Endpoint::Type::UDP_RELAY && udpConnectivityState!=UDP_BAD && udpConnectivityState!=UDP_NOT_AVAILABLE) || srcEndpoint.type==Endpoint::Type::TCP_RELAY){
  2106. currentEndpoint=srcEndpoint.id;
  2107. if(srcEndpoint.type==Endpoint::Type::UDP_RELAY || (useTCP && srcEndpoint.type==Endpoint::Type::TCP_RELAY))
  2108. preferredRelay=srcEndpoint.id;
  2109. }
  2110. }
  2111. if(!audioStarted && receivedInitAck){
  2112. StartAudio();
  2113. audioStarted=true;
  2114. }
  2115. }
  2116. if(type==PKT_INIT_ACK){
  2117. LOGD("Received init ack");
  2118. if(!receivedInitAck){
  2119. receivedInitAck=true;
  2120. messageThread.Cancel(initTimeoutID);
  2121. initTimeoutID=MessageThread::INVALID_ID;
  2122. if(packetInnerLen>10){
  2123. peerVersion=in.ReadInt32();
  2124. uint32_t minVer=(uint32_t) in.ReadInt32();
  2125. if(minVer>PROTOCOL_VERSION || peerVersion<MIN_PROTOCOL_VERSION){
  2126. lastError=ERROR_INCOMPATIBLE;
  2127. SetState(STATE_FAILED);
  2128. return;
  2129. }
  2130. }else{
  2131. peerVersion=1;
  2132. }
  2133. LOGI("peer version from init ack %d", peerVersion);
  2134. unsigned char streamCount=in.ReadByte();
  2135. if(streamCount==0)
  2136. return;
  2137. int i;
  2138. shared_ptr<Stream> incomingAudioStream=NULL;
  2139. for(i=0; i<streamCount; i++){
  2140. shared_ptr<Stream> stm=make_shared<Stream>();
  2141. stm->id=in.ReadByte();
  2142. stm->type=in.ReadByte();
  2143. if(peerVersion<5){
  2144. unsigned char codec=in.ReadByte();
  2145. if(codec==CODEC_OPUS_OLD)
  2146. stm->codec=CODEC_OPUS;
  2147. }else{
  2148. stm->codec=(uint32_t) in.ReadInt32();
  2149. }
  2150. in.ReadInt16();
  2151. stm->frameDuration=60;
  2152. stm->enabled=in.ReadByte()==1;
  2153. if(stm->type==STREAM_TYPE_VIDEO && peerVersion<9){
  2154. LOGV("Skipping video stream for old protocol version");
  2155. continue;
  2156. }
  2157. if(stm->type==STREAM_TYPE_AUDIO){
  2158. stm->jitterBuffer=make_shared<JitterBuffer>(nullptr, stm->frameDuration);
  2159. if(stm->frameDuration>50)
  2160. stm->jitterBuffer->SetMinPacketCount((uint32_t) ServerConfig::GetSharedInstance()->GetInt("jitter_initial_delay_60", 2));
  2161. else if(stm->frameDuration>30)
  2162. stm->jitterBuffer->SetMinPacketCount((uint32_t) ServerConfig::GetSharedInstance()->GetInt("jitter_initial_delay_40", 4));
  2163. else
  2164. stm->jitterBuffer->SetMinPacketCount((uint32_t) ServerConfig::GetSharedInstance()->GetInt("jitter_initial_delay_20", 6));
  2165. stm->decoder=NULL;
  2166. }else if(stm->type==STREAM_TYPE_VIDEO){
  2167. if(!stm->packetReassembler){
  2168. //stm->packetReassembler=make_shared<PacketReassembler>();
  2169. //stm->packetReassembler->SetCallback(bind(&VoIPController::ProcessIncomingVideoFrame, this, placeholders::_1, placeholders::_2, placeholders::_3));
  2170. }
  2171. }else{
  2172. LOGW("Unknown incoming stream type: %d", stm->type);
  2173. continue;
  2174. }
  2175. incomingStreams.push_back(stm);
  2176. if(stm->type==STREAM_TYPE_AUDIO && !incomingAudioStream)
  2177. incomingAudioStream=stm;
  2178. }
  2179. if(!incomingAudioStream)
  2180. return;
  2181. if(peerVersion>=5 && !useMTProto2){
  2182. useMTProto2=true;
  2183. LOGD("MTProto2 wasn't initially enabled for whatever reason but peer supports it; upgrading");
  2184. }
  2185. if(!audioStarted && receivedInit){
  2186. StartAudio();
  2187. audioStarted=true;
  2188. }
  2189. messageThread.Post([this]{
  2190. if(state==STATE_WAIT_INIT_ACK){
  2191. SetState(STATE_ESTABLISHED);
  2192. }
  2193. }, ServerConfig::GetSharedInstance()->GetDouble("established_delay_if_no_stream_data", 1.5));
  2194. if(allowP2p)
  2195. SendPublicEndpointsRequest();
  2196. }
  2197. }
  2198. if(type==PKT_STREAM_DATA || type==PKT_STREAM_DATA_X2 || type==PKT_STREAM_DATA_X3){
  2199. if(!receivedFirstStreamPacket){
  2200. receivedFirstStreamPacket=true;
  2201. if(state!=STATE_ESTABLISHED && receivedInitAck){
  2202. messageThread.Post([this](){
  2203. SetState(STATE_ESTABLISHED);
  2204. }, .5);
  2205. LOGW("First audio packet - setting state to ESTABLISHED");
  2206. }
  2207. }
  2208. int count;
  2209. switch(type){
  2210. case PKT_STREAM_DATA_X2:
  2211. count=2;
  2212. break;
  2213. case PKT_STREAM_DATA_X3:
  2214. count=3;
  2215. break;
  2216. case PKT_STREAM_DATA:
  2217. default:
  2218. count=1;
  2219. break;
  2220. }
  2221. int i;
  2222. if(srcEndpoint.type==Endpoint::Type::UDP_RELAY && srcEndpoint.id!=peerPreferredRelay){
  2223. peerPreferredRelay=srcEndpoint.id;
  2224. }
  2225. for(i=0;i<count;i++){
  2226. unsigned char streamID=in.ReadByte();
  2227. unsigned char flags=(unsigned char) (streamID & 0xC0);
  2228. streamID&=0x3F;
  2229. uint16_t sdlen=(uint16_t) (flags & STREAM_DATA_FLAG_LEN16 ? in.ReadInt16() : in.ReadByte());
  2230. uint32_t pts=(uint32_t) in.ReadInt32();
  2231. unsigned char fragmentCount=1;
  2232. unsigned char fragmentIndex=0;
  2233. //LOGD("stream data, pts=%d, len=%d, rem=%d", pts, sdlen, in.Remaining());
  2234. audioTimestampIn=pts;
  2235. if(!audioOutStarted && audioOutput){
  2236. MutexGuard m(audioIOMutex);
  2237. audioOutput->Start();
  2238. audioOutStarted=true;
  2239. }
  2240. bool fragmented=static_cast<bool>(sdlen & STREAM_DATA_XFLAG_FRAGMENTED);
  2241. bool extraFEC=static_cast<bool>(sdlen & STREAM_DATA_XFLAG_EXTRA_FEC);
  2242. bool keyframe=static_cast<bool>(sdlen & STREAM_DATA_XFLAG_KEYFRAME);
  2243. if(fragmented){
  2244. fragmentIndex=in.ReadByte();
  2245. fragmentCount=in.ReadByte();
  2246. }
  2247. sdlen&=0x7FF;
  2248. if(in.GetOffset()+sdlen>len){
  2249. return;
  2250. }
  2251. shared_ptr<Stream> stm;
  2252. for(shared_ptr<Stream>& ss:incomingStreams){
  2253. if(ss->id==streamID){
  2254. stm=ss;
  2255. break;
  2256. }
  2257. }
  2258. if(stm && stm->type==STREAM_TYPE_AUDIO){
  2259. if(stm->jitterBuffer){
  2260. stm->jitterBuffer->HandleInput((unsigned char *) (buffer+in.GetOffset()), sdlen, pts, false);
  2261. if(extraFEC){
  2262. in.Seek(in.GetOffset()+sdlen);
  2263. unsigned int fecCount=in.ReadByte();
  2264. for(unsigned int j=0;j<fecCount;j++){
  2265. unsigned char dlen=in.ReadByte();
  2266. unsigned char data[256];
  2267. in.ReadBytes(data, dlen);
  2268. stm->jitterBuffer->HandleInput(data, dlen, pts-(fecCount-j-1)*stm->frameDuration, true);
  2269. }
  2270. }
  2271. }
  2272. }else if(stm && stm->type==STREAM_TYPE_VIDEO){
  2273. if(stm->packetReassembler){
  2274. Buffer pdata(sdlen);
  2275. pdata.CopyFrom(buffer+in.GetOffset(), 0, sdlen);
  2276. stm->packetReassembler->AddFragment(std::move(pdata), fragmentIndex, fragmentCount, pts, keyframe);
  2277. }
  2278. //LOGV("Received video fragment %u of %u", fragmentIndex, fragmentCount);
  2279. }else{
  2280. LOGW("received packet for unknown stream %u", (unsigned int)streamID);
  2281. }
  2282. if(i<count-1)
  2283. in.Seek(in.GetOffset()+sdlen);
  2284. }
  2285. }
  2286. if(type==PKT_PING){
  2287. //LOGD("Received ping from %s:%d", srcEndpoint.address.ToString().c_str(), srcEndpoint.port);
  2288. if(srcEndpoint.type!=Endpoint::Type::UDP_RELAY && srcEndpoint.type!=Endpoint::Type::TCP_RELAY && !allowP2p){
  2289. LOGW("Received p2p ping but p2p is disabled by manual override");
  2290. return;
  2291. }
  2292. BufferOutputStream pkt(128);
  2293. pkt.WriteInt32(pseq);
  2294. size_t pktLength = pkt.GetLength();
  2295. SendOrEnqueuePacket(PendingOutgoingPacket{
  2296. /*.seq=*/GenerateOutSeq(),
  2297. /*.type=*/PKT_PONG,
  2298. /*.len=*/pktLength,
  2299. /*.data=*/Buffer(move(pkt)),
  2300. /*.endpoint=*/srcEndpoint.id,
  2301. });
  2302. }
  2303. if(type==PKT_PONG){
  2304. if(packetInnerLen>=4){
  2305. uint32_t pingSeq=(uint32_t) in.ReadInt32();
  2306. #ifdef LOG_PACKETS
  2307. LOGD("Received pong for ping in seq %u", pingSeq);
  2308. #endif
  2309. if(pingSeq==srcEndpoint.lastPingSeq){
  2310. srcEndpoint.rtts.Add(GetCurrentTime()-srcEndpoint.lastPingTime);
  2311. srcEndpoint.averageRTT=srcEndpoint.rtts.NonZeroAverage();
  2312. LOGD("Current RTT via %s: %.3f, average: %.3f", packet.address->ToString().c_str(), srcEndpoint.rtts[0], srcEndpoint.averageRTT);
  2313. if(srcEndpoint.averageRTT>rateMaxAcceptableRTT)
  2314. needRate=true;
  2315. }
  2316. }
  2317. }
  2318. if(type==PKT_STREAM_STATE){
  2319. unsigned char id=in.ReadByte();
  2320. unsigned char enabled=in.ReadByte();
  2321. LOGV("Peer stream state: id %u flags %u", (int)id, (int)enabled);
  2322. for(vector<shared_ptr<Stream>>::iterator s=incomingStreams.begin();s!=incomingStreams.end();++s){
  2323. if((*s)->id==id){
  2324. (*s)->enabled=enabled==1;
  2325. UpdateAudioOutputState();
  2326. break;
  2327. }
  2328. }
  2329. }
  2330. if(type==PKT_LAN_ENDPOINT){
  2331. LOGV("received lan endpoint");
  2332. uint32_t peerAddr=(uint32_t) in.ReadInt32();
  2333. uint16_t peerPort=(uint16_t) in.ReadInt32();
  2334. MutexGuard m(endpointsMutex);
  2335. constexpr int64_t lanID=(int64_t)(FOURCC('L','A','N','4')) << 32;
  2336. IPv4Address v4addr(peerAddr);
  2337. IPv6Address v6addr(string("::0"));
  2338. unsigned char peerTag[16];
  2339. Endpoint lan(lanID, peerPort, v4addr, v6addr, Endpoint::Type::UDP_P2P_LAN, peerTag);
  2340. if(currentEndpoint==lanID)
  2341. currentEndpoint=preferredRelay;
  2342. endpoints[lanID]=lan;
  2343. }
  2344. if(type==PKT_NETWORK_CHANGED && _currentEndpoint->type!=Endpoint::Type::UDP_RELAY && _currentEndpoint->type!=Endpoint::Type::TCP_RELAY){
  2345. currentEndpoint=preferredRelay;
  2346. if(allowP2p)
  2347. SendPublicEndpointsRequest();
  2348. if(peerVersion>=2){
  2349. uint32_t flags=(uint32_t) in.ReadInt32();
  2350. dataSavingRequestedByPeer=(flags & INIT_FLAG_DATA_SAVING_ENABLED)==INIT_FLAG_DATA_SAVING_ENABLED;
  2351. UpdateDataSavingState();
  2352. UpdateAudioBitrateLimit();
  2353. ResetEndpointPingStats();
  2354. }
  2355. }
  2356. if(type==PKT_STREAM_EC){
  2357. unsigned char streamID=in.ReadByte();
  2358. uint32_t lastTimestamp=(uint32_t)in.ReadInt32();
  2359. unsigned char count=in.ReadByte();
  2360. for(shared_ptr<Stream>& stm:incomingStreams){
  2361. if(stm->id==streamID){
  2362. for(unsigned int i=0;i<count;i++){
  2363. unsigned char dlen=in.ReadByte();
  2364. unsigned char data[256];
  2365. in.ReadBytes(data, dlen);
  2366. if(stm->jitterBuffer){
  2367. stm->jitterBuffer->HandleInput(data, dlen, lastTimestamp-(count-i-1)*stm->frameDuration, true);
  2368. }
  2369. }
  2370. break;
  2371. }
  2372. }
  2373. }
  2374. }
  2375. void VoIPController::ProcessExtraData(Buffer &data){
  2376. BufferInputStream in(*data, data.Length());
  2377. unsigned char type=in.ReadByte();
  2378. unsigned char fullHash[SHA1_LENGTH];
  2379. crypto.sha1(*data, data.Length(), fullHash);
  2380. uint64_t hash=*reinterpret_cast<uint64_t*>(fullHash);
  2381. if(lastReceivedExtrasByType[type]==hash){
  2382. return;
  2383. }
  2384. LOGE("ProcessExtraData");
  2385. lastReceivedExtrasByType[type]=hash;
  2386. if(type==EXTRA_TYPE_STREAM_FLAGS){
  2387. unsigned char id=in.ReadByte();
  2388. uint32_t flags=static_cast<uint32_t>(in.ReadInt32());
  2389. LOGV("Peer stream state: id %u flags %u", (unsigned int)id, (unsigned int)flags);
  2390. for(shared_ptr<Stream>& s:incomingStreams){
  2391. if(s->id==id){
  2392. bool prevEnabled=s->enabled;
  2393. s->enabled=(flags & STREAM_FLAG_ENABLED)==STREAM_FLAG_ENABLED;
  2394. if(flags & STREAM_FLAG_EXTRA_EC){
  2395. if(!s->extraECEnabled){
  2396. s->extraECEnabled=true;
  2397. if(s->jitterBuffer)
  2398. s->jitterBuffer->SetMinPacketCount(4);
  2399. }
  2400. }else{
  2401. if(s->extraECEnabled){
  2402. s->extraECEnabled=false;
  2403. if(s->jitterBuffer)
  2404. s->jitterBuffer->SetMinPacketCount(2);
  2405. }
  2406. }
  2407. if(prevEnabled!=s->enabled && s->type==STREAM_TYPE_VIDEO && videoRenderer)
  2408. videoRenderer->SetStreamEnabled(s->enabled);
  2409. UpdateAudioOutputState();
  2410. break;
  2411. }
  2412. }
  2413. }else if(type==EXTRA_TYPE_STREAM_CSD){
  2414. LOGI("Received codec specific data");
  2415. /*
  2416. os.WriteByte(stream.id);
  2417. os.WriteByte(static_cast<unsigned char>(stream.codecSpecificData.size()));
  2418. for(Buffer& b:stream.codecSpecificData){
  2419. assert(b.Length()<255);
  2420. os.WriteByte(static_cast<unsigned char>(b.Length()));
  2421. os.WriteBytes(b);
  2422. }
  2423. Buffer buf(move(os));
  2424. SendExtra(buf, EXTRA_TYPE_STREAM_CSD);
  2425. */
  2426. unsigned char streamID=in.ReadByte();
  2427. for(shared_ptr<Stream>& stm:incomingStreams){
  2428. if(stm->id==streamID){
  2429. stm->codecSpecificData.clear();
  2430. stm->csdIsValid=false;
  2431. stm->width=static_cast<unsigned int>(in.ReadInt16());
  2432. stm->height=static_cast<unsigned int>(in.ReadInt16());
  2433. size_t count=(size_t)in.ReadByte();
  2434. for(size_t i=0;i<count;i++){
  2435. size_t len=(size_t)in.ReadByte();
  2436. Buffer csd(len);
  2437. in.ReadBytes(*csd, len);
  2438. stm->codecSpecificData.push_back(move(csd));
  2439. }
  2440. break;
  2441. }
  2442. }
  2443. }else if(type==EXTRA_TYPE_LAN_ENDPOINT){
  2444. if(!allowP2p)
  2445. return;
  2446. LOGV("received lan endpoint (extra)");
  2447. uint32_t peerAddr=(uint32_t) in.ReadInt32();
  2448. uint16_t peerPort=(uint16_t) in.ReadInt32();
  2449. MutexGuard m(endpointsMutex);
  2450. constexpr int64_t lanID=(int64_t)(FOURCC('L','A','N','4')) << 32;
  2451. if(currentEndpoint==lanID)
  2452. currentEndpoint=preferredRelay;
  2453. IPv4Address v4addr(peerAddr);
  2454. IPv6Address v6addr(string("::0"));
  2455. unsigned char peerTag[16];
  2456. Endpoint lan(lanID, peerPort, v4addr, v6addr, Endpoint::Type::UDP_P2P_LAN, peerTag);
  2457. endpoints[lanID]=lan;
  2458. }else if(type==EXTRA_TYPE_NETWORK_CHANGED){
  2459. LOGI("Peer network changed");
  2460. wasNetworkHandover=true;
  2461. const Endpoint& _currentEndpoint=endpoints.at(currentEndpoint);
  2462. if(_currentEndpoint.type!=Endpoint::Type::UDP_RELAY && _currentEndpoint.type!=Endpoint::Type::TCP_RELAY)
  2463. currentEndpoint=preferredRelay;
  2464. if(allowP2p)
  2465. SendPublicEndpointsRequest();
  2466. uint32_t flags=(uint32_t) in.ReadInt32();
  2467. dataSavingRequestedByPeer=(flags & INIT_FLAG_DATA_SAVING_ENABLED)==INIT_FLAG_DATA_SAVING_ENABLED;
  2468. UpdateDataSavingState();
  2469. UpdateAudioBitrateLimit();
  2470. ResetEndpointPingStats();
  2471. }else if(type==EXTRA_TYPE_GROUP_CALL_KEY){
  2472. if(!didReceiveGroupCallKey && !didSendGroupCallKey){
  2473. unsigned char groupKey[256];
  2474. in.ReadBytes(groupKey, 256);
  2475. messageThread.Post([this, &groupKey]{
  2476. if(callbacks.groupCallKeyReceived)
  2477. callbacks.groupCallKeyReceived(this, groupKey);
  2478. });
  2479. didReceiveGroupCallKey=true;
  2480. }
  2481. }else if(type==EXTRA_TYPE_REQUEST_GROUP){
  2482. if(!didInvokeUpgradeCallback){
  2483. messageThread.Post([this]{
  2484. if(callbacks.upgradeToGroupCallRequested)
  2485. callbacks.upgradeToGroupCallRequested(this);
  2486. });
  2487. didInvokeUpgradeCallback=true;
  2488. }
  2489. }else if(type==EXTRA_TYPE_IPV6_ENDPOINT){
  2490. if(!allowP2p)
  2491. return;
  2492. unsigned char _addr[16];
  2493. in.ReadBytes(_addr, 16);
  2494. IPv6Address addr(_addr);
  2495. uint16_t port=static_cast<uint16_t>(in.ReadInt16());
  2496. MutexGuard m(endpointsMutex);
  2497. peerIPv6Available=true;
  2498. LOGV("Received peer IPv6 endpoint [%s]:%u", addr.ToString().c_str(), port);
  2499. constexpr int64_t p2pID=(int64_t)(FOURCC('P','2','P','6')) << 32;
  2500. Endpoint ep;
  2501. ep.type=Endpoint::Type::UDP_P2P_INET;
  2502. ep.port=port;
  2503. ep.v6address=addr;
  2504. ep.id=p2pID;
  2505. endpoints[p2pID]=ep;
  2506. if(!myIPv6.IsEmpty())
  2507. currentEndpoint=p2pID;
  2508. }
  2509. }
  2510. void VoIPController::ProcessAcknowledgedOutgoingExtra(UnacknowledgedExtraData &extra){
  2511. if(extra.type==EXTRA_TYPE_GROUP_CALL_KEY){
  2512. if(!didReceiveGroupCallKeyAck){
  2513. didReceiveGroupCallKeyAck=true;
  2514. messageThread.Post([this]{
  2515. if(callbacks.groupCallKeySent)
  2516. callbacks.groupCallKeySent(this);
  2517. });
  2518. }
  2519. }
  2520. }
  2521. Endpoint& VoIPController::GetRemoteEndpoint(){
  2522. return endpoints.at(currentEndpoint);
  2523. }
  2524. Endpoint* VoIPController::GetEndpointForPacket(const PendingOutgoingPacket& pkt){
  2525. Endpoint* endpoint=NULL;
  2526. if(pkt.endpoint){
  2527. try{
  2528. endpoint=&endpoints.at(pkt.endpoint);
  2529. }catch(out_of_range&){
  2530. LOGW("Unable to send packet via nonexistent endpoint %" PRIu64, pkt.endpoint);
  2531. return NULL;
  2532. }
  2533. }
  2534. if(!endpoint)
  2535. endpoint=&endpoints.at(currentEndpoint);
  2536. return endpoint;
  2537. }
  2538. bool VoIPController::SendOrEnqueuePacket(PendingOutgoingPacket pkt, bool enqueue){
  2539. Endpoint* endpoint=GetEndpointForPacket(pkt);
  2540. if(!endpoint){
  2541. abort();
  2542. return false;
  2543. }
  2544. bool canSend;
  2545. if(endpoint->type!=Endpoint::Type::TCP_RELAY){
  2546. canSend=realUdpSocket->IsReadyToSend();
  2547. }else{
  2548. if(!endpoint->socket){
  2549. LOGV("Connecting to %s:%u", endpoint->GetAddress().ToString().c_str(), endpoint->port);
  2550. if(proxyProtocol==PROXY_NONE){
  2551. endpoint->socket=new NetworkSocketTCPObfuscated(NetworkSocket::Create(NetworkProtocol::PROTO_TCP));
  2552. endpoint->socket->Connect(&endpoint->GetAddress(), endpoint->port);
  2553. }else if(proxyProtocol==PROXY_SOCKS5){
  2554. NetworkSocket* tcp=NetworkSocket::Create(NetworkProtocol::PROTO_TCP);
  2555. tcp->Connect(resolvedProxyAddress, proxyPort);
  2556. NetworkSocketSOCKS5Proxy* proxy=new NetworkSocketSOCKS5Proxy(tcp, NULL, proxyUsername, proxyPassword);
  2557. endpoint->socket=proxy;
  2558. endpoint->socket->Connect(&endpoint->GetAddress(), endpoint->port);
  2559. }
  2560. selectCanceller->CancelSelect();
  2561. }
  2562. canSend=endpoint->socket && endpoint->socket->IsReadyToSend();
  2563. }
  2564. if(!canSend){
  2565. if(enqueue){
  2566. LOGW("Not ready to send - enqueueing");
  2567. sendQueue.push_back(move(pkt));
  2568. }
  2569. return false;
  2570. }
  2571. if((endpoint->type==Endpoint::Type::TCP_RELAY && useTCP) || (endpoint->type!=Endpoint::Type::TCP_RELAY && useUDP)){
  2572. //BufferOutputStream p(buf, sizeof(buf));
  2573. BufferOutputStream p(1500);
  2574. WritePacketHeader(pkt.seq, &p, pkt.type, (uint32_t)pkt.len);
  2575. p.WriteBytes(pkt.data);
  2576. SendPacket(p.GetBuffer(), p.GetLength(), *endpoint, pkt);
  2577. if(pkt.type==PKT_STREAM_DATA){
  2578. unsentStreamPackets--;
  2579. }
  2580. }
  2581. return true;
  2582. }
  2583. void VoIPController::SendPacket(unsigned char *data, size_t len, Endpoint& ep, PendingOutgoingPacket& srcPacket){
  2584. if(stopping)
  2585. return;
  2586. if(ep.type==Endpoint::Type::TCP_RELAY && !useTCP)
  2587. return;
  2588. BufferOutputStream out(len+128);
  2589. if(ep.type==Endpoint::Type::UDP_RELAY || ep.type==Endpoint::Type::TCP_RELAY)
  2590. out.WriteBytes((unsigned char*)ep.peerTag, 16);
  2591. else if(peerVersion<9)
  2592. out.WriteBytes(callID, 16);
  2593. if(len>0){
  2594. if(useMTProto2){
  2595. BufferOutputStream inner(len+128);
  2596. size_t sizeSize;
  2597. if(peerVersion>=8 || (!peerVersion && connectionMaxLayer>=92)){
  2598. inner.WriteInt16((uint16_t) len);
  2599. sizeSize=0;
  2600. }else{
  2601. inner.WriteInt32((uint32_t) len);
  2602. out.WriteBytes(keyFingerprint, 8);
  2603. sizeSize=4;
  2604. }
  2605. inner.WriteBytes(data, len);
  2606. size_t padLen=16-inner.GetLength()%16;
  2607. if(padLen<16)
  2608. padLen+=16;
  2609. unsigned char padding[32];
  2610. crypto.rand_bytes((uint8_t *) padding, padLen);
  2611. inner.WriteBytes(padding, padLen);
  2612. assert(inner.GetLength()%16==0);
  2613. unsigned char key[32], iv[32], msgKey[16];
  2614. BufferOutputStream buf(len+32);
  2615. size_t x=isOutgoing ? 0 : 8;
  2616. buf.WriteBytes(encryptionKey+88+x, 32);
  2617. buf.WriteBytes(inner.GetBuffer()+sizeSize, inner.GetLength()-sizeSize);
  2618. unsigned char msgKeyLarge[32];
  2619. crypto.sha256(buf.GetBuffer(), buf.GetLength(), msgKeyLarge);
  2620. memcpy(msgKey, msgKeyLarge+8, 16);
  2621. KDF2(msgKey, isOutgoing ? 0 : 8, key, iv);
  2622. out.WriteBytes(msgKey, 16);
  2623. //LOGV("<- MSG KEY: %08x %08x %08x %08x, hashed %u", *reinterpret_cast<int32_t*>(msgKey), *reinterpret_cast<int32_t*>(msgKey+4), *reinterpret_cast<int32_t*>(msgKey+8), *reinterpret_cast<int32_t*>(msgKey+12), inner.GetLength()-4);
  2624. unsigned char aesOut[MSC_STACK_FALLBACK(inner.GetLength(), 1500)];
  2625. crypto.aes_ige_encrypt(inner.GetBuffer(), aesOut, inner.GetLength(), key, iv);
  2626. out.WriteBytes(aesOut, inner.GetLength());
  2627. }else{
  2628. BufferOutputStream inner(len+128);
  2629. inner.WriteInt32((int32_t)len);
  2630. inner.WriteBytes(data, len);
  2631. if(inner.GetLength()%16!=0){
  2632. size_t padLen=16-inner.GetLength()%16;
  2633. unsigned char padding[16];
  2634. crypto.rand_bytes((uint8_t *) padding, padLen);
  2635. inner.WriteBytes(padding, padLen);
  2636. }
  2637. assert(inner.GetLength()%16==0);
  2638. unsigned char key[32], iv[32], msgHash[SHA1_LENGTH];
  2639. crypto.sha1((uint8_t *) inner.GetBuffer(), len+4, msgHash);
  2640. out.WriteBytes(keyFingerprint, 8);
  2641. out.WriteBytes((msgHash+(SHA1_LENGTH-16)), 16);
  2642. KDF(msgHash+(SHA1_LENGTH-16), isOutgoing ? 0 : 8, key, iv);
  2643. unsigned char aesOut[MSC_STACK_FALLBACK(inner.GetLength(), 1500)];
  2644. crypto.aes_ige_encrypt(inner.GetBuffer(), aesOut, inner.GetLength(), key, iv);
  2645. out.WriteBytes(aesOut, inner.GetLength());
  2646. }
  2647. }
  2648. //LOGV("Sending %d bytes to %s:%d", out.GetLength(), ep.address.ToString().c_str(), ep.port);
  2649. #ifdef LOG_PACKETS
  2650. LOGV("Sending: to=%s:%u, seq=%u, length=%u, type=%s", ep.GetAddress().ToString().c_str(), ep.port, srcPacket.seq, out.GetLength(), GetPacketTypeString(srcPacket.type).c_str());
  2651. #endif
  2652. NetworkPacket pkt={0};
  2653. pkt.address=&ep.GetAddress();
  2654. pkt.port=ep.port;
  2655. pkt.length=out.GetLength();
  2656. pkt.data=out.GetBuffer();
  2657. pkt.protocol=ep.type==Endpoint::Type::TCP_RELAY ? PROTO_TCP : PROTO_UDP;
  2658. ActuallySendPacket(pkt, ep);
  2659. }
  2660. void VoIPController::ActuallySendPacket(NetworkPacket &pkt, Endpoint& ep){
  2661. //LOGI("Sending packet of %d bytes", pkt.length);
  2662. if(IS_MOBILE_NETWORK(networkType))
  2663. stats.bytesSentMobile+=(uint64_t)pkt.length;
  2664. else
  2665. stats.bytesSentWifi+=(uint64_t)pkt.length;
  2666. if(ep.type==Endpoint::Type::TCP_RELAY){
  2667. if(ep.socket && !ep.socket->IsFailed()){
  2668. ep.socket->Send(&pkt);
  2669. }
  2670. }else{
  2671. udpSocket->Send(&pkt);
  2672. }
  2673. }
  2674. std::string VoIPController::NetworkTypeToString(int type){
  2675. switch(type){
  2676. case NET_TYPE_WIFI:
  2677. return "wifi";
  2678. case NET_TYPE_GPRS:
  2679. return "gprs";
  2680. case NET_TYPE_EDGE:
  2681. return "edge";
  2682. case NET_TYPE_3G:
  2683. return "3g";
  2684. case NET_TYPE_HSPA:
  2685. return "hspa";
  2686. case NET_TYPE_LTE:
  2687. return "lte";
  2688. case NET_TYPE_ETHERNET:
  2689. return "ethernet";
  2690. case NET_TYPE_OTHER_HIGH_SPEED:
  2691. return "other_high_speed";
  2692. case NET_TYPE_OTHER_LOW_SPEED:
  2693. return "other_low_speed";
  2694. case NET_TYPE_DIALUP:
  2695. return "dialup";
  2696. case NET_TYPE_OTHER_MOBILE:
  2697. return "other_mobile";
  2698. default:
  2699. return "unknown";
  2700. }
  2701. }
  2702. std::string VoIPController::GetPacketTypeString(unsigned char type){
  2703. switch(type){
  2704. case PKT_INIT:
  2705. return "init";
  2706. case PKT_INIT_ACK:
  2707. return "init_ack";
  2708. case PKT_STREAM_STATE:
  2709. return "stream_state";
  2710. case PKT_STREAM_DATA:
  2711. return "stream_data";
  2712. case PKT_PING:
  2713. return "ping";
  2714. case PKT_PONG:
  2715. return "pong";
  2716. case PKT_LAN_ENDPOINT:
  2717. return "lan_endpoint";
  2718. case PKT_NETWORK_CHANGED:
  2719. return "network_changed";
  2720. case PKT_NOP:
  2721. return "nop";
  2722. case PKT_STREAM_EC:
  2723. return "stream_ec";
  2724. }
  2725. char buf[255];
  2726. snprintf(buf, sizeof(buf), "unknown(%u)", type);
  2727. return string(buf);
  2728. }
  2729. void VoIPController::AddIPv6Relays(){
  2730. if(!myIPv6.IsEmpty() && !didAddIPv6Relays){
  2731. unordered_map<string, vector<Endpoint>> endpointsByAddress;
  2732. MutexGuard m(endpointsMutex);
  2733. for(pair<const int64_t, Endpoint>& _e:endpoints){
  2734. Endpoint& e=_e.second;
  2735. if((e.type==Endpoint::Type::UDP_RELAY || e.type==Endpoint::Type::TCP_RELAY) && !e.v6address.IsEmpty() && !e.address.IsEmpty()){
  2736. endpointsByAddress[e.v6address.ToString()].push_back(e);
  2737. }
  2738. }
  2739. for(pair<const string, vector<Endpoint>>& addr:endpointsByAddress){
  2740. for(Endpoint& e:addr.second){
  2741. didAddIPv6Relays=true;
  2742. e.address=IPv4Address(0);
  2743. e.id=e.id ^ ((int64_t)(FOURCC('I','P','v','6')) << 32);
  2744. e.averageRTT=0;
  2745. e.lastPingSeq=0;
  2746. e.lastPingTime=0;
  2747. e.rtts.Reset();
  2748. e.udpPongCount=0;
  2749. endpoints[e.id]=e;
  2750. LOGD("Adding IPv6-only endpoint [%s]:%u", e.v6address.ToString().c_str(), e.port);
  2751. }
  2752. }
  2753. }
  2754. }
  2755. void VoIPController::AddTCPRelays(){
  2756. if(!didAddTcpRelays){
  2757. bool wasSetCurrentToTCP=setCurrentEndpointToTCP;
  2758. LOGV("Adding TCP relays");
  2759. MutexGuard m(endpointsMutex);
  2760. vector<Endpoint> relays;
  2761. for(pair<const int64_t, Endpoint> &_e:endpoints){
  2762. Endpoint& e=_e.second;
  2763. if(e.type!=Endpoint::Type::UDP_RELAY)
  2764. continue;
  2765. if(wasSetCurrentToTCP && !useUDP){
  2766. e.rtts.Reset();
  2767. e.averageRTT=0;
  2768. e.lastPingSeq=0;
  2769. }
  2770. Endpoint tcpRelay(e);
  2771. tcpRelay.type=Endpoint::Type::TCP_RELAY;
  2772. tcpRelay.averageRTT=0;
  2773. tcpRelay.lastPingSeq=0;
  2774. tcpRelay.lastPingTime=0;
  2775. tcpRelay.rtts.Reset();
  2776. tcpRelay.udpPongCount=0;
  2777. tcpRelay.id=tcpRelay.id ^ ((int64_t) (FOURCC('T', 'C', 'P', 0)) << 32);
  2778. if(setCurrentEndpointToTCP && endpoints.at(currentEndpoint).type!=Endpoint::Type::TCP_RELAY){
  2779. LOGV("Setting current endpoint to TCP");
  2780. setCurrentEndpointToTCP=false;
  2781. currentEndpoint=tcpRelay.id;
  2782. preferredRelay=tcpRelay.id;
  2783. }
  2784. relays.push_back(tcpRelay);
  2785. }
  2786. for(Endpoint& e:relays){
  2787. endpoints[e.id]=e;
  2788. }
  2789. didAddTcpRelays=true;
  2790. }
  2791. }
  2792. #if defined(__APPLE__)
  2793. static void initMachTimestart() {
  2794. mach_timebase_info_data_t tb = { 0, 0 };
  2795. mach_timebase_info(&tb);
  2796. VoIPController::machTimebase = tb.numer;
  2797. VoIPController::machTimebase /= tb.denom;
  2798. VoIPController::machTimestart = mach_absolute_time();
  2799. }
  2800. #endif
  2801. double VoIPController::GetCurrentTime(){
  2802. #if defined(__linux__) || defined(__FreeBSD__) || defined(__OpenBSD__)
  2803. struct timespec ts;
  2804. clock_gettime(CLOCK_MONOTONIC, &ts);
  2805. return ts.tv_sec+(double)ts.tv_nsec/1000000000.0;
  2806. #elif defined(__APPLE__)
  2807. static pthread_once_t token = PTHREAD_ONCE_INIT;
  2808. pthread_once(&token, &initMachTimestart);
  2809. return (mach_absolute_time() - machTimestart) * machTimebase / 1000000000.0f;
  2810. #elif defined(_WIN32)
  2811. if(!didInitWin32TimeScale){
  2812. LARGE_INTEGER scale;
  2813. QueryPerformanceFrequency(&scale);
  2814. win32TimeScale=scale.QuadPart;
  2815. didInitWin32TimeScale=true;
  2816. }
  2817. LARGE_INTEGER t;
  2818. QueryPerformanceCounter(&t);
  2819. return (double)t.QuadPart/(double)win32TimeScale;
  2820. #endif
  2821. }
  2822. void VoIPController::KDF(unsigned char* msgKey, size_t x, unsigned char* aesKey, unsigned char* aesIv){
  2823. uint8_t sA[SHA1_LENGTH], sB[SHA1_LENGTH], sC[SHA1_LENGTH], sD[SHA1_LENGTH];
  2824. BufferOutputStream buf(128);
  2825. buf.WriteBytes(msgKey, 16);
  2826. buf.WriteBytes(encryptionKey+x, 32);
  2827. crypto.sha1(buf.GetBuffer(), buf.GetLength(), sA);
  2828. buf.Reset();
  2829. buf.WriteBytes(encryptionKey+32+x, 16);
  2830. buf.WriteBytes(msgKey, 16);
  2831. buf.WriteBytes(encryptionKey+48+x, 16);
  2832. crypto.sha1(buf.GetBuffer(), buf.GetLength(), sB);
  2833. buf.Reset();
  2834. buf.WriteBytes(encryptionKey+64+x, 32);
  2835. buf.WriteBytes(msgKey, 16);
  2836. crypto.sha1(buf.GetBuffer(), buf.GetLength(), sC);
  2837. buf.Reset();
  2838. buf.WriteBytes(msgKey, 16);
  2839. buf.WriteBytes(encryptionKey+96+x, 32);
  2840. crypto.sha1(buf.GetBuffer(), buf.GetLength(), sD);
  2841. buf.Reset();
  2842. buf.WriteBytes(sA, 8);
  2843. buf.WriteBytes(sB+8, 12);
  2844. buf.WriteBytes(sC+4, 12);
  2845. assert(buf.GetLength()==32);
  2846. memcpy(aesKey, buf.GetBuffer(), 32);
  2847. buf.Reset();
  2848. buf.WriteBytes(sA+8, 12);
  2849. buf.WriteBytes(sB, 8);
  2850. buf.WriteBytes(sC+16, 4);
  2851. buf.WriteBytes(sD, 8);
  2852. assert(buf.GetLength()==32);
  2853. memcpy(aesIv, buf.GetBuffer(), 32);
  2854. }
  2855. void VoIPController::KDF2(unsigned char* msgKey, size_t x, unsigned char *aesKey, unsigned char *aesIv){
  2856. uint8_t sA[32], sB[32];
  2857. BufferOutputStream buf(128);
  2858. buf.WriteBytes(msgKey, 16);
  2859. buf.WriteBytes(encryptionKey+x, 36);
  2860. crypto.sha256(buf.GetBuffer(), buf.GetLength(), sA);
  2861. buf.Reset();
  2862. buf.WriteBytes(encryptionKey+40+x, 36);
  2863. buf.WriteBytes(msgKey, 16);
  2864. crypto.sha256(buf.GetBuffer(), buf.GetLength(), sB);
  2865. buf.Reset();
  2866. buf.WriteBytes(sA, 8);
  2867. buf.WriteBytes(sB+8, 16);
  2868. buf.WriteBytes(sA+24, 8);
  2869. memcpy(aesKey, buf.GetBuffer(), 32);
  2870. buf.Reset();
  2871. buf.WriteBytes(sB, 8);
  2872. buf.WriteBytes(sA+8, 16);
  2873. buf.WriteBytes(sB+24, 8);
  2874. memcpy(aesIv, buf.GetBuffer(), 32);
  2875. }
  2876. void VoIPController::SendPublicEndpointsRequest(const Endpoint& relay){
  2877. if(!useUDP)
  2878. return;
  2879. LOGD("Sending public endpoints request to %s:%d", relay.address.ToString().c_str(), relay.port);
  2880. publicEndpointsReqTime=GetCurrentTime();
  2881. waitingForRelayPeerInfo=true;
  2882. unsigned char buf[32];
  2883. memcpy(buf, relay.peerTag, 16);
  2884. memset(buf+16, 0xFF, 16);
  2885. NetworkPacket pkt={0};
  2886. pkt.data=buf;
  2887. pkt.length=32;
  2888. pkt.address=(NetworkAddress*)&relay.address;
  2889. pkt.port=relay.port;
  2890. pkt.protocol=PROTO_UDP;
  2891. udpSocket->Send(&pkt);
  2892. }
  2893. Endpoint& VoIPController::GetEndpointByType(int type){
  2894. if(type==Endpoint::Type::UDP_RELAY && preferredRelay)
  2895. return endpoints.at(preferredRelay);
  2896. for(pair<const int64_t, Endpoint>& e:endpoints){
  2897. if(e.second.type==type)
  2898. return e.second;
  2899. }
  2900. throw out_of_range("no endpoint");
  2901. }
  2902. void VoIPController::SendPacketReliably(unsigned char type, unsigned char *data, size_t len, double retryInterval, double timeout){
  2903. LOGD("Send reliably, type=%u, len=%u, retry=%.3f, timeout=%.3f", type, unsigned(len), retryInterval, timeout);
  2904. QueuedPacket pkt;
  2905. if(data){
  2906. Buffer b(len);
  2907. b.CopyFrom(data, 0, len);
  2908. pkt.data=move(b);
  2909. }
  2910. pkt.type=type;
  2911. pkt.retryInterval=retryInterval;
  2912. pkt.timeout=timeout;
  2913. pkt.firstSentTime=0;
  2914. pkt.lastSentTime=0;
  2915. {
  2916. MutexGuard m(queuedPacketsMutex);
  2917. queuedPackets.push_back(move(pkt));
  2918. }
  2919. messageThread.Post(std::bind(&VoIPController::UpdateQueuedPackets, this));
  2920. if(timeout>0.0){
  2921. messageThread.Post(std::bind(&VoIPController::UpdateQueuedPackets, this), timeout);
  2922. }
  2923. }
  2924. void VoIPController::SendExtra(Buffer &data, unsigned char type){
  2925. MutexGuard m(queuedPacketsMutex);
  2926. LOGV("Sending extra type %u length %d", type, int(data.Length()));
  2927. for(vector<UnacknowledgedExtraData>::iterator x=currentExtras.begin();x!=currentExtras.end();++x){
  2928. if(x->type==type){
  2929. x->firstContainingSeq=0;
  2930. x->data=move(data);
  2931. return;
  2932. }
  2933. }
  2934. UnacknowledgedExtraData xd={type, move(data), 0};
  2935. currentExtras.push_back(move(xd));
  2936. }
  2937. void VoIPController::DebugCtl(int request, int param){
  2938. if(request==1){ // set bitrate
  2939. maxBitrate=param;
  2940. if(encoder){
  2941. encoder->SetBitrate(maxBitrate);
  2942. }
  2943. }else if(request==2){ // set packet loss
  2944. if(encoder){
  2945. encoder->SetPacketLoss(param);
  2946. }
  2947. }else if(request==3){ // force enable/disable p2p
  2948. allowP2p=param==1;
  2949. /*if(!allowP2p && currentEndpoint && currentEndpoint->type!=Endpoint::Type::UDP_RELAY){
  2950. currentEndpoint=preferredRelay;
  2951. }else if(allowP2p){
  2952. SendPublicEndpointsRequest();
  2953. }*/
  2954. BufferOutputStream s(4);
  2955. s.WriteInt32(dataSavingMode ? INIT_FLAG_DATA_SAVING_ENABLED : 0);
  2956. SendPacketReliably(PKT_NETWORK_CHANGED, s.GetBuffer(), s.GetLength(), 1, 20);
  2957. }else if(request==4){
  2958. if(echoCanceller)
  2959. echoCanceller->Enable(param==1);
  2960. }
  2961. }
  2962. void VoIPController::SendUdpPing(Endpoint& endpoint){
  2963. if(endpoint.type!=Endpoint::Type::UDP_RELAY)
  2964. return;
  2965. BufferOutputStream p(1024);
  2966. p.WriteBytes(endpoint.peerTag, 16);
  2967. p.WriteInt32(-1);
  2968. p.WriteInt32(-1);
  2969. p.WriteInt32(-1);
  2970. p.WriteInt32(-2);
  2971. int64_t id;
  2972. crypto.rand_bytes(reinterpret_cast<uint8_t*>(&id), 8);
  2973. p.WriteInt64(id);
  2974. NetworkPacket pkt={0};
  2975. pkt.address=&endpoint.GetAddress();
  2976. pkt.port=endpoint.port;
  2977. pkt.protocol=PROTO_UDP;
  2978. pkt.data=p.GetBuffer();
  2979. pkt.length=p.GetLength();
  2980. udpSocket->Send(&pkt);
  2981. LOGV("Sending UDP ping to %s:%d, id %" PRId64, endpoint.GetAddress().ToString().c_str(), endpoint.port, id);
  2982. }
  2983. void VoIPController::ResetUdpAvailability(){
  2984. LOGI("Resetting UDP availability");
  2985. if(udpPingTimeoutID!=MessageThread::INVALID_ID){
  2986. messageThread.Cancel(udpPingTimeoutID);
  2987. }
  2988. {
  2989. MutexGuard m(endpointsMutex);
  2990. for(pair<const int64_t, Endpoint>& e:endpoints){
  2991. e.second.udpPongCount=0;
  2992. }
  2993. }
  2994. udpPingCount=0;
  2995. udpConnectivityState=UDP_PING_PENDING;
  2996. udpPingTimeoutID=messageThread.Post(std::bind(&VoIPController::SendUdpPings, this), 0.0, 0.5);
  2997. }
  2998. void VoIPController::ResetEndpointPingStats(){
  2999. MutexGuard m(endpointsMutex);
  3000. for(pair<const int64_t, Endpoint>& e:endpoints){
  3001. e.second.averageRTT=0.0;
  3002. e.second.rtts.Reset();
  3003. }
  3004. }
  3005. #pragma mark - Video
  3006. int VoIPController::GetVideoResolutionForCurrentBitrate(){
  3007. shared_ptr<Stream> stm=GetStreamByType(STREAM_TYPE_VIDEO, true);
  3008. if(!stm)
  3009. return INIT_VIDEO_RES_NONE;
  3010. int resolutionFromBitrate=INIT_VIDEO_RES_1080;
  3011. // TODO: probably move this to server config
  3012. if(stm->codec==CODEC_AVC || stm->codec==CODEC_VP8){
  3013. if(currentVideoBitrate>400000){
  3014. resolutionFromBitrate=INIT_VIDEO_RES_720;
  3015. }else if(currentVideoBitrate>250000){
  3016. resolutionFromBitrate=INIT_VIDEO_RES_480;
  3017. }else{
  3018. resolutionFromBitrate=INIT_VIDEO_RES_360;
  3019. }
  3020. }else if(stm->codec==CODEC_HEVC || stm->codec==CODEC_VP9){
  3021. if(currentVideoBitrate>400000){
  3022. resolutionFromBitrate=INIT_VIDEO_RES_1080;
  3023. }else if(currentVideoBitrate>250000){
  3024. resolutionFromBitrate=INIT_VIDEO_RES_720;
  3025. }else if(currentVideoBitrate>100000){
  3026. resolutionFromBitrate=INIT_VIDEO_RES_480;
  3027. }else{
  3028. resolutionFromBitrate=INIT_VIDEO_RES_360;
  3029. }
  3030. }
  3031. return min(peerMaxVideoResolution, resolutionFromBitrate);
  3032. }
  3033. void VoIPController::SetVideoSource(video::VideoSource *source){
  3034. if(videoSource){
  3035. videoSource->Stop();
  3036. videoSource->SetCallback(nullptr);
  3037. }
  3038. videoSource=source;
  3039. shared_ptr<Stream> stm=GetStreamByType(STREAM_TYPE_VIDEO, true);
  3040. if(!stm){
  3041. LOGE("Can't set video source when there is no outgoing video stream");
  3042. return;
  3043. }
  3044. if(videoSource){
  3045. if(!stm->enabled){
  3046. stm->enabled=true;
  3047. SendStreamFlags(*stm);
  3048. }
  3049. uint32_t bitrate=videoCongestionControl.GetBitrate();
  3050. currentVideoBitrate=bitrate;
  3051. videoSource->SetBitrate(bitrate);
  3052. videoSource->Reset(stm->codec, stm->resolution=GetVideoResolutionForCurrentBitrate());
  3053. videoSource->Start();
  3054. videoSource->SetCallback(bind(&VoIPController::SendVideoFrame, this, placeholders::_1, placeholders::_2));
  3055. lastVideoResolutionChangeTime=GetCurrentTime();
  3056. }else{
  3057. if(stm->enabled){
  3058. stm->enabled=false;
  3059. SendStreamFlags(*stm);
  3060. }
  3061. }
  3062. }
  3063. void VoIPController::SetVideoRenderer(video::VideoRenderer *renderer){
  3064. videoRenderer=renderer;
  3065. }
  3066. void VoIPController::SetVideoCodecSpecificData(const std::vector<Buffer>& data){
  3067. outgoingStreams[1]->codecSpecificData.clear();
  3068. for(const Buffer& csd:data){
  3069. outgoingStreams[1]->codecSpecificData.push_back(Buffer::CopyOf(csd));
  3070. }
  3071. LOGI("Set outgoing video stream CSD");
  3072. }
  3073. void VoIPController::SendVideoFrame(const Buffer &frame, uint32_t flags){
  3074. //LOGI("Send video frame %u flags %u", (unsigned int)frame.Length(), flags);
  3075. shared_ptr<Stream> stm=GetStreamByType(STREAM_TYPE_VIDEO, true);
  3076. if(stm){
  3077. if(firstVideoFrameTime==0.0)
  3078. firstVideoFrameTime=GetCurrentTime();
  3079. videoCongestionControl.UpdateMediaRate(static_cast<uint32_t>(frame.Length()));
  3080. uint32_t bitrate=videoCongestionControl.GetBitrate();
  3081. if(bitrate!=currentVideoBitrate){
  3082. currentVideoBitrate=bitrate;
  3083. LOGD("Setting video bitrate to %u", bitrate);
  3084. videoSource->SetBitrate(bitrate);
  3085. int resolutionFromBitrate=GetVideoResolutionForCurrentBitrate();
  3086. if(resolutionFromBitrate!=stm->resolution && GetCurrentTime()-lastVideoResolutionChangeTime>3.0){
  3087. LOGI("Changing video resolution: %d -> %d", stm->resolution, resolutionFromBitrate);
  3088. stm->resolution=resolutionFromBitrate;
  3089. messageThread.Post([this, stm, resolutionFromBitrate]{
  3090. videoSource->Reset(stm->codec, resolutionFromBitrate);
  3091. stm->csdIsValid=false;
  3092. });
  3093. lastVideoResolutionChangeTime=GetCurrentTime();
  3094. return;
  3095. }
  3096. }
  3097. if(videoKeyframeRequested){
  3098. if(flags & VIDEO_FRAME_FLAG_KEYFRAME){
  3099. for(SentVideoFrame& f:sentVideoFrames){
  3100. if(!f.unacknowledgedPackets.empty()){
  3101. for(uint32_t& pseq:f.unacknowledgedPackets){
  3102. RecentOutgoingPacket* opkt=GetRecentOutgoingPacket(pseq);
  3103. if(opkt){
  3104. videoCongestionControl.ProcessPacketLost(opkt->size);
  3105. }
  3106. }
  3107. }
  3108. }
  3109. sentVideoFrames.clear();
  3110. videoKeyframeRequested=false;
  3111. }else{
  3112. LOGV("Dropping input video frame waiting for key frame");
  3113. return;
  3114. }
  3115. }
  3116. uint32_t pts=videoFrameCount++;
  3117. if(!stm->csdIsValid){
  3118. vector<Buffer>& csd=videoSource->GetCodecSpecificData();
  3119. stm->codecSpecificData.clear();
  3120. for(Buffer& b:csd){
  3121. stm->codecSpecificData.push_back(Buffer::CopyOf(b));
  3122. }
  3123. stm->csdIsValid=true;
  3124. stm->width=videoSource->GetFrameWidth();
  3125. stm->height=videoSource->GetFrameHeight();
  3126. SendStreamCSD(*stm);
  3127. }
  3128. size_t segmentCount=frame.Length()/1024;
  3129. if(frame.Length()%1024>0)
  3130. segmentCount++;
  3131. SentVideoFrame sentFrame;
  3132. sentFrame.num=pts;
  3133. sentFrame.fragmentCount=static_cast<uint32_t>(segmentCount);
  3134. sentFrame.fragmentsInQueue=0;//static_cast<uint32_t>(segmentCount);
  3135. for(size_t seg=0;seg<segmentCount;seg++){
  3136. BufferOutputStream pkt(1500);
  3137. size_t offset=seg*1024;
  3138. size_t len=MIN(1024, frame.Length()-offset);
  3139. unsigned char pflags=STREAM_DATA_FLAG_LEN16;
  3140. //pflags |= STREAM_DATA_FLAG_HAS_MORE_FLAGS;
  3141. pkt.WriteByte((unsigned char) (stm->id | pflags)); // streamID + flags
  3142. int16_t lengthAndFlags=static_cast<int16_t>(len & 0x7FF);
  3143. if(segmentCount>1)
  3144. lengthAndFlags |= STREAM_DATA_XFLAG_FRAGMENTED;
  3145. if(flags & VIDEO_FRAME_FLAG_KEYFRAME)
  3146. lengthAndFlags |= STREAM_DATA_XFLAG_KEYFRAME;
  3147. pkt.WriteInt16(lengthAndFlags);
  3148. //pkt.WriteInt32(audioTimestampOut);
  3149. pkt.WriteInt32(pts);
  3150. if(segmentCount>1){
  3151. pkt.WriteByte((unsigned char)seg);
  3152. pkt.WriteByte((unsigned char)segmentCount);
  3153. }
  3154. //LOGV("Sending segment %u of %u", (unsigned int)seg, (unsigned int)segmentCount);
  3155. pkt.WriteBytes(frame, offset, len);
  3156. uint32_t seq=GenerateOutSeq();
  3157. size_t pktLength = pkt.GetLength();
  3158. PendingOutgoingPacket p{
  3159. /*.seq=*/seq,
  3160. /*.type=*/PKT_STREAM_DATA,
  3161. /*.len=*/pktLength,
  3162. /*.data=*/Buffer(move(pkt)),
  3163. /*.endpoint=*/0,
  3164. };
  3165. unsentStreamPackets++;
  3166. SendOrEnqueuePacket(move(p));
  3167. videoCongestionControl.ProcessPacketSent(static_cast<unsigned int>(pktLength));
  3168. sentFrame.unacknowledgedPackets.push_back(seq);
  3169. }
  3170. MutexGuard m(sentVideoFramesMutex);
  3171. sentVideoFrames.push_back(sentFrame);
  3172. }
  3173. }
  3174. void VoIPController::SendStreamCSD(VoIPController::Stream &stream){
  3175. assert(stream.csdIsValid);
  3176. BufferOutputStream os(256);
  3177. os.WriteByte(stream.id);
  3178. os.WriteInt16((int16_t)stream.width);
  3179. os.WriteInt16((int16_t)stream.height);
  3180. os.WriteByte(static_cast<unsigned char>(stream.codecSpecificData.size()));
  3181. for(Buffer& b:stream.codecSpecificData){
  3182. assert(b.Length()<255);
  3183. os.WriteByte(static_cast<unsigned char>(b.Length()));
  3184. os.WriteBytes(b);
  3185. }
  3186. Buffer buf(move(os));
  3187. SendExtra(buf, EXTRA_TYPE_STREAM_CSD);
  3188. }
  3189. void VoIPController::ProcessIncomingVideoFrame(Buffer frame, uint32_t pts, bool keyframe){
  3190. //LOGI("Incoming video frame size %u pts %u", (unsigned int)frame.Length(), pts);
  3191. if(frame.Length()==0){
  3192. LOGE("EMPTY FRAME");
  3193. }
  3194. if(videoRenderer){
  3195. shared_ptr<Stream> stm=GetStreamByType(STREAM_TYPE_VIDEO, false);
  3196. if(!stm->csdIsValid){
  3197. videoRenderer->Reset(stm->codec, stm->width, stm->height, stm->codecSpecificData);
  3198. stm->csdIsValid=true;
  3199. }
  3200. if(lastReceivedVideoFrameNumber==UINT32_MAX || lastReceivedVideoFrameNumber==pts-1 || keyframe){
  3201. lastReceivedVideoFrameNumber=pts;
  3202. //LOGV("3 before decode %u", (unsigned int)frame.Length());
  3203. videoRenderer->DecodeAndDisplay(move(frame), pts);
  3204. }else{
  3205. LOGW("Skipping non-keyframe after packet loss...");
  3206. }
  3207. }
  3208. }
  3209. void VoIPController::SetupOutgoingVideoStream(){
  3210. vector<uint32_t> myEncoders=video::VideoSource::GetAvailableEncoders();
  3211. shared_ptr<Stream> vstm=make_shared<Stream>();
  3212. vstm->id=2;
  3213. vstm->type=STREAM_TYPE_VIDEO;
  3214. if(find(myEncoders.begin(), myEncoders.end(), CODEC_HEVC)!=myEncoders.end() && find(peerVideoDecoders.begin(), peerVideoDecoders.end(), CODEC_HEVC)!=peerVideoDecoders.end()){
  3215. vstm->codec=CODEC_HEVC;
  3216. }else if(find(myEncoders.begin(), myEncoders.end(), CODEC_AVC)!=myEncoders.end() && find(peerVideoDecoders.begin(), peerVideoDecoders.end(), CODEC_AVC)!=peerVideoDecoders.end()){
  3217. vstm->codec=CODEC_AVC;
  3218. }else if(find(myEncoders.begin(), myEncoders.end(), CODEC_VP8)!=myEncoders.end() && find(peerVideoDecoders.begin(), peerVideoDecoders.end(), CODEC_VP8)!=peerVideoDecoders.end()){
  3219. vstm->codec=CODEC_VP8;
  3220. }else{
  3221. LOGW("Can't setup outgoing video stream: no codecs in common");
  3222. return;
  3223. }
  3224. vstm->enabled=false;
  3225. outgoingStreams.push_back(vstm);
  3226. }
  3227. #pragma mark - Timer methods
  3228. void VoIPController::SendUdpPings(){
  3229. LOGW("Send udp pings");
  3230. MutexGuard m(endpointsMutex);
  3231. for(pair<const int64_t, Endpoint>& e:endpoints){
  3232. if(e.second.type==Endpoint::Type::UDP_RELAY){
  3233. SendUdpPing(e.second);
  3234. }
  3235. }
  3236. if(udpConnectivityState==UDP_UNKNOWN || udpConnectivityState==UDP_PING_PENDING)
  3237. udpConnectivityState=UDP_PING_SENT;
  3238. udpPingCount++;
  3239. if(udpPingCount==4 || udpPingCount==10){
  3240. messageThread.CancelSelf();
  3241. udpPingTimeoutID=messageThread.Post(std::bind(&VoIPController::EvaluateUdpPingResults, this), 1.0);
  3242. }
  3243. }
  3244. void VoIPController::EvaluateUdpPingResults(){
  3245. double avgPongs=0;
  3246. int count=0;
  3247. for(pair<const int64_t, Endpoint>& _e:endpoints){
  3248. Endpoint& e=_e.second;
  3249. if(e.type==Endpoint::Type::UDP_RELAY){
  3250. if(e.udpPongCount>0){
  3251. avgPongs+=(double) e.udpPongCount;
  3252. count++;
  3253. }
  3254. }
  3255. }
  3256. if(count>0)
  3257. avgPongs/=(double)count;
  3258. else
  3259. avgPongs=0.0;
  3260. LOGI("UDP ping reply count: %.2f", avgPongs);
  3261. if(avgPongs==0.0 && proxyProtocol==PROXY_SOCKS5 && udpSocket!=realUdpSocket){
  3262. LOGI("Proxy does not let UDP through, closing proxy connection and using UDP directly");
  3263. NetworkSocket* proxySocket=udpSocket;
  3264. proxySocket->Close();
  3265. udpSocket=realUdpSocket;
  3266. selectCanceller->CancelSelect();
  3267. delete proxySocket;
  3268. proxySupportsUDP=false;
  3269. ResetUdpAvailability();
  3270. return;
  3271. }
  3272. bool configUseTCP=ServerConfig::GetSharedInstance()->GetBoolean("use_tcp", true);
  3273. if(configUseTCP){
  3274. if(avgPongs==0.0 || (udpConnectivityState==UDP_BAD && avgPongs<7.0)){
  3275. if(needRateFlags & NEED_RATE_FLAG_UDP_NA)
  3276. needRate=true;
  3277. udpConnectivityState=UDP_NOT_AVAILABLE;
  3278. useTCP=true;
  3279. useUDP=avgPongs>1.0;
  3280. if(endpoints.at(currentEndpoint).type!=Endpoint::Type::TCP_RELAY)
  3281. setCurrentEndpointToTCP=true;
  3282. AddTCPRelays();
  3283. waitingForRelayPeerInfo=false;
  3284. }else if(avgPongs<3.0){
  3285. if(needRateFlags & NEED_RATE_FLAG_UDP_BAD)
  3286. needRate=true;
  3287. udpConnectivityState=UDP_BAD;
  3288. useTCP=true;
  3289. setCurrentEndpointToTCP=true;
  3290. AddTCPRelays();
  3291. udpPingTimeoutID=messageThread.Post(std::bind(&VoIPController::SendUdpPings, this), 0.5, 0.5);
  3292. }else{
  3293. udpPingTimeoutID=MessageThread::INVALID_ID;
  3294. udpConnectivityState=UDP_AVAILABLE;
  3295. }
  3296. }else{
  3297. udpPingTimeoutID=MessageThread::INVALID_ID;
  3298. udpConnectivityState=UDP_NOT_AVAILABLE;
  3299. }
  3300. }
  3301. void VoIPController::SendRelayPings(){
  3302. MutexGuard m(endpointsMutex);
  3303. if((state==STATE_ESTABLISHED || state==STATE_RECONNECTING) && endpoints.size()>1){
  3304. Endpoint* _preferredRelay=&endpoints.at(preferredRelay);
  3305. Endpoint* _currentEndpoint=&endpoints.at(currentEndpoint);
  3306. Endpoint* minPingRelay=_preferredRelay;
  3307. double minPing=_preferredRelay->averageRTT*(_preferredRelay->type==Endpoint::Type::TCP_RELAY ? 2 : 1);
  3308. if(minPing==0.0) // force the switch to an available relay, if any
  3309. minPing=DBL_MAX;
  3310. for(pair<const int64_t, Endpoint>& _endpoint:endpoints){
  3311. Endpoint& endpoint=_endpoint.second;
  3312. if(endpoint.type==Endpoint::Type::TCP_RELAY && !useTCP)
  3313. continue;
  3314. if(endpoint.type==Endpoint::Type::UDP_RELAY && !useUDP)
  3315. continue;
  3316. if(GetCurrentTime()-endpoint.lastPingTime>=10){
  3317. LOGV("Sending ping to %s", endpoint.GetAddress().ToString().c_str());
  3318. SendOrEnqueuePacket(PendingOutgoingPacket{
  3319. /*.seq=*/(endpoint.lastPingSeq=GenerateOutSeq()),
  3320. /*.type=*/PKT_PING,
  3321. /*.len=*/0,
  3322. /*.data=*/Buffer(),
  3323. /*.endpoint=*/endpoint.id
  3324. });
  3325. endpoint.lastPingTime=GetCurrentTime();
  3326. }
  3327. if((useUDP && endpoint.type==Endpoint::Type::UDP_RELAY) || (useTCP && endpoint.type==Endpoint::Type::TCP_RELAY)){
  3328. double k=endpoint.type==Endpoint::Type::UDP_RELAY ? 1 : 2;
  3329. if(endpoint.averageRTT>0 && endpoint.averageRTT*k<minPing*relaySwitchThreshold){
  3330. minPing=endpoint.averageRTT*k;
  3331. minPingRelay=&endpoint;
  3332. }
  3333. }
  3334. }
  3335. if(minPingRelay->id!=preferredRelay){
  3336. preferredRelay=minPingRelay->id;
  3337. _preferredRelay=minPingRelay;
  3338. LOGV("set preferred relay to %s", _preferredRelay->address.ToString().c_str());
  3339. if(_currentEndpoint->type==Endpoint::Type::UDP_RELAY || _currentEndpoint->type==Endpoint::Type::TCP_RELAY){
  3340. currentEndpoint=preferredRelay;
  3341. _currentEndpoint=_preferredRelay;
  3342. }
  3343. }
  3344. if(_currentEndpoint->type==Endpoint::Type::UDP_RELAY && useUDP){
  3345. constexpr int64_t p2pID=(int64_t)(FOURCC('P','2','P','4')) << 32;
  3346. constexpr int64_t lanID=(int64_t)(FOURCC('L','A','N','4')) << 32;
  3347. if(endpoints.find(p2pID)!=endpoints.end()){
  3348. Endpoint& p2p=endpoints[p2pID];
  3349. if(endpoints.find(lanID)!=endpoints.end() && endpoints[lanID].averageRTT>0 && endpoints[lanID].averageRTT<minPing*relayToP2pSwitchThreshold){
  3350. currentEndpoint=lanID;
  3351. LOGI("Switching to p2p (LAN)");
  3352. }else{
  3353. if(p2p.averageRTT>0 && p2p.averageRTT<minPing*relayToP2pSwitchThreshold){
  3354. currentEndpoint=p2pID;
  3355. LOGI("Switching to p2p (Inet)");
  3356. }
  3357. }
  3358. }
  3359. }else{
  3360. if(minPing>0 && minPing<_currentEndpoint->averageRTT*p2pToRelaySwitchThreshold){
  3361. LOGI("Switching to relay");
  3362. currentEndpoint=preferredRelay;
  3363. }
  3364. }
  3365. }
  3366. }
  3367. void VoIPController::UpdateRTT(){
  3368. rttHistory.Add(GetAverageRTT());
  3369. //double v=rttHistory.Average();
  3370. if(rttHistory[0]>10.0 && rttHistory[8]>10.0 && (networkType==NET_TYPE_EDGE || networkType==NET_TYPE_GPRS)){
  3371. waitingForAcks=true;
  3372. }else{
  3373. waitingForAcks=false;
  3374. }
  3375. //LOGI("%.3lf/%.3lf, rtt diff %.3lf, waiting=%d, queue=%d", rttHistory[0], rttHistory[8], v, waitingForAcks, sendQueue->Size());
  3376. for(vector<shared_ptr<Stream>>::iterator stm=incomingStreams.begin();stm!=incomingStreams.end();++stm){
  3377. if((*stm)->jitterBuffer){
  3378. int lostCount=(*stm)->jitterBuffer->GetAndResetLostPacketCount();
  3379. if(lostCount>0 || (lostCount<0 && recvLossCount>((uint32_t) -lostCount)))
  3380. recvLossCount+=lostCount;
  3381. }
  3382. }
  3383. }
  3384. void VoIPController::UpdateCongestion(){
  3385. if(conctl && encoder){
  3386. uint32_t sendLossCount=conctl->GetSendLossCount();
  3387. sendLossCountHistory.Add(sendLossCount-prevSendLossCount);
  3388. prevSendLossCount=sendLossCount;
  3389. double packetsPerSec=1000/(double) outgoingStreams[0]->frameDuration;
  3390. double avgSendLossCount=sendLossCountHistory.Average()/packetsPerSec;
  3391. //LOGV("avg send loss: %.3f%%", avgSendLossCount*100);
  3392. if(avgSendLossCount>packetLossToEnableExtraEC && networkType!=NET_TYPE_GPRS && networkType!=NET_TYPE_EDGE){
  3393. if(!shittyInternetMode){
  3394. // Shitty Internet Mode™. Redundant redundancy you can trust.
  3395. shittyInternetMode=true;
  3396. for(shared_ptr<Stream> &s:outgoingStreams){
  3397. if(s->type==STREAM_TYPE_AUDIO){
  3398. s->extraECEnabled=true;
  3399. SendStreamFlags(*s);
  3400. break;
  3401. }
  3402. }
  3403. if(encoder)
  3404. encoder->SetSecondaryEncoderEnabled(true);
  3405. LOGW("Enabling extra EC");
  3406. if(needRateFlags & NEED_RATE_FLAG_SHITTY_INTERNET_MODE)
  3407. needRate=true;
  3408. wasExtraEC=true;
  3409. }
  3410. }
  3411. if(avgSendLossCount>0.08){
  3412. extraEcLevel=4;
  3413. }else if(avgSendLossCount>0.05){
  3414. extraEcLevel=3;
  3415. }else if(avgSendLossCount>0.02){
  3416. extraEcLevel=2;
  3417. }else{
  3418. extraEcLevel=0;
  3419. }
  3420. encoder->SetPacketLoss((int)(avgSendLossCount*100.0));
  3421. if(avgSendLossCount>rateMaxAcceptableSendLoss)
  3422. needRate=true;
  3423. if((avgSendLossCount<packetLossToEnableExtraEC || networkType==NET_TYPE_EDGE || networkType==NET_TYPE_GPRS) && shittyInternetMode){
  3424. shittyInternetMode=false;
  3425. for(shared_ptr<Stream> &s:outgoingStreams){
  3426. if(s->type==STREAM_TYPE_AUDIO){
  3427. s->extraECEnabled=false;
  3428. SendStreamFlags(*s);
  3429. break;
  3430. }
  3431. }
  3432. if(encoder)
  3433. encoder->SetSecondaryEncoderEnabled(false);
  3434. LOGW("Disabling extra EC");
  3435. }
  3436. if(!wasEncoderLaggy && encoder->GetComplexity()<10)
  3437. wasEncoderLaggy=true;
  3438. }
  3439. }
  3440. void VoIPController::UpdateAudioBitrate(){
  3441. if(encoder && conctl){
  3442. double time=GetCurrentTime();
  3443. if((audioInput && !audioInput->IsInitialized()) || (audioOutput && !audioOutput->IsInitialized())){
  3444. LOGE("Audio I/O failed");
  3445. lastError=ERROR_AUDIO_IO;
  3446. SetState(STATE_FAILED);
  3447. }
  3448. int act=conctl->GetBandwidthControlAction();
  3449. if(shittyInternetMode){
  3450. encoder->SetBitrate(8000);
  3451. }else if(act==TGVOIP_CONCTL_ACT_DECREASE){
  3452. uint32_t bitrate=encoder->GetBitrate();
  3453. if(bitrate>8000)
  3454. encoder->SetBitrate(bitrate<(minAudioBitrate+audioBitrateStepDecr) ? minAudioBitrate : (bitrate-audioBitrateStepDecr));
  3455. }else if(act==TGVOIP_CONCTL_ACT_INCREASE){
  3456. uint32_t bitrate=encoder->GetBitrate();
  3457. if(bitrate<maxBitrate)
  3458. encoder->SetBitrate(bitrate+audioBitrateStepIncr);
  3459. }
  3460. if(state==STATE_ESTABLISHED && time-lastRecvPacketTime>=reconnectingTimeout){
  3461. SetState(STATE_RECONNECTING);
  3462. if(needRateFlags & NEED_RATE_FLAG_RECONNECTING)
  3463. needRate=true;
  3464. wasReconnecting=true;
  3465. ResetUdpAvailability();
  3466. }
  3467. if(state==STATE_ESTABLISHED || state==STATE_RECONNECTING){
  3468. if(time-lastRecvPacketTime>=config.recvTimeout){
  3469. const Endpoint& _currentEndpoint=endpoints.at(currentEndpoint);
  3470. if(_currentEndpoint.type!=Endpoint::Type::UDP_RELAY && _currentEndpoint.type!=Endpoint::Type::TCP_RELAY){
  3471. LOGW("Packet receive timeout, switching to relay");
  3472. currentEndpoint=preferredRelay;
  3473. for(pair<const int64_t, Endpoint>& _e:endpoints){
  3474. Endpoint& e=_e.second;
  3475. if(e.type==Endpoint::Type::UDP_P2P_INET || e.type==Endpoint::Type::UDP_P2P_LAN){
  3476. e.averageRTT=0;
  3477. e.rtts.Reset();
  3478. }
  3479. }
  3480. if(allowP2p){
  3481. SendPublicEndpointsRequest();
  3482. }
  3483. UpdateDataSavingState();
  3484. UpdateAudioBitrateLimit();
  3485. BufferOutputStream s(4);
  3486. s.WriteInt32(dataSavingMode ? INIT_FLAG_DATA_SAVING_ENABLED : 0);
  3487. if(peerVersion<6){
  3488. SendPacketReliably(PKT_NETWORK_CHANGED, s.GetBuffer(), s.GetLength(), 1, 20);
  3489. }else{
  3490. Buffer buf(move(s));
  3491. SendExtra(buf, EXTRA_TYPE_NETWORK_CHANGED);
  3492. }
  3493. lastRecvPacketTime=time;
  3494. }else{
  3495. LOGW("Packet receive timeout, disconnecting");
  3496. lastError=ERROR_TIMEOUT;
  3497. SetState(STATE_FAILED);
  3498. }
  3499. }
  3500. }
  3501. }
  3502. }
  3503. void VoIPController::UpdateSignalBars(){
  3504. int prevSignalBarCount=GetSignalBarsCount();
  3505. double packetsPerSec=1000/(double) outgoingStreams[0]->frameDuration;
  3506. double avgSendLossCount=sendLossCountHistory.Average()/packetsPerSec;
  3507. int signalBarCount=4;
  3508. if(state==STATE_RECONNECTING || waitingForAcks)
  3509. signalBarCount=1;
  3510. if(endpoints.at(currentEndpoint).type==Endpoint::Type::TCP_RELAY){
  3511. signalBarCount=MIN(signalBarCount, 3);
  3512. }
  3513. if(avgSendLossCount>0.1){
  3514. signalBarCount=1;
  3515. }else if(avgSendLossCount>0.0625){
  3516. signalBarCount=MIN(signalBarCount, 2);
  3517. }else if(avgSendLossCount>0.025){
  3518. signalBarCount=MIN(signalBarCount, 3);
  3519. }
  3520. for(shared_ptr<Stream>& stm:incomingStreams){
  3521. if(stm->jitterBuffer){
  3522. double avgLateCount[3];
  3523. stm->jitterBuffer->GetAverageLateCount(avgLateCount);
  3524. if(avgLateCount[2]>=0.2)
  3525. signalBarCount=1;
  3526. else if(avgLateCount[2]>=0.1)
  3527. signalBarCount=MIN(signalBarCount, 2);
  3528. }
  3529. }
  3530. signalBarsHistory.Add(static_cast<unsigned char>(signalBarCount));
  3531. //LOGV("Signal bar count history %08X", *reinterpret_cast<uint32_t *>(&signalBarsHistory));
  3532. int _signalBarCount=GetSignalBarsCount();
  3533. if(_signalBarCount!=prevSignalBarCount){
  3534. LOGD("SIGNAL BAR COUNT CHANGED: %d", _signalBarCount);
  3535. if(callbacks.signalBarCountChanged)
  3536. callbacks.signalBarCountChanged(this, _signalBarCount);
  3537. }
  3538. }
  3539. void VoIPController::UpdateQueuedPackets(){
  3540. vector<PendingOutgoingPacket> packetsToSend;
  3541. {
  3542. MutexGuard m(queuedPacketsMutex);
  3543. for(std::vector<QueuedPacket>::iterator qp=queuedPackets.begin(); qp!=queuedPackets.end();){
  3544. if(qp->timeout>0 && qp->firstSentTime>0 && GetCurrentTime()-qp->firstSentTime>=qp->timeout){
  3545. LOGD("Removing queued packet because of timeout");
  3546. qp=queuedPackets.erase(qp);
  3547. continue;
  3548. }
  3549. if(GetCurrentTime()-qp->lastSentTime>=qp->retryInterval){
  3550. messageThread.Post(std::bind(&VoIPController::UpdateQueuedPackets, this), qp->retryInterval);
  3551. uint32_t seq=GenerateOutSeq();
  3552. qp->seqs.Add(seq);
  3553. qp->lastSentTime=GetCurrentTime();
  3554. //LOGD("Sending queued packet, seq=%u, type=%u, len=%u", seq, qp.type, qp.data.Length());
  3555. Buffer buf(qp->data.Length());
  3556. if(qp->firstSentTime==0)
  3557. qp->firstSentTime=qp->lastSentTime;
  3558. if(qp->data.Length())
  3559. buf.CopyFrom(qp->data, qp->data.Length());
  3560. packetsToSend.push_back(PendingOutgoingPacket{
  3561. /*.seq=*/seq,
  3562. /*.type=*/qp->type,
  3563. /*.len=*/qp->data.Length(),
  3564. /*.data=*/move(buf),
  3565. /*.endpoint=*/0
  3566. });
  3567. }
  3568. ++qp;
  3569. }
  3570. }
  3571. for(PendingOutgoingPacket& pkt:packetsToSend){
  3572. SendOrEnqueuePacket(move(pkt));
  3573. }
  3574. }
  3575. void VoIPController::SendNopPacket(){
  3576. if(state!=STATE_ESTABLISHED)
  3577. return;
  3578. SendOrEnqueuePacket(PendingOutgoingPacket{
  3579. /*.seq=*/(firstSentPing=GenerateOutSeq()),
  3580. /*.type=*/PKT_NOP,
  3581. /*.len=*/0,
  3582. /*.data=*/Buffer(),
  3583. /*.endpoint=*/0
  3584. });
  3585. }
  3586. void VoIPController::SendPublicEndpointsRequest(){
  3587. if(!allowP2p)
  3588. return;
  3589. LOGI("Sending public endpoints request");
  3590. MutexGuard m(endpointsMutex);
  3591. for(pair<const int64_t, Endpoint>& e:endpoints){
  3592. if(e.second.type==Endpoint::Type::UDP_RELAY && !e.second.IsIPv6Only()){
  3593. SendPublicEndpointsRequest(e.second);
  3594. }
  3595. }
  3596. publicEndpointsReqCount++;
  3597. if(publicEndpointsReqCount<10){
  3598. messageThread.Post([this]{
  3599. if(waitingForRelayPeerInfo){
  3600. LOGW("Resending peer relay info request");
  3601. SendPublicEndpointsRequest();
  3602. }
  3603. }, 5.0);
  3604. }else{
  3605. publicEndpointsReqCount=0;
  3606. }
  3607. }
  3608. void VoIPController::TickJitterBufferAngCongestionControl(){
  3609. // TODO get rid of this and update states of these things internally and retroactively
  3610. for(shared_ptr<Stream>& stm:incomingStreams){
  3611. if(stm->jitterBuffer){
  3612. stm->jitterBuffer->Tick();
  3613. }
  3614. }
  3615. if(conctl){
  3616. conctl->Tick();
  3617. }
  3618. }
  3619. #pragma mark - Endpoint
  3620. Endpoint::Endpoint(int64_t id, uint16_t port, const IPv4Address& _address, const IPv6Address& _v6address, Type type, unsigned char peerTag[16]) : address(_address), v6address(_v6address){
  3621. this->id=id;
  3622. this->port=port;
  3623. this->type=type;
  3624. memcpy(this->peerTag, peerTag, 16);
  3625. if(type==Type::UDP_RELAY && ServerConfig::GetSharedInstance()->GetBoolean("force_tcp", false))
  3626. this->type=Type::TCP_RELAY;
  3627. lastPingSeq=0;
  3628. lastPingTime=0;
  3629. averageRTT=0;
  3630. socket=NULL;
  3631. udpPongCount=0;
  3632. }
  3633. Endpoint::Endpoint() : address(0), v6address(string("::0")) {
  3634. lastPingSeq=0;
  3635. lastPingTime=0;
  3636. averageRTT=0;
  3637. socket=NULL;
  3638. udpPongCount=0;
  3639. }
  3640. const NetworkAddress &Endpoint::GetAddress() const{
  3641. return IsIPv6Only() ? (NetworkAddress&)v6address : (NetworkAddress&)address;
  3642. }
  3643. NetworkAddress &Endpoint::GetAddress(){
  3644. return IsIPv6Only() ? (NetworkAddress&)v6address : (NetworkAddress&)address;
  3645. }
  3646. bool Endpoint::IsIPv6Only() const{
  3647. return address.IsEmpty() && !v6address.IsEmpty();
  3648. }
  3649. Endpoint::~Endpoint(){
  3650. if(socket){
  3651. socket->Close();
  3652. delete socket;
  3653. }
  3654. }
  3655. #pragma mark - AudioInputTester
  3656. AudioInputTester::AudioInputTester(std::string deviceID) : deviceID(std::move(deviceID)){
  3657. io=audio::AudioIO::Create(this->deviceID, "default");
  3658. if(io->Failed()){
  3659. LOGE("Audio IO failed");
  3660. return;
  3661. }
  3662. input=io->GetInput();
  3663. input->SetCallback([](unsigned char* data, size_t size, void* ctx) -> size_t{
  3664. reinterpret_cast<AudioInputTester*>(ctx)->Update(reinterpret_cast<int16_t*>(data), size/2);
  3665. return 0;
  3666. }, this);
  3667. input->Start();
  3668. /*thread=new MessageThread();
  3669. thread->Start();
  3670. thread->Post([this]{
  3671. this->callback(maxSample/(float)INT16_MAX);
  3672. maxSample=0;
  3673. }, updateInterval, updateInterval);*/
  3674. }
  3675. AudioInputTester::~AudioInputTester(){
  3676. //thread->Stop();
  3677. //delete thread;
  3678. input->Stop();
  3679. delete io;
  3680. }
  3681. void AudioInputTester::Update(int16_t *samples, size_t count){
  3682. for(size_t i=0;i<count;i++){
  3683. int16_t s=abs(samples[i]);
  3684. if(s>maxSample)
  3685. maxSample=s;
  3686. }
  3687. }
  3688. float AudioInputTester::GetAndResetLevel(){
  3689. float s=maxSample;
  3690. maxSample=0;
  3691. return s/(float)INT16_MAX;
  3692. }