From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: by sourceware.org (Postfix, from userid 2210) id C4A46396ECFE; Fri, 8 May 2020 11:30:42 +0000 (GMT) DKIM-Filter: OpenDKIM Filter v2.11.0 sourceware.org C4A46396ECFE Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: Ken Brown To: cygwin-cvs@sourceware.org Subject: [newlib-cygwin] Cygwin: FIFO: support opening multiple readers X-Act-Checkin: newlib-cygwin X-Git-Author: Ken Brown X-Git-Refname: refs/heads/master X-Git-Oldrev: bf66a56ccaee92f44ec52551969ed52542c4ecfe X-Git-Newrev: 4811889e0c6a9e8e725d0cd0c8e5c8393017a04e Message-Id: <20200508113042.C4A46396ECFE@sourceware.org> Date: Fri, 8 May 2020 11:30:42 +0000 (GMT) X-BeenThere: cygwin-cvs@cygwin.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Cygwin core component git logs List-Unsubscribe: , List-Archive: List-Help: List-Subscribe: , X-List-Received-Date: Fri, 08 May 2020 11:30:42 -0000 https://sourceware.org/git/gitweb.cgi?p=newlib-cygwin.git;h=4811889e0c6a9e8e725d0cd0c8e5c8393017a04e commit 4811889e0c6a9e8e725d0cd0c8e5c8393017a04e Author: Ken Brown Date: Wed May 6 11:31:39 2020 -0400 Cygwin: FIFO: support opening multiple readers Although we can have multiple readers open because of dup/fork/exec, the current code does not support multiple readers opening a FIFO by explicitly calling 'open'. The main complication in supporting this is that when a blocking reader tries to open and there's already one open, it has to check whether there any writers open. It can't rely on the write_ready event, whose state hasn't changed since the first writer opened. To fix this, add two new named events, check_write_ready_evt and write_ready_ok_evt, and a new method, check_write_ready(). The first event signals the owner's reader thread to call check_write_ready(), which polls the fc_handler list to check for connected writers. If it finds none, it checks to see if there's a writer in the process and then sets/resets write_ready appropriately. When check_write_ready() finishes it sets write_ready_ok_evt to signal the reader that write_ready has been updated. The polling is done via fifo_client_handler::pipe_state(). As long as it's calling that function anyway, check_write_ready() updates the state of each handler. Also add a new lock to prevent a race if two readers are trying to open simultaneously. Diff: --- winsup/cygwin/fhandler.h | 9 ++- winsup/cygwin/fhandler_fifo.cc | 129 +++++++++++++++++++++++++++++++++++++---- 2 files changed, 127 insertions(+), 11 deletions(-) diff --git a/winsup/cygwin/fhandler.h b/winsup/cygwin/fhandler.h index 31c65866e..8a23d6753 100644 --- a/winsup/cygwin/fhandler.h +++ b/winsup/cygwin/fhandler.h @@ -1324,7 +1324,7 @@ class fifo_shmem_t { LONG _nreaders; fifo_reader_id_t _owner, _prev_owner, _pending_owner; - af_unix_spinlock_t _owner_lock, _reading_lock; + af_unix_spinlock_t _owner_lock, _reading_lock, _reader_opening_lock; /* Info about shared memory block used for temporary storage of the owner's fc_handler list. */ @@ -1346,6 +1346,8 @@ public: void owner_unlock () { _owner_lock.unlock (); } void reading_lock () { _reading_lock.lock (); } void reading_unlock () { _reading_lock.unlock (); } + void reader_opening_lock () { _reader_opening_lock.lock (); } + void reader_opening_unlock () { _reader_opening_lock.unlock (); } int get_shared_nhandlers () const { return (int) _sh_nhandlers; } void set_shared_nhandlers (int n) { InterlockedExchange (&_sh_nhandlers, n); } @@ -1371,6 +1373,8 @@ class fhandler_fifo: public fhandler_base HANDLE owner_needed_evt; /* The owner is closing. */ HANDLE owner_found_evt; /* A new owner has taken over. */ HANDLE update_needed_evt; /* shared_fc_handler needs updating. */ + HANDLE check_write_ready_evt; /* write_ready needs to be checked. */ + HANDLE write_ready_ok_evt; /* check_write_ready is done. */ /* Handles to non-shared events needed for fifo_reader_threads. */ HANDLE cancel_evt; /* Signal thread to terminate. */ @@ -1448,6 +1452,9 @@ class fhandler_fifo: public fhandler_base { return shmem->shared_fc_handler_updated (); } void shared_fc_handler_updated (bool val) { shmem->shared_fc_handler_updated (val); } + void check_write_ready (); + void reader_opening_lock () { shmem->reader_opening_lock (); } + void reader_opening_unlock () { shmem->reader_opening_unlock (); } public: fhandler_fifo (); diff --git a/winsup/cygwin/fhandler_fifo.cc b/winsup/cygwin/fhandler_fifo.cc index 5c059a567..4c7d51edb 100644 --- a/winsup/cygwin/fhandler_fifo.cc +++ b/winsup/cygwin/fhandler_fifo.cc @@ -75,6 +75,7 @@ fhandler_fifo::fhandler_fifo (): fhandler_base (), read_ready (NULL), write_ready (NULL), writer_opening (NULL), owner_needed_evt (NULL), owner_found_evt (NULL), update_needed_evt (NULL), + check_write_ready_evt (NULL), write_ready_ok_evt (NULL), cancel_evt (NULL), thr_sync_evt (NULL), _maybe_eof (false), fc_handler (NULL), shandlers (0), nhandlers (0), reader (false), writer (false), duplexer (false), @@ -441,6 +442,45 @@ fhandler_fifo::update_shared_handlers () return 0; } +/* The write_ready event gets set when a writer opens, to indicate + that a blocking reader can open. If a second reader wants to open, + we need to see if there are still any writers open. */ +void +fhandler_fifo::check_write_ready () +{ + bool set = false; + + fifo_client_lock (); + for (int i = 0; i < nhandlers && !set; i++) + switch (fc_handler[i].pipe_state ()) + { + case FILE_PIPE_CONNECTED_STATE: + fc_handler[i].state = fc_connected; + set = true; + break; + case FILE_PIPE_INPUT_AVAILABLE_STATE: + fc_handler[i].state = fc_input_avail; + set = true; + break; + case FILE_PIPE_DISCONNECTED_STATE: + fc_handler[i].state = fc_disconnected; + break; + case FILE_PIPE_LISTENING_STATE: + fc_handler[i].state = fc_listening; + case FILE_PIPE_CLOSING_STATE: + fc_handler[i].state = fc_closing; + default: + fc_handler[i].state = fc_error; + break; + } + fifo_client_unlock (); + if (set || IsEventSignalled (writer_opening)) + SetEvent (write_ready); + else + ResetEvent (write_ready); + SetEvent (write_ready_ok_evt); +} + static DWORD WINAPI fifo_reader_thread (LPVOID param) { @@ -526,13 +566,15 @@ fhandler_fifo::fifo_reader_thread_func () IO_STATUS_BLOCK io; bool cancel = false; bool update = false; + bool check = false; status = NtFsControlFile (fc.h, conn_evt, NULL, NULL, &io, FSCTL_PIPE_LISTEN, NULL, 0, NULL, 0); if (status == STATUS_PENDING) { - HANDLE w[3] = { conn_evt, update_needed_evt, cancel_evt }; - switch (WaitForMultipleObjects (3, w, false, INFINITE)) + HANDLE w[4] = { conn_evt, update_needed_evt, + check_write_ready_evt, cancel_evt }; + switch (WaitForMultipleObjects (4, w, false, INFINITE)) { case WAIT_OBJECT_0: status = io.Status; @@ -544,6 +586,10 @@ fhandler_fifo::fifo_reader_thread_func () update = true; break; case WAIT_OBJECT_0 + 2: + status = STATUS_WAIT_2; + check = true; + break; + case WAIT_OBJECT_0 + 3: status = STATUS_THREAD_IS_TERMINATING; cancel = true; update = true; @@ -570,6 +616,7 @@ fhandler_fifo::fifo_reader_thread_func () break; case STATUS_THREAD_IS_TERMINATING: case STATUS_WAIT_1: + case STATUS_WAIT_2: /* Try to connect a bogus client. Otherwise fc is still listening, and the next connection might not get recorded. */ status1 = open_pipe (ph); @@ -602,6 +649,8 @@ fhandler_fifo::fifo_reader_thread_func () NtClose (ph); if (update && update_shared_handlers () < 0) api_fatal ("Can't update shared handlers, %E"); + if (check) + check_write_ready (); if (cancel) goto canceled; } @@ -833,14 +882,19 @@ fhandler_fifo::open (int flags, mode_t) and start the fifo_reader_thread. */ if (reader) { + bool first = true; + SetEvent (read_ready); if (create_shmem () < 0) goto err_close_writer_opening; if (create_shared_fc_handler () < 0) goto err_close_shmem; - inc_nreaders (); - /* Reinitialize _sh_fc_handler_updated, which starts as 0. */ - shared_fc_handler_updated (true); + reader_opening_lock (); + if (inc_nreaders () == 1) + /* Reinitialize _sh_fc_handler_updated, which starts as 0. */ + shared_fc_handler_updated (true); + else + first = false; npbuf[0] = 'n'; if (!(owner_needed_evt = CreateEvent (sa_buf, true, false, npbuf))) { @@ -862,9 +916,23 @@ fhandler_fifo::open (int flags, mode_t) __seterrno (); goto err_close_owner_found_evt; } + npbuf[0] = 'c'; + if (!(check_write_ready_evt = CreateEvent (sa_buf, false, false, npbuf))) + { + debug_printf ("CreateEvent for %s failed, %E", npbuf); + __seterrno (); + goto err_close_update_needed_evt; + } + npbuf[0] = 'k'; + if (!(write_ready_ok_evt = CreateEvent (sa_buf, false, false, npbuf))) + { + debug_printf ("CreateEvent for %s failed, %E", npbuf); + __seterrno (); + goto err_close_check_write_ready_evt; + } /* Make cancel and sync inheritable for exec. */ if (!(cancel_evt = create_event (true))) - goto err_close_update_needed_evt; + goto err_close_write_ready_ok_evt; if (!(thr_sync_evt = create_event (true))) goto err_close_cancel_evt; me.winpid = GetCurrentProcessId (); @@ -908,9 +976,19 @@ fhandler_fifo::open (int flags, mode_t) } } } - /* Not a duplexer; wait for a writer to connect. */ - else if (!wait (write_ready)) - goto err_close_reader; + /* Not a duplexer; wait for a writer to connect if we're blocking. */ + else if (!(flags & O_NONBLOCK)) + { + if (!first) + { + /* Ask the owner to update write_ready. */ + SetEvent (check_write_ready_evt); + WaitForSingleObject (write_ready_ok_evt, INFINITE); + } + if (!wait (write_ready)) + goto err_close_reader; + } + reader_opening_unlock (); goto success; } @@ -984,6 +1062,10 @@ err_close_reader: return 0; err_close_cancel_evt: NtClose (cancel_evt); +err_close_write_ready_ok_evt: + NtClose (write_ready_ok_evt); +err_close_check_write_ready_evt: + NtClose (check_write_ready_evt); err_close_update_needed_evt: NtClose (update_needed_evt); err_close_owner_found_evt: @@ -993,6 +1075,7 @@ err_close_owner_needed_evt: err_dec_nreaders: if (dec_nreaders () == 0) ResetEvent (read_ready); + reader_opening_unlock (); /* err_close_shared_fc_handler: */ NtUnmapViewOfSection (NtCurrentProcess (), shared_fc_handler); NtClose (shared_fc_hdl); @@ -1396,6 +1479,10 @@ fhandler_fifo::close () NtClose (owner_found_evt); if (update_needed_evt) NtClose (update_needed_evt); + if (check_write_ready_evt) + NtClose (check_write_ready_evt); + if (write_ready_ok_evt) + NtClose (write_ready_ok_evt); if (cancel_evt) NtClose (cancel_evt); if (thr_sync_evt) @@ -1519,8 +1606,22 @@ fhandler_fifo::dup (fhandler_base *child, int flags) __seterrno (); goto err_close_owner_found_evt; } + if (!DuplicateHandle (GetCurrentProcess (), check_write_ready_evt, + GetCurrentProcess (), &fhf->check_write_ready_evt, + 0, !(flags & O_CLOEXEC), DUPLICATE_SAME_ACCESS)) + { + __seterrno (); + goto err_close_update_needed_evt; + } + if (!DuplicateHandle (GetCurrentProcess (), write_ready_ok_evt, + GetCurrentProcess (), &fhf->write_ready_ok_evt, + 0, !(flags & O_CLOEXEC), DUPLICATE_SAME_ACCESS)) + { + __seterrno (); + goto err_close_check_write_ready_evt; + } if (!(fhf->cancel_evt = create_event (true))) - goto err_close_update_needed_evt; + goto err_close_write_ready_ok_evt; if (!(fhf->thr_sync_evt = create_event (true))) goto err_close_cancel_evt; inc_nreaders (); @@ -1530,6 +1631,10 @@ fhandler_fifo::dup (fhandler_base *child, int flags) return 0; err_close_cancel_evt: NtClose (fhf->cancel_evt); +err_close_write_ready_ok_evt: + NtClose (fhf->write_ready_ok_evt); +err_close_check_write_ready_evt: + NtClose (fhf->check_write_ready_evt); err_close_update_needed_evt: NtClose (fhf->update_needed_evt); err_close_owner_found_evt: @@ -1575,6 +1680,8 @@ fhandler_fifo::fixup_after_fork (HANDLE parent) fork_fixup (parent, owner_needed_evt, "owner_needed_evt"); fork_fixup (parent, owner_found_evt, "owner_found_evt"); fork_fixup (parent, update_needed_evt, "update_needed_evt"); + fork_fixup (parent, check_write_ready_evt, "check_write_ready_evt"); + fork_fixup (parent, write_ready_ok_evt, "write_ready_ok_evt"); if (close_on_exec ()) /* Prevent a later attempt to close the non-inherited pipe-instance handles copied from the parent. */ @@ -1658,6 +1765,8 @@ fhandler_fifo::set_close_on_exec (bool val) set_no_inheritance (owner_needed_evt, val); set_no_inheritance (owner_found_evt, val); set_no_inheritance (update_needed_evt, val); + set_no_inheritance (check_write_ready_evt, val); + set_no_inheritance (write_ready_ok_evt, val); set_no_inheritance (cancel_evt, val); set_no_inheritance (thr_sync_evt, val); fifo_client_lock ();