/* * Copyright (c) 2010-2011 Apple Inc. All rights reserved. * * @APPLE_APACHE_LICENSE_HEADER_START@ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * * @APPLE_APACHE_LICENSE_HEADER_END@ */ #include #include #include #include #include #include #if defined(__unix__) || (defined(__APPLE__) && defined(__MACH__)) #include #include #include #endif #include #ifdef __APPLE__ #include #include #include #include #endif #include #include #include #include "dispatch_test.h" #ifndef DISPATCHTEST_IO #if DISPATCH_API_VERSION >= 20100226 && DISPATCH_API_VERSION != 20101110 #define DISPATCHTEST_IO 1 #endif #endif static void test_fin(void *cxt) { test_ptr("test_fin run", cxt, cxt); test_stop(); } #if DISPATCHTEST_IO /* Basic way of implementing dispatch_io's dispatch_read without using dispatch channel api's */ static void dispatch_read2(dispatch_fd_t fd, size_t length, dispatch_queue_t queue, void (^handler)(dispatch_data_t d, int error)) { #if !defined(_WIN32) if (fcntl(fd, F_SETFL, O_NONBLOCK) != 0) { test_errno("fcntl O_NONBLOCK", errno, 0); test_stop(); } #endif dispatch_source_t reader = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, (uintptr_t)fd, 0, queue); test_ptr_notnull("reader", reader); __block size_t bytes_read = 0; __block dispatch_data_t data = dispatch_data_empty; __block int err = 0; dispatch_source_set_event_handler(reader, ^{ const ssize_t bufsiz = 1024*512; // 512KB buffer char *buffer = NULL; #if defined(_WIN32) SYSTEM_INFO si; GetSystemInfo(&si); size_t pagesize = (size_t)si.dwPageSize; buffer = _aligned_malloc(bufsiz, pagesize); #else size_t pagesize = (size_t)sysconf(_SC_PAGESIZE); posix_memalign((void **)&buffer, pagesize, bufsiz); #endif ssize_t actual = dispatch_test_fd_read(fd, buffer, bufsiz); if (actual == -1) { err = errno; } if (actual > 0) { bytes_read += (size_t)actual; #if defined(_WIN32) dispatch_data_t tmp_data = dispatch_data_create(buffer, (size_t)actual, NULL, ^{ _aligned_free(buffer); }); #else dispatch_data_t tmp_data = dispatch_data_create(buffer, (size_t)actual, NULL, DISPATCH_DATA_DESTRUCTOR_FREE); #endif dispatch_data_t concat = dispatch_data_create_concat(data,tmp_data); dispatch_release(tmp_data); dispatch_release(data); data = concat; } // If we reached EOF or we read as much we were asked to. if (actual < bufsiz || bytes_read >= length) { char foo[2]; actual = dispatch_test_fd_read(fd, foo, 2); bytes_read += (size_t)actual; // confirm EOF condition test_long("EOF", actual, 0); dispatch_source_cancel(reader); } }); dispatch_source_set_cancel_handler(reader, ^{ dispatch_data_t d = dispatch_data_create_subrange(data, 0, length); dispatch_release(data); handler(d, err); dispatch_release(d); dispatch_release(reader); }); dispatch_resume(reader); } static void test_read(void) { char *path = dispatch_test_get_large_file(); dispatch_fd_t fd = dispatch_test_fd_open(path, O_RDONLY); if (fd == -1) { test_errno("open", errno, 0); test_stop(); } dispatch_test_release_large_file(path); free(path); #ifdef F_NOCACHE if (fcntl(fd, F_NOCACHE, 1)) { test_errno("fcntl F_NOCACHE", errno, 0); test_stop(); } #else // investigate what the impact of lack of file cache disabling has // for this test #endif size_t size = (size_t)dispatch_test_fd_lseek(fd, 0, SEEK_END); dispatch_test_fd_lseek(fd, 0, SEEK_SET); dispatch_group_t g = dispatch_group_create(); void (^b)(dispatch_data_t, int) = ^(dispatch_data_t d, int error) { test_errno("read error", error, 0); test_sizet("dispatch data size", d ? dispatch_data_get_size(d) : 0, size); if (d) { const void *contig_buf; size_t contig_size; dispatch_data_t tmp = dispatch_data_create_map(d, &contig_buf, &contig_size); test_sizet("dispatch data contig size", contig_size, size); if (contig_size) { // Validate the copied buffer is similar to what we expect char *buf = (char*)malloc(size); dispatch_test_fd_pread(fd, buf, size, 0); test_long("dispatch data contents", memcmp(buf, contig_buf, size), 0); free(buf); } dispatch_release(tmp); } dispatch_group_leave(g); }; dispatch_group_enter(g); dispatch_read(fd, SIZE_MAX, dispatch_get_global_queue(0, 0), b); // rdar://problem/7795794 test_group_wait(g); dispatch_test_fd_lseek(fd, 0, SEEK_SET); if (dispatch_test_check_evfilt_read_for_fd(fd)) { dispatch_group_enter(g); dispatch_read2(fd, size, dispatch_get_global_queue(0,0), b); test_group_wait(g); } else { test_skip("EVFILT_READ kevent not firing for test file"); } dispatch_release(g); dispatch_test_fd_close(fd); } static void test_read_write(void) { #if defined(_WIN32) char *path_in = dispatch_test_get_large_file(); char path_out[] = "dispatchtest_io.XXXXXX"; dispatch_fd_t in = dispatch_test_fd_open(path_in, O_RDONLY); if (in == -1) { test_errno("open", errno, 0); test_stop(); } dispatch_test_release_large_file(path_in); free(path_in); size_t siz_in = (size_t)dispatch_test_fd_lseek(in, 0, SEEK_END); dispatch_test_fd_lseek(in, 0, SEEK_SET); #else const char *path_in = "/dev/urandom"; char path_out[] = "/tmp/dispatchtest_io.XXXXXX"; const size_t siz_in = 10240; dispatch_fd_t in = dispatch_test_fd_open(path_in, O_RDONLY); if (in == -1) { test_errno("open", errno, 0); test_stop(); } #endif dispatch_fd_t out = mkstemp(path_out); if (out == -1) { test_errno("mkstemp", errno, 0); test_stop(); } if (unlink(path_out) == -1) { test_errno("unlink", errno, 0); test_stop(); } dispatch_queue_t q = dispatch_get_global_queue(0,0); dispatch_group_t g = dispatch_group_create(); dispatch_group_enter(g); __block dispatch_data_t data; dispatch_read(in, siz_in, q, ^(dispatch_data_t data_in, int err_in) { if (err_in) { test_errno("dispatch_read", err_in, 0); test_stop(); } dispatch_test_fd_close(in); size_t siz_out = dispatch_data_get_size(data_in); test_sizet("read size", siz_out, siz_in); dispatch_retain(data_in); data = data_in; dispatch_write(out, data, q, ^(dispatch_data_t data_out, int err_out) { if (err_out || data_out) { test_errno("dispatch_write", err_out, 0); test_stop(); } dispatch_test_fd_lseek(out, 0, SEEK_SET); dispatch_read(out, siz_out, q, ^(dispatch_data_t cmp, int err_cmp) { if (err_cmp) { test_errno("dispatch_read", err_cmp, 0); test_stop(); } dispatch_test_fd_close(out); size_t siz_cmp = dispatch_data_get_size(cmp); test_sizet("readback size", siz_cmp, siz_out); const void *data_buf, *cmp_buf; dispatch_data_t data_map, cmp_map; data_map = dispatch_data_create_map(data, &data_buf, NULL); cmp_map = dispatch_data_create_map(cmp, &cmp_buf, NULL); test_long("readback memcmp", memcmp(data_buf, cmp_buf, MIN(siz_out, siz_cmp)), 0); dispatch_release(cmp_map); dispatch_release(data_map); dispatch_release(data); dispatch_group_leave(g); }); }); }); test_group_wait(g); dispatch_release(g); } static void test_read_writes(void) // { const size_t chunks_out = 320; const size_t siz_chunk = 32, siz_in = siz_chunk * chunks_out; #if defined(_WIN32) char *path_in = dispatch_test_get_large_file(); char path_out[] = "dispatchtest_io.XXXXXX"; dispatch_fd_t in = dispatch_test_fd_open(path_in, O_RDONLY); if (in == -1) { test_errno("open", errno, 0); test_stop(); } dispatch_test_release_large_file(path_in); free(path_in); #else const char *path_in = "/dev/urandom"; char path_out[] = "/tmp/dispatchtest_io.XXXXXX"; dispatch_fd_t in = dispatch_test_fd_open(path_in, O_RDONLY); if (in == -1) { test_errno("open", errno, 0); test_stop(); } #endif dispatch_fd_t out = mkstemp(path_out); if (out == -1) { test_errno("mkstemp", errno, 0); test_stop(); } if (unlink(path_out) == -1) { test_errno("unlink", errno, 0); test_stop(); } dispatch_queue_t q = dispatch_get_global_queue(0,0); dispatch_group_t g = dispatch_group_create(); dispatch_group_enter(g); __block dispatch_data_t data; __block size_t siz_out; dispatch_read(in, siz_in, q, ^(dispatch_data_t data_in, int err_in) { if (err_in) { test_errno("dispatch_read", err_in, 0); test_stop(); } dispatch_test_fd_close(in); siz_out = dispatch_data_get_size(data_in); test_sizet("read size", siz_out, siz_in); dispatch_retain(data_in); data = data_in; dispatch_data_t data_chunks[chunks_out]; size_t i; for (i = 0; i < chunks_out; i++) { data_chunks[i] = dispatch_data_create_subrange(data_in, i * siz_chunk, siz_chunk); } for (i = 0; i < chunks_out; i++) { dispatch_data_t d = data_chunks[i]; dispatch_group_enter(g); dispatch_write(out, d, q, ^(dispatch_data_t data_out, int err_out) { if (err_out || data_out) { test_errno("dispatch_write", err_out, 0); test_stop(); } dispatch_group_leave(g); }); } for (i = 0; i < chunks_out; i++) { dispatch_release(data_chunks[i]); } dispatch_group_leave(g); }); test_group_wait(g); dispatch_group_enter(g); dispatch_test_fd_lseek(out, 0, SEEK_SET); dispatch_read(out, siz_in, q, ^(dispatch_data_t cmp, int err_cmp) { if (err_cmp) { test_errno("dispatch_read", err_cmp, 0); test_stop(); } dispatch_test_fd_close(out); size_t siz_cmp = dispatch_data_get_size(cmp); test_sizet("readback size", siz_cmp, siz_out); const void *data_buf, *cmp_buf; dispatch_data_t data_map, cmp_map; data_map = dispatch_data_create_map(data, &data_buf, NULL); cmp_map = dispatch_data_create_map(cmp, &cmp_buf, NULL); test_long("readback memcmp", memcmp(data_buf, cmp_buf, MIN(siz_out, siz_cmp)), 0); dispatch_release(cmp_map); dispatch_release(data_map); dispatch_release(data); dispatch_group_leave(g); }); test_group_wait(g); dispatch_release(g); } #if !defined(_WIN32) static void test_writes_reads_eagain(void) // rdar://problem/8333366 { int in = open("/dev/urandom", O_RDONLY); if (in == -1) { test_errno("open", errno, 0); test_stop(); } int fds[2], *fd = fds; if(pipe(fd) == -1) { test_errno("pipe", errno, 0); test_stop(); } const size_t chunks = 320; const size_t siz_chunk = 32, siz = siz_chunk * chunks; dispatch_queue_t q = dispatch_get_global_queue(0,0); dispatch_group_t g = dispatch_group_create(); __block size_t siz_acc = 0, deliveries = 0; __block void (^b)(dispatch_data_t, int); b = Block_copy(^(dispatch_data_t data, int err) { if (err) { test_errno("dispatch_read", err, 0); test_stop(); } deliveries++; siz_acc += dispatch_data_get_size(data); if (siz_acc < siz) { dispatch_group_enter(g); dispatch_read(*fd, siz, q, b); } dispatch_group_leave(g); }); dispatch_group_enter(g); dispatch_read(*fd, siz, q, b); char *buf[siz_chunk]; size_t i; for (i = 0; i < chunks; i++) { ssize_t s = read(in, buf, siz_chunk); if (s < (ssize_t)siz_chunk) { test_errno("read", errno, 0); test_stop(); } s = write(*(fd+1), buf, siz_chunk); if (s < (ssize_t)siz_chunk) { test_errno("write", errno, 0); test_stop(); } usleep(10000); } close(in); close(*(fd+1)); test_group_wait(g); test_sizet("dispatch_read deliveries", deliveries, chunks); test_sizet("dispatch_read data size", siz_acc, siz); close(*fd); Block_release(b); dispatch_release(g); } #endif #endif // DISPATCHTEST_IO int main(void) { dispatch_test_start("Dispatch IO Convenience Read/Write"); dispatch_async(dispatch_get_main_queue(), ^{ #if DISPATCHTEST_IO test_read(); test_read_write(); test_read_writes(); #if !defined(_WIN32) test_writes_reads_eagain(); #endif #endif test_fin(NULL); }); dispatch_main(); }