From a9ffa71a15d96f498865d91adfbe10590c1d788e Mon Sep 17 00:00:00 2001 From: Mark Geisert Date: Mon, 23 Jul 2018 22:31:57 -0700 Subject: [PATCH] POSIX Asynchronous I/O support: aio files This is the core of the AIO implementation: aio.cc and aio.h. The latter is used within the Cygwin DLL by aio.cc and the fhandler* modules, as well as by user programs wanting the AIO functionality. --- winsup/cygwin/aio.cc | 1005 +++++++++++++++++++++++++++++++++++ winsup/cygwin/include/aio.h | 82 +++ 2 files changed, 1087 insertions(+) create mode 100644 winsup/cygwin/aio.cc create mode 100644 winsup/cygwin/include/aio.h diff --git a/winsup/cygwin/aio.cc b/winsup/cygwin/aio.cc new file mode 100644 index 000000000..fe63dec04 --- /dev/null +++ b/winsup/cygwin/aio.cc @@ -0,0 +1,1005 @@ +/* aio.cc: Posix asynchronous i/o functions. + +This file is part of Cygwin. + +This software is a copyrighted work licensed under the terms of the +Cygwin license. Please consult the file "CYGWIN_LICENSE" for +details. */ + +#include "winsup.h" +#include "hires.h" +#include "path.h" +#include "fhandler.h" +#include "dtable.h" +#include "cygheap.h" +#include "sigproc.h" +#include +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/* 'aioinitialized' is a thread-safe status of AIO feature initialization: + * 0 means uninitialized, >0 means initializing, <0 means initialized + */ +static NO_COPY volatile LONG aioinitialized = 0; + +/* This implementation supports two flavors of asynchronous operation: + * "inline" and "queued". Inline AIOs are used when: + * (1) fd refers to a local non-locked disk file opened in binary mode, + * (2) no more than AIO_MAX inline AIOs will be in progress at same time. + * In all other cases queued AIOs will be used. + * + * An inline AIO is performed by the calling app's thread as a pread|pwrite on + * a shadow fd that permits Windows asynchronous i/o, with event notification + * on completion. Event arrival causes AIO context for the fd to be updated. + * + * A queued AIO is performed in a similar manner, but by an AIO worker thread + * rather than the calling app's thread. The queued flavor can also operate + * on sockets, pipes, non-binary files, mandatory-locked files, and files + * that don't support pread|pwrite. Generally all these cases are handled as + * synchronous read|write operations, but still don't delay the app because + * they're taken care of by AIO worker threads. + */ + +/* These variables support inline AIO operations */ +static NO_COPY HANDLE evt_handles[AIO_MAX]; +static NO_COPY struct aiocb *evt_aiocbs[AIO_MAX]; +static NO_COPY CRITICAL_SECTION evt_locks[AIO_MAX]; /* per-slot locks */ +static NO_COPY CRITICAL_SECTION slotcrit; /* lock for slot variables in toto */ + +/* These variables support queued AIO operations */ +static NO_COPY sem_t worksem; /* tells whether AIOs are queued */ +static NO_COPY CRITICAL_SECTION workcrit; /* lock for AIO work queue */ +TAILQ_HEAD(queue, aiocb) worklist = TAILQ_HEAD_INITIALIZER(worklist); + +static int +aiochkslot (struct aiocb *aio) +{ + EnterCriticalSection (&slotcrit); + + /* Sanity check.. make sure this AIO is not already busy */ + for (int slot = 0; slot < AIO_MAX; ++slot) + if (evt_aiocbs[slot] == aio) + { + debug_printf ("aio %p is already busy in slot %d", aio, slot); + LeaveCriticalSection (&slotcrit); + return slot; + } + + LeaveCriticalSection (&slotcrit); + return -1; +} + +static int +aiogetslot (struct aiocb *aio) +{ + EnterCriticalSection (&slotcrit); + + /* Find free slot for this inline AIO; if none available AIO will be queued */ + for (int slot = 0; slot < AIO_MAX; ++slot) + if (evt_aiocbs[slot] == NULL) + { + /* If aio is NULL this is just an availability check.. no change made */ + if (aio) + evt_aiocbs[slot] = aio; + LeaveCriticalSection (&slotcrit); + return slot; + } + + LeaveCriticalSection (&slotcrit); + return -1; +} + +static int +aiorelslot (struct aiocb *aio) +{ + EnterCriticalSection (&slotcrit); + + /* Find slot associated with this inline AIO and free it */ + for (int slot = 0; slot < AIO_MAX; ++slot) + if (evt_aiocbs[slot] == aio) + { + evt_aiocbs[slot] = NULL; + LeaveCriticalSection (&slotcrit); + return slot; + } + + LeaveCriticalSection (&slotcrit); + return -1; +} + +static void +aionotify_on_pthread (struct sigevent *evp) +{ + pthread_attr_t *attr; + pthread_attr_t default_attr; + int rc; + pthread_t vaquita; /* == "little porpoise", endangered, see below */ + + if (evp->sigev_notify_attributes) + attr = evp->sigev_notify_attributes; + else + { + pthread_attr_init (attr = &default_attr); + pthread_attr_setdetachstate (attr, PTHREAD_CREATE_DETACHED); + } + + /* A "vaquita" thread is a temporary pthread created to deliver a signal to + * the application. We don't wait around for the thread to return from the + * app. There's some symbolism here of sending a little creature off to tell + * the app something important. If all the vaquitas end up wiped out in the + * wild, a distinct near-term possibility, at least this code remembers them. + */ + rc = pthread_create (&vaquita, attr, + (void * (*) (void *)) evp->sigev_notify_function, + evp->sigev_value.sival_ptr); + + /* The following error is not expected. If seen often, develop a recovery. */ + if (rc) + debug_printf ("aio vaquita thread creation failed, %E"); + + /* Should we wait for the signal delivery thread to finish? We can't: Who + * knows what mischief the app coder may have in their handler? Worst case + * is they accidentally used non-signal-safe functions in their handler. We + * return hoping for the best and finish cleaning up our end of notification. + */ + return; +} + +static void +aionotify (struct aiocb *aio) +{ + siginfo_t si = {0}; + si.si_code = SI_ASYNCIO; + + /* If signal notification wanted, send AIO-complete signal */ + switch (aio->aio_sigevent.sigev_notify) { + case SIGEV_NONE: + break; + + case SIGEV_SIGNAL: + si.si_signo = aio->aio_sigevent.sigev_signo; + si.si_value = aio->aio_sigevent.sigev_value; + if (si.si_signo) + sig_send (myself, si); + break; + + case SIGEV_THREAD: + aionotify_on_pthread (&aio->aio_sigevent); + break; + } + + /* If this op is on LIO list and is last op, send LIO-complete signal */ + if (aio->aio_liocb) + { + if (1 == InterlockedExchangeAdd (&aio->aio_liocb->lio_count, -1)) + { + /* LIO's count has decremented to zero */ + switch (aio->aio_liocb->lio_sigevent->sigev_notify) { + case SIGEV_NONE: + break; + + case SIGEV_SIGNAL: + si.si_signo = aio->aio_liocb->lio_sigevent->sigev_signo; + si.si_value = aio->aio_liocb->lio_sigevent->sigev_value; + if (si.si_signo) + sig_send (myself, si); + break; + + case SIGEV_THREAD: + aionotify_on_pthread (aio->aio_liocb->lio_sigevent); + break; + } + + free (aio->aio_liocb); + aio->aio_liocb = NULL; + } + } +} + +static DWORD WINAPI __attribute__ ((noreturn)) +aiowaiter (void *unused) +{ /* One instance, called on its own cygthread; runs until program exits */ + struct aiocb *aio; + + while (1) + { + /* Wait forever for at least one event to be set */ + DWORD res = WaitForMultipleObjects(AIO_MAX, evt_handles, FALSE, INFINITE); + switch (res) + { + case WAIT_FAILED: + api_fatal ("aiowaiter fatal error, %E"); + + default: + if (res < WAIT_OBJECT_0 || res >= WAIT_OBJECT_0 + AIO_MAX) + api_fatal ("aiowaiter unexpected WFMO result %d", res); + int slot = res - WAIT_OBJECT_0; + + /* Guard against "saw completion before request finished" gotcha */ + EnterCriticalSection (&evt_locks[slot]); + LeaveCriticalSection (&evt_locks[slot]); + + aio = evt_aiocbs[slot]; + debug_printf ("WFMO returns %d, aio %p", res, aio); + + if (aio->aio_errno == EBUSY) + { + /* Capture Windows status and convert to Cygwin status */ + NTSTATUS status = (NTSTATUS) aio->aio_wincb.status; + if (NT_SUCCESS (status)) + { + aio->aio_rbytes = (ssize_t) aio->aio_wincb.info; + aio->aio_errno = 0; + } + else + { + aio->aio_rbytes = -1; + aio->aio_errno = geterrno_from_nt_status (status); + } + } + else + { + /* Async operation was simulated; AIO status already updated */ + } + + /* Send completion signal if user requested it */ + aionotify (aio); + + /* Free up the slot used for this inline AIO. We do this + * manually rather than calling aiorelslot() because we + * already have the slot number handy. + */ + EnterCriticalSection (&slotcrit); + evt_aiocbs[slot] = NULL; + LeaveCriticalSection (&slotcrit); + debug_printf ("retired aio %p; slot %d released", aio, slot); + + /* Notify workers that a slot has opened up */ + sem_post (&worksem); + } + } +} + +static int +asyncread (struct aiocb *aio) +{ /* Try to initiate an asynchronous read, either from app or worker thread */ + ssize_t res = 0; + + cygheap_fdget cfd (aio->aio_fildes); + if (cfd < 0) + res = -1; /* errno has been set to EBADF */ + else + { + int slot = aiogetslot (aio); + debug_printf ("slot %d%s", slot, slot >= 0 ? " acquired" : ""); + if (slot >= 0) + { + EnterCriticalSection (&evt_locks[slot]); + aio->aio_errno = EBUSY; /* Mark AIO as physically underway now */ + aio->aio_wincb.event = (void *) evt_handles[slot]; + res = cfd->pread ((void *) aio->aio_buf, aio->aio_nbytes, + aio->aio_offset, (void *) aio); + LeaveCriticalSection (&evt_locks[slot]); + } + else + { + set_errno (ENOBUFS); /* Internal use only */ + res = -1; + } + } + + return res; +} + +static int +asyncwrite (struct aiocb *aio) +{ /* Try to initiate an asynchronous write, either from app or worker thread */ + ssize_t res = 0; + + cygheap_fdget cfd (aio->aio_fildes); + if (cfd < 0) + res = -1; /* errno has been set to EBADF */ + else + { + int slot = aiogetslot (aio); + debug_printf ("slot %d%s", slot, slot >= 0 ? " acquired" : ""); + if (slot >= 0) + { + EnterCriticalSection (&evt_locks[slot]); + aio->aio_errno = EBUSY; /* Mark AIO as physically underway now */ + aio->aio_wincb.event = (void *) evt_handles[slot]; + res = cfd->pwrite ((void *) aio->aio_buf, aio->aio_nbytes, + aio->aio_offset, (void *) aio); + LeaveCriticalSection (&evt_locks[slot]); + } + else + { + set_errno (ENOBUFS); /* Internal use only */ + res = -1; + } + } + + return res; +} + +/* Have to forward ref because of chicken v. egg situation */ +static DWORD WINAPI __attribute__ ((noreturn)) aioworker (void *); + +static void +aioinit (void) +{ + /* First a cheap test to speed processing after initialization completes */ + if (aioinitialized >= 0) + { + /* Guard against multiple threads initializing at same time */ + if (0 == InterlockedExchangeAdd (&aioinitialized, 1)) + { + int i = AIO_MAX; + char *tnames = (char *) malloc (AIO_MAX * 8); + + if (!tnames) + api_fatal ("couldn't create aioworker tname table"); + + InitializeCriticalSection (&slotcrit); + InitializeCriticalSection (&workcrit); + sem_init (&worksem, 0, 0); + TAILQ_INIT(&worklist); + + /* Create AIO_MAX number of aioworker threads for queued AIOs */ + while (i--) + { + __small_sprintf (&tnames[i * 8], "aio%d", AIO_MAX - i); + if (!new cygthread (aioworker, NULL, &tnames[i * 8])) + api_fatal ("couldn't create an aioworker thread, %E"); + } + + /* Initialize event handles and slot locks arrays for inline AIOs */ + for (i = 0; i < AIO_MAX; ++i) + { + /* Events are non-inheritable, auto-reset, init unset, unnamed */ + evt_handles[i] = CreateEvent (NULL, FALSE, FALSE, NULL); + if (!evt_handles[i]) + api_fatal ("couldn't create an event, %E"); + + InitializeCriticalSection (&evt_locks[i]); + } + + /* Create aiowaiter thread; waits for inline AIO completion events */ + if (!new cygthread (aiowaiter, NULL, "aio")) + api_fatal ("couldn't create aiowaiter thread, %E"); + + /* Indicate we have completed initialization */ + InterlockedExchange (&aioinitialized, -1); + } + else + /* If 'aioinitialized' is greater than zero, another thread is + * initializing for us; wait until 'aioinitialized' goes negative + */ + while (InterlockedExchangeAdd (&aioinitialized, 0) >= 0) + yield (); + } +} + +static int +aioqueue (struct aiocb *aio) +{ /* Add an AIO to the worklist, to be serviced by a worker thread */ + if (aioinitialized >= 0) + aioinit (); + + EnterCriticalSection (&workcrit); + TAILQ_INSERT_TAIL(&worklist, aio, aio_chain); + LeaveCriticalSection (&workcrit); + + debug_printf ("queued aio %p", aio); + sem_post (&worksem); + + return 0; +} + +static DWORD WINAPI __attribute__ ((noreturn)) +aioworker (void *unused) +{ /* Multiple instances; called on own cygthreads; runs 'til program exits */ + struct aiocb *aio; + + while (1) + { + /* Park here until there's work to do or a slot becomes available */ + sem_wait (&worksem); + +look4work: + EnterCriticalSection (&workcrit); + if (TAILQ_EMPTY(&worklist)) + { + /* Another aioworker picked up the work already */ + LeaveCriticalSection (&workcrit); + continue; + } + + /* Make sure a slot is available before starting this AIO */ + aio = TAILQ_FIRST(&worklist); + int slot = aiogetslot (NULL); + if (slot >= 0) // a slot is available + TAILQ_REMOVE(&worklist, aio, aio_chain); + LeaveCriticalSection (&workcrit); + if (slot < 0) // no slot is available, so worklist unchanged and we park + continue; + + debug_printf ("starting aio %p", aio); + switch (aio->aio_lio_opcode) + { + case LIO_NOP: + aio->aio_rbytes = 0; + break; + + case LIO_READ: + aio->aio_rbytes = asyncread (aio); + break; + + case LIO_WRITE: + aio->aio_rbytes = asyncwrite (aio); + break; + + default: + errno = EINVAL; + aio->aio_rbytes = -1; + break; + } + + /* If operation still underway, let aiowaiter hear about its finish */ + if (aio->aio_rbytes == 0 && aio->aio_nbytes != 0) // not racy + continue; + + /* If operation errored, save error number, else clear it */ + if (aio->aio_rbytes == -1) + aio->aio_errno = get_errno (); + else + aio->aio_errno = 0; + + /* If a slot for this queued async AIO was available, but we lost out */ + if (aio->aio_errno == ENOBUFS) + { + aio->aio_errno = EINPROGRESS; + aioqueue (aio); /* Re-queue the AIO */ + + /* Another option would be to fail the AIO with error EAGAIN, but + * experience with iozone showed apps might not expect to see a + * deferred EAGAIN. I.e. they should expect EAGAIN on their call to + * aio_read() or aio_write() but probably not expect to see EAGAIN + * on an aio_error() query after they'd previously seen EINPROGRESS + * on the initial AIO call. + */ + continue; + } + + /* If seeks aren't permitted on given fd, or pread|pwrite not legal */ + if (aio->aio_errno == ESPIPE) + { + ssize_t res = 0; + off_t curpos; + + cygheap_fdget cfd (aio->aio_fildes); + if (cfd < 0) + { + res = -1; + goto done; /* errno has been set to EBADF */ + } + + /* If we can get current file position, seek to aio_offset */ + curpos = cfd->lseek (0, SEEK_CUR); + if (curpos < 0 || cfd->lseek (aio->aio_offset, SEEK_SET) < 0) + { + /* Can't seek */ + res = curpos; + set_errno (0); /* Get rid of ESPIPE we've incurred */ + aio->aio_errno = 0; /* Here too */ + } + + /* Do the requested AIO operation manually, synchronously */ + switch (aio->aio_lio_opcode) + { + case LIO_READ: + /* 2nd argument to cfd->read() is passed by reference... */ + cfd->read ((void *) aio->aio_buf, aio->aio_nbytes); + /* ...so on return it contains the number of bytes read */ + res = aio->aio_nbytes; + break; + + case LIO_WRITE: + res = cfd->write ((void *) aio->aio_buf, aio->aio_nbytes); + break; + } + + /* If we had seeked successfully, restore original file position */ + if (curpos >= 0) + if (cfd->lseek (curpos, SEEK_SET) < 0) + res = -1; + +done: + /* Update AIO to reflect final result */ + aio->aio_rbytes = res; + aio->aio_errno = res == -1 ? get_errno () : 0; + + /* Make like the requested async operation completed normally */ + for (int i = 0; i < AIO_MAX; i++) + if (evt_aiocbs[i] == aio) + { + SetEvent (evt_handles[i]); + goto truly_done; + } + + /* Free up the slot we ended up not using */ + int slot = aiorelslot (aio); + debug_printf ("slot %d released", slot); + } + + /* Send completion signal if user requested it */ + aionotify (aio); + +truly_done: + debug_printf ("completed aio %p", aio); + goto look4work; + } +} + +int +aio_cancel (int fildes, struct aiocb *aio) +{ + int aiocount = 0; + struct aiocb *ptr; + siginfo_t si = {0}; + si.si_code = SI_ASYNCIO; + + /* Note 'aio' is allowed to be NULL here; it's used as a wildcard */ +restart: + EnterCriticalSection (&workcrit); + TAILQ_FOREACH(ptr, &worklist, aio_chain) + { + if (ptr->aio_fildes == fildes && (!aio || ptr == aio)) + { + /* This queued AIO qualifies for cancellation */ + TAILQ_REMOVE(&worklist, ptr, aio_chain); + LeaveCriticalSection (&workcrit); + + ptr->aio_errno = ECANCELED; + ptr->aio_rbytes = -1; + + /* If signal notification wanted, send AIO-canceled signal */ + switch (ptr->aio_sigevent.sigev_notify) { + case SIGEV_NONE: + break; + + case SIGEV_SIGNAL: + si.si_signo = ptr->aio_sigevent.sigev_signo; + si.si_value = ptr->aio_sigevent.sigev_value; + if (si.si_signo) + sig_send (myself, si); + break; + + case SIGEV_THREAD: + aionotify_on_pthread (&ptr->aio_sigevent); + break; + } + + ++aiocount; + goto restart; + } + } + LeaveCriticalSection (&workcrit); + + /* Note that AIO_NOTCANCELED is not possible in this implementation. That's + * because AIOs are dequeued to execute; the worklist search above won't + * find an AIO that's been dequeued from the worklist. + */ + if (aiocount) + return AIO_CANCELED; + else + return AIO_ALLDONE; +} + +int +aio_error (const struct aiocb *aio) +{ + int res; + + if (!aio) + { + set_errno (EINVAL); + return -1; + } + + switch (aio->aio_errno) + { + case EBUSY: /* This state for internal use only; not visible to app */ + case ENOBUFS: /* This state for internal use only; not visible to app */ + res = EINPROGRESS; + break; + + default: + res = aio->aio_errno; + } + + return res; +} + +int +aio_fsync (int mode, struct aiocb *aio) +{ + if (!aio) + { + set_errno (EINVAL); + return -1; + } + + switch (mode) + { +#if defined(O_SYNC) + case O_SYNC: + aio->aio_rbytes = fsync (aio->aio_fildes); + break; + +#if defined(O_DSYNC) && O_DSYNC != O_SYNC + case O_DSYNC: + aio->aio_rbytes = fdatasync (aio->aio_fildes); + break; +#endif +#endif + + default: + set_errno (EINVAL); + return -1; + } + + if (aio->aio_rbytes == -1) + aio->aio_errno = get_errno (); + + return aio->aio_rbytes; +} + +int +aio_read (struct aiocb *aio) +{ + ssize_t res = 0; + + if (!aio) + { + set_errno (EINVAL); + return -1; + } + if (aioinitialized >= 0) + aioinit (); + if (aio->aio_errno == EINPROGRESS || -1 != aiochkslot (aio)) + { + set_errno (EAGAIN); + return -1; + } + + aio->aio_lio_opcode = LIO_READ; + aio->aio_errno = EINPROGRESS; + aio->aio_rbytes = -1; + + /* Ensure zeroed (i.e. initialized but unused) aio_sigevent doesn't signal */ + if (aio->aio_sigevent.sigev_signo == 0) + aio->aio_sigevent.sigev_notify = SIGEV_NONE; + + /* Try to launch inline async read; only on ESPIPE/ENOBUFS is it queued */ + pthread_testcancel (); + res = asyncread (aio); + + /* If async read couldn't be launched, queue the AIO for a worker thread */ + if (res == -1) + switch (get_errno ()) { + case ESPIPE: + { + int slot = aiorelslot (aio); + if (slot >= 0) + debug_printf ("slot %d released", slot); + } + /* fall through */ + + case ENOBUFS: + aio->aio_errno = EINPROGRESS; + aio->aio_rbytes = -1; + + res = aioqueue (aio); + break; + + default: + ; /* I think this is not possible */ + } + + return res; +} + +ssize_t +aio_return (struct aiocb *aio) +{ + if (!aio) + { + set_errno (EINVAL); + return -1; + } + + switch (aio->aio_errno) + { + case EBUSY: /* AIO is currently underway (internal state) */ + case ENOBUFS: /* AIO is currently underway (internal state) */ + case EINPROGRESS: /* AIO has been queued successfully */ + set_errno (EINPROGRESS); + return -1; + + case EINVAL: /* aio_return() has already been called on this AIO */ + set_errno (aio->aio_errno); + return -1; + + default: /* AIO has completed, successfully or not */ + ; + } + + /* This AIO has completed so grab any error status if present */ + if (aio->aio_rbytes == -1) + set_errno (aio->aio_errno); + + /* Set this AIO's errno so later aio_return() calls on this AIO fail */ + aio->aio_errno = EINVAL; + + return aio->aio_rbytes; +} + +static int +aiosuspend (const struct aiocb *const aiolist[], + int nent, const struct timespec *timeout) +{ + /* Returns lowest list index of completed aios, else 'nent' if all completed. + * If none completed on entry, wait for interval specified by 'timeout'. + */ + int res; + sigset_t sigmask; + siginfo_t si; + ULONGLONG nsecs = 0; + ULONGLONG time0, time1; + struct timespec to = {0}; + + if (timeout) + { + to = *timeout; + if (to.tv_sec < 0 || to.tv_nsec < 0 || to.tv_nsec > NSPERSEC) + { + set_errno (EINVAL); + return -1; + } + nsecs = (NSPERSEC * to.tv_sec) + to.tv_nsec; + } + +retry: + sigemptyset (&sigmask); + int aiocount = 0; + for (int i = 0; i < nent; ++i) + if (aiolist[i] && aiolist[i]->aio_liocb) + { + if (aiolist[i]->aio_errno == EINPROGRESS || + aiolist[i]->aio_errno == ENOBUFS || + aiolist[i]->aio_errno == EBUSY) + { + ++aiocount; + if (aiolist[i]->aio_sigevent.sigev_notify == SIGEV_SIGNAL || + aiolist[i]->aio_sigevent.sigev_notify == SIGEV_THREAD) + sigaddset (&sigmask, aiolist[i]->aio_sigevent.sigev_signo); + } + else + return i; + } + + if (aiocount == 0) + return nent; + + if (timeout && nsecs == 0) + { + set_errno (EAGAIN); + return -1; + } + + time0 = ntod.nsecs (); + /* Note wait below is abortable even w/ empty sigmask and infinite timeout */ + res = sigtimedwait (&sigmask, &si, timeout ? &to : NULL); + if (res == -1) + return -1; /* Return with errno set by failed sigtimedwait() */ + time1 = ntod.nsecs (); + + /* Adjust timeout to account for time just waited */ + time1 -= time0; + if (time1 > nsecs) + nsecs = 0; // just in case we didn't get rescheduled very quickly + else + nsecs -= time1; + to.tv_sec = nsecs / NSPERSEC; + to.tv_nsec = nsecs % NSPERSEC; + + goto retry; +} + +int +aio_suspend (const struct aiocb *const aiolist[], + int nent, const struct timespec *timeout) +{ + int res; + + if (nent < 0) + { + set_errno (EINVAL); + return -1; + } + + pthread_testcancel (); + res = aiosuspend (aiolist, nent, timeout); + + /* If there was an error, or no AIOs completed before or during timeout */ + if (res == -1) + return res; /* If no AIOs completed, errno has been set to EAGAIN */ + + /* Else if all AIOs have completed */ + else if (res == nent) + return 0; + + /* Else at least one of the AIOs completed */ + else + return 0; +} + +int +aio_write (struct aiocb *aio) +{ + ssize_t res = 0; + + if (!aio) + { + set_errno (EINVAL); + return -1; + } + if (aioinitialized >= 0) + aioinit (); + if (aio->aio_errno == EINPROGRESS || -1 != aiochkslot (aio)) + { + set_errno (EAGAIN); + return -1; + } + + aio->aio_lio_opcode = LIO_WRITE; + aio->aio_errno = EINPROGRESS; + aio->aio_rbytes = -1; + + /* Ensure zeroed (i.e. initialized but unused) aio_sigevent doesn't signal */ + if (aio->aio_sigevent.sigev_signo == 0) + aio->aio_sigevent.sigev_notify = SIGEV_NONE; + + /* Try to launch inline async write; only on ESPIPE/ENOBUFS is it queued */ + pthread_testcancel (); + res = asyncwrite (aio); + + /* If async write couldn't be launched, queue the AIO for a worker thread */ + if (res == -1) + switch (get_errno ()) { + case ESPIPE: + { + int slot = aiorelslot (aio); + if (slot >= 0) + debug_printf ("slot %d released", slot); + } + /* fall through */ + + case ENOBUFS: + aio->aio_errno = EINPROGRESS; + aio->aio_rbytes = -1; + + res = aioqueue (aio); + break; + + default: + ; /* I think this is not possible */ + } + + return res; +} + +int +lio_listio (int mode, struct aiocb *__restrict const aiolist[__restrict], + int nent, struct sigevent *__restrict sig) +{ + struct aiocb *aio; + struct __liocb *lio; + + pthread_testcancel (); + + if ((mode != LIO_WAIT && mode != LIO_NOWAIT) || + (nent < 0 || nent > AIO_LISTIO_MAX)) + { + set_errno (EINVAL); + return -1; + } + + if (sig && nent && mode == LIO_NOWAIT) + { + lio = (struct __liocb *) malloc (sizeof (struct __liocb)); + if (!lio) + { + set_errno (ENOMEM); + return -1; + } + + lio->lio_count = nent; + lio->lio_sigevent = sig; + } + else + lio = NULL; + + int aiocount = 0; + for (int i = 0; i < nent; ++i) + { + aio = (struct aiocb *) aiolist[i]; + if (!aio) + { + if (lio) + InterlockedDecrement (&lio->lio_count); + continue; + } + + aio->aio_liocb = lio; + switch (aio->aio_lio_opcode) + { + case LIO_NOP: + if (lio) + InterlockedDecrement (&lio->lio_count); + continue; + + case LIO_READ: + aio_read (aio); + ++aiocount; + continue; + + case LIO_WRITE: + aio_write (aio); + ++aiocount; + continue; + + default: + break; + } + + if (lio) + InterlockedDecrement (&lio->lio_count); + aio->aio_errno = EINVAL; + aio->aio_rbytes = -1; + } + + /* mode is LIO_NOWAIT so return some kind of answer immediately */ + if (mode == LIO_NOWAIT) + { + /* At least one AIO has been launched or queued */ + if (aiocount) + return 0; + + /* No AIOs have been launched or queued */ + set_errno (EAGAIN); + return -1; + } + + /* Else mode is LIO_WAIT so wait for all AIOs to complete or error */ + while (nent) + { + int i = aiosuspend ((const struct aiocb *const *) aiolist, nent, NULL); + if (i >= nent) + break; + else + aiolist[i]->aio_liocb = NULL; /* Avoids repeating notify on this AIO */ + } + + return 0; +} + +#ifdef __cplusplus +} +#endif diff --git a/winsup/cygwin/include/aio.h b/winsup/cygwin/include/aio.h new file mode 100644 index 000000000..523a47870 --- /dev/null +++ b/winsup/cygwin/include/aio.h @@ -0,0 +1,82 @@ +/* aio.h: Support for Posix asynchronous i/o routines. + +This file is part of Cygwin. + +This software is a copyrighted work licensed under the terms of the +Cygwin license. Please consult the file "CYGWIN_LICENSE" for +details. */ + +#ifndef _AIO_H +#define _AIO_H + +#include +#include +#include +#include +#include // for AIO_LISTIO_MAX, AIO_MAX, and AIO_PRIO_DELTA_MAX + +/* defines for return value of aio_cancel() */ +#define AIO_ALLDONE 0 +#define AIO_CANCELED 1 +#define AIO_NOTCANCELED 2 + +/* defines for 'mode' argument of lio_listio() */ +#define LIO_NOWAIT 0 +#define LIO_WAIT 1 + +/* defines for 'aio_lio_opcode' element of struct aiocb */ +#define LIO_NOP 0 +#define LIO_READ 1 +#define LIO_WRITE 2 + +#ifdef __cplusplus +extern "C" { +#endif + +/* struct __liocb is Cygwin-specific */ +struct __liocb { + volatile uint32_t lio_count; + struct sigevent *lio_sigevent; +}; + +/* struct __wincb is Cygwin-specific */ +struct __wincb { + int32_t status; /* These two fields must be first... */ + uintptr_t info; /* ...and must be adjacent, in this order */ + void *event; +}; + +/* struct aiocb is defined by Posix */ +struct aiocb { + /* these elements of aiocb are Cygwin-specific */ + TAILQ_ENTRY(aiocb) aio_chain; + struct __liocb *aio_liocb; + struct __wincb aio_wincb; + ssize_t aio_rbytes; + int aio_errno; + /* the remaining elements of aiocb are defined by Posix */ + int aio_lio_opcode; + int aio_reqprio; /* Not yet implemented; must be zero */ + int aio_fildes; + volatile void *aio_buf; + size_t aio_nbytes; + off_t aio_offset; + struct sigevent aio_sigevent; +}; + +/* function prototypes as defined by Posix */ +int aio_cancel (int, struct aiocb *); +int aio_error (const struct aiocb *); +int aio_fsync (int, struct aiocb *); +int aio_read (struct aiocb *); +ssize_t aio_return (struct aiocb *); +int aio_suspend (const struct aiocb *const [], int, + const struct timespec *); +int aio_write (struct aiocb *); +int lio_listio (int, struct aiocb *__restrict const [__restrict], int, + struct sigevent *__restrict); + +#ifdef __cplusplus +} +#endif +#endif /* _AIO_H */