From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: by sourceware.org (Postfix, from userid 2210) id BCAF5396ECFE; Fri, 8 May 2020 11:30:37 +0000 (GMT) DKIM-Filter: OpenDKIM Filter v2.11.0 sourceware.org BCAF5396ECFE Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: Ken Brown To: cygwin-cvs@sourceware.org Subject: [newlib-cygwin] Cygwin: FIFO: allow any reader to take ownership X-Act-Checkin: newlib-cygwin X-Git-Author: Ken Brown X-Git-Refname: refs/heads/master X-Git-Oldrev: f35dfff3dec716869132cc89827878dc22059665 X-Git-Newrev: bf66a56ccaee92f44ec52551969ed52542c4ecfe Message-Id: <20200508113037.BCAF5396ECFE@sourceware.org> Date: Fri, 8 May 2020 11:30:37 +0000 (GMT) X-BeenThere: cygwin-cvs@cygwin.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Cygwin core component git logs List-Unsubscribe: , List-Archive: List-Help: List-Subscribe: , X-List-Received-Date: Fri, 08 May 2020 11:30:37 -0000 https://sourceware.org/git/gitweb.cgi?p=newlib-cygwin.git;h=bf66a56ccaee92f44ec52551969ed52542c4ecfe commit bf66a56ccaee92f44ec52551969ed52542c4ecfe Author: Ken Brown Date: Sat Apr 25 09:54:18 2020 -0400 Cygwin: FIFO: allow any reader to take ownership Add a take_ownership method, used by raw_read and select.cc:peek_fifo. It wakes up all fifo_reader_threads and allows the caller to become owner. The work is done by the fifo_reader_threads. For synchronization we introduce several new fhandler_fifo data members and methods: - update_needed_evt signals the current owner to stop listening for writer connections and update its fc_handler list. - shared_fc_handler() gets and sets the status of the fc_handler update process. - get_pending_owner() and set_pending_owner() get and set the reader that is requesting ownership. Finally, a new 'reading_lock' prevents two readers from trying to take ownership simultaneously. Diff: --- winsup/cygwin/fhandler.h | 28 +++++++++-- winsup/cygwin/fhandler_fifo.cc | 106 ++++++++++++++++++++++++++++++++++++----- winsup/cygwin/select.cc | 4 ++ 3 files changed, 122 insertions(+), 16 deletions(-) diff --git a/winsup/cygwin/fhandler.h b/winsup/cygwin/fhandler.h index f8c1b52a4..31c65866e 100644 --- a/winsup/cygwin/fhandler.h +++ b/winsup/cygwin/fhandler.h @@ -1323,12 +1323,13 @@ struct fifo_reader_id_t class fifo_shmem_t { LONG _nreaders; - fifo_reader_id_t _owner, _prev_owner; - af_unix_spinlock_t _owner_lock; + fifo_reader_id_t _owner, _prev_owner, _pending_owner; + af_unix_spinlock_t _owner_lock, _reading_lock; /* Info about shared memory block used for temporary storage of the owner's fc_handler list. */ - LONG _sh_nhandlers, _sh_shandlers, _sh_fc_handler_committed; + LONG _sh_nhandlers, _sh_shandlers, _sh_fc_handler_committed, + _sh_fc_handler_updated; public: int inc_nreaders () { return (int) InterlockedIncrement (&_nreaders); } @@ -1338,9 +1339,13 @@ public: void set_owner (fifo_reader_id_t fr_id) { _owner = fr_id; } fifo_reader_id_t get_prev_owner () const { return _prev_owner; } void set_prev_owner (fifo_reader_id_t fr_id) { _prev_owner = fr_id; } + fifo_reader_id_t get_pending_owner () const { return _pending_owner; } + void set_pending_owner (fifo_reader_id_t fr_id) { _pending_owner = fr_id; } void owner_lock () { _owner_lock.lock (); } void owner_unlock () { _owner_lock.unlock (); } + void reading_lock () { _reading_lock.lock (); } + void reading_unlock () { _reading_lock.unlock (); } int get_shared_nhandlers () const { return (int) _sh_nhandlers; } void set_shared_nhandlers (int n) { InterlockedExchange (&_sh_nhandlers, n); } @@ -1350,6 +1355,9 @@ public: { return (size_t) _sh_fc_handler_committed; } void set_shared_fc_handler_committed (size_t n) { InterlockedExchange (&_sh_fc_handler_committed, (LONG) n); } + bool shared_fc_handler_updated () const { return _sh_fc_handler_updated; } + void shared_fc_handler_updated (bool val) + { InterlockedExchange (&_sh_fc_handler_updated, val); } }; class fhandler_fifo: public fhandler_base @@ -1362,6 +1370,7 @@ class fhandler_fifo: public fhandler_base /* Handles to named events needed by all readers of a given FIFO. */ HANDLE owner_needed_evt; /* The owner is closing. */ HANDLE owner_found_evt; /* A new owner has taken over. */ + HANDLE update_needed_evt; /* shared_fc_handler needs updating. */ /* Handles to non-shared events needed for fifo_reader_threads. */ HANDLE cancel_evt; /* Signal thread to terminate. */ @@ -1409,6 +1418,11 @@ class fhandler_fifo: public fhandler_base fifo_reader_id_t get_prev_owner () const { return shmem->get_prev_owner (); } void set_prev_owner (fifo_reader_id_t fr_id) { shmem->set_prev_owner (fr_id); } + fifo_reader_id_t get_pending_owner () const + { return shmem->get_pending_owner (); } + void set_pending_owner (fifo_reader_id_t fr_id) + { shmem->set_pending_owner (fr_id); } + void owner_needed () { ResetEvent (owner_found_evt); @@ -1430,6 +1444,10 @@ class fhandler_fifo: public fhandler_base { shmem->set_shared_fc_handler_committed (n); } int update_my_handlers (bool from_exec = false); int update_shared_handlers (); + bool shared_fc_handler_updated () const + { return shmem->shared_fc_handler_updated (); } + void shared_fc_handler_updated (bool val) + { shmem->shared_fc_handler_updated (val); } public: fhandler_fifo (); @@ -1449,6 +1467,10 @@ public: void owner_lock () { shmem->owner_lock (); } void owner_unlock () { shmem->owner_unlock (); } + void take_ownership (); + void reading_lock () { shmem->reading_lock (); } + void reading_unlock () { shmem->reading_unlock (); } + int open (int, mode_t); off_t lseek (off_t offset, int whence); int close (); diff --git a/winsup/cygwin/fhandler_fifo.cc b/winsup/cygwin/fhandler_fifo.cc index 793adfae8..5c059a567 100644 --- a/winsup/cygwin/fhandler_fifo.cc +++ b/winsup/cygwin/fhandler_fifo.cc @@ -74,7 +74,7 @@ static NO_COPY fifo_reader_id_t null_fr_id = { .winpid = 0, .fh = NULL }; fhandler_fifo::fhandler_fifo (): fhandler_base (), read_ready (NULL), write_ready (NULL), writer_opening (NULL), - owner_needed_evt (NULL), owner_found_evt (NULL), + owner_needed_evt (NULL), owner_found_evt (NULL), update_needed_evt (NULL), cancel_evt (NULL), thr_sync_evt (NULL), _maybe_eof (false), fc_handler (NULL), shandlers (0), nhandlers (0), reader (false), writer (false), duplexer (false), @@ -436,6 +436,8 @@ fhandler_fifo::update_shared_handlers () } set_shared_nhandlers (nhandlers); memcpy (shared_fc_handler, fc_handler, nhandlers * sizeof (fc_handler[0])); + shared_fc_handler_updated (true); + set_prev_owner (me); return 0; } @@ -456,20 +458,44 @@ fhandler_fifo::fifo_reader_thread_func () while (1) { - fifo_reader_id_t cur_owner; + fifo_reader_id_t cur_owner, pending_owner; + bool idle = false, take_ownership = false; owner_lock (); cur_owner = get_owner (); - if (!cur_owner) + pending_owner = get_pending_owner (); + + if (pending_owner) { - set_owner (me); - if (update_my_handlers () < 0) - api_fatal ("Can't update my handlers, %E"); - owner_found (); - owner_unlock (); - continue; + if (pending_owner != me) + idle = true; + else + take_ownership = true; } + else if (!cur_owner) + take_ownership = true; else if (cur_owner != me) + idle = true; + if (take_ownership) + { + if (!shared_fc_handler_updated ()) + { + owner_unlock (); + yield (); + continue; + } + else + { + set_owner (me); + set_pending_owner (null_fr_id); + if (update_my_handlers () < 0) + api_fatal ("Can't update my handlers, %E"); + owner_found (); + owner_unlock (); + continue; + } + } + else if (idle) { owner_unlock (); HANDLE w[2] = { owner_needed_evt, cancel_evt }; @@ -494,6 +520,7 @@ fhandler_fifo::fifo_reader_thread_func () /* Listen for a writer to connect to the new client handler. */ fifo_client_handler& fc = fc_handler[nhandlers - 1]; fifo_client_unlock (); + shared_fc_handler_updated (false); owner_unlock (); NTSTATUS status; IO_STATUS_BLOCK io; @@ -504,8 +531,8 @@ fhandler_fifo::fifo_reader_thread_func () FSCTL_PIPE_LISTEN, NULL, 0, NULL, 0); if (status == STATUS_PENDING) { - HANDLE w[2] = { conn_evt, cancel_evt }; - switch (WaitForMultipleObjects (2, w, false, INFINITE)) + HANDLE w[3] = { conn_evt, update_needed_evt, cancel_evt }; + switch (WaitForMultipleObjects (3, w, false, INFINITE)) { case WAIT_OBJECT_0: status = io.Status; @@ -513,6 +540,10 @@ fhandler_fifo::fifo_reader_thread_func () status); break; case WAIT_OBJECT_0 + 1: + status = STATUS_WAIT_1; + update = true; + break; + case WAIT_OBJECT_0 + 2: status = STATUS_THREAD_IS_TERMINATING; cancel = true; update = true; @@ -538,6 +569,7 @@ fhandler_fifo::fifo_reader_thread_func () record_connection (fc, fc_closing); break; case STATUS_THREAD_IS_TERMINATING: + case STATUS_WAIT_1: /* Try to connect a bogus client. Otherwise fc is still listening, and the next connection might not get recorded. */ status1 = open_pipe (ph); @@ -807,6 +839,8 @@ fhandler_fifo::open (int flags, mode_t) if (create_shared_fc_handler () < 0) goto err_close_shmem; inc_nreaders (); + /* Reinitialize _sh_fc_handler_updated, which starts as 0. */ + shared_fc_handler_updated (true); npbuf[0] = 'n'; if (!(owner_needed_evt = CreateEvent (sa_buf, true, false, npbuf))) { @@ -821,9 +855,16 @@ fhandler_fifo::open (int flags, mode_t) __seterrno (); goto err_close_owner_needed_evt; } + npbuf[0] = 'u'; + if (!(update_needed_evt = CreateEvent (sa_buf, false, false, npbuf))) + { + debug_printf ("CreateEvent for %s failed, %E", npbuf); + __seterrno (); + goto err_close_owner_found_evt; + } /* Make cancel and sync inheritable for exec. */ if (!(cancel_evt = create_event (true))) - goto err_close_owner_found_evt; + goto err_close_update_needed_evt; if (!(thr_sync_evt = create_event (true))) goto err_close_cancel_evt; me.winpid = GetCurrentProcessId (); @@ -943,6 +984,8 @@ err_close_reader: return 0; err_close_cancel_evt: NtClose (cancel_evt); +err_close_update_needed_evt: + NtClose (update_needed_evt); err_close_owner_found_evt: NtClose (owner_found_evt); err_close_owner_needed_evt: @@ -1136,6 +1179,24 @@ fhandler_fifo::hit_eof () return ret; } +/* Called from raw_read and select.cc:peek_fifo. */ +void +fhandler_fifo::take_ownership () +{ + owner_lock (); + if (get_owner () == me) + { + owner_unlock (); + return; + } + set_pending_owner (me); + owner_needed (); + SetEvent (update_needed_evt); + owner_unlock (); + /* The reader threads should now do the transfer. */ + WaitForSingleObject (owner_found_evt, INFINITE); +} + void __reg3 fhandler_fifo::raw_read (void *in_ptr, size_t& len) { @@ -1144,6 +1205,9 @@ fhandler_fifo::raw_read (void *in_ptr, size_t& len) while (1) { + /* No one else can take ownership while we hold the reading_lock. */ + reading_lock (); + take_ownership (); /* Poll the connected clients for input. */ int nconnected = 0; fifo_client_lock (); @@ -1167,6 +1231,7 @@ fhandler_fifo::raw_read (void *in_ptr, size_t& len) { len = nbytes; fifo_client_unlock (); + reading_unlock (); return; } break; @@ -1187,9 +1252,11 @@ fhandler_fifo::raw_read (void *in_ptr, size_t& len) fifo_client_unlock (); if (maybe_eof () && hit_eof ()) { + reading_unlock (); len = 0; return; } + reading_unlock (); if (is_nonblocking ()) { set_errno (EAGAIN); @@ -1327,6 +1394,8 @@ fhandler_fifo::close () NtClose (owner_needed_evt); if (owner_found_evt) NtClose (owner_found_evt); + if (update_needed_evt) + NtClose (update_needed_evt); if (cancel_evt) NtClose (cancel_evt); if (thr_sync_evt) @@ -1443,8 +1512,15 @@ fhandler_fifo::dup (fhandler_base *child, int flags) __seterrno (); goto err_close_owner_needed_evt; } + if (!DuplicateHandle (GetCurrentProcess (), update_needed_evt, + GetCurrentProcess (), &fhf->update_needed_evt, + 0, !(flags & O_CLOEXEC), DUPLICATE_SAME_ACCESS)) + { + __seterrno (); + goto err_close_owner_found_evt; + } if (!(fhf->cancel_evt = create_event (true))) - goto err_close_owner_found_evt; + goto err_close_update_needed_evt; if (!(fhf->thr_sync_evt = create_event (true))) goto err_close_cancel_evt; inc_nreaders (); @@ -1454,6 +1530,8 @@ fhandler_fifo::dup (fhandler_base *child, int flags) return 0; err_close_cancel_evt: NtClose (fhf->cancel_evt); +err_close_update_needed_evt: + NtClose (fhf->update_needed_evt); err_close_owner_found_evt: NtClose (fhf->owner_found_evt); err_close_owner_needed_evt: @@ -1496,6 +1574,7 @@ fhandler_fifo::fixup_after_fork (HANDLE parent) api_fatal ("Can't reopen shared fc_handler memory during fork, %E"); fork_fixup (parent, owner_needed_evt, "owner_needed_evt"); fork_fixup (parent, owner_found_evt, "owner_found_evt"); + fork_fixup (parent, update_needed_evt, "update_needed_evt"); if (close_on_exec ()) /* Prevent a later attempt to close the non-inherited pipe-instance handles copied from the parent. */ @@ -1578,6 +1657,7 @@ fhandler_fifo::set_close_on_exec (bool val) { set_no_inheritance (owner_needed_evt, val); set_no_inheritance (owner_found_evt, val); + set_no_inheritance (update_needed_evt, val); set_no_inheritance (cancel_evt, val); set_no_inheritance (thr_sync_evt, val); fifo_client_lock (); diff --git a/winsup/cygwin/select.cc b/winsup/cygwin/select.cc index 9323c423f..2c299acf7 100644 --- a/winsup/cygwin/select.cc +++ b/winsup/cygwin/select.cc @@ -866,6 +866,8 @@ peek_fifo (select_record *s, bool from_select) goto out; } + fh->reading_lock (); + fh->take_ownership (); fh->fifo_client_lock (); int nconnected = 0; for (int i = 0; i < fh->get_nhandlers (); i++) @@ -888,6 +890,7 @@ peek_fifo (select_record *s, bool from_select) fh->get_fc_handler (i).get_state () = fc_input_avail; select_printf ("read: %s, ready for read", fh->get_name ()); fh->fifo_client_unlock (); + fh->reading_unlock (); gotone += s->read_ready = true; goto out; default: @@ -905,6 +908,7 @@ peek_fifo (select_record *s, bool from_select) if (s->except_selected) gotone += s->except_ready = true; } + fh->reading_unlock (); } out: if (s->write_selected)