From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: (qmail 94705 invoked by alias); 2 Sep 2018 22:39:25 -0000 Mailing-List: contact fortran-help@gcc.gnu.org; run by ezmlm Precedence: bulk List-Id: List-Subscribe: List-Post: List-Help: , Sender: fortran-owner@gcc.gnu.org Received: (qmail 94684 invoked by uid 89); 2 Sep 2018 22:39:24 -0000 Authentication-Results: sourceware.org; auth=none X-Spam-SWARE-Status: No, score=-10.0 required=5.0 tests=AWL,BAYES_00,FREEMAIL_FROM,GIT_PATCH_2,GIT_PATCH_3,KAM_ASCII_DIVIDERS,KAM_NUMSUBJECT,RCVD_IN_DNSWL_NONE,SPF_PASS autolearn=ham version=3.3.2 spammy=UNIT, surplus, SIGNAL, storage X-HELO: mail-oi0-f44.google.com Received: from mail-oi0-f44.google.com (HELO mail-oi0-f44.google.com) (209.85.218.44) by sourceware.org (qpsmtpd/0.93/v0.84-503-g423c35a) with ESMTP; Sun, 02 Sep 2018 22:39:21 +0000 Received: by mail-oi0-f44.google.com with SMTP id b15-v6so30515855oib.10; Sun, 02 Sep 2018 15:39:21 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=4H7HjANDrVqEieBZ9P7qAMozmTFcr0v8xtaQTRnG8Zg=; b=Dq/fu6srWUyotjThTVL0ivGGsKgcpNrGyK/C7Wc/KQRoZX17qF9RO7ko/0Mtm563Ug 4DBHu+bdOAZ9hTW44nq63t4nkotQK5Fp7Tz3xucQgER0KXbhvYl2HpcaF1d6TPRW2Ane 9FyaLl61Qbf714PqYGHvSGzINURyM6pzTI2bH48La/7EwhgLbfaItaHNGus0j9HDcO4p sD94y7b4lSxDyEXLp8K46PLxWryP1SK3/FCgCcNyxUl3nTr4xrXAxZlYjy/6ZC4RvJnj gTslIkir9fkxb96FEEe8JW8zVtRsO++Ri8ZyAEofJyJJ+boA5DEye8WPTmsM34rZXkZU TThQ== MIME-Version: 1.0 References: In-Reply-To: From: Bernhard Reutner-Fischer Date: Sun, 02 Sep 2018 22:39:00 -0000 Message-ID: Subject: Re: [patch, fortran] Asynchronous I/O, take 3 To: Thomas Koenig Cc: =?UTF-8?Q?Dominique_d=27Humi=C3=A8res?= , nk@koenigni.com, gfortran , GCC Patches Content-Type: text/plain; charset="UTF-8" X-IsSubscribed: yes X-SW-Source: 2018-09/txt/msg00006.txt.bz2 On Sun, 15 Jul 2018 at 13:20, Thomas Koenig wrote: > So, here is the final version. I would really like to get this > into trunk, and out of the way, so Nicolas and I can focus on > other things. > > So, OK? [I know i'm late as it was already applied] For me it would be easier to read the locking if struct async_unit had it's queue_lock named q_lock/qlock instead of plain lock. The io_lock is named nicely already. Furthermore there is a mixture of (correctly wrapped) __gthread_ in struct adv_cond versus unwrapped pthread_mutex_t in struct async_unit where i'd have expected the latter to also use the __gthread wrappers. struct adv_cond member pending should not be an int but an unsigned int or, even better, a bool, i'd say. transfer_array_inner () is named unintuitively IMHO. Maybe transfer_array_now, transfer_array_scalar or transfer_array_1. Index: libgfortran/io/async.c =================================================================== --- libgfortran/io/async.c (nicht existent) +++ libgfortran/io/async.c (Arbeitskopie) [] +static void +update_pdt (st_parameter_dt **old, st_parameter_dt *new) { + st_parameter_dt *temp; + NOTE ("Changing pdts, current_unit = %p", (void *) (new->u.p.current_unit)); + temp = *old; + *old = new; + if (temp) + free (temp); +} free (NULL) is perfectly valid, please remove the if. +static void * +async_io (void *arg) +{ [] + while (true) + { + /* Main loop. At this point, au->lock is always held. */ dot space space at the end of a sentence please. [] + while (ctq) + { + if (prev) + free (prev); Likewise, drop if. + prev = ctq; + if (!au->error.has_error) I'd flag that as likely. Likewise, i'd flag finish_thread as unlikely; Being a label you can hint the predictor that it's rather unlikely jumped to by flagging it cold: finish_thread: __attribute__((cold)); +/* Initialize an asyncronous unit, returning zero on success, + nonzero on failure. It also sets u->au. */ + +void +init_async_unit (gfc_unit *u) s/asyncronous/asynchronous/ +{ + async_unit *au; + if (!__gthread_active_p ()) + { + u->au = NULL; + return; + } + Excess horizontal space on the empty line above. + au = (async_unit *) xmalloc (sizeof (async_unit)); I'd XCNEW (async_unit) and omit all those NULL and 0 stores. You should use the scalar allocators provided in include/libiberty.h throughout, so s/xmalloc/XNEW/ and s/free/XDELETE/ and so on. +/* Enqueue a transfer statement. */ + +void +enqueue_transfer (async_unit *au, transfer_args *arg, enum aio_do type) +{ + transfer_queue *tq = calloc (sizeof (transfer_queue), 1); + tq->arg = *arg; boom on OOM. XCNEW (transfer_queue), please. + tq->arg = *arg; + tq->type = type; + tq->has_id = 0; redundant store to has_id. + LOCK (&au->lock); + if (!au->tail) + au->head = tq; + else + au->tail->next = tq; + au->tail = tq; + REVOKE_SIGNAL (&(au->emptysignal)); + au->empty = false; + UNLOCK (&au->lock); + SIGNAL (&au->work); +} +/* Enqueue an st_write_done or st_read_done which contains an ID. */ + +int +enqueue_done_id (async_unit *au, enum aio_do type) +{ + int ret; + transfer_queue *tq = calloc (sizeof (transfer_queue), 1); XCNEW. +/* Enqueue an st_write_done or st_read_done without an ID. */ + +void +enqueue_done (async_unit *au, enum aio_do type) +{ + transfer_queue *tq = calloc (sizeof (transfer_queue), 1); XCNEW. + tq->type = type; + tq->has_id = 0; Redundant store to has_id. Maybe just comment it out if you do want to emphasis this side-effect of zeroing. /* tq->has_id = 0; already done by XCNEW */ or the like. +/* Enqueue a CLOSE statement. */ + +void +enqueue_close (async_unit *au) +{ + transfer_queue *tq = calloc (sizeof (transfer_queue), 1); XCNEW. And i think enqueue_close does not need internal_proto but could be and should be static. Or, even better, remove it completely and call enqueue_done (au, AIO_CLOSE) in async_close directly. +/* The asynchronous unit keeps the currently active PDT around. + This function changes that to the current one. */ + +void +enqueue_data_transfer_init (async_unit *au, st_parameter_dt *dt, int read_flag) +{ + st_parameter_dt *new = xmalloc (sizeof (st_parameter_dt)); XNEW (st_parameter_dt); + transfer_queue *tq = xmalloc (sizeof (transfer_queue)); XNEW (transfer_queue); + + memcpy ((void *) new, (void *) dt, sizeof (st_parameter_dt)); + + NOTE ("dt->internal_unit_desc = %p", dt->internal_unit_desc); + NOTE ("common.flags & mask = %d", dt->common.flags & IOPARM_LIBRETURN_MASK); + tq->next = NULL; + tq->type = AIO_DATA_TRANSFER_INIT; + tq->read_flag = read_flag; + tq->has_id = 0; ah, that should be bool, not _Bool and hence s/0/false/ and s/1/true/ here and elsewhere when storing to has_id. since read_flag seems to be a boolean too, i'd unsigned has_id : 1; unsiged read_flag : 1; fwiw. + tq->new_pdt = new; + LOCK (&au->lock); + + if (!au->tail) + au->head = tq; + else + au->tail->next = tq; + au->tail = tq; + REVOKE_SIGNAL (&(au->emptysignal)); + au->empty = 0; s/0/false/ please. + UNLOCK (&au->lock); + SIGNAL (&au->work); +} +/* Perform a wait operation on an asynchronous unit with an ID specified, + which means collecting the errors that may have happened asynchronously. + Return true if an error has been encountered. */ + +bool +async_wait_id (st_parameter_common *cmp, async_unit *au, int i) I'd rename the parameter i to id for clarity. + WAIT_SIGNAL_MUTEX (&(au->id.done), + (au->id.low >= au->id.waiting || au->empty), &au->lock); I'd test au->empty first. Not sure why it's ok to clear has_error in collect_async_errors -- especially without locking -- but i guess you tested such async failure conditions in both async_wait_id and async_wait. Index: libgfortran/io/async.h =================================================================== --- libgfortran/io/async.h (nicht existent) +++ libgfortran/io/async.h (Arbeitskopie) @@ -0,0 +1,378 @@ +/* Thread - local storage of the current unit we are looking at. Needed for + error reporting. */ dot space space at the end of a sentence. The layout of struct async_unit does not look too good on a 64bit box. +bool collect_async_errors (st_parameter_common *, async_unit *); +internal_proto (collect_async_errors); superfluous trailing space Index: libgfortran/io/file_pos.c =================================================================== --- libgfortran/io/file_pos.c (Revision 259739) +++ libgfortran/io/file_pos.c (Arbeitskopie) @@ -267,8 +280,13 @@ st_backspace (st_parameter_filepos *fpp) done: if (u != NULL) - unlock_unit (u); + { + unlock_unit (u); + if (u->au && needs_unlock) + UNLOCK (&u->au->io_lock); + } + library_end (); } in st_backspace you first unlock the unit and only afterwards unlock the async io_lock. I would settle on first unlocking the async io_lock and only then unlocking the unit, no? @@ -376,9 +406,12 @@ st_endfile (st_parameter_filepos *fpp) } } - done: - unlock_unit (u); + done: + if (u->au && needs_unlock) + UNLOCK (&u->au->io_lock); + unlock_unit (u); + library_end (); } like you do here, in st_endfile. Here in st_endfile, why do you async_wait before the LIBERROR_OPTION_CONFLICT handling by if (u->flags.access == ACCESS_SEQUENTIAL && u->endfile == AFTER_ENDFILE) and not afterwards? @@ -450,6 +499,7 @@ void st_flush (st_parameter_filepos *fpp) { gfc_unit *u; + bool needs_unlock = false; library_start (&fpp->common); @@ -456,6 +506,17 @@ st_flush (st_parameter_filepos *fpp) u = find_unit (fpp->common.unit); if (u != NULL) { + if (u->au) + { + if (async_wait (&(fpp->common), u->au)) + return; + else + { + needs_unlock = true; + LOCK (&u->au->io_lock); + } + } + /* Make sure format buffer is flushed. */ if (u->flags.form == FORM_FORMATTED) fbuf_flush (u, u->mode); @@ -469,5 +530,8 @@ st_flush (st_parameter_filepos *fpp) generate_error (&fpp->common, LIBERROR_BAD_OPTION, "Specified UNIT in FLUSH is not connected"); + if (needs_unlock) + UNLOCK (&u->au->io_lock); I would change the condition to if (ASYNC_IO && needs_unlock) for consistency. + library_end (); } Index: libgfortran/io/inquire.c =================================================================== --- libgfortran/io/inquire.c (Revision 259739) +++ libgfortran/io/inquire.c (Arbeitskopie) @@ -26,6 +26,7 @@ see the files COPYING3 and COPYING.RUNTIME respect /* Implement the non-IOLENGTH variant of the INQUIRY statement */ #include "io.h" +#include "async.h" #include "unix.h" #include please include async.h *after* unix.h like you do everwhere else. Index: libgfortran/io/read.c =================================================================== --- libgfortran/io/read.c (Revision 259739) +++ libgfortran/io/read.c (Arbeitskopie) @@ -30,6 +30,7 @@ see the files COPYING3 and COPYING.RUNTIME respect #include #include #include +#include "async.h" typedef unsigned char uchar; @@ -42,6 +43,7 @@ typedef unsigned char uchar; void set_integer (void *dest, GFC_INTEGER_LARGEST value, int length) { + NOTE ("set_integer: %lld %p", (long long int) value, dest); switch (length) { #ifdef HAVE_GFC_INTEGER_16 Debugging leftover? Please remove the include and the NOTE. --- libgfortran/io/transfer.c (Revision 259739) +++ libgfortran/io/transfer.c (Arbeitskopie) +/* Wrapper function for I/O of scalar types. If this should be an async I/O + request, queue it. For a synchronous write on an async unit, perform the + wait operation and return an error. For all synchronous writes, call the + right transfer function. */ +static void +wrap_scalar_transfer (st_parameter_dt *dtp, bt type, void *p, int kind, + size_t size, size_t n_elem) +{ + if (dtp->u.p.current_unit && dtp->u.p.current_unit->au) + { + if (dtp->u.p.async) + { Please move this second nested if into the first one. + transfer_args args; + args.scalar.transfer = dtp->u.p.transfer; + args.scalar.arg_bt = type; + args.scalar.data = p; + args.scalar.i = kind; + args.scalar.s1 = size; + args.scalar.s2 = n_elem; + enqueue_transfer (dtp->u.p.current_unit->au, &args, + AIO_TRANSFER_SCALAR); + return; + } + } void +transfer_array (st_parameter_dt *dtp, gfc_array_char *desc, int kind, + gfc_charlen_type charlen) +{ + if ((dtp->common.flags & IOPARM_LIBRETURN_MASK) != IOPARM_LIBRETURN_OK) + return; + + if (dtp->u.p.current_unit && dtp->u.p.current_unit->au) + { + if (dtp->u.p.async) + { Likewise: Please move this second nested if into the first one. + transfer_args args; + size_t sz = sizeof (gfc_array_char) + + sizeof (descriptor_dimension) + * GFC_DESCRIPTOR_RANK (desc); + args.array.desc = xmalloc (sz); + NOTE ("desc = %p", (void *) args.array.desc); + memcpy (args.array.desc, desc, sz); + args.array.kind = kind; + args.array.charlen = charlen; + enqueue_transfer (dtp->u.p.current_unit->au, &args, + AIO_TRANSFER_ARRAY); + return; + } + } + /* Come here if there was no asynchronous I/O to be scheduled. */ + transfer_array_inner (dtp, desc, kind, charlen); +} @@ -2770,6 +2839,42 @@ data_transfer_init (st_parameter_dt *dtp, int read else if (dtp->u.p.current_unit->internal_unit_kind > 0) dtp->u.p.unit_is_internal = 1; + if ((cf & IOPARM_DT_HAS_ASYNCHRONOUS) != 0) + { + int f; + f = find_option (&dtp->common, dtp->asynchronous, dtp->asynchronous_len, + async_opt, "Bad ASYNCHRONOUS in data transfer " + "statement"); + if (f == ASYNC_YES && dtp->u.p.current_unit->flags.async != ASYNC_YES) + { + generate_error (&dtp->common, LIBERROR_OPTION_CONFLICT, + "ASYNCHRONOUS transfer without " + "ASYHCRONOUS='YES' in OPEN"); s/ASYHCRONOUS/ASYNCHRONOUS/ + return; + } + dtp->u.p.async = f == ASYNC_YES; + } [] +void +data_transfer_init_worker (st_parameter_dt *dtp, int read_flag) Missing function comment. +{ + GFC_INTEGER_4 cf = dtp->common.flags; + + NOTE ("starting worker..."); + + if (read_flag && dtp->u.p.current_unit->flags.form != FORM_UNFORMATTED + && ((cf & IOPARM_DT_LIST_FORMAT) != 0) Excess braces + && dtp->u.p.current_unit->child_dtio == 0) Surplus horizontal whitespace before == + dtp->u.p.current_unit->last_char = EOF - 1; [] +void +st_read_done (st_parameter_dt *dtp) +{ + if (dtp->u.p.current_unit) + { + if (dtp->u.p.current_unit->au) + { + if (dtp->common.flags & IOPARM_DT_HAS_ID) + *dtp->id = enqueue_done_id (dtp->u.p.current_unit->au, AIO_READ_DONE); Surplus trailing whitespace. + else + { + enqueue_done (dtp->u.p.current_unit->au, AIO_READ_DONE); + /* An asynchronous unit without ASYNCHRONOUS="YES" - make this + synchronous by performing a wait operation. */ + if (!dtp->u.p.async) + async_wait (&dtp->common, dtp->u.p.current_unit->au); Don't we have to honour handled errors from async_wait? Same for st_write_done (). + } + } + else + st_read_done_worker (dtp); --- libgfortran/io/unit.c (Revision 259739) +++ libgfortran/io/unit.c (Arbeitskopie) @@ -922,7 +930,7 @@ newunit_alloc (void) memset (newunits + old_size, 0, old_size); newunits[old_size] = true; newunit_lwi = old_size + 1; - __gthread_mutex_unlock (&unit_lock); + UNLOCK (&unit_lock); This should be indented by 2 spaces, not 4. return -old_size + NEWUNIT_START; } --- libgfortran/libgfortran.h (Revision 259739) +++ libgfortran/libgfortran.h (Arbeitskopie) @@ -743,6 +743,9 @@ internal_proto(translate_error); extern void generate_error (st_parameter_common *, int, const char *); iexport_proto(generate_error); +extern bool generate_error_common (st_parameter_common *, int, const char *); +iexport_proto(generate_error_common); why is that exported and not just internal_proto() ? + extern void generate_warning (st_parameter_common *, const char *); internal_proto(generate_warning); @@ -1748,5 +1751,7 @@ void cshift1_16_c16 (gfc_array_c16 * const restric internal_proto(cshift1_16_c16); #endif +/* Define this if we support asynchronous I/O on this platform. This + currently requires weak symbols. */ #endif /* LIBGFOR_H */ obsolete comment, please remove. Thanks for the async support! cheers,