BlockingQueue.h 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. //
  2. // libtgvoip is free and unencumbered public domain software.
  3. // For more information, see http://unlicense.org or the UNLICENSE file
  4. // you should have received with this source code distribution.
  5. //
  6. #ifndef LIBTGVOIP_BLOCKINGQUEUE_H
  7. #define LIBTGVOIP_BLOCKINGQUEUE_H
  8. #include <stdlib.h>
  9. #include <list>
  10. #include "threading.h"
  11. #include "utils.h"
  12. namespace tgvoip{
  13. using namespace std;
  14. template<typename T>
  15. class BlockingQueue{
  16. public:
  17. TGVOIP_DISALLOW_COPY_AND_ASSIGN(BlockingQueue);
  18. BlockingQueue(size_t capacity) : semaphore(capacity, 0){
  19. this->capacity=capacity;
  20. overflowCallback=NULL;
  21. };
  22. ~BlockingQueue(){
  23. semaphore.Release();
  24. }
  25. void Put(T thing){
  26. MutexGuard sync(mutex);
  27. queue.push_back(std::move(thing));
  28. bool didOverflow=false;
  29. while(queue.size()>capacity){
  30. didOverflow=true;
  31. if(overflowCallback){
  32. overflowCallback(std::move(queue.front()));
  33. queue.pop_front();
  34. }else{
  35. abort();
  36. }
  37. }
  38. if(!didOverflow)
  39. semaphore.Release();
  40. }
  41. T GetBlocking(){
  42. semaphore.Acquire();
  43. MutexGuard sync(mutex);
  44. return GetInternal();
  45. }
  46. T Get(){
  47. MutexGuard sync(mutex);
  48. if(queue.size()>0)
  49. semaphore.Acquire();
  50. return GetInternal();
  51. }
  52. unsigned int Size(){
  53. return queue.size();
  54. }
  55. void PrepareDealloc(){
  56. }
  57. void SetOverflowCallback(void (*overflowCallback)(T)){
  58. this->overflowCallback=overflowCallback;
  59. }
  60. private:
  61. T GetInternal(){
  62. //if(queue.size()==0)
  63. // return NULL;
  64. T r=std::move(queue.front());
  65. queue.pop_front();
  66. return r;
  67. }
  68. list<T> queue;
  69. size_t capacity;
  70. //tgvoip_lock_t lock;
  71. Semaphore semaphore;
  72. Mutex mutex;
  73. void (*overflowCallback)(T);
  74. };
  75. }
  76. #endif //LIBTGVOIP_BLOCKINGQUEUE_H