dispatch_readsync.c 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. /*
  2. * Copyright (c) 2008-2011 Apple Inc. All rights reserved.
  3. *
  4. * @APPLE_APACHE_LICENSE_HEADER_START@
  5. *
  6. * Licensed under the Apache License, Version 2.0 (the "License");
  7. * you may not use this file except in compliance with the License.
  8. * You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. *
  18. * @APPLE_APACHE_LICENSE_HEADER_END@
  19. */
  20. #include <dispatch/dispatch.h>
  21. #include <dispatch/private.h>
  22. #include <stdlib.h>
  23. #if defined(__unix__) || (defined(__APPLE__) && defined(__MACH__))
  24. #include <unistd.h>
  25. #ifdef __ANDROID__
  26. #include <linux/sysctl.h>
  27. #else
  28. #if !defined(__linux__)
  29. #include <sys/sysctl.h>
  30. #endif
  31. #endif /* __ANDROID__ */
  32. #endif
  33. #include <assert.h>
  34. #include <bsdtests.h>
  35. #include "dispatch_test.h"
  36. #define LAPS 10000
  37. #define INTERVAL 100
  38. #if TARGET_OS_EMBEDDED
  39. #define BUSY 10000
  40. #define NTHREADS 16
  41. #else
  42. #define BUSY 1000000
  43. #define NTHREADS 64
  44. #endif
  45. static dispatch_group_t g;
  46. static volatile size_t r_count, w_count, workers, readers, writers, crw, count, drain;
  47. static void
  48. writer(void *ctxt)
  49. {
  50. size_t w = __sync_add_and_fetch(&writers, 1), *m = (size_t *)ctxt;
  51. if (w > *m) *m = w;
  52. usleep(10000);
  53. size_t busy = BUSY;
  54. while (busy--) if (readers) __sync_add_and_fetch(&crw, 1);
  55. if (__sync_sub_and_fetch(&w_count, 1) == 0) {
  56. if (r_count == 0) {
  57. dispatch_async(dispatch_get_main_queue(), ^{test_stop();});
  58. }
  59. }
  60. __sync_sub_and_fetch(&writers, 1);
  61. dispatch_group_leave(g);
  62. }
  63. static void
  64. reader(void *ctxt)
  65. {
  66. size_t r = __sync_add_and_fetch(&readers, 1), *m = (size_t *)ctxt;
  67. if (r > *m) *m = r;
  68. usleep(10000);
  69. size_t busy = BUSY;
  70. while (busy--) if (writers) __sync_add_and_fetch(&crw, 1);
  71. if (__sync_sub_and_fetch(&r_count, 1) == 0) {
  72. if (r_count == 0) {
  73. dispatch_async(dispatch_get_main_queue(), ^{test_stop();});
  74. }
  75. }
  76. __sync_sub_and_fetch(&readers, 1);
  77. }
  78. static void
  79. test_readsync(dispatch_queue_t rq, dispatch_queue_t wq, size_t n)
  80. {
  81. size_t i, max_readers = 0, max_writers = 0;
  82. size_t *mrs = calloc(n, sizeof(size_t)), *mr, *mw = &max_writers;
  83. r_count = LAPS * 2;
  84. w_count = LAPS / INTERVAL;
  85. workers = readers = writers = crw = count = 0;
  86. for (i = 0, mr = mrs; i < n; i++, mr++) {
  87. dispatch_group_async(g,
  88. dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH,
  89. DISPATCH_QUEUE_OVERCOMMIT), ^{
  90. __sync_add_and_fetch(&workers, 1);
  91. do {
  92. usleep(100000);
  93. } while (workers < n);
  94. for (;;) {
  95. size_t idx = __sync_add_and_fetch(&count, 1);
  96. if (idx > LAPS) break;
  97. dispatch_sync_f(rq, mr, reader);
  98. if (!(idx % INTERVAL)) {
  99. dispatch_group_enter(g);
  100. dispatch_barrier_async_f(wq, mw, writer);
  101. }
  102. dispatch_sync_f(rq, mr, reader);
  103. if (!(idx % (INTERVAL*10))) {
  104. // Let the queue drain
  105. __sync_add_and_fetch(&drain, 1);
  106. usleep(10000);
  107. dispatch_barrier_sync(wq, ^{});
  108. __sync_sub_and_fetch(&drain, 1);
  109. } else while (drain) usleep(1000);
  110. }
  111. });
  112. }
  113. dispatch_group_wait(g, DISPATCH_TIME_FOREVER);
  114. for (i = 0, mr = mrs; i < n; i++, mr++) {
  115. if (*mr > max_readers) max_readers = *mr;
  116. }
  117. free(mrs);
  118. test_sizet("max readers", max_readers, n);
  119. test_sizet("max writers", max_writers, 1);
  120. test_sizet("concurrent readers & writers", crw, 0);
  121. }
  122. int
  123. main(void)
  124. {
  125. dispatch_test_start("Dispatch Reader/Writer Queues");
  126. uint32_t activecpu, wq_max_threads;
  127. #ifdef __linux__
  128. activecpu = (uint32_t)sysconf(_SC_NPROCESSORS_ONLN);
  129. // don't want to parse /proc/sys/kernel/threads-max
  130. wq_max_threads = activecpu * NTHREADS + 2;
  131. #elif defined(_WIN32)
  132. SYSTEM_INFO si;
  133. GetSystemInfo(&si);
  134. activecpu = si.dwNumberOfProcessors;
  135. wq_max_threads = activecpu * NTHREADS + 2;
  136. #else
  137. size_t s = sizeof(uint32_t);
  138. sysctlbyname("hw.activecpu", &activecpu, &s, NULL, 0);
  139. s = sizeof(uint32_t);
  140. sysctlbyname("kern.wq_max_threads", &wq_max_threads, &s, NULL, 0);
  141. #endif
  142. // cap at wq_max_threads - one wq thread for dq - one wq thread for manager
  143. size_t n = MIN(activecpu * NTHREADS, wq_max_threads - 2);
  144. g = dispatch_group_create();
  145. dispatch_queue_attr_t qattr = NULL;
  146. #if DISPATCH_API_VERSION >= 20100518 // rdar://problem/7790099
  147. qattr = DISPATCH_QUEUE_CONCURRENT;
  148. #endif
  149. dispatch_queue_t dq = dispatch_queue_create("readsync", qattr);
  150. assert(dq);
  151. if (!qattr) {
  152. dispatch_queue_set_width(dq, LONG_MAX); // rdar://problem/7919264
  153. dispatch_barrier_sync(dq, ^{}); // wait for changes to take effect
  154. }
  155. test_readsync(dq, dq, n);
  156. dispatch_queue_t tq = dispatch_queue_create("writebarrierasync", qattr);
  157. assert(tq);
  158. if (!qattr) {
  159. dispatch_queue_set_width(tq, LONG_MAX);
  160. }
  161. dispatch_set_target_queue(dq, tq);
  162. dispatch_barrier_sync(tq, ^{}); // wait for changes to take effect
  163. test_readsync(dq, tq, n); // rdar://problem/8186485
  164. dispatch_release(tq);
  165. dispatch_release(dq);
  166. dispatch_release(g);
  167. dispatch_main();
  168. }