dispatch_io.c 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849
  1. /*
  2. * Copyright (c) 2009-2011 Apple Inc. All rights reserved.
  3. *
  4. * @APPLE_APACHE_LICENSE_HEADER_START@
  5. *
  6. * Licensed under the Apache License, Version 2.0 (the "License");
  7. * you may not use this file except in compliance with the License.
  8. * You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. *
  18. * @APPLE_APACHE_LICENSE_HEADER_END@
  19. */
  20. #include <sys/stat.h>
  21. #include <sys/types.h>
  22. #include <assert.h>
  23. #include <fcntl.h>
  24. #include <stdatomic.h>
  25. #include <stdio.h>
  26. #include <stdlib.h>
  27. #if defined(__unix__) || (defined(__APPLE__) && defined(__MACH__))
  28. #include <fts.h>
  29. #include <sys/param.h>
  30. #include <unistd.h>
  31. #endif
  32. #include <errno.h>
  33. #ifdef __APPLE__
  34. #include <mach/mach.h>
  35. #include <mach/mach_time.h>
  36. #include <libkern/OSAtomic.h>
  37. #include <TargetConditionals.h>
  38. #endif
  39. #ifdef __linux__
  40. #include <sys/resource.h>
  41. #endif
  42. #include <dispatch/dispatch.h>
  43. #include <bsdtests.h>
  44. #include "dispatch_test.h"
  45. #ifndef DISPATCHTEST_IO
  46. #if DISPATCH_API_VERSION >= 20100226 && DISPATCH_API_VERSION != 20101110
  47. #define DISPATCHTEST_IO 1
  48. #if DISPATCH_API_VERSION >= 20100723
  49. #define DISPATCHTEST_IO_PATH 1 // rdar://problem/7738093
  50. #endif
  51. #endif
  52. #endif
  53. static void
  54. test_fin(void *cxt)
  55. {
  56. test_ptr("test_fin run", cxt, cxt);
  57. test_stop();
  58. }
  59. #if DISPATCHTEST_IO
  60. #if TARGET_OS_EMBEDDED
  61. #define LARGE_FILE "/System/Library/Fonts/Cache/STHeiti-Light.ttc" // 29MB file
  62. #define maxopenfiles 768
  63. #else
  64. #define LARGE_FILE "/System/Library/Speech/Voices/Alex.SpeechVoice/Contents/Resources/PCMWave" // 417MB file
  65. #define maxopenfiles 4096
  66. #endif
  67. static void
  68. test_io_close(int with_timer, bool from_path)
  69. {
  70. #define chunks 4
  71. #define READSIZE (512*1024)
  72. unsigned int i;
  73. const char *path = LARGE_FILE;
  74. dispatch_fd_t fd = dispatch_test_fd_open(path, O_RDONLY);
  75. if (fd == -1) {
  76. if (errno == ENOENT) {
  77. test_skip("Large file not found");
  78. return;
  79. }
  80. test_errno("open", errno, 0);
  81. test_stop();
  82. }
  83. #ifdef F_GLOBAL_NOCACHE
  84. if (fcntl(fd, F_GLOBAL_NOCACHE, 1) == -1) {
  85. test_errno("fcntl F_GLOBAL_NOCACHE", errno, 0);
  86. test_stop();
  87. }
  88. #endif
  89. const size_t size = (size_t)dispatch_test_fd_lseek(fd, 0, SEEK_END) / chunks;
  90. dispatch_test_fd_lseek(fd, 0, SEEK_SET);
  91. const int expected_error = with_timer? ECANCELED : 0;
  92. dispatch_source_t t = NULL;
  93. dispatch_group_t g = dispatch_group_create();
  94. dispatch_group_enter(g);
  95. void (^cleanup_handler)(int error) = ^(int error) {
  96. test_errno("create error", error, 0);
  97. dispatch_group_leave(g);
  98. dispatch_test_fd_close(fd);
  99. };
  100. dispatch_io_t io;
  101. if (!from_path) {
  102. io = dispatch_io_create(DISPATCH_IO_RANDOM, fd,
  103. dispatch_get_global_queue(0, 0), cleanup_handler);
  104. } else {
  105. #if DISPATCHTEST_IO_PATH
  106. io = dispatch_io_create_with_path(DISPATCH_IO_RANDOM, path, O_RDONLY, 0,
  107. dispatch_get_global_queue(0, 0), cleanup_handler);
  108. #endif
  109. }
  110. dispatch_io_set_high_water(io, READSIZE);
  111. if (with_timer == 1) {
  112. dispatch_io_set_low_water(io, READSIZE);
  113. dispatch_io_set_interval(io, 2 * NSEC_PER_SEC,
  114. DISPATCH_IO_STRICT_INTERVAL);
  115. } else if (with_timer == 2) {
  116. t = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0,
  117. dispatch_get_global_queue(0,0));
  118. dispatch_retain(io);
  119. dispatch_source_set_event_handler(t, ^{
  120. dispatch_io_close(io, DISPATCH_IO_STOP);
  121. dispatch_source_cancel(t);
  122. });
  123. dispatch_source_set_cancel_handler(t, ^{
  124. dispatch_release(io);
  125. });
  126. dispatch_source_set_timer(t, dispatch_time(DISPATCH_TIME_NOW,
  127. 2 * NSEC_PER_SEC), DISPATCH_TIME_FOREVER, 0);
  128. dispatch_resume(t);
  129. }
  130. size_t chunk_sizes[chunks] = {}, *chunk_size = chunk_sizes, total = 0;
  131. dispatch_data_t data_chunks[chunks], *data = data_chunks;
  132. for (i = 0; i < chunks; i++) {
  133. data[i] = dispatch_data_empty;
  134. dispatch_group_enter(g);
  135. dispatch_io_read(io, (off_t)(i * size), size, dispatch_get_global_queue(0,0),
  136. ^(bool done, dispatch_data_t d, int error) {
  137. if (d) {
  138. chunk_size[i] += dispatch_data_get_size(d);
  139. dispatch_data_t concat = dispatch_data_create_concat(data[i], d);
  140. dispatch_release(data[i]);
  141. data[i] = concat;
  142. if ((dispatch_data_get_size(d) < READSIZE && !error && !done)) {
  143. // The timer must have fired
  144. dispatch_io_close(io, DISPATCH_IO_STOP);
  145. return;
  146. }
  147. }
  148. if (done) {
  149. test_errno("read error", error,
  150. error == expected_error ? expected_error : 0);
  151. dispatch_group_leave(g);
  152. dispatch_release(data[i]);
  153. }
  154. });
  155. }
  156. dispatch_io_close(io, 0);
  157. dispatch_io_close(io, 0);
  158. dispatch_io_read(io, 0, 1, dispatch_get_global_queue(0,0),
  159. ^(bool done, dispatch_data_t d, int error) {
  160. test_long("closed done", done, true);
  161. test_errno("closed error", error, ECANCELED);
  162. test_ptr_null("closed data", d);
  163. });
  164. dispatch_release(io);
  165. test_group_wait(g);
  166. dispatch_release(g);
  167. if (t) {
  168. dispatch_source_cancel(t);
  169. dispatch_release(t);
  170. }
  171. for (i = 0; i < chunks; i++) {
  172. if (with_timer) {
  173. test_sizet_less_than("chunk size", chunk_size[i], size + 1);
  174. } else {
  175. test_sizet("chunk size", chunk_size[i], size);
  176. }
  177. total += chunk_size[i];
  178. }
  179. if (with_timer) {
  180. test_sizet_less_than("total size", total, chunks * size + 1);
  181. } else {
  182. test_sizet("total size", total, chunks * size);
  183. }
  184. }
  185. static void
  186. test_io_stop(void) // rdar://problem/8250057
  187. {
  188. dispatch_fd_t fds[2], *fd = fds;
  189. #if defined(_WIN32)
  190. if (!CreatePipe((PHANDLE)&fds[0], (PHANDLE)&fds[1], NULL, 0)) {
  191. test_long("CreatePipe", GetLastError(), ERROR_SUCCESS);
  192. test_stop();
  193. }
  194. #else
  195. if(pipe(fd) == -1) {
  196. test_errno("pipe", errno, 0);
  197. test_stop();
  198. }
  199. #endif
  200. dispatch_group_t g = dispatch_group_create();
  201. dispatch_group_enter(g);
  202. dispatch_io_t io = dispatch_io_create(DISPATCH_IO_STREAM, *fd,
  203. dispatch_get_global_queue(0, 0), ^(int error) {
  204. test_errno("create error", error, 0);
  205. dispatch_test_fd_close(*fd);
  206. dispatch_test_fd_close(*(fd+1));
  207. dispatch_group_leave(g);
  208. });
  209. dispatch_group_enter(g);
  210. dispatch_io_read(io, 0, 1, dispatch_get_global_queue(0, 0),
  211. ^(bool done, dispatch_data_t d __attribute__((unused)), int error) {
  212. if (done) {
  213. test_errno("read error", error, ECANCELED);
  214. dispatch_group_leave(g);
  215. }
  216. });
  217. dispatch_source_t t = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0,
  218. 0, dispatch_get_global_queue(0,0));
  219. dispatch_retain(io);
  220. dispatch_source_set_event_handler(t, ^{
  221. dispatch_io_close(io, DISPATCH_IO_STOP);
  222. dispatch_release(io);
  223. });
  224. dispatch_source_set_timer(t, dispatch_time(DISPATCH_TIME_NOW,
  225. 2 * NSEC_PER_SEC), DISPATCH_TIME_FOREVER, 0);
  226. dispatch_resume(t);
  227. dispatch_release(io);
  228. test_group_wait(g);
  229. dispatch_release(g);
  230. dispatch_source_cancel(t);
  231. dispatch_release(t);
  232. }
  233. static void
  234. test_io_read_write(void)
  235. {
  236. char *path_in = dispatch_test_get_large_file();
  237. #if defined(_WIN32)
  238. char *temp_dir = getenv("TMP");
  239. if (!temp_dir) {
  240. temp_dir = getenv("TEMP");
  241. }
  242. if (!temp_dir) {
  243. test_ptr_notnull("temporary directory", temp_dir);
  244. test_stop();
  245. }
  246. char *path_out = NULL;
  247. asprintf(&path_out, "%s\\dispatchtest_io.XXXXXX", temp_dir);
  248. #else
  249. char path_out[] = "/tmp/dispatchtest_io.XXXXXX";
  250. #endif
  251. dispatch_fd_t in = dispatch_test_fd_open(path_in, O_RDONLY);
  252. if (in == -1) {
  253. test_errno("open", errno, 0);
  254. test_stop();
  255. }
  256. dispatch_test_release_large_file(path_in);
  257. free(path_in);
  258. const size_t siz_in =
  259. MIN(1024 * 1024, (size_t)dispatch_test_fd_lseek(in, 0, SEEK_END));
  260. dispatch_test_fd_lseek(in, 0, SEEK_SET);
  261. dispatch_fd_t out = mkstemp(path_out);
  262. if (out == -1) {
  263. test_errno("mkstemp", errno, 0);
  264. test_stop();
  265. }
  266. if (unlink(path_out) == -1) {
  267. test_errno("unlink", errno, 0);
  268. test_stop();
  269. }
  270. #if defined(_WIN32)
  271. free(path_out);
  272. #endif
  273. dispatch_queue_t q = dispatch_get_global_queue(0,0);
  274. dispatch_group_t g = dispatch_group_create();
  275. dispatch_group_enter(g);
  276. dispatch_io_t io_in = dispatch_io_create(DISPATCH_IO_STREAM, in,
  277. q, ^(int error) {
  278. test_errno("dispatch_io_create", error, 0);
  279. dispatch_test_fd_close(in);
  280. dispatch_group_leave(g);
  281. });
  282. dispatch_io_set_high_water(io_in, siz_in/4);
  283. dispatch_group_enter(g);
  284. dispatch_io_t io_out = dispatch_io_create(DISPATCH_IO_STREAM, out,
  285. q, ^(int error) {
  286. test_errno("dispatch_io_create", error, 0);
  287. dispatch_group_leave(g);
  288. });
  289. dispatch_io_set_high_water(io_out, siz_in/16);
  290. __block dispatch_data_t data = dispatch_data_empty;
  291. dispatch_group_enter(g);
  292. dispatch_io_read(io_in, 0, siz_in, q,
  293. ^(bool done_in, dispatch_data_t data_in, int err_in) {
  294. test_sizet_less_than("read size", dispatch_data_get_size(data_in),
  295. siz_in);
  296. if (data_in) {
  297. dispatch_group_enter(g);
  298. dispatch_io_write(io_out, 0, data_in, q,
  299. ^(bool done_out, dispatch_data_t data_out, int err_out) {
  300. if (done_out) {
  301. test_errno("dispatch_io_write", err_out, 0);
  302. test_sizet("remaining write size",
  303. data_out ? dispatch_data_get_size(data_out) : 0, 0);
  304. dispatch_group_leave(g);
  305. } else {
  306. test_sizet_less_than("remaining write size",
  307. dispatch_data_get_size(data_out), siz_in);
  308. }
  309. });
  310. dispatch_data_t concat = dispatch_data_create_concat(data, data_in);
  311. dispatch_release(data);
  312. data = concat;
  313. }
  314. if (done_in) {
  315. test_errno("dispatch_io_read", err_in, 0);
  316. dispatch_release(io_out);
  317. dispatch_group_leave(g);
  318. }
  319. });
  320. dispatch_release(io_in);
  321. test_group_wait(g);
  322. dispatch_test_fd_lseek(out, 0, SEEK_SET);
  323. dispatch_group_enter(g);
  324. dispatch_read(out, siz_in, q,
  325. ^(dispatch_data_t cmp, int err_cmp) {
  326. if (err_cmp) {
  327. test_errno("dispatch_read", err_cmp, 0);
  328. test_stop();
  329. }
  330. dispatch_test_fd_close(out);
  331. size_t siz_cmp = dispatch_data_get_size(cmp);
  332. test_sizet("readback size", siz_cmp, siz_in);
  333. const void *data_buf, *cmp_buf;
  334. dispatch_data_t data_map, cmp_map;
  335. data_map = dispatch_data_create_map(data, &data_buf, NULL);
  336. cmp_map = dispatch_data_create_map(cmp, &cmp_buf, NULL);
  337. test_long("readback memcmp",
  338. memcmp(data_buf, cmp_buf, MIN(siz_in, siz_cmp)), 0);
  339. dispatch_release(cmp_map);
  340. dispatch_release(data_map);
  341. dispatch_release(data);
  342. dispatch_group_leave(g);
  343. });
  344. test_group_wait(g);
  345. dispatch_release(g);
  346. }
  347. enum {
  348. DISPATCH_ASYNC_READ_ON_CONCURRENT_QUEUE = 0,
  349. DISPATCH_ASYNC_READ_ON_SERIAL_QUEUE,
  350. DISPATCH_READ_ON_CONCURRENT_QUEUE,
  351. DISPATCH_IO_READ_ON_CONCURRENT_QUEUE,
  352. DISPATCH_IO_READ_FROM_PATH_ON_CONCURRENT_QUEUE,
  353. };
  354. static void
  355. test_async_read(char *path, size_t size, int option, dispatch_queue_t queue,
  356. void (^process_data)(size_t))
  357. {
  358. dispatch_fd_t fd = dispatch_test_fd_open(path, O_RDONLY);
  359. if (fd == -1) {
  360. // Don't stop for access permission issues
  361. if (errno == EACCES) {
  362. process_data(size);
  363. return;
  364. }
  365. test_errno("Failed to open file", errno, 0);
  366. test_stop();
  367. }
  368. #ifdef F_GLOBAL_NOCACHE
  369. // disable caching also for extra fd opened by dispatch_io_create_with_path
  370. if (fcntl(fd, F_GLOBAL_NOCACHE, 1) == -1) {
  371. test_errno("fcntl F_GLOBAL_NOCACHE", errno, 0);
  372. test_stop();
  373. }
  374. #endif
  375. switch (option) {
  376. case DISPATCH_ASYNC_READ_ON_CONCURRENT_QUEUE:
  377. case DISPATCH_ASYNC_READ_ON_SERIAL_QUEUE:
  378. dispatch_async(queue, ^{
  379. char* buffer = NULL;
  380. #if defined(_WIN32)
  381. SYSTEM_INFO si;
  382. GetSystemInfo(&si);
  383. buffer = _aligned_malloc(size, si.dwPageSize);
  384. #else
  385. size_t pagesize = (size_t)sysconf(_SC_PAGESIZE);
  386. posix_memalign((void **)&buffer, pagesize, size);
  387. #endif
  388. ssize_t r = dispatch_test_fd_read(fd, buffer, size);
  389. if (r == -1) {
  390. test_errno("async read error", errno, 0);
  391. test_stop();
  392. }
  393. #if defined(_WIN32)
  394. _aligned_free(buffer);
  395. #else
  396. free(buffer);
  397. #endif
  398. dispatch_test_fd_close(fd);
  399. process_data((size_t)r);
  400. });
  401. break;
  402. case DISPATCH_READ_ON_CONCURRENT_QUEUE:
  403. dispatch_read(fd, size, queue, ^(dispatch_data_t d, int error) {
  404. if (error) {
  405. test_errno("dispatch_read error", error, 0);
  406. test_stop();
  407. }
  408. dispatch_test_fd_close(fd);
  409. process_data(dispatch_data_get_size(d));
  410. });
  411. break;
  412. case DISPATCH_IO_READ_ON_CONCURRENT_QUEUE:
  413. case DISPATCH_IO_READ_FROM_PATH_ON_CONCURRENT_QUEUE: {
  414. __block dispatch_data_t d = dispatch_data_empty;
  415. void (^cleanup_handler)(int error) = ^(int error) {
  416. if (error) {
  417. test_errno("dispatch_io_create error", error, 0);
  418. test_stop();
  419. }
  420. dispatch_test_fd_close(fd);
  421. process_data(dispatch_data_get_size(d));
  422. dispatch_release(d);
  423. };
  424. dispatch_io_t io = NULL;
  425. if (option == DISPATCH_IO_READ_FROM_PATH_ON_CONCURRENT_QUEUE) {
  426. #if DISPATCHTEST_IO_PATH
  427. io = dispatch_io_create_with_path(DISPATCH_IO_RANDOM, path,
  428. O_RDONLY, 0, queue, cleanup_handler);
  429. #endif
  430. } else {
  431. io = dispatch_io_create(DISPATCH_IO_RANDOM, fd, queue,
  432. cleanup_handler);
  433. }
  434. if (!io) {
  435. test_ptr_notnull("dispatch_io_create", io);
  436. test_stop();
  437. }
  438. // Timeout after 20 secs
  439. dispatch_io_set_interval(io, 20 * NSEC_PER_SEC,
  440. DISPATCH_IO_STRICT_INTERVAL);
  441. dispatch_io_read(io, 0, size, queue,
  442. ^(bool done, dispatch_data_t data, int error) {
  443. if (!done && !error && !dispatch_data_get_size(data)) {
  444. // Timer fired, and no progress from last delivery
  445. dispatch_io_close(io, DISPATCH_IO_STOP);
  446. }
  447. if (data) {
  448. dispatch_data_t c = dispatch_data_create_concat(d, data);
  449. dispatch_release(d);
  450. d = c;
  451. }
  452. if (error) {
  453. test_errno("dispatch_io_read error", error, 0);
  454. if (error != ECANCELED) {
  455. test_stop();
  456. }
  457. }
  458. });
  459. dispatch_release(io);
  460. break;
  461. }
  462. }
  463. }
  464. static void
  465. test_enumerate_dir_trees(char **paths,
  466. void (^process_file)(char *path, size_t size))
  467. {
  468. #if defined(_WIN32)
  469. for (size_t i = 0; paths[i]; i++) {
  470. char *search_path = NULL;
  471. asprintf(&search_path, "%s\\*", paths[i]);
  472. WIN32_FIND_DATAA node;
  473. HANDLE find = FindFirstFileA(search_path, &node);
  474. free(search_path);
  475. if (find == INVALID_HANDLE_VALUE) {
  476. if (GetLastError() == ERROR_ACCESS_DENIED) {
  477. return;
  478. }
  479. test_ptr_not("FindFirstFile", find, INVALID_HANDLE_VALUE);
  480. test_stop();
  481. }
  482. do {
  483. if (strcmp(node.cFileName, ".") == 0 ||
  484. strcmp(node.cFileName, "..") == 0) {
  485. continue;
  486. }
  487. char *node_path = NULL;
  488. asprintf(&node_path, "%s\\%s", paths[i], node.cFileName);
  489. if (node.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) {
  490. char *subdir_paths[] = {node_path, NULL};
  491. test_enumerate_dir_trees(subdir_paths, process_file);
  492. } else {
  493. size_t size = (size_t)(((uint64_t)node.nFileSizeHigh << 32) |
  494. node.nFileSizeLow);
  495. process_file(node_path, size);
  496. }
  497. free(node_path);
  498. } while (FindNextFileA(find, &node));
  499. FindClose(find);
  500. }
  501. #else
  502. FTS *tree = fts_open(paths, FTS_PHYSICAL|FTS_XDEV, NULL);
  503. if (!tree) {
  504. test_ptr_notnull("fts_open failed", tree);
  505. test_stop();
  506. }
  507. FTSENT *node;
  508. while ((node = fts_read(tree)) &&
  509. !(node->fts_info == FTS_ERR || node->fts_info == FTS_NS)) {
  510. if (node->fts_level > 0 && node->fts_name[0] == '.') {
  511. fts_set(tree, node, FTS_SKIP);
  512. } else if (node->fts_info == FTS_F) {
  513. size_t size = (size_t)node->fts_statp->st_size;
  514. process_file(node->fts_path, size);
  515. }
  516. }
  517. if ((!node && errno) || (node && (node->fts_info == FTS_ERR ||
  518. node->fts_info == FTS_NS))) {
  519. test_errno("fts_read failed", !node ? errno : node->fts_errno, 0);
  520. test_stop();
  521. }
  522. if (fts_close(tree)) {
  523. test_errno("fts_close failed", errno, 0);
  524. test_stop();
  525. }
  526. #endif
  527. }
  528. static int
  529. test_read_dirs(char **paths, dispatch_queue_t queue, dispatch_group_t g,
  530. dispatch_semaphore_t s, _Atomic size_t *bytes, int option)
  531. {
  532. __block int files_opened = 0;
  533. __block size_t total_size = 0;
  534. test_enumerate_dir_trees(paths, ^(char *path, size_t size){
  535. dispatch_group_enter(g);
  536. dispatch_semaphore_wait(s, DISPATCH_TIME_FOREVER);
  537. total_size += size;
  538. files_opened++;
  539. test_async_read(path, size, option, queue, ^(size_t len){
  540. atomic_fetch_add_explicit(bytes, len, memory_order_relaxed);
  541. dispatch_semaphore_signal(s);
  542. dispatch_group_leave(g);
  543. });
  544. });
  545. test_group_wait(g);
  546. test_sizet("total size", *bytes, total_size);
  547. return files_opened;
  548. }
  549. #if defined(_WIN32)
  550. extern __declspec(dllimport)
  551. #else
  552. extern __attribute__((weak_import))
  553. #endif
  554. void
  555. _dispatch_iocntl(uint32_t param, uint64_t value);
  556. enum {
  557. DISPATCH_IOCNTL_CHUNK_PAGES = 1,
  558. DISPATCH_IOCNTL_LOW_WATER_CHUNKS,
  559. DISPATCH_IOCNTL_INITIAL_DELIVERY,
  560. };
  561. static void
  562. test_read_many_files(void)
  563. {
  564. #if defined(_WIN32)
  565. char *paths[] = {NULL, NULL};
  566. char *system_root = getenv("SystemRoot");
  567. if (!system_root) {
  568. test_ptr_notnull("SystemRoot", system_root);
  569. test_stop();
  570. }
  571. asprintf(&paths[0], "%s\\System32", system_root);
  572. #else
  573. char *paths[] = {"/usr/lib", NULL};
  574. #endif
  575. dispatch_group_t g = dispatch_group_create();
  576. dispatch_semaphore_t s = dispatch_semaphore_create(maxopenfiles);
  577. uint64_t start;
  578. _Atomic size_t bytes;
  579. int files_read, i;
  580. const dispatch_queue_t queues[] = {
  581. [DISPATCH_ASYNC_READ_ON_CONCURRENT_QUEUE] =
  582. dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0),
  583. [DISPATCH_ASYNC_READ_ON_SERIAL_QUEUE] =
  584. dispatch_queue_create("read", NULL),
  585. [DISPATCH_READ_ON_CONCURRENT_QUEUE] =
  586. dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0),
  587. [DISPATCH_IO_READ_ON_CONCURRENT_QUEUE] =
  588. dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0),
  589. #if DISPATCHTEST_IO_PATH
  590. [DISPATCH_IO_READ_FROM_PATH_ON_CONCURRENT_QUEUE] =
  591. dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0),
  592. #endif
  593. };
  594. dispatch_set_target_queue(queues[DISPATCH_ASYNC_READ_ON_SERIAL_QUEUE],
  595. dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0));
  596. static const char *names[] = {
  597. [DISPATCH_ASYNC_READ_ON_CONCURRENT_QUEUE] =
  598. "dispatch_async(^{read();}) on concurrent queue",
  599. [DISPATCH_ASYNC_READ_ON_SERIAL_QUEUE] =
  600. "dispatch_async(^{read();}) on serial queue",
  601. [DISPATCH_READ_ON_CONCURRENT_QUEUE] =
  602. "dispatch_read() on concurrent queue",
  603. [DISPATCH_IO_READ_ON_CONCURRENT_QUEUE] =
  604. "dispatch_io_read() on concurrent queue",
  605. [DISPATCH_IO_READ_FROM_PATH_ON_CONCURRENT_QUEUE] =
  606. "dispatch_io_read() from path on concurrent queue",
  607. };
  608. if (&_dispatch_iocntl) {
  609. const size_t chunk_pages = 3072;
  610. _dispatch_iocntl(DISPATCH_IOCNTL_CHUNK_PAGES, (uint64_t)chunk_pages);
  611. }
  612. #if !defined(_WIN32)
  613. struct rlimit l;
  614. if (!getrlimit(RLIMIT_NOFILE, &l) && l.rlim_cur < 2 * maxopenfiles + 256) {
  615. l.rlim_cur = 2 * maxopenfiles + 256;
  616. setrlimit(RLIMIT_NOFILE, &l);
  617. }
  618. #endif
  619. for (i = 0; i < (int)(sizeof(queues)/sizeof(dispatch_queue_t)); ++i) {
  620. fprintf(stdout, "%s:\n", names[i]);
  621. bytes = 0;
  622. start = mach_absolute_time();
  623. files_read = test_read_dirs(paths, queues[i], g, s, &bytes, i);
  624. double elapsed = (double)(mach_absolute_time() - start) / NSEC_PER_SEC;
  625. double throughput = ((double)bytes / elapsed)/(1024 * 1024);
  626. fprintf(stdout, "Total Files read: %u, Total MBytes %g, "
  627. "Total time: %g s, Throughput: %g MB/s\n", files_read,
  628. (double)bytes / (1024 * 1024), elapsed, throughput);
  629. }
  630. dispatch_release(queues[DISPATCH_ASYNC_READ_ON_SERIAL_QUEUE]);
  631. dispatch_release(s);
  632. dispatch_release(g);
  633. #if defined(_WIN32)
  634. free(paths[0]);
  635. #endif
  636. }
  637. static void
  638. test_io_from_io(void) // rdar://problem/8388909
  639. {
  640. #if DISPATCH_API_VERSION >= 20101012
  641. const size_t siz_in = 10240;
  642. dispatch_queue_t q = dispatch_get_global_queue(0, 0);
  643. dispatch_group_t g = dispatch_group_create();
  644. dispatch_io_t io = NULL;
  645. // Windows does not easily support immutable directories
  646. #if !defined(_WIN32)
  647. char path[] = "/tmp/dispatchtest_io.XXXXXX/file.name";
  648. char *tmp = strrchr(path, '/');
  649. *tmp = '\0';
  650. if (!mkdtemp(path)) {
  651. test_ptr_notnull("mkdtemp failed", path);
  652. test_stop();
  653. }
  654. #ifdef UF_IMMUTABLE
  655. // Make the directory immutable
  656. if (chflags(path, UF_IMMUTABLE) == -1) {
  657. test_errno("chflags", errno, 0);
  658. test_stop();
  659. }
  660. #else
  661. // Make the directory non-read/writeable
  662. if (chmod(path, 0) == -1) {
  663. test_errno("chmod", errno, 0);
  664. test_stop();
  665. }
  666. #endif
  667. *tmp = '/';
  668. io = dispatch_io_create_with_path(DISPATCH_IO_RANDOM, path,
  669. O_CREAT|O_RDWR, 0600, q, ^(int error) {
  670. if (error) {
  671. test_errno("channel cleanup called with error", error, 0);
  672. test_stop();
  673. }
  674. test_errno("channel cleanup called", error, 0);
  675. });
  676. char *foo = malloc(256);
  677. dispatch_data_t tdata;
  678. tdata = dispatch_data_create(foo, 256, NULL, DISPATCH_DATA_DESTRUCTOR_FREE);
  679. dispatch_group_enter(g);
  680. dispatch_io_write(io, 0, tdata, q, ^(bool done, dispatch_data_t data_out,
  681. int err_out) {
  682. #ifdef UF_IMMUTABLE
  683. test_errno("error from write to immutable directory", err_out, EPERM);
  684. #else
  685. test_errno("error from write to write protected directory", err_out, EACCES);
  686. #endif
  687. test_sizet("unwritten data", dispatch_data_get_size(data_out), 256);
  688. if (!err_out && done) {
  689. test_stop();
  690. }
  691. if (done) {
  692. dispatch_group_leave(g);
  693. }
  694. });
  695. dispatch_release(tdata);
  696. dispatch_release(io);
  697. test_group_wait(g);
  698. *tmp = '\0';
  699. #ifdef UF_IMMUTABLE
  700. // Change the directory to mutable
  701. if (chflags(path, 0) == -1) {
  702. test_errno("chflags", errno, 0);
  703. test_stop();
  704. }
  705. #else
  706. // Change the directory to user read/write/execute
  707. if (chmod(path, S_IRUSR | S_IWUSR | S_IXUSR) == -1) {
  708. test_errno("chmod", errno, 0);
  709. test_stop();
  710. }
  711. #endif
  712. *tmp = '/';
  713. #endif // !defined(_WIN32)
  714. #if defined(_WIN32)
  715. char *path = dispatch_test_get_large_file();
  716. char *path_in = dispatch_test_get_large_file();
  717. dispatch_fd_t in = dispatch_test_fd_open(path_in, O_RDONLY);
  718. if (in == -1) {
  719. test_errno("open", errno, 0);
  720. test_stop();
  721. }
  722. dispatch_test_release_large_file(path_in);
  723. free(path_in);
  724. #else
  725. const char *path_in = "/dev/urandom";
  726. dispatch_fd_t in = dispatch_test_fd_open(path_in, O_RDONLY);
  727. if (in == -1) {
  728. test_errno("open", errno, 0);
  729. test_stop();
  730. }
  731. #endif
  732. dispatch_group_enter(g);
  733. io = dispatch_io_create_with_path(DISPATCH_IO_RANDOM, path,
  734. O_CREAT|O_RDWR, 0600, q, ^(int error) {
  735. if (error) {
  736. test_errno("channel cleanup called with error", error, 0);
  737. test_stop();
  738. }
  739. test_errno("channel cleanup called", error, 0);
  740. });
  741. dispatch_read(in, siz_in, q, ^(dispatch_data_t data_in, int err_in ) {
  742. if (err_in) {
  743. test_errno("dispatch_read", err_in, 0);
  744. test_stop();
  745. }
  746. dispatch_io_write(io, 0, data_in, q,
  747. ^(bool done, dispatch_data_t data_out, int err_out) {
  748. if (done) {
  749. test_errno("dispatch_io_write", err_out, 0);
  750. test_sizet("remaining write size",
  751. data_out ? dispatch_data_get_size(data_out) : 0, 0);
  752. dispatch_group_leave(g);
  753. } else {
  754. test_sizet_less_than("remaining write size",
  755. dispatch_data_get_size(data_out), siz_in);
  756. }
  757. });
  758. });
  759. test_group_wait(g);
  760. dispatch_io_t io2 = dispatch_io_create_with_io(DISPATCH_IO_STREAM, io, q,
  761. ^(int error) {
  762. if (error) {
  763. test_errno("dispatch_io_create_with_io", error, 0);
  764. test_stop();
  765. }
  766. });
  767. dispatch_release(io);
  768. dispatch_group_enter(g);
  769. __block dispatch_data_t data_out = dispatch_data_empty;
  770. dispatch_io_read(io2, 0, siz_in, q,
  771. ^(bool done, dispatch_data_t d, int error) {
  772. if (d) {
  773. dispatch_data_t concat = dispatch_data_create_concat(data_out, d);
  774. dispatch_release(data_out);
  775. data_out = concat;
  776. }
  777. if (done) {
  778. test_errno("read error from channel created_with_io", error, 0);
  779. dispatch_group_leave(g);
  780. }
  781. });
  782. dispatch_release(io2);
  783. test_group_wait(g);
  784. dispatch_release(g);
  785. test_sizet("readback size", dispatch_data_get_size(data_out), siz_in);
  786. dispatch_release(data_out);
  787. #if defined(_WIN32)
  788. dispatch_test_release_large_file(path);
  789. free(path);
  790. #endif
  791. #endif
  792. }
  793. #endif // DISPATCHTEST_IO
  794. int
  795. main(void)
  796. {
  797. dispatch_test_start("Dispatch IO");
  798. dispatch_async(dispatch_get_main_queue(), ^{
  799. #if DISPATCHTEST_IO
  800. int i; bool from_path = false;
  801. do {
  802. for (i = 0; i < 3; i++) {
  803. test_io_close(i, from_path);
  804. }
  805. #if DISPATCHTEST_IO_PATH
  806. from_path = !from_path;
  807. #endif
  808. } while (from_path);
  809. test_io_stop();
  810. test_io_from_io();
  811. test_io_read_write();
  812. test_read_many_files();
  813. #endif
  814. test_fin(NULL);
  815. });
  816. dispatch_main();
  817. }