dispatch_io_pipe.c 13 KB


  1. /*
  2. * Copyright (c) 2019 Apple Inc. All rights reserved.
  3. *
  4. * @APPLE_APACHE_LICENSE_HEADER_START@
  5. *
  6. * Licensed under the Apache License, Version 2.0 (the "License");
  7. * you may not use this file except in compliance with the License.
  8. * You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. *
  18. * @APPLE_APACHE_LICENSE_HEADER_END@
  19. */
  20. #include <sys/types.h>
  21. #include <assert.h>
  22. #include <errno.h>
  23. #include <fcntl.h>
  24. #include <stdio.h>
  25. #include <stdlib.h>
  26. #if defined(__unix__) || (defined(__APPLE__) && defined(__MACH__))
  27. #include <unistd.h>
  28. #endif
  29. #include <dispatch/dispatch.h>
  30. #include <bsdtests.h>
  31. #include "dispatch_test.h"
  32. enum {
  33. DISPATCH_PIPE_KIND_ANONYMOUS,
  34. #if defined(_WIN32)
  35. DISPATCH_PIPE_KIND_NAMED_INBOUND,
  36. DISPATCH_PIPE_KIND_NAMED_OUTBOUND,
  37. DISPATCH_PIPE_KIND_NAMED_INBOUND_OVERLAPPED,
  38. DISPATCH_PIPE_KIND_NAMED_OUTBOUND_OVERLAPPED,
  39. #endif
  40. DISPATCH_PIPE_KIND_COUNT,
  41. };
  42. enum {
  43. DISPATCH_TEST_IMMEDIATE,
  44. DISPATCH_TEST_DELAYED,
  45. };
  46. static const char *const pipe_names[] = {
  47. [DISPATCH_PIPE_KIND_ANONYMOUS] = "anonymous",
  48. #if defined(_WIN32)
  49. [DISPATCH_PIPE_KIND_NAMED_INBOUND] = "named, inbound",
  50. [DISPATCH_PIPE_KIND_NAMED_OUTBOUND] = "named, outbound",
  51. [DISPATCH_PIPE_KIND_NAMED_INBOUND_OVERLAPPED] = "named, inbound, overlapped",
  52. [DISPATCH_PIPE_KIND_NAMED_OUTBOUND_OVERLAPPED] = "named, outbound, overlapped",
  53. #endif
  54. };
  55. static const char *const delay_names[] = {
  56. [DISPATCH_TEST_IMMEDIATE] = "Immediate",
  57. [DISPATCH_TEST_DELAYED] = "Delayed",
  58. };
  59. #if defined(_WIN32)
  60. enum {
  61. NAMED_PIPE_BUFFER_SIZE = 0x1000,
  62. };
  63. #endif
  64. static size_t
  65. test_get_pipe_buffer_size(int kind)
  66. {
  67. #if defined(_WIN32)
  68. if (kind != DISPATCH_PIPE_KIND_ANONYMOUS) {
  69. return NAMED_PIPE_BUFFER_SIZE;
  70. }
  71. static dispatch_once_t once;
  72. static DWORD size;
  73. dispatch_once(&once, ^{
  74. HANDLE read_handle, write_handle;
  75. if (!CreatePipe(&read_handle, &write_handle, NULL, 0)) {
  76. test_long("CreatePipe", GetLastError(), ERROR_SUCCESS);
  77. test_stop();
  78. }
  79. GetNamedPipeInfo(write_handle, NULL, &size, NULL, NULL);
  80. CloseHandle(read_handle);
  81. CloseHandle(write_handle);
  82. });
  83. return size;
  84. #else
  85. (void)kind;
  86. static dispatch_once_t once;
  87. static size_t size;
  88. dispatch_once(&once, ^{
  89. int fds[2];
  90. if (pipe(fds) < 0) {
  91. test_errno("pipe", errno, 0);
  92. test_stop();
  93. }
  94. fcntl(fds[1], F_SETFL, O_NONBLOCK);
  95. for (size = 0; write(fds[1], "", 1) > 0; size++) {}
  96. close(fds[0]);
  97. close(fds[1]);
  98. });
  99. return size;
  100. #endif
  101. }
  102. #if defined(_WIN32)
  103. static void
  104. test_make_named_pipe(DWORD flags, dispatch_fd_t *readfd, dispatch_fd_t *writefd)
  105. {
  106. wchar_t name[64];
  107. static int counter = 0;
  108. swprintf(name, sizeof(name), L"\\\\.\\pipe\\dispatch_io_pipe_%lu_%d",
  109. GetCurrentProcessId(), counter++);
  110. HANDLE server = CreateNamedPipeW(name,
  111. flags | FILE_FLAG_FIRST_PIPE_INSTANCE, PIPE_TYPE_BYTE,
  112. /* nMaxInstances */ 1, NAMED_PIPE_BUFFER_SIZE,
  113. NAMED_PIPE_BUFFER_SIZE, /* nDefaultTimeOut */ 0,
  114. /* lpSecurityAttributes */ NULL);
  115. if (server == INVALID_HANDLE_VALUE) {
  116. test_ptr_not("CreateNamedPipe", server, INVALID_HANDLE_VALUE);
  117. test_stop();
  118. }
  119. HANDLE client = CreateFileW(name,
  120. (flags & PIPE_ACCESS_INBOUND) ? GENERIC_WRITE : GENERIC_READ,
  121. /* dwShareMode */ 0, /* lpSecurityAttributes */ NULL, OPEN_EXISTING,
  122. flags & FILE_FLAG_OVERLAPPED, /* hTemplateFile */ NULL);
  123. if (client == INVALID_HANDLE_VALUE) {
  124. test_ptr_not("CreateFile", client, INVALID_HANDLE_VALUE);
  125. test_stop();
  126. }
  127. if (flags & PIPE_ACCESS_INBOUND) {
  128. *readfd = (dispatch_fd_t)server;
  129. *writefd = (dispatch_fd_t)client;
  130. } else {
  131. *readfd = (dispatch_fd_t)client;
  132. *writefd = (dispatch_fd_t)server;
  133. }
  134. }
  135. #endif
  136. static void
  137. test_make_pipe(int kind, dispatch_fd_t *readfd, dispatch_fd_t *writefd)
  138. {
  139. #if defined(_WIN32)
  140. switch (kind) {
  141. case DISPATCH_PIPE_KIND_ANONYMOUS:
  142. if (!CreatePipe((PHANDLE)readfd, (PHANDLE)writefd, NULL, 0)) {
  143. test_long("CreatePipe", GetLastError(), ERROR_SUCCESS);
  144. test_stop();
  145. }
  146. break;
  147. case DISPATCH_PIPE_KIND_NAMED_INBOUND:
  148. test_make_named_pipe(PIPE_ACCESS_INBOUND, readfd, writefd);
  149. break;
  150. case DISPATCH_PIPE_KIND_NAMED_OUTBOUND:
  151. test_make_named_pipe(PIPE_ACCESS_OUTBOUND, readfd, writefd);
  152. break;
  153. case DISPATCH_PIPE_KIND_NAMED_INBOUND_OVERLAPPED:
  154. test_make_named_pipe(PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED, readfd,
  155. writefd);
  156. break;
  157. case DISPATCH_PIPE_KIND_NAMED_OUTBOUND_OVERLAPPED:
  158. test_make_named_pipe(PIPE_ACCESS_OUTBOUND | FILE_FLAG_OVERLAPPED,
  159. readfd, writefd);
  160. break;
  161. }
  162. #else
  163. (void)kind;
  164. int fds[2];
  165. if (pipe(fds) < 0) {
  166. test_errno("pipe", errno, 0);
  167. test_stop();
  168. }
  169. *readfd = fds[0];
  170. *writefd = fds[1];
  171. #endif
  172. }
  173. static void
  174. test_source_read(int kind, int delay)
  175. {
  176. printf("\nSource Read %s: %s\n", delay_names[delay], pipe_names[kind]);
  177. dispatch_fd_t readfd, writefd;
  178. test_make_pipe(kind, &readfd, &writefd);
  179. dispatch_group_t g = dispatch_group_create();
  180. dispatch_group_enter(g);
  181. void (^write_block)(void) = ^{
  182. dispatch_group_enter(g);
  183. char buf[512] = {0};
  184. ssize_t n = dispatch_test_fd_write(writefd, buf, sizeof(buf));
  185. if (n < 0) {
  186. test_errno("write error", errno, 0);
  187. test_stop();
  188. }
  189. test_sizet("num written", (size_t)n, sizeof(buf));
  190. dispatch_group_leave(g);
  191. };
  192. if (delay == DISPATCH_TEST_IMMEDIATE) {
  193. write_block();
  194. }
  195. dispatch_source_t reader = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ,
  196. (uintptr_t)readfd, 0, dispatch_get_global_queue(0, 0));
  197. test_ptr_notnull("dispatch_source_create", reader);
  198. assert(reader);
  199. dispatch_source_set_event_handler(reader, ^{
  200. dispatch_group_enter(g);
  201. char buf[512];
  202. size_t available = dispatch_source_get_data(reader);
  203. test_sizet("num available", available, sizeof(buf));
  204. ssize_t n = dispatch_test_fd_read(readfd, buf, sizeof(buf));
  205. if (n >= 0) {
  206. test_sizet("num read", (size_t)n, sizeof(buf));
  207. } else {
  208. test_errno("read error", errno, 0);
  209. }
  210. dispatch_source_cancel(reader);
  211. dispatch_group_leave(g);
  212. });
  213. dispatch_source_set_cancel_handler(reader, ^{
  214. dispatch_release(reader);
  215. dispatch_group_leave(g);
  216. });
  217. dispatch_resume(reader);
  218. dispatch_source_t t = NULL;
  219. if (delay == DISPATCH_TEST_DELAYED) {
  220. t = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0,
  221. dispatch_get_global_queue(0, 0));
  222. dispatch_source_set_event_handler(t, write_block);
  223. dispatch_source_set_timer(t,
  224. dispatch_time(DISPATCH_TIME_NOW, 500 * NSEC_PER_MSEC),
  225. DISPATCH_TIME_FOREVER, 0);
  226. dispatch_resume(t);
  227. }
  228. test_group_wait(g);
  229. dispatch_release(g);
  230. if (t) {
  231. dispatch_source_cancel(t);
  232. dispatch_release(t);
  233. }
  234. dispatch_test_fd_close(readfd);
  235. dispatch_test_fd_close(writefd);
  236. }
  237. static void
  238. test_source_write(int kind, int delay)
  239. {
  240. printf("\nSource Write %s: %s\n", delay_names[delay], pipe_names[kind]);
  241. dispatch_fd_t readfd, writefd;
  242. test_make_pipe(kind, &readfd, &writefd);
  243. dispatch_group_t g = dispatch_group_create();
  244. dispatch_group_enter(g);
  245. const size_t bufsize = test_get_pipe_buffer_size(kind);
  246. void (^write_block)(void) = ^{
  247. char *buf = calloc(bufsize, 1);
  248. assert(buf);
  249. ssize_t nw = dispatch_test_fd_write(writefd, buf, bufsize);
  250. free(buf);
  251. if (nw < 0) {
  252. test_errno("write error", errno, 0);
  253. test_stop();
  254. }
  255. test_sizet("num written", (size_t)nw, bufsize);
  256. };
  257. write_block();
  258. void (^read_block)(void) = ^{
  259. dispatch_group_enter(g);
  260. char *buf = calloc(bufsize, 1);
  261. assert(buf);
  262. ssize_t nr = dispatch_test_fd_read(readfd, buf, bufsize);
  263. free(buf);
  264. if (nr < 0) {
  265. test_errno("read error", errno, 0);
  266. test_stop();
  267. }
  268. test_sizet("num read", (size_t)nr, bufsize);
  269. dispatch_group_leave(g);
  270. };
  271. if (delay == DISPATCH_TEST_IMMEDIATE) {
  272. read_block();
  273. }
  274. dispatch_source_t writer = dispatch_source_create(
  275. DISPATCH_SOURCE_TYPE_WRITE, (uintptr_t)writefd, 0,
  276. dispatch_get_global_queue(0, 0));
  277. test_ptr_notnull("dispatch_source_create", writer);
  278. assert(writer);
  279. dispatch_source_set_event_handler(writer, ^{
  280. dispatch_group_enter(g);
  281. size_t available = dispatch_source_get_data(writer);
  282. test_sizet_less_than("num available", 0, available);
  283. write_block();
  284. read_block();
  285. dispatch_source_cancel(writer);
  286. dispatch_group_leave(g);
  287. });
  288. dispatch_source_set_cancel_handler(writer, ^{
  289. dispatch_release(writer);
  290. dispatch_group_leave(g);
  291. });
  292. dispatch_resume(writer);
  293. dispatch_source_t t = NULL;
  294. if (delay == DISPATCH_TEST_DELAYED) {
  295. t = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0,
  296. dispatch_get_global_queue(0, 0));
  297. dispatch_source_set_event_handler(t, read_block);
  298. dispatch_source_set_timer(t,
  299. dispatch_time(DISPATCH_TIME_NOW, 500 * NSEC_PER_MSEC),
  300. DISPATCH_TIME_FOREVER, 0);
  301. dispatch_resume(t);
  302. }
  303. test_group_wait(g);
  304. dispatch_release(g);
  305. if (t) {
  306. dispatch_source_cancel(t);
  307. dispatch_release(t);
  308. }
  309. dispatch_test_fd_close(readfd);
  310. dispatch_test_fd_close(writefd);
  311. }
  312. static void
  313. test_dispatch_read(int kind, int delay)
  314. {
  315. printf("\nDispatch Read %s: %s\n", delay_names[delay], pipe_names[kind]);
  316. dispatch_fd_t readfd, writefd;
  317. test_make_pipe(kind, &readfd, &writefd);
  318. dispatch_group_t g = dispatch_group_create();
  319. dispatch_group_enter(g);
  320. char writebuf[512] = {0};
  321. char *writebufp = writebuf;
  322. void (^write_block)(void) = ^{
  323. dispatch_group_enter(g);
  324. ssize_t n =
  325. dispatch_test_fd_write(writefd, writebufp, sizeof(writebuf));
  326. if (n < 0) {
  327. test_errno("write error", errno, 0);
  328. test_stop();
  329. }
  330. test_sizet("num written", (size_t)n, sizeof(writebuf));
  331. dispatch_group_leave(g);
  332. };
  333. if (delay == DISPATCH_TEST_IMMEDIATE) {
  334. write_block();
  335. }
  336. dispatch_read(readfd, sizeof(writebuf), dispatch_get_global_queue(0, 0),
  337. ^(dispatch_data_t data, int err) {
  338. test_errno("read error", err, 0);
  339. test_sizet("num read", dispatch_data_get_size(data), sizeof(writebuf));
  340. dispatch_group_leave(g);
  341. });
  342. dispatch_source_t t = NULL;
  343. if (delay == DISPATCH_TEST_DELAYED) {
  344. t = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0,
  345. dispatch_get_global_queue(0, 0));
  346. dispatch_source_set_event_handler(t, write_block);
  347. dispatch_source_set_timer(t,
  348. dispatch_time(DISPATCH_TIME_NOW, 500 * NSEC_PER_MSEC),
  349. DISPATCH_TIME_FOREVER, 0);
  350. dispatch_resume(t);
  351. }
  352. test_group_wait(g);
  353. dispatch_release(g);
  354. if (t) {
  355. dispatch_source_cancel(t);
  356. dispatch_release(t);
  357. }
  358. dispatch_test_fd_close(readfd);
  359. dispatch_test_fd_close(writefd);
  360. }
  361. static void
  362. test_dispatch_write(int kind, int delay)
  363. {
  364. printf("\nDispatch Write %s: %s\n", delay_names[delay], pipe_names[kind]);
  365. dispatch_fd_t readfd, writefd;
  366. test_make_pipe(kind, &readfd, &writefd);
  367. dispatch_group_t g = dispatch_group_create();
  368. dispatch_group_enter(g);
  369. const size_t bufsize = test_get_pipe_buffer_size(kind);
  370. char *buf = calloc(bufsize, 1);
  371. assert(buf);
  372. ssize_t nw = dispatch_test_fd_write(writefd, buf, bufsize);
  373. free(buf);
  374. if (nw < 0) {
  375. test_errno("write error", errno, 0);
  376. test_stop();
  377. }
  378. test_sizet("num written", (size_t)nw, bufsize);
  379. void (^read_block)(void) = ^{
  380. dispatch_group_enter(g);
  381. char *readbuf = calloc(bufsize, 1);
  382. assert(readbuf);
  383. ssize_t nr = dispatch_test_fd_read(readfd, readbuf, bufsize);
  384. free(readbuf);
  385. if (nr < 0) {
  386. test_errno("read error", errno, 0);
  387. test_stop();
  388. }
  389. test_sizet("num read", (size_t)nr, bufsize);
  390. dispatch_group_leave(g);
  391. };
  392. if (delay == DISPATCH_TEST_IMMEDIATE) {
  393. read_block();
  394. }
  395. buf = calloc(bufsize, 1);
  396. assert(buf);
  397. dispatch_data_t wd = dispatch_data_create(buf, bufsize,
  398. dispatch_get_global_queue(0, 0), DISPATCH_DATA_DESTRUCTOR_FREE);
  399. dispatch_write(writefd, wd, dispatch_get_global_queue(0, 0),
  400. ^(dispatch_data_t data, int err) {
  401. test_errno("write error", err, 0);
  402. test_ptr_null("data written", data);
  403. read_block();
  404. dispatch_group_leave(g);
  405. });
  406. dispatch_source_t t = NULL;
  407. if (delay == DISPATCH_TEST_DELAYED) {
  408. t = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0,
  409. dispatch_get_global_queue(0, 0));
  410. dispatch_source_set_event_handler(t, read_block);
  411. dispatch_source_set_timer(t,
  412. dispatch_time(DISPATCH_TIME_NOW, 500 * NSEC_PER_MSEC),
  413. DISPATCH_TIME_FOREVER, 0);
  414. dispatch_resume(t);
  415. }
  416. test_group_wait(g);
  417. dispatch_release(g);
  418. dispatch_release(wd);
  419. if (t) {
  420. dispatch_source_cancel(t);
  421. dispatch_release(t);
  422. }
  423. dispatch_test_fd_close(readfd);
  424. dispatch_test_fd_close(writefd);
  425. }
  426. int
  427. main(void)
  428. {
  429. dispatch_test_start("Dispatch IO Pipe");
  430. dispatch_async(dispatch_get_main_queue(), ^{
  431. for (int kind = 0; kind < DISPATCH_PIPE_KIND_COUNT; kind++) {
  432. test_source_read(kind, DISPATCH_TEST_IMMEDIATE);
  433. test_source_read(kind, DISPATCH_TEST_DELAYED);
  434. test_source_write(kind, DISPATCH_TEST_IMMEDIATE);
  435. test_source_write(kind, DISPATCH_TEST_DELAYED);
  436. test_dispatch_read(kind, DISPATCH_TEST_IMMEDIATE);
  437. test_dispatch_read(kind, DISPATCH_TEST_DELAYED);
  438. test_dispatch_write(kind, DISPATCH_TEST_IMMEDIATE);
  439. test_dispatch_write(kind, DISPATCH_TEST_DELAYED);
  440. }
  441. test_stop();
  442. });
  443. dispatch_main();
  444. }