dispatch_concur.c 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. /*
  2. * Copyright (c) 2010-2011 Apple Inc. All rights reserved.
  3. *
  4. * @APPLE_APACHE_LICENSE_HEADER_START@
  5. *
  6. * Licensed under the Apache License, Version 2.0 (the "License");
  7. * you may not use this file except in compliance with the License.
  8. * You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. *
  18. * @APPLE_APACHE_LICENSE_HEADER_END@
  19. */
  20. #include <dispatch/dispatch.h>
  21. #include <dispatch/private.h>
  22. #include <stdlib.h>
  23. #include <stdio.h>
  24. #if defined(__unix__) || (defined(__APPLE__) && defined(__MACH__))
  25. #include <sys/types.h>
  26. #include <unistd.h>
  27. #ifdef __ANDROID__
  28. #include <linux/sysctl.h>
  29. #else
  30. #if !defined(__linux__)
  31. #include <sys/sysctl.h>
  32. #endif
  33. #endif /* __ANDROID__ */
  34. #endif
  35. #include <bsdtests.h>
  36. #include "dispatch_test.h"
  37. static volatile size_t done, concur;
  38. static int use_group_async;
  39. static uint32_t activecpu;
  40. static uint32_t min_acceptable_concurrency;
  41. static dispatch_queue_t q;
  42. static dispatch_group_t g, gw;
  43. const size_t workers = 4;
  44. static void
  45. nop(void* ctxt __attribute__((unused)))
  46. {
  47. return;
  48. }
  49. static void
  50. work(void* ctxt __attribute__((unused)))
  51. {
  52. usleep(1000);
  53. __sync_add_and_fetch(&done, 1);
  54. if (!use_group_async) dispatch_group_leave(gw);
  55. }
  56. static void
  57. submit_work(void* ctxt)
  58. {
  59. size_t c = __sync_add_and_fetch(&concur, 1), *m = (size_t *)ctxt, i;
  60. if (c > *m) *m = c;
  61. for (i = 0; i < workers; ++i) {
  62. if (use_group_async) {
  63. dispatch_group_async_f(gw, q, NULL, work);
  64. } else {
  65. dispatch_group_enter(gw);
  66. dispatch_async_f(q, NULL, work);
  67. }
  68. }
  69. usleep(10000);
  70. __sync_sub_and_fetch(&concur, 1);
  71. if (!use_group_async) dispatch_group_leave(g);
  72. }
  73. static void
  74. test_concur_async(size_t n, size_t qw)
  75. {
  76. size_t i, max_concur = 0, *mcs = calloc(n, sizeof(size_t)), *mc;
  77. done = concur = 0;
  78. dispatch_suspend(q);
  79. for (i = 0, mc = mcs; i < n; i++, mc++) {
  80. if (use_group_async) {
  81. dispatch_group_async_f(g, q, mc, submit_work);
  82. } else {
  83. dispatch_group_enter(g);
  84. dispatch_async_f(q, mc, submit_work);
  85. }
  86. }
  87. dispatch_resume(q);
  88. dispatch_group_wait(g, DISPATCH_TIME_FOREVER);
  89. if (qw > 1) {
  90. size_t concurrency = MIN(n * workers, qw);
  91. if (done > min_acceptable_concurrency) {
  92. test_sizet_less_than_or_equal("concurrently completed workers", done, concurrency);
  93. } else {
  94. test_sizet("concurrently completed workers", done, concurrency);
  95. }
  96. } else {
  97. test_sizet_less_than_or_equal("concurrently completed workers", done, 1);
  98. }
  99. for (i = 0, mc = mcs; i < n; i++, mc++) {
  100. if (*mc > max_concur) max_concur = *mc;
  101. }
  102. free(mcs);
  103. size_t expect = MIN(n, qw);
  104. if (max_concur > min_acceptable_concurrency) {
  105. test_sizet_less_than_or_equal("max submission concurrency", max_concur, expect);
  106. } else {
  107. test_sizet("max submission concurrency", max_concur, expect);
  108. }
  109. dispatch_group_wait(gw, DISPATCH_TIME_FOREVER);
  110. usleep(1000);
  111. }
  112. static void
  113. sync_work(void* ctxt)
  114. {
  115. size_t c = __sync_add_and_fetch(&concur, 1), *m = (size_t *)ctxt;
  116. if (c > *m) *m = c;
  117. usleep(10000);
  118. __sync_sub_and_fetch(&concur, 1);
  119. }
  120. static void
  121. test_concur_sync(size_t n, size_t qw)
  122. {
  123. size_t i, max_concur = 0, *mcs = calloc(n, sizeof(size_t)), *mc;
  124. concur = 0;
  125. for (i = 0, mc = mcs; i < n; i++, mc++) {
  126. dispatch_group_async(g,
  127. dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH,
  128. DISPATCH_QUEUE_OVERCOMMIT), ^{
  129. usleep(100000);
  130. dispatch_sync_f(q, mc, sync_work);
  131. });
  132. }
  133. dispatch_group_wait(g, DISPATCH_TIME_FOREVER);
  134. for (i = 0, mc = mcs; i < n; i++, mc++) {
  135. if (*mc > max_concur) max_concur = *mc;
  136. }
  137. free(mcs);
  138. size_t expect = qw == 1 ? 1 : n;
  139. if (max_concur > min_acceptable_concurrency) {
  140. test_sizet_less_than_or_equal("max sync concurrency", max_concur, expect);
  141. } else {
  142. test_sizet("max sync concurrency", max_concur, expect);
  143. }
  144. }
  145. static void
  146. apply_work(void* ctxt, size_t i)
  147. {
  148. size_t c = __sync_add_and_fetch(&concur, 1), *m = ((size_t *)ctxt) + i;
  149. if (c > *m) *m = c;
  150. usleep(100000);
  151. __sync_sub_and_fetch(&concur, 1);
  152. }
  153. static void
  154. test_concur_apply(size_t n, size_t qw)
  155. {
  156. size_t i, max_concur = 0, *mcs = calloc(n, sizeof(size_t)), *mc;
  157. concur = 0;
  158. dispatch_apply_f(n, q, mcs, apply_work);
  159. for (i = 0, mc = mcs; i < n; i++, mc++) {
  160. if (*mc > max_concur) max_concur = *mc;
  161. }
  162. free(mcs);
  163. size_t expect = MIN(n, qw);
  164. if (max_concur > min_acceptable_concurrency) {
  165. test_sizet_less_than_or_equal("max apply concurrency", max_concur, expect);
  166. } else {
  167. test_sizet("max apply concurrency", max_concur, expect);
  168. }
  169. }
  170. static dispatch_queue_t
  171. create_queue(long width, dispatch_queue_t tq, long *qw, const char **ql)
  172. {
  173. if (!width) {
  174. *qw = LONG_MAX;
  175. *ql = "global";
  176. return dispatch_get_global_queue(0, 0);
  177. };
  178. dispatch_queue_t queue;
  179. dispatch_queue_attr_t qattr = NULL;
  180. *qw = width;
  181. *ql = width < LONG_MAX ? ( width == 1 ? "serial": "wide" ) : "concurrent";
  182. #if DISPATCH_API_VERSION >= 20100518 // <rdar://problem/7790099>
  183. qattr = width < LONG_MAX ? NULL : DISPATCH_QUEUE_CONCURRENT;
  184. #endif
  185. queue = dispatch_queue_create(*ql, qattr);
  186. if (!qattr) {
  187. dispatch_queue_set_width(queue, width);
  188. }
  189. if (tq) {
  190. dispatch_set_target_queue(queue, tq);
  191. }
  192. if (!qattr || tq) {
  193. dispatch_barrier_sync_f(queue, NULL, nop); // wait for changes to take effect
  194. }
  195. return queue;
  196. }
  197. int
  198. main(int argc __attribute__((unused)), char* argv[] __attribute__((unused)))
  199. {
  200. dispatch_test_start("Dispatch Private Concurrent/Wide Queue"); // <rdar://problem/8049506&8169448&8186485>
  201. #ifdef __linux__
  202. activecpu = (uint32_t)sysconf(_SC_NPROCESSORS_ONLN);
  203. #elif defined(_WIN32)
  204. SYSTEM_INFO si;
  205. GetSystemInfo(&si);
  206. activecpu = si.dwNumberOfProcessors;
  207. #else
  208. size_t s = sizeof(activecpu);
  209. sysctlbyname("hw.activecpu", &activecpu, &s, NULL, 0);
  210. #endif
  211. size_t n = activecpu / 2 > 1 ? activecpu / 2 : 1, w = activecpu * 2;
  212. min_acceptable_concurrency = (uint32_t)n;
  213. dispatch_queue_t tq, ttq;
  214. long qw, tqw, ttqw;
  215. const char *ql, *tql, *ttql;
  216. size_t qi, tqi, ttqi;
  217. long qws[] = {
  218. 0, LONG_MAX, (long)w, 1, // 0 <=> global queue
  219. };
  220. g = dispatch_group_create();
  221. gw = dispatch_group_create();
  222. for (ttqi = 0; ttqi < sizeof(qws)/sizeof(*qws); ttqi++) {
  223. ttq = create_queue(qws[ttqi], NULL, &ttqw, &ttql);
  224. for (tqi = 0; tqi < sizeof(qws)/sizeof(*qws); tqi++) {
  225. if (!qws[tqi] && qws[ttqi]) continue;
  226. tq = create_queue(qws[tqi], ttq, &tqw, &tql);
  227. for (qi = 0; qi < sizeof(qws)/sizeof(*qws); qi++) {
  228. if (!qws[qi] && qws[tqi]) continue;
  229. q = create_queue(qws[qi], tq, &qw, &ql);
  230. for (use_group_async = 0; use_group_async < 2; use_group_async++) {
  231. fprintf(stdout, "Testing dispatch%s_async on "
  232. "queue hierarchy: %s -> %s -> %s\n",
  233. use_group_async ? "_group" : "", ql, tql, ttql);
  234. fflush(stdout);
  235. test_concur_async(n, (size_t)MIN(qw, MIN(tqw, ttqw)));
  236. }
  237. fprintf(stdout, "Testing dispatch_sync on "
  238. "queue hierarchy: %s -> %s -> %s\n", ql, tql, ttql);
  239. fflush(stdout);
  240. test_concur_sync(w, (size_t)MIN(qw, MIN(tqw, ttqw)));
  241. fprintf(stdout, "Testing dispatch_apply on "
  242. "queue hierarchy: %s -> %s -> %s\n", ql, tql, ttql);
  243. fflush(stdout);
  244. test_concur_apply(activecpu, (size_t)MIN(qw, MIN(tqw, ttqw)));
  245. dispatch_release(q);
  246. }
  247. dispatch_release(tq);
  248. }
  249. dispatch_release(ttq);
  250. }
  251. dispatch_release(g);
  252. dispatch_release(gw);
  253. test_stop();
  254. return 0;
  255. }