dispatch_io_muxed.c 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. /*
  2. * Copyright (c) 2019 Apple Inc. All rights reserved.
  3. *
  4. * @APPLE_APACHE_LICENSE_HEADER_START@
  5. *
  6. * Licensed under the Apache License, Version 2.0 (the "License");
  7. * you may not use this file except in compliance with the License.
  8. * You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. *
  18. * @APPLE_APACHE_LICENSE_HEADER_END@
  19. */
  20. #include <assert.h>
  21. #include <errno.h>
  22. #include <stdio.h>
  23. #include <stdlib.h>
  24. #if defined(__unix__) || (defined(__APPLE__) && defined(__MACH__))
  25. #include <netinet/in.h>
  26. #include <sys/socket.h>
  27. #include <unistd.h>
  28. #elif defined(_WIN32)
  29. #include <WinSock2.h>
  30. #include <WS2tcpip.h>
  31. #include <Windows.h>
  32. #endif
  33. #include <dispatch/dispatch.h>
  34. #include <bsdtests.h>
  35. #include "dispatch_test.h"
  36. #if !defined(_WIN32)
  37. #define closesocket(x) close(x)
  38. #endif
  39. static void
  40. test_file_muxed(void)
  41. {
  42. printf("\nFile Muxed\n");
  43. #if defined(_WIN32)
  44. const char *temp_dir = getenv("TMP");
  45. if (!temp_dir) {
  46. temp_dir = getenv("TEMP");
  47. }
  48. if (!temp_dir) {
  49. test_ptr_notnull("temporary directory", temp_dir);
  50. test_stop();
  51. }
  52. const char *path_separator = "\\";
  53. #else
  54. const char *temp_dir = getenv("TMPDIR");
  55. if (!temp_dir) {
  56. temp_dir = "/tmp";
  57. }
  58. const char *path_separator = "/";
  59. #endif
  60. char *path = NULL;
  61. (void)asprintf(&path, "%s%sdispatchtest_io.XXXXXX", temp_dir, path_separator);
  62. dispatch_fd_t fd = mkstemp(path);
  63. if (fd == -1) {
  64. test_errno("mkstemp", errno, 0);
  65. test_stop();
  66. }
  67. if (unlink(path) == -1) {
  68. test_errno("unlink", errno, 0);
  69. test_stop();
  70. }
  71. #if defined(_WIN32)
  72. free(path);
  73. #endif
  74. dispatch_test_fd_write(fd, "test", 4);
  75. dispatch_test_fd_lseek(fd, 0, SEEK_SET);
  76. dispatch_group_t g = dispatch_group_create();
  77. dispatch_group_enter(g);
  78. dispatch_group_enter(g);
  79. dispatch_source_t reader = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ,
  80. (uintptr_t)fd, 0, dispatch_get_global_queue(0, 0));
  81. test_ptr_notnull("dispatch_source_create", reader);
  82. assert(reader);
  83. dispatch_source_set_event_handler(reader, ^{
  84. dispatch_source_cancel(reader);
  85. });
  86. dispatch_source_set_cancel_handler(reader, ^{
  87. dispatch_release(reader);
  88. dispatch_group_leave(g);
  89. });
  90. dispatch_source_t writer = dispatch_source_create(
  91. DISPATCH_SOURCE_TYPE_WRITE, (uintptr_t)fd, 0,
  92. dispatch_get_global_queue(0, 0));
  93. test_ptr_notnull("dispatch_source_create", writer);
  94. assert(writer);
  95. dispatch_source_set_event_handler(writer, ^{
  96. dispatch_source_cancel(writer);
  97. });
  98. dispatch_source_set_cancel_handler(writer, ^{
  99. dispatch_release(writer);
  100. dispatch_group_leave(g);
  101. });
  102. dispatch_resume(reader);
  103. dispatch_resume(writer);
  104. test_group_wait(g);
  105. dispatch_release(g);
  106. dispatch_test_fd_close(fd);
  107. }
  108. static void
  109. test_stream_muxed(dispatch_fd_t serverfd, dispatch_fd_t clientfd)
  110. {
  111. dispatch_group_t g = dispatch_group_create();
  112. dispatch_group_enter(g);
  113. dispatch_group_enter(g);
  114. dispatch_source_t reader = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ,
  115. (uintptr_t)serverfd, 0, dispatch_get_global_queue(0, 0));
  116. test_ptr_notnull("dispatch_source_create", reader);
  117. assert(reader);
  118. dispatch_source_set_event_handler(reader, ^{
  119. dispatch_source_cancel(reader);
  120. });
  121. dispatch_source_set_cancel_handler(reader, ^{
  122. dispatch_release(reader);
  123. dispatch_group_leave(g);
  124. });
  125. dispatch_source_t writer = dispatch_source_create(
  126. DISPATCH_SOURCE_TYPE_WRITE, (uintptr_t)serverfd, 0,
  127. dispatch_get_global_queue(0, 0));
  128. test_ptr_notnull("dispatch_source_create", writer);
  129. assert(writer);
  130. dispatch_source_set_event_handler(writer, ^{
  131. dispatch_source_cancel(writer);
  132. });
  133. dispatch_source_set_cancel_handler(writer, ^{
  134. dispatch_release(writer);
  135. dispatch_group_leave(g);
  136. });
  137. dispatch_resume(reader);
  138. dispatch_resume(writer);
  139. dispatch_after(dispatch_time(DISPATCH_TIME_NOW, 500 * NSEC_PER_MSEC),
  140. dispatch_get_global_queue(0, 0), ^{
  141. dispatch_group_enter(g);
  142. char buf[512] = {0};
  143. ssize_t n = dispatch_test_fd_write(clientfd, buf, sizeof(buf));
  144. if (n < 0) {
  145. test_errno("write error", errno, 0);
  146. test_stop();
  147. }
  148. test_sizet("num written", (size_t)n, sizeof(buf));
  149. dispatch_group_leave(g);
  150. });
  151. test_group_wait(g);
  152. dispatch_release(g);
  153. }
  154. static void
  155. test_socket_muxed(void)
  156. {
  157. printf("\nSocket Muxed\n");
  158. int listenfd = -1, serverfd = -1, clientfd = -1;
  159. struct sockaddr_in addr;
  160. socklen_t addrlen;
  161. listenfd = socket(AF_INET, SOCK_STREAM, 0);
  162. if (listenfd == -1) {
  163. test_errno("socket()", errno, 0);
  164. test_stop();
  165. }
  166. addr.sin_family = AF_INET;
  167. addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
  168. addr.sin_port = 0;
  169. addrlen = sizeof(addr);
  170. if (bind(listenfd, (struct sockaddr *)&addr, addrlen) == -1) {
  171. test_errno("bind()", errno, 0);
  172. test_stop();
  173. }
  174. if (listen(listenfd, 3) == -1) {
  175. test_errno("listen()", errno, 0);
  176. test_stop();
  177. }
  178. if (getsockname(listenfd, (struct sockaddr *)&addr, &addrlen) == -1) {
  179. test_errno("getsockname()", errno, 0);
  180. test_stop();
  181. }
  182. clientfd = socket(AF_INET, SOCK_STREAM, 0);
  183. if (clientfd == -1) {
  184. test_errno("socket()", errno, 0);
  185. test_stop();
  186. }
  187. if (connect(clientfd, (struct sockaddr *)&addr, addrlen)) {
  188. test_errno("connect()", errno, 0);
  189. test_stop();
  190. }
  191. serverfd = accept(listenfd, (struct sockaddr *)&addr, &addrlen);
  192. if (serverfd == -1) {
  193. test_errno("accept()", errno, 0);
  194. test_stop();
  195. }
  196. test_stream_muxed((dispatch_fd_t)serverfd, (dispatch_fd_t)clientfd);
  197. closesocket(clientfd);
  198. closesocket(serverfd);
  199. closesocket(listenfd);
  200. }
  201. #if defined(_WIN32)
  202. static void
  203. test_pipe_muxed(void)
  204. {
  205. printf("\nDuplex Pipe Muxed\n");
  206. wchar_t name[64];
  207. swprintf(name, sizeof(name), L"\\\\.\\pipe\\dispatch_io_muxed_%lu",
  208. GetCurrentProcessId());
  209. HANDLE server = CreateNamedPipeW(name,
  210. PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE, PIPE_TYPE_BYTE,
  211. /* nMaxInstances */ 1, /* nOutBufferSize */ 0x1000,
  212. /* nInBufferSize */ 0x1000, /* nDefaultTimeOut */ 0,
  213. /* lpSecurityAttributes */ NULL);
  214. if (server == INVALID_HANDLE_VALUE) {
  215. test_ptr_not("CreateNamedPipe", server, INVALID_HANDLE_VALUE);
  216. test_stop();
  217. }
  218. HANDLE client = CreateFileW(name, GENERIC_READ | GENERIC_WRITE,
  219. /* dwShareMode */ 0, /* lpSecurityAttributes */ NULL, OPEN_EXISTING,
  220. /* dwFlagsAndAttributes */ 0, /* hTemplateFile */ NULL);
  221. if (client == INVALID_HANDLE_VALUE) {
  222. test_ptr_not("CreateFile", client, INVALID_HANDLE_VALUE);
  223. test_stop();
  224. }
  225. test_stream_muxed((dispatch_fd_t)server, (dispatch_fd_t)client);
  226. CloseHandle(client);
  227. CloseHandle(server);
  228. }
  229. #endif
  230. int
  231. main(void)
  232. {
  233. dispatch_test_start("Dispatch IO Muxed");
  234. #if defined(_WIN32)
  235. WSADATA wsa;
  236. int err = WSAStartup(MAKEWORD(2, 2), &wsa);
  237. if (err != 0) {
  238. fprintf(stderr, "WSAStartup failed with %d\n", err);
  239. test_stop();
  240. }
  241. #endif
  242. dispatch_async(dispatch_get_main_queue(), ^{
  243. test_file_muxed();
  244. test_socket_muxed();
  245. #if defined(_WIN32)
  246. test_pipe_muxed();
  247. #endif
  248. test_stop();
  249. });
  250. dispatch_main();
  251. }