diff --git a/winsup/cygwin/fhandler.h b/winsup/cygwin/fhandler.h index 132e6002133b..032ab5fb07ae 100644 --- a/winsup/cygwin/fhandler.h +++ b/winsup/cygwin/fhandler.h @@ -1171,6 +1171,7 @@ class fhandler_socket_unix : public fhandler_socket class fhandler_pipe: public fhandler_base { private: + HANDLE read_mtx; pid_t popen_pid; size_t max_atomic_write; void set_pipe_non_blocking (bool nonblocking); @@ -1178,6 +1179,7 @@ public: fhandler_pipe (); bool ispipe() const { return true; } + void set_read_mutex (HANDLE mtx) { read_mtx = mtx; } void set_popen_pid (pid_t pid) {popen_pid = pid;} pid_t get_popen_pid () const {return popen_pid;} @@ -1187,7 +1189,9 @@ public: select_record *select_except (select_stuff *); char *get_proc_fd_name (char *buf); int open (int flags, mode_t mode = 0); + void fixup_after_fork (HANDLE); int dup (fhandler_base *child, int); + int close (); void __reg3 raw_read (void *ptr, size_t& len); ssize_t __reg3 raw_write (const void *ptr, size_t len); int ioctl (unsigned int cmd, void *); diff --git a/winsup/cygwin/fhandler_pipe.cc b/winsup/cygwin/fhandler_pipe.cc index 2dec0a84817c..7a5cefb3d07c 100644 --- a/winsup/cygwin/fhandler_pipe.cc +++ b/winsup/cygwin/fhandler_pipe.cc @@ -240,8 +240,37 @@ fhandler_pipe::raw_read (void *ptr, size_t& len) keep_looping = false; if (evt) ResetEvent (evt); + if (!is_nonblocking ()) + { + FILE_PIPE_LOCAL_INFORMATION fpli; + ULONG reader_count; + ULONG max_len = 64; + + WaitForSingleObject (read_mtx, INFINITE); + + /* Make sure never to request more bytes than half the pipe + buffer size. Every pending read lowers WriteQuotaAvailable + on the write side and thus affects select's ability to return + more or less reliable info whether a write succeeds or not. + + Let the size of the request depend on the number of readers + at the time. */ + status = NtQueryInformationFile (get_handle (), &io, + &fpli, sizeof (fpli), + FilePipeLocalInformation); + if (NT_SUCCESS (status) && fpli.ReadDataAvailable == 0) + { + reader_count = get_obj_handle_count (get_handle ()); + if (reader_count < 10) + max_len = fpli.InboundQuota / (2 * reader_count); + if (len > max_len) + len = max_len; + } + } status = NtReadFile (get_handle (), evt, NULL, NULL, &io, ptr, len, NULL, NULL); + if (!is_nonblocking ()) + ReleaseMutex (read_mtx); if (evt && status == STATUS_PENDING) { waitret = cygwait (evt); @@ -313,7 +342,6 @@ fhandler_pipe::raw_read (void *ptr, size_t& len) ssize_t __reg3 fhandler_pipe::raw_write (const void *ptr, size_t len) { - ssize_t ret = -1; size_t nbytes = 0; ULONG chunk; NTSTATUS status = STATUS_SUCCESS; @@ -352,8 +380,36 @@ fhandler_pipe::raw_write (const void *ptr, size_t len) else len1 = (ULONG) left; nbytes_now = 0; - status = NtWriteFile (get_handle (), evt, NULL, NULL, &io, - (PVOID) ptr, len1, NULL, NULL); + /* NtWriteFile returns success with # of bytes written == 0 if writing + on a non-blocking pipe fails because the pipe buffer doesn't have + sufficient space. + + POSIX requires + - A write request for {PIPE_BUF} or fewer bytes shall have the + following effect: if there is sufficient space available in the + pipe, write() shall transfer all the data and return the number + of bytes requested. Otherwise, write() shall transfer no data and + return -1 with errno set to [EAGAIN]. + + - A write request for more than {PIPE_BUF} bytes shall cause one + of the following: + + - When at least one byte can be written, transfer what it can and + return the number of bytes written. When all data previously + written to the pipe is read, it shall transfer at least {PIPE_BUF} + bytes. + + - When no data can be written, transfer no data, and return -1 with + errno set to [EAGAIN]. */ + while (len1 > 0) + { + status = NtWriteFile (get_handle (), evt, NULL, NULL, &io, + (PVOID) ptr, len1, NULL, NULL); + if (evt || !NT_SUCCESS (status) || io.Information > 0 + || len <= PIPE_BUF) + break; + len1 >>= 1; + } if (evt && status == STATUS_PENDING) { waitret = cygwait (evt); @@ -375,13 +431,11 @@ fhandler_pipe::raw_write (const void *ptr, size_t len) else if (NT_SUCCESS (status)) { nbytes_now = io.Information; - /* NtWriteFile returns success with # of bytes written == 0 - if writing on a non-blocking pipe fails because the pipe - buffer doesn't have sufficient space. */ - if (nbytes_now == 0) - set_errno (EAGAIN); - ptr = ((char *) ptr) + chunk; + ptr = ((char *) ptr) + nbytes_now; nbytes += nbytes_now; + /* 0 bytes returned? EAGAIN. See above. */ + if (nbytes == 0) + set_errno (EAGAIN); } else if (STATUS_PIPE_IS_CLOSED (status)) { @@ -392,17 +446,23 @@ fhandler_pipe::raw_write (const void *ptr, size_t len) __seterrno_from_nt_status (status); if (nbytes_now == 0) - len = 0; /* Terminate loop. */ - if (nbytes > 0) - ret = nbytes; + break; } if (evt) CloseHandle (evt); - if (status == STATUS_THREAD_SIGNALED && ret < 0) + if (status == STATUS_THREAD_SIGNALED && nbytes == 0) set_errno (EINTR); else if (status == STATUS_THREAD_CANCELED) pthread::static_cancel_self (); - return ret; + return nbytes ?: -1; +} + +void +fhandler_pipe::fixup_after_fork (HANDLE parent) +{ + if (read_mtx) + fork_fixup (parent, read_mtx, "read_mtx"); + fhandler_base::fixup_after_fork (parent); } int @@ -411,16 +471,31 @@ fhandler_pipe::dup (fhandler_base *child, int flags) fhandler_pipe *ftp = (fhandler_pipe *) child; ftp->set_popen_pid (0); - int res; - if (get_handle () && fhandler_base::dup (child, flags)) + int res = 0; + if (fhandler_base::dup (child, flags)) res = -1; - else - res = 0; + else if (read_mtx && + !DuplicateHandle (GetCurrentProcess (), read_mtx, + GetCurrentProcess (), &ftp->read_mtx, + 0, !(flags & O_CLOEXEC), DUPLICATE_SAME_ACCESS)) + { + __seterrno (); + ftp->close (); + res = -1; + } debug_printf ("res %d", res); return res; } +int +fhandler_pipe::close () +{ + if (read_mtx) + NtClose (read_mtx); + return fhandler_base::close (); +} + #define PIPE_INTRO "\\\\.\\pipe\\cygwin-" /* Create a pipe, and return handles to the read and write ends, @@ -608,6 +683,7 @@ fhandler_pipe::create (fhandler_pipe *fhs[2], unsigned psize, int mode) else if ((fhs[1] = (fhandler_pipe *) build_fh_dev (*pipew_dev)) == NULL) { delete fhs[0]; + CloseHandle (r); CloseHandle (w); } else @@ -617,7 +693,25 @@ fhandler_pipe::create (fhandler_pipe *fhs[2], unsigned psize, int mode) unique_id); fhs[1]->init (w, FILE_CREATE_PIPE_INSTANCE | GENERIC_WRITE, mode, unique_id); - res = 0; + /* For the read side of the pipe, add a mutex. See raw_read for the + usage. */ + SECURITY_ATTRIBUTES sa = { .nLength = sizeof (SECURITY_ATTRIBUTES), + .lpSecurityDescriptor = NULL, + .bInheritHandle = !(mode & O_CLOEXEC) + }; + HANDLE mtx = CreateMutexW (&sa, FALSE, NULL); + if (!mtx) + { + delete fhs[0]; + CloseHandle (r); + delete fhs[1]; + CloseHandle (w); + } + else + { + fhs[0]->set_read_mutex (mtx); + res = 0; + } } debug_printf ("%R = pipe([%p, %p], %d, %y)", res, fhs[0], fhs[1], psize, mode); @@ -658,7 +752,7 @@ nt_create (LPSECURITY_ATTRIBUTES sa_ptr, PHANDLE r, PHANDLE w, &cygheap->installation_key, GetCurrentProcessId ()); - access = GENERIC_READ | FILE_WRITE_ATTRIBUTES; + access = GENERIC_READ | FILE_WRITE_ATTRIBUTES | SYNCHRONIZE; ULONG pipe_type = pipe_byte ? FILE_PIPE_BYTE_STREAM_TYPE : FILE_PIPE_MESSAGE_TYPE; @@ -737,7 +831,7 @@ nt_create (LPSECURITY_ATTRIBUTES sa_ptr, PHANDLE r, PHANDLE w, { debug_printf ("NtOpenFile: name %S", &pipename); - access = GENERIC_WRITE | FILE_READ_ATTRIBUTES; + access = GENERIC_WRITE | FILE_READ_ATTRIBUTES | SYNCHRONIZE; status = NtOpenFile (w, access, &attr, &io, 0, 0); if (!NT_SUCCESS (status)) { diff --git a/winsup/cygwin/flock.cc b/winsup/cygwin/flock.cc index bd7a16d91ecd..2f12fc07e37b 100644 --- a/winsup/cygwin/flock.cc +++ b/winsup/cygwin/flock.cc @@ -216,22 +216,6 @@ allow_others_to_sync () done = true; } -/* Get the handle count of an object. */ -static ULONG -get_obj_handle_count (HANDLE h) -{ - OBJECT_BASIC_INFORMATION obi; - NTSTATUS status; - ULONG hdl_cnt = 0; - - status = NtQueryObject (h, ObjectBasicInformation, &obi, sizeof obi, NULL); - if (!NT_SUCCESS (status)) - debug_printf ("NtQueryObject: %y", status); - else - hdl_cnt = obi.HandleCount; - return hdl_cnt; -} - /* Helper struct to construct a local OBJECT_ATTRIBUTES on the stack. */ struct lockfattr_t { diff --git a/winsup/cygwin/miscfuncs.cc b/winsup/cygwin/miscfuncs.cc index f4c3a1c48e8e..dc36030ca572 100644 --- a/winsup/cygwin/miscfuncs.cc +++ b/winsup/cygwin/miscfuncs.cc @@ -18,6 +18,22 @@ details. */ #include "tls_pbuf.h" #include "mmap_alloc.h" +/* Get handle count of an object. */ +ULONG +get_obj_handle_count (HANDLE h) +{ + OBJECT_BASIC_INFORMATION obi; + NTSTATUS status; + ULONG hdl_cnt = 0; + + status = NtQueryObject (h, ObjectBasicInformation, &obi, sizeof obi, NULL); + if (!NT_SUCCESS (status)) + debug_printf ("NtQueryObject: %y", status); + else + hdl_cnt = obi.HandleCount; + return hdl_cnt; +} + int __reg2 check_invalid_virtual_addr (const void *s, unsigned sz) { diff --git a/winsup/cygwin/miscfuncs.h b/winsup/cygwin/miscfuncs.h index 1ff7ee0d3fde..47cef6f20c0a 100644 --- a/winsup/cygwin/miscfuncs.h +++ b/winsup/cygwin/miscfuncs.h @@ -98,6 +98,9 @@ transform_chars (PUNICODE_STRING upath, USHORT start_idx) PWCHAR transform_chars_af_unix (PWCHAR, const char *, __socklen_t); +/* Get handle count of an object. */ +ULONG get_obj_handle_count (HANDLE h); + /* Memory checking */ int __reg2 check_invalid_virtual_addr (const void *s, unsigned sz); diff --git a/winsup/cygwin/select.cc b/winsup/cygwin/select.cc index 83e1c00e0ac7..ac2fd227eb17 100644 --- a/winsup/cygwin/select.cc +++ b/winsup/cygwin/select.cc @@ -612,7 +612,6 @@ pipe_data_available (int fd, fhandler_base *fh, HANDLE h, bool writing) that. This means that a pipe could still block since you could be trying to write more to the pipe than is available in the buffer but that is the hazard of select(). */ - fpli.WriteQuotaAvailable = fpli.OutboundQuota - fpli.ReadDataAvailable; if (fpli.WriteQuotaAvailable > 0) { paranoid_printf ("fd %d, %s, write: size %u, avail %u", fd,