dispatch_read2.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464
  1. /*
  2. * Copyright (c) 2010-2011 Apple Inc. All rights reserved.
  3. *
  4. * @APPLE_APACHE_LICENSE_HEADER_START@
  5. *
  6. * Licensed under the Apache License, Version 2.0 (the "License");
  7. * you may not use this file except in compliance with the License.
  8. * You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. *
  18. * @APPLE_APACHE_LICENSE_HEADER_END@
  19. */
  20. #include <sys/stat.h>
  21. #include <sys/types.h>
  22. #include <assert.h>
  23. #include <fcntl.h>
  24. #include <stdio.h>
  25. #include <stdlib.h>
  26. #if defined(__unix__) || (defined(__APPLE__) && defined(__MACH__))
  27. #include <fts.h>
  28. #include <sys/param.h>
  29. #include <unistd.h>
  30. #endif
  31. #include <errno.h>
  32. #ifdef __APPLE__
  33. #include <mach/mach.h>
  34. #include <mach/mach_time.h>
  35. #include <libkern/OSAtomic.h>
  36. #include <TargetConditionals.h>
  37. #endif
  38. #include <Block.h>
  39. #include <dispatch/dispatch.h>
  40. #include <bsdtests.h>
  41. #include "dispatch_test.h"
  42. #ifndef DISPATCHTEST_IO
  43. #if DISPATCH_API_VERSION >= 20100226 && DISPATCH_API_VERSION != 20101110
  44. #define DISPATCHTEST_IO 1
  45. #endif
  46. #endif
  47. static void
  48. test_fin(void *cxt)
  49. {
  50. test_ptr("test_fin run", cxt, cxt);
  51. test_stop();
  52. }
  53. #if DISPATCHTEST_IO
  54. /*
  55. Basic way of implementing dispatch_io's dispatch_read without
  56. using dispatch channel api's
  57. */
  58. static void
  59. dispatch_read2(dispatch_fd_t fd,
  60. size_t length,
  61. dispatch_queue_t queue,
  62. void (^handler)(dispatch_data_t d, int error))
  63. {
  64. #if !defined(_WIN32)
  65. if (fcntl(fd, F_SETFL, O_NONBLOCK) != 0) {
  66. test_errno("fcntl O_NONBLOCK", errno, 0);
  67. test_stop();
  68. }
  69. #endif
  70. dispatch_source_t reader = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ,
  71. (uintptr_t)fd, 0, queue);
  72. test_ptr_notnull("reader", reader);
  73. __block size_t bytes_read = 0;
  74. __block dispatch_data_t data = dispatch_data_empty;
  75. __block int err = 0;
  76. dispatch_source_set_event_handler(reader, ^{
  77. const ssize_t bufsiz = 1024*512; // 512KB buffer
  78. char *buffer = NULL;
  79. #if defined(_WIN32)
  80. SYSTEM_INFO si;
  81. GetSystemInfo(&si);
  82. size_t pagesize = (size_t)si.dwPageSize;
  83. buffer = _aligned_malloc(bufsiz, pagesize);
  84. #else
  85. size_t pagesize = (size_t)sysconf(_SC_PAGESIZE);
  86. posix_memalign((void **)&buffer, pagesize, bufsiz);
  87. #endif
  88. ssize_t actual = dispatch_test_fd_read(fd, buffer, bufsiz);
  89. if (actual == -1) {
  90. err = errno;
  91. }
  92. if (actual > 0) {
  93. bytes_read += (size_t)actual;
  94. #if defined(_WIN32)
  95. dispatch_data_t tmp_data = dispatch_data_create(buffer, (size_t)actual,
  96. NULL, ^{ _aligned_free(buffer); });
  97. #else
  98. dispatch_data_t tmp_data = dispatch_data_create(buffer, (size_t)actual,
  99. NULL, DISPATCH_DATA_DESTRUCTOR_FREE);
  100. #endif
  101. dispatch_data_t concat = dispatch_data_create_concat(data,tmp_data);
  102. dispatch_release(tmp_data);
  103. dispatch_release(data);
  104. data = concat;
  105. }
  106. // If we reached EOF or we read as much we were asked to.
  107. if (actual < bufsiz || bytes_read >= length) {
  108. char foo[2];
  109. actual = dispatch_test_fd_read(fd, foo, 2);
  110. bytes_read += (size_t)actual;
  111. // confirm EOF condition
  112. test_long("EOF", actual, 0);
  113. dispatch_source_cancel(reader);
  114. }
  115. });
  116. dispatch_source_set_cancel_handler(reader, ^{
  117. dispatch_data_t d = dispatch_data_create_subrange(data, 0, length);
  118. dispatch_release(data);
  119. handler(d, err);
  120. dispatch_release(d);
  121. dispatch_release(reader);
  122. });
  123. dispatch_resume(reader);
  124. }
  125. static void
  126. test_read(void)
  127. {
  128. char *path = dispatch_test_get_large_file();
  129. dispatch_fd_t fd = dispatch_test_fd_open(path, O_RDONLY);
  130. if (fd == -1) {
  131. test_errno("open", errno, 0);
  132. test_stop();
  133. }
  134. dispatch_test_release_large_file(path);
  135. free(path);
  136. #ifdef F_NOCACHE
  137. if (fcntl(fd, F_NOCACHE, 1)) {
  138. test_errno("fcntl F_NOCACHE", errno, 0);
  139. test_stop();
  140. }
  141. #else
  142. // investigate what the impact of lack of file cache disabling has
  143. // for this test
  144. #endif
  145. size_t size = (size_t)dispatch_test_fd_lseek(fd, 0, SEEK_END);
  146. dispatch_test_fd_lseek(fd, 0, SEEK_SET);
  147. dispatch_group_t g = dispatch_group_create();
  148. void (^b)(dispatch_data_t, int) = ^(dispatch_data_t d, int error) {
  149. test_errno("read error", error, 0);
  150. test_sizet("dispatch data size", d ? dispatch_data_get_size(d) : 0, size);
  151. if (d) {
  152. const void *contig_buf;
  153. size_t contig_size;
  154. dispatch_data_t tmp = dispatch_data_create_map(d, &contig_buf,
  155. &contig_size);
  156. test_sizet("dispatch data contig size", contig_size, size);
  157. if (contig_size) {
  158. // Validate the copied buffer is similar to what we expect
  159. char *buf = (char*)malloc(size);
  160. dispatch_test_fd_pread(fd, buf, size, 0);
  161. test_long("dispatch data contents", memcmp(buf, contig_buf,
  162. size), 0);
  163. free(buf);
  164. }
  165. dispatch_release(tmp);
  166. }
  167. dispatch_group_leave(g);
  168. };
  169. dispatch_group_enter(g);
  170. dispatch_read(fd, SIZE_MAX, dispatch_get_global_queue(0, 0), b); // rdar://problem/7795794
  171. test_group_wait(g);
  172. dispatch_test_fd_lseek(fd, 0, SEEK_SET);
  173. if (dispatch_test_check_evfilt_read_for_fd(fd)) {
  174. dispatch_group_enter(g);
  175. dispatch_read2(fd, size, dispatch_get_global_queue(0,0), b);
  176. test_group_wait(g);
  177. } else {
  178. test_skip("EVFILT_READ kevent not firing for test file");
  179. }
  180. dispatch_release(g);
  181. dispatch_test_fd_close(fd);
  182. }
  183. static void
  184. test_read_write(void)
  185. {
  186. #if defined(_WIN32)
  187. char *path_in = dispatch_test_get_large_file();
  188. char path_out[] = "dispatchtest_io.XXXXXX";
  189. dispatch_fd_t in = dispatch_test_fd_open(path_in, O_RDONLY);
  190. if (in == -1) {
  191. test_errno("open", errno, 0);
  192. test_stop();
  193. }
  194. dispatch_test_release_large_file(path_in);
  195. free(path_in);
  196. size_t siz_in = (size_t)dispatch_test_fd_lseek(in, 0, SEEK_END);
  197. dispatch_test_fd_lseek(in, 0, SEEK_SET);
  198. #else
  199. const char *path_in = "/dev/urandom";
  200. char path_out[] = "/tmp/dispatchtest_io.XXXXXX";
  201. const size_t siz_in = 10240;
  202. dispatch_fd_t in = dispatch_test_fd_open(path_in, O_RDONLY);
  203. if (in == -1) {
  204. test_errno("open", errno, 0);
  205. test_stop();
  206. }
  207. #endif
  208. dispatch_fd_t out = mkstemp(path_out);
  209. if (out == -1) {
  210. test_errno("mkstemp", errno, 0);
  211. test_stop();
  212. }
  213. if (unlink(path_out) == -1) {
  214. test_errno("unlink", errno, 0);
  215. test_stop();
  216. }
  217. dispatch_queue_t q = dispatch_get_global_queue(0,0);
  218. dispatch_group_t g = dispatch_group_create();
  219. dispatch_group_enter(g);
  220. __block dispatch_data_t data;
  221. dispatch_read(in, siz_in, q, ^(dispatch_data_t data_in, int err_in) {
  222. if (err_in) {
  223. test_errno("dispatch_read", err_in, 0);
  224. test_stop();
  225. }
  226. dispatch_test_fd_close(in);
  227. size_t siz_out = dispatch_data_get_size(data_in);
  228. test_sizet("read size", siz_out, siz_in);
  229. dispatch_retain(data_in);
  230. data = data_in;
  231. dispatch_write(out, data, q, ^(dispatch_data_t data_out, int err_out) {
  232. if (err_out || data_out) {
  233. test_errno("dispatch_write", err_out, 0);
  234. test_stop();
  235. }
  236. dispatch_test_fd_lseek(out, 0, SEEK_SET);
  237. dispatch_read(out, siz_out, q,
  238. ^(dispatch_data_t cmp, int err_cmp) {
  239. if (err_cmp) {
  240. test_errno("dispatch_read", err_cmp, 0);
  241. test_stop();
  242. }
  243. dispatch_test_fd_close(out);
  244. size_t siz_cmp = dispatch_data_get_size(cmp);
  245. test_sizet("readback size", siz_cmp, siz_out);
  246. const void *data_buf, *cmp_buf;
  247. dispatch_data_t data_map, cmp_map;
  248. data_map = dispatch_data_create_map(data, &data_buf, NULL);
  249. cmp_map = dispatch_data_create_map(cmp, &cmp_buf, NULL);
  250. test_long("readback memcmp",
  251. memcmp(data_buf, cmp_buf, MIN(siz_out, siz_cmp)), 0);
  252. dispatch_release(cmp_map);
  253. dispatch_release(data_map);
  254. dispatch_release(data);
  255. dispatch_group_leave(g);
  256. });
  257. });
  258. });
  259. test_group_wait(g);
  260. dispatch_release(g);
  261. }
  262. static void
  263. test_read_writes(void) // <rdar://problem/7785143>
  264. {
  265. const size_t chunks_out = 320;
  266. const size_t siz_chunk = 32, siz_in = siz_chunk * chunks_out;
  267. #if defined(_WIN32)
  268. char *path_in = dispatch_test_get_large_file();
  269. char path_out[] = "dispatchtest_io.XXXXXX";
  270. dispatch_fd_t in = dispatch_test_fd_open(path_in, O_RDONLY);
  271. if (in == -1) {
  272. test_errno("open", errno, 0);
  273. test_stop();
  274. }
  275. dispatch_test_release_large_file(path_in);
  276. free(path_in);
  277. #else
  278. const char *path_in = "/dev/urandom";
  279. char path_out[] = "/tmp/dispatchtest_io.XXXXXX";
  280. dispatch_fd_t in = dispatch_test_fd_open(path_in, O_RDONLY);
  281. if (in == -1) {
  282. test_errno("open", errno, 0);
  283. test_stop();
  284. }
  285. #endif
  286. dispatch_fd_t out = mkstemp(path_out);
  287. if (out == -1) {
  288. test_errno("mkstemp", errno, 0);
  289. test_stop();
  290. }
  291. if (unlink(path_out) == -1) {
  292. test_errno("unlink", errno, 0);
  293. test_stop();
  294. }
  295. dispatch_queue_t q = dispatch_get_global_queue(0,0);
  296. dispatch_group_t g = dispatch_group_create();
  297. dispatch_group_enter(g);
  298. __block dispatch_data_t data;
  299. __block size_t siz_out;
  300. dispatch_read(in, siz_in, q, ^(dispatch_data_t data_in, int err_in) {
  301. if (err_in) {
  302. test_errno("dispatch_read", err_in, 0);
  303. test_stop();
  304. }
  305. dispatch_test_fd_close(in);
  306. siz_out = dispatch_data_get_size(data_in);
  307. test_sizet("read size", siz_out, siz_in);
  308. dispatch_retain(data_in);
  309. data = data_in;
  310. dispatch_data_t data_chunks[chunks_out];
  311. size_t i;
  312. for (i = 0; i < chunks_out; i++) {
  313. data_chunks[i] = dispatch_data_create_subrange(data_in,
  314. i * siz_chunk, siz_chunk);
  315. }
  316. for (i = 0; i < chunks_out; i++) {
  317. dispatch_data_t d = data_chunks[i];
  318. dispatch_group_enter(g);
  319. dispatch_write(out, d, q, ^(dispatch_data_t data_out,
  320. int err_out) {
  321. if (err_out || data_out) {
  322. test_errno("dispatch_write", err_out, 0);
  323. test_stop();
  324. }
  325. dispatch_group_leave(g);
  326. });
  327. }
  328. for (i = 0; i < chunks_out; i++) {
  329. dispatch_release(data_chunks[i]);
  330. }
  331. dispatch_group_leave(g);
  332. });
  333. test_group_wait(g);
  334. dispatch_group_enter(g);
  335. dispatch_test_fd_lseek(out, 0, SEEK_SET);
  336. dispatch_read(out, siz_in, q,
  337. ^(dispatch_data_t cmp, int err_cmp) {
  338. if (err_cmp) {
  339. test_errno("dispatch_read", err_cmp, 0);
  340. test_stop();
  341. }
  342. dispatch_test_fd_close(out);
  343. size_t siz_cmp = dispatch_data_get_size(cmp);
  344. test_sizet("readback size", siz_cmp, siz_out);
  345. const void *data_buf, *cmp_buf;
  346. dispatch_data_t data_map, cmp_map;
  347. data_map = dispatch_data_create_map(data, &data_buf, NULL);
  348. cmp_map = dispatch_data_create_map(cmp, &cmp_buf, NULL);
  349. test_long("readback memcmp",
  350. memcmp(data_buf, cmp_buf, MIN(siz_out, siz_cmp)), 0);
  351. dispatch_release(cmp_map);
  352. dispatch_release(data_map);
  353. dispatch_release(data);
  354. dispatch_group_leave(g);
  355. });
  356. test_group_wait(g);
  357. dispatch_release(g);
  358. }
  359. #if !defined(_WIN32)
  360. static void
  361. test_writes_reads_eagain(void) // rdar://problem/8333366
  362. {
  363. int in = open("/dev/urandom", O_RDONLY);
  364. if (in == -1) {
  365. test_errno("open", errno, 0);
  366. test_stop();
  367. }
  368. int fds[2], *fd = fds;
  369. if(pipe(fd) == -1) {
  370. test_errno("pipe", errno, 0);
  371. test_stop();
  372. }
  373. const size_t chunks = 320;
  374. const size_t siz_chunk = 32, siz = siz_chunk * chunks;
  375. dispatch_queue_t q = dispatch_get_global_queue(0,0);
  376. dispatch_group_t g = dispatch_group_create();
  377. __block size_t siz_acc = 0, deliveries = 0;
  378. __block void (^b)(dispatch_data_t, int);
  379. b = Block_copy(^(dispatch_data_t data, int err) {
  380. if (err) {
  381. test_errno("dispatch_read", err, 0);
  382. test_stop();
  383. }
  384. deliveries++;
  385. siz_acc += dispatch_data_get_size(data);
  386. if (siz_acc < siz) {
  387. dispatch_group_enter(g);
  388. dispatch_read(*fd, siz, q, b);
  389. }
  390. dispatch_group_leave(g);
  391. });
  392. dispatch_group_enter(g);
  393. dispatch_read(*fd, siz, q, b);
  394. char *buf[siz_chunk];
  395. size_t i;
  396. for (i = 0; i < chunks; i++) {
  397. ssize_t s = read(in, buf, siz_chunk);
  398. if (s < (ssize_t)siz_chunk) {
  399. test_errno("read", errno, 0);
  400. test_stop();
  401. }
  402. s = write(*(fd+1), buf, siz_chunk);
  403. if (s < (ssize_t)siz_chunk) {
  404. test_errno("write", errno, 0);
  405. test_stop();
  406. }
  407. usleep(10000);
  408. }
  409. close(in);
  410. close(*(fd+1));
  411. test_group_wait(g);
  412. test_sizet("dispatch_read deliveries", deliveries, chunks);
  413. test_sizet("dispatch_read data size", siz_acc, siz);
  414. close(*fd);
  415. Block_release(b);
  416. dispatch_release(g);
  417. }
  418. #endif
  419. #endif // DISPATCHTEST_IO
  420. int
  421. main(void)
  422. {
  423. dispatch_test_start("Dispatch IO Convenience Read/Write");
  424. dispatch_async(dispatch_get_main_queue(), ^{
  425. #if DISPATCHTEST_IO
  426. test_read();
  427. test_read_write();
  428. test_read_writes();
  429. #if !defined(_WIN32)
  430. test_writes_reads_eagain();
  431. #endif
  432. #endif
  433. test_fin(NULL);
  434. });
  435. dispatch_main();
  436. }