Index: gcc/fortran/trans-decl.c =================================================================== --- gcc/fortran/trans-decl.c (Revision 259739) +++ gcc/fortran/trans-decl.c (Arbeitskopie) @@ -699,7 +699,8 @@ gfc_finish_var_decl (tree decl, gfc_symbol * sym) && CLASS_DATA (sym)->ts.u.derived->attr.has_dtio_procs))) TREE_STATIC (decl) = 1; - if (sym->attr.volatile_) + /* Treat asynchronous variables the same as volatile, for now. */ + if (sym->attr.volatile_ || sym->attr.asynchronous) { TREE_THIS_VOLATILE (decl) = 1; TREE_SIDE_EFFECTS (decl) = 1; Index: gcc/fortran/trans-io.c =================================================================== --- gcc/fortran/trans-io.c (Revision 259739) +++ gcc/fortran/trans-io.c (Arbeitskopie) @@ -438,10 +438,9 @@ gfc_build_io_library_fndecls (void) get_identifier (PREFIX("st_iolength")), ".w", void_type_node, 1, dt_parm_type); - /* TODO: Change when asynchronous I/O is implemented. */ parm_type = build_pointer_type (st_parameter[IOPARM_ptype_wait].type); iocall[IOCALL_WAIT] = gfc_build_library_function_decl_with_spec ( - get_identifier (PREFIX("st_wait")), ".X", + get_identifier (PREFIX("st_wait_async")), ".w", void_type_node, 1, parm_type); parm_type = build_pointer_type (st_parameter[IOPARM_ptype_filepos].type); @@ -1527,7 +1526,7 @@ gfc_trans_wait (gfc_code * code) mask |= IOPARM_common_err; if (p->id) - mask |= set_parameter_value (&block, var, IOPARM_wait_id, p->id); + mask |= set_parameter_ref (&block, &post_block, var, IOPARM_wait_id, p->id); set_parameter_const (&block, var, IOPARM_common_flags, mask); Index: libgfortran/Makefile.am =================================================================== --- libgfortran/Makefile.am (Revision 259739) +++ libgfortran/Makefile.am (Arbeitskopie) @@ -100,7 +100,8 @@ io/transfer128.c \ io/unit.c \ io/unix.c \ io/write.c \ -io/fbuf.c +io/fbuf.c \ +io/async.c endif @@ -108,7 +109,8 @@ gfor_io_headers= \ io/io.h \ io/fbuf.h \ io/format.h \ -io/unix.h +io/unix.h \ +io/async.h gfor_helper_src= \ intrinsics/associated.c \ Index: libgfortran/Makefile.in =================================================================== --- libgfortran/Makefile.in (Revision 259739) +++ libgfortran/Makefile.in (Arbeitskopie) @@ -70,7 +70,8 @@ target_triplet = @target@ @LIBGFOR_MINIMAL_FALSE@io/unit.c \ @LIBGFOR_MINIMAL_FALSE@io/unix.c \ @LIBGFOR_MINIMAL_FALSE@io/write.c \ -@LIBGFOR_MINIMAL_FALSE@io/fbuf.c +@LIBGFOR_MINIMAL_FALSE@io/fbuf.c \ +@LIBGFOR_MINIMAL_FALSE@io/async.c @LIBGFOR_MINIMAL_FALSE@am__append_3 = \ @LIBGFOR_MINIMAL_FALSE@intrinsics/access.c \ @@ -352,7 +353,7 @@ am__objects_47 = $(am__objects_4) $(am__objects_5) @LIBGFOR_MINIMAL_FALSE@ inquire.lo intrinsics.lo list_read.lo \ @LIBGFOR_MINIMAL_FALSE@ lock.lo open.lo read.lo transfer.lo \ @LIBGFOR_MINIMAL_FALSE@ transfer128.lo unit.lo unix.lo write.lo \ -@LIBGFOR_MINIMAL_FALSE@ fbuf.lo +@LIBGFOR_MINIMAL_FALSE@ fbuf.lo async.lo am__objects_49 = size_from_kind.lo $(am__objects_48) @LIBGFOR_MINIMAL_FALSE@am__objects_50 = access.lo c99_functions.lo \ @LIBGFOR_MINIMAL_FALSE@ chdir.lo chmod.lo clock.lo cpu_time.lo \ @@ -650,7 +651,8 @@ gfor_io_headers = \ io/io.h \ io/fbuf.h \ io/format.h \ -io/unix.h +io/unix.h \ +io/async.h gfor_helper_src = intrinsics/associated.c intrinsics/abort.c \ intrinsics/args.c intrinsics/cshift0.c intrinsics/eoshift0.c \ @@ -1550,6 +1552,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/any_l8.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/args.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/associated.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/async.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/backtrace.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/bessel_r10.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/bessel_r16.Plo@am__quote@ @@ -5813,6 +5816,13 @@ fbuf.lo: io/fbuf.c @AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ @am__fastdepCC_FALSE@ $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o fbuf.lo `test -f 'io/fbuf.c' || echo '$(srcdir)/'`io/fbuf.c +async.lo: io/async.c +@am__fastdepCC_TRUE@ $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT async.lo -MD -MP -MF $(DEPDIR)/async.Tpo -c -o async.lo `test -f 'io/async.c' || echo '$(srcdir)/'`io/async.c +@am__fastdepCC_TRUE@ $(am__mv) $(DEPDIR)/async.Tpo $(DEPDIR)/async.Plo +@AMDEP_TRUE@@am__fastdepCC_FALSE@ source='io/async.c' object='async.lo' libtool=yes @AMDEPBACKSLASH@ +@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ +@am__fastdepCC_FALSE@ $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o async.lo `test -f 'io/async.c' || echo '$(srcdir)/'`io/async.c + associated.lo: intrinsics/associated.c @am__fastdepCC_TRUE@ $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT associated.lo -MD -MP -MF $(DEPDIR)/associated.Tpo -c -o associated.lo `test -f 'intrinsics/associated.c' || echo '$(srcdir)/'`intrinsics/associated.c @am__fastdepCC_TRUE@ $(am__mv) $(DEPDIR)/associated.Tpo $(DEPDIR)/associated.Plo Index: libgfortran/gfortran.map =================================================================== --- libgfortran/gfortran.map (Revision 259739) +++ libgfortran/gfortran.map (Arbeitskopie) @@ -1482,3 +1482,8 @@ GFORTRAN_C99_8 { y1f; ynf; }; + +GFORTRAN_9 { + global: + _gfortran_st_wait_async; +}; Index: libgfortran/io/async.c =================================================================== --- libgfortran/io/async.c (nicht existent) +++ libgfortran/io/async.c (Arbeitskopie) @@ -0,0 +1,401 @@ +/* Copyright (C) 2018 Free Software Foundation, Inc. + Contributed by Nicolas Koenig + +This file is part of the GNU Fortran runtime library (libgfortran). + +Libgfortran is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; either version 3, or (at your option) +any later version. + +Libgfortran is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +Under Section 7 of GPL version 3, you are granted additional +permissions described in the GCC Runtime Library Exception, version +3.1, as published by the Free Software Foundation. + +You should have received a copy of the GNU General Public License and +a copy of the GCC Runtime Library Exception along with this program; +see the files COPYING3 and COPYING.RUNTIME respectively. If not, see +. */ + +#include "libgfortran.h" + +#ifndef GTHREAD_USE_WEAK +#ifdef SUPPORTS_WEAK +#define GTHREAD_USE_WEAK 1 +#endif +#endif + +#define _GTHREAD_USE_COND_INIT_FUNC +#include "../../libgcc/gthr-posix.h" +#include "io.h" +#include "fbuf.h" +#include "format.h" +#include "unix.h" +#include +#include + +#include + +#include "async.h" + +DEBUG_LINE(__thread const char *aio_prefix = MPREFIX); + +DEBUG_LINE(__gthread_mutex_t debug_queue_lock = __GTHREAD_MUTEX_INIT;) +DEBUG_LINE(aio_lock_debug *aio_debug_head = NULL;) + +__thread gfc_unit *thread_unit = NULL; + +typedef struct transfer_queue +{ + enum aio_do type; + struct transfer_queue *next; + struct st_parameter_dt *new_pdt; + transfer_args arg; + _Bool has_id; +} transfer_queue; + +struct error { + st_parameter_dt *dtp; + int id; +}; + +static void +update_pdt(st_parameter_dt **old, st_parameter_dt *new) { + st_parameter_dt *temp; + NOTE("Changing pdts"); + temp = *old; + *old = new; + if(temp) + free(temp); +} + +static void +destroy_adv_cond (struct adv_cond * ac) +{ + T_ERROR (__gthread_mutex_destroy, &ac->lock); + T_ERROR (__gthread_cond_destroy, &ac->signal); +} + +static void * +async_io (void *arg) +{ + DEBUG_LINE(aio_prefix = TPREFIX); + transfer_queue *ctq = NULL, *prev = NULL; + gfc_unit *u = (gfc_unit *) arg; + async_unit *au = u->au; + thread_unit = u; + LOCK (&au->lock); + au->thread = __gthread_self (); + UNLOCK (&au->lock); + while (true) + { + WAIT_SIGNAL (&au->work, au->tail || au->finished); + LOCK (&au->lock); + ctq = au->head; + prev = NULL; + while (ctq) + { + if (prev) + free (prev); + prev = ctq; + if (!au->error.has_error) + { + UNLOCK (&au->lock); + + switch(ctq->type) + { + case AIO_WRITE_DONE: + NOTE("Finalizing write"); + st_write_done_worker (au->pdt); + break; + case AIO_READ_DONE: + NOTE("Finalizing read"); + st_read_done_worker (au->pdt); + break; + case AIO_CHANGE_PDT: + update_pdt(&au->pdt, ctq->new_pdt); + break; + case AIO_TRANSFER_SCALAR: + NOTE("Starting scalar transfer"); + ctq->arg.scalar.transfer (au->pdt, ctq->arg.scalar.arg_bt, + ctq->arg.scalar.data, + ctq->arg.scalar.i, + ctq->arg.scalar.s1, + ctq->arg.scalar.s2); + break; + case AIO_TRANSFER_ARRAY: + NOTE("Starting array transfer"); + NOTE("ctq->arg.array.desc = %p", (void *) (ctq->arg.array.desc)); + transfer_array_inner(au->pdt, ctq->arg.array.desc, + ctq->arg.array.kind, + ctq->arg.array.charlen); + NOTE("desc = %p", (void *) ctq->arg.array.desc); + free (ctq->arg.array.desc); + break; + default: + internal_error(NULL, "Invalid queue type"); + break; + } + LOCK (&au->lock); + if (unlikely (au->error.has_error)) + { + au->error.last_good_id = au->id.low - 1; + unlock_unit (au->pdt->u.p.current_unit); + } + } + NOTE("Current id: %d", au->id.low); + if (ctq->has_id && au->id.waiting == au->id.low++) + SIGNAL (&au->id.done); + ctq = ctq->next; + } + au->tail = NULL; + au->head = NULL; + SIGNAL (&au->emptysignal); + au->empty = 1; + if (au->finished) + break; + UNLOCK (&au->lock); + } + UNLOCK (&au->lock); + return NULL; +} + + +static void +free_async_unit (async_unit * au) +{ + if (au->tail) + internal_error(NULL, "Trying to free nonempty unit"); + destroy_adv_cond (&au->work); + destroy_adv_cond (&au->emptysignal); + destroy_adv_cond (&au->id.done); + T_ERROR (__gthread_mutex_destroy, &au->lock); + free (au); +} + +static void +init_adv_cond (struct adv_cond * ac) +{ + ac->pending = 0; + __GTHREAD_MUTEX_INIT_FUNCTION(&ac->lock); + __gthread_cond_init_function (&ac->signal); +} + +async_unit * +init_async_unit (gfc_unit *u) +{ + async_unit *au; + if (!__gthread_active_p ()) + return NULL; + + au = (async_unit *) xmalloc (sizeof (async_unit)); + u->au = au; + init_adv_cond (&au->work); + init_adv_cond (&au->emptysignal); + __GTHREAD_MUTEX_INIT_FUNCTION(&au->lock); + T_ERROR (__gthread_create, &au->thread, &async_io, (void *) u); + au->pdt = NULL; + au->head = NULL; + au->tail = NULL; + au->empty = true; + au->finished = 0; + au->id.waiting = -1; + au->id.low = 0; + au->id.high = 0; + au->error.fatal_error = 0; + au->error.has_error = 0; + au->error.last_good_id = 0; + init_adv_cond (&au->id.done); + return au; +} + + +void +enqueue_transfer (async_unit * au, transfer_args * arg, enum aio_do type) +{ + transfer_queue *tq = calloc (sizeof (transfer_queue), 1); + tq->arg = *arg; + tq->type = type; + tq->has_id = 0; + LOCK (&au->lock); + if (!au->tail) + au->head = tq; + else + au->tail->next = tq; + au->tail = tq; + REVOKE_SIGNAL (&(au->emptysignal)); + au->empty = false; + UNLOCK (&au->lock); + SIGNAL(&au->work); +} + +int +enqueue_done_id (async_unit *au, enum aio_do type) { + int ret; + transfer_queue *tq = calloc (sizeof (transfer_queue), 1); + + tq->type = type; + tq->has_id = 1; + LOCK (&au->lock); + if (!au->tail) + au->head = tq; + else + au->tail->next = tq; + au->tail = tq; + REVOKE_SIGNAL (&(au->emptysignal)); + au->empty = false; + ret = au->id.high++; + NOTE("Enqueue id: %d", ret) + UNLOCK(&au->lock); + SIGNAL(&au->work); + return ret; +} + +void +enqueue_done (async_unit *au, enum aio_do type) +{ + transfer_queue *tq = calloc (sizeof (transfer_queue), 1); + tq->type = type; + tq->has_id = 0; + LOCK (&au->lock); + if (!au->tail) + au->head = tq; + else + au->tail->next = tq; + au->tail = tq; + REVOKE_SIGNAL (&(au->emptysignal)); + au->empty = false; + UNLOCK (&au->lock); + SIGNAL(&au->work); +} + +void +change_pdt (async_unit *au, st_parameter_dt *dt) { + st_parameter_dt *new = xmalloc(sizeof(st_parameter_dt)); + transfer_queue *tq = xmalloc(sizeof(transfer_queue)); + memcpy((void *) new, (void *) dt, sizeof(st_parameter_dt)); + tq->next = NULL; + tq->new_pdt = new; + tq->type = AIO_CHANGE_PDT; + tq->has_id = 0; + LOCK(&au->lock); + if (!au->tail) + au->head = tq; + else + au->tail->next = tq; + au->tail = tq; + REVOKE_SIGNAL (&(au->emptysignal)); + au->empty = 0; + UNLOCK(&au->lock); + SIGNAL(&au->work); +} + +/* Perform a wait operation on an asynchronous unit, which means collecting + the errors that may have happened asynchronously. Return true if an error + has been encountered. */ + +static bool +async_wait_operation (st_parameter_common *cmp, async_unit *au) +{ + bool has_error = au->error.has_error; + + if (has_error) + { + if (generate_error_common (cmp, au->error.family, au->error.message)) + { + au->error.has_error = 0; + au->error.cmp = NULL; + } + else + { + /* The program will exit later. */ + au->error.fatal_error = true; + } + } + return has_error; +} + +/* Perform a wait operation on an asynchronous unit with an ID specified, + which means collecting the errors that may have happened asynchronously. + Return true if an error has been encountered. */ + +bool +async_wait_id (st_parameter_common *cmp, async_unit * au, int i) +{ + bool ret; + + if (au == NULL) + return false; + + if (cmp == NULL) + cmp = au->error.cmp; + + if (au->error.has_error) + { + if (i <= au->error.last_good_id) + return false; + + return async_wait_operation (cmp, au); + } + + LOCK (&au->lock); + NOTE("Waiting for id %d", i); + if (au->id.waiting < i) + au->id.waiting = i; + UNLOCK (&au->lock); + SIGNAL (&(au->work)); + WAIT_SIGNAL (&(au->id.done), (au->id.low >= au->id.waiting || au->empty)); + LOCK (&au->lock); + ret = async_wait_operation (cmp, au); + UNLOCK(&au->lock); + return ret; +} + +bool +async_wait (st_parameter_common *cmp, async_unit * au) +{ + bool ret; + + if (au == NULL) + return false; + + if (cmp == NULL) + cmp = au->error.cmp; + + SIGNAL (&(au->work)); + LOCK(&au->lock); + if (au->empty) + { + ret = async_wait_operation (cmp, au); + UNLOCK(&au->lock); + return ret; + } + + UNLOCK(&au->lock); + WAIT_SIGNAL (&(au->emptysignal), (au->empty)); + LOCK (&au->lock); + ret = async_wait_operation (cmp, au); + UNLOCK(&au->lock); + return ret; +} + +void +async_close (async_unit * au) +{ + NOTE("Closing async unit"); + if (!au->error.fatal_error) + { + LOCK (&au->lock); + au->finished = 1; + UNLOCK (&au->lock); + NOTE ("About to wait in async_close"); + async_wait (NULL, au); + T_ERROR (__gthread_join, au->thread, NULL); + } + free_async_unit(au); +} Index: libgfortran/io/async.h =================================================================== --- libgfortran/io/async.h (nicht existent) +++ libgfortran/io/async.h (Arbeitskopie) @@ -0,0 +1,332 @@ +/* Copyright (C) 2018 Free Software Foundation, Inc. + Contributed by Nicolas Koenig + +This file is part of the GNU Fortran runtime library (libgfortran). + +Libgfortran is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; either version 3, or (at your option) +any later version. + +Libgfortran is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +Under Section 7 of GPL version 3, you are granted additional +permissions described in the GCC Runtime Library Exception, version +3.1, as published by the Free Software Foundation. + +You should have received a copy of the GNU General Public License and +a copy of the GCC Runtime Library Exception along with this program; +see the files COPYING3 and COPYING.RUNTIME respectively. If not, see +. */ + +#ifndef ASYNC_H +#define ASYNC_H + +#define DEBUG_ASYNC +#undef DEBUG_ASYNC + +#ifdef DEBUG_ASYNC + +#define DEBUG_PRINTF(...) fprintf(stderr,__VA_ARGS__) + +#define IN_DEBUG_QUEUE(mutex) ({\ + __label__ end;\ + aio_lock_debug *curr = aio_debug_head;\ + while(curr) { \ + if (curr->m == mutex) {\ + goto end;\ + }\ + curr = curr->next;\ + }\ + end:;\ + curr;\ + }) + +#define TAIL_DEBUG_QUEUE ({\ + aio_lock_debug *curr = aio_debug_head; \ + while(curr && curr->next) { \ + curr = curr->next;\ + }\ + curr;\ + }) + +#define CHECK_LOCK(mutex, status) do {\ + aio_lock_debug *curr;\ + INTERN_LOCK(&debug_queue_lock); \ + if (__gthread_mutex_trylock(mutex)) {\ + if ((curr = IN_DEBUG_QUEUE(mutex))) {\ + sprintf(status, "\033[31m%s():%d\033[0m", curr->func, curr->line);\ + } else\ + sprintf(status, "\033[31munknown\033[0m");\ + }\ + else {\ + __gthread_mutex_unlock(mutex);\ + sprintf(status, "\033[32munlocked\033[0m");\ + }\ + INTERN_UNLOCK(&debug_queue_lock);\ + }while(0) + +#define T_ERROR(func, ...) do {\ + int t_error_temp; \ + t_error_temp = func(__VA_ARGS__);\ + if(t_error_temp)\ + ERROR(t_error_temp, "args: " #__VA_ARGS__ "\n");\ + } while(0) + +#define NOTE(str, ...) do{\ + char note_str[200];\ + sprintf(note_str, "%s\033[35mNOTE: \033[0m" str, aio_prefix, ##__VA_ARGS__);\ + DEBUG_PRINTF("%-90s %20s():5%d\n", note_str, __FUNCTION__, __LINE__);\ + }while(0); + +#define ERROR(errnum, str, ...) do{\ + char note_str[200];\ + sprintf(note_str, "%s\033[41;37mERROR:\033[0m [%d] " str, aio_prefix, \ + errnum, ##__VA_ARGS__);\ + DEBUG_PRINTF("%-68s %s():%d\n", note_str, __FUNCTION__, __LINE__);\ + }while(0) + +#define MUTEX_DEBUG_ADD(mutex) do {\ + aio_lock_debug *n;\ + n = malloc(sizeof(aio_lock_debug));\ + n->prev = TAIL_DEBUG_QUEUE;\ + if (n->prev) \ + n->prev->next = n; \ + n->next = NULL;\ + n->line = __LINE__;\ + n->func = __FUNCTION__;\ + n->m = mutex;\ + if (!aio_debug_head) {\ + aio_debug_head = n;\ + }\ + } while(0) + +#define UNLOCK(mutex) do {\ + aio_lock_debug *curr;\ + DEBUG_PRINTF("%s%-75s %20s():%-5d %18p\n", aio_prefix, "\033[32mUNLOCK: \033[0m" #mutex, \ + __FUNCTION__, __LINE__, (void *) mutex); \ + INTERN_LOCK(&debug_queue_lock);\ + curr = IN_DEBUG_QUEUE(mutex);\ + if (curr) \ + {\ + if (curr->prev)\ + curr->prev->next = curr->next;\ + if (curr->next) {\ + curr->next->prev = curr->prev;\ + if (curr == aio_debug_head)\ + aio_debug_head = curr->next;\ + } else {\ + if (curr == aio_debug_head)\ + aio_debug_head = NULL;\ + }\ + free(curr);\ + }\ + INTERN_UNLOCK(&debug_queue_lock);\ + INTERN_UNLOCK(mutex);\ + }while(0) + +#define TRYLOCK(mutex) ({\ + char status[200];\ + int res;\ + aio_lock_debug *curr;\ + res = __gthread_mutex_trylock(mutex);\ + INTERN_LOCK(&debug_queue_lock); \ + if (res) {\ + if ((curr = IN_DEBUG_QUEUE(mutex))) {\ + sprintf(status, "\033[31m%s():%d\033[0m", curr->func, curr->line);\ + } else\ + sprintf(status, "\033[31munknown\033[0m");\ + }\ + else {\ + sprintf(status, "\033[32munlocked\033[0m");\ + MUTEX_DEBUG_ADD(mutex);\ + }\ + DEBUG_PRINTF("%s%-44s prev: %-35s %20s():%-5d %18p\n", aio_prefix, \ + "\033[31;2mTRYLOCK: \033[0m" #mutex, status, __FUNCTION__, __LINE__,\ + (void *) mutex);\ + INTERN_UNLOCK(&debug_queue_lock);\ + res;\ + }) + +#define LOCK(mutex) do {\ + char status[200];\ + CHECK_LOCK(mutex, status);\ + DEBUG_PRINTF("%s%-42s prev: %-35s %20s():%-5d %18p\n", aio_prefix, \ + "\033[31mLOCK: \033[0m" #mutex, status, __FUNCTION__, __LINE__, (void *) mutex); \ + INTERN_LOCK(mutex);\ + INTERN_LOCK(&debug_queue_lock); \ + MUTEX_DEBUG_ADD(mutex);\ + INTERN_UNLOCK(&debug_queue_lock);\ + DEBUG_PRINTF("%s\033[31mACQ:\033[0m %-30s %78p\n", aio_prefix, #mutex, mutex); \ + }while(0) + +#define DEBUG_LINE(...) __VA_ARGS__ + +#else +#define DEBUG_PRINTF(...) {} +#define CHECK_LOCK(au, mutex, status) {} +#define NOTE(str, ...) {} +#define DEBUG_LINE(...) +#define T_ERROR(func, ...) func(__VA_ARGS__) +#define LOCK(mutex) INTERN_LOCK(mutex) +#define UNLOCK(mutex) INTERN_UNLOCK(mutex) +#define TRYLOCK(mutex) (__gthread_mutex_trylock(mutex)) +#endif + +#define MPREFIX "\033[30;46mM:\033[0m " +#define TPREFIX "\033[37;44mT:\033[0m " +#define RPREFIX "\033[37;41mR:\033[0m " + +#define INTERN_LOCK(mutex) T_ERROR(__gthread_mutex_lock, mutex); + +#define INTERN_UNLOCK(mutex) T_ERROR(__gthread_mutex_unlock, mutex); + +#define SIGNAL(advcond) do{\ + INTERN_LOCK(&(advcond)->lock);\ + (advcond)->pending = 1;\ + INTERN_UNLOCK(&(advcond)->lock);\ + DEBUG_PRINTF("%s%-75s %20s():%-5d %18p\n", aio_prefix, "\033[33mSIGNAL: \033[0m" \ + #advcond, __FUNCTION__, __LINE__, (void *) advcond);\ + T_ERROR(__gthread_cond_broadcast, &(advcond)->signal);\ + }while(0) + +#define WAIT_SIGNAL(advcond, condition) do{\ + __label__ finish;\ + INTERN_LOCK(&((advcond)->lock));\ + DEBUG_PRINTF("%s%-75s %20s():%-5d %18p\n", aio_prefix, "\033[34mWAITING: \033[0m" \ + #advcond, __FUNCTION__, __LINE__, (void *) advcond);\ + if ((advcond)->pending){\ + goto finish;\ + }\ + if ( condition ) {\ + goto finish; \ + }\ + while(!__gthread_cond_wait(&(advcond)->signal, &(advcond)->lock)) {\ + if ( condition ){\ + DEBUG_PRINTF("%s%-75s %20s():%-5d %18p\n", aio_prefix, "\033[33mREC: \033[0m" \ + #advcond, __FUNCTION__, __LINE__, (void *)advcond);\ + break;\ + }\ + }\ + finish: \ + (advcond)->pending = 0;\ + INTERN_UNLOCK(&((advcond)->lock));\ + }while(0) + +#define REVOKE_SIGNAL(advcond) do{\ + INTERN_LOCK(&(advcond)->lock);\ + (advcond)->pending = 0;\ + INTERN_UNLOCK(&(advcond)->lock);\ + }while(0) + +DEBUG_LINE(extern __thread const char *aio_prefix); + +DEBUG_LINE(typedef struct aio_lock_debug{ + __gthread_mutex_t *m; + int line; + const char *func; + struct aio_lock_debug *next; + struct aio_lock_debug *prev; +} aio_lock_debug;) + +DEBUG_LINE(extern aio_lock_debug *aio_debug_head;) +DEBUG_LINE(extern __gthread_mutex_t debug_queue_lock;) + +extern __thread gfc_unit *thread_unit; + +enum aio_do { + AIO_INVALID = 0, + AIO_CHANGE_PDT, + AIO_TRANSFER_SCALAR, + AIO_TRANSFER_ARRAY, + AIO_WRITE_DONE, + AIO_READ_DONE +}; + +typedef union transfer_args +{ + struct + { + void (*transfer) (struct st_parameter_dt *, bt, void *, int, size_t, size_t); + bt arg_bt; + void *data; + int i; + size_t s1; + size_t s2; + } scalar; + struct + { + gfc_array_char *desc; + int kind; + gfc_charlen_type charlen; + } array; +} transfer_args; + +struct adv_cond +{ + int pending; + __gthread_mutex_t lock; + __gthread_cond_t signal; +}; + +typedef struct async_unit +{ + pthread_mutex_t lock; + struct adv_cond work; + struct adv_cond emptysignal; + struct st_parameter_dt *pdt; + pthread_t thread; + struct transfer_queue *head; + struct transfer_queue *tail; + struct + { + int waiting; + int low; + int high; + struct adv_cond done; + } id; + + int finished; + bool empty; + + struct { + const char *message; + st_parameter_common *cmp; + bool has_error; + int last_good_id; + int family; + bool fatal_error; + } error; + +} async_unit; + +async_unit *init_async_unit(gfc_unit *); +internal_proto (init_async_unit); + +bool async_wait (st_parameter_common *, async_unit *); +internal_proto(async_wait); + +bool async_wait_id (st_parameter_common *, async_unit *, int); +internal_proto(async_wait_id); + +void async_close (async_unit *); +internal_proto(async_close); + +void enqueue_transfer (async_unit * au, transfer_args * arg, enum aio_do); +internal_proto(enqueue_transfer); + +void enqueue_done (async_unit *, enum aio_do type); +internal_proto(enqueue_done); + +int enqueue_done_id (async_unit *, enum aio_do type); +internal_proto(enqueue_done_id); + +void enqueue_init (async_unit *); +internal_proto(enqueue_init); + +void change_pdt(async_unit *, st_parameter_dt *); +internal_proto(change_pdt); +#endif Index: libgfortran/io/close.c =================================================================== --- libgfortran/io/close.c (Revision 259739) +++ libgfortran/io/close.c (Arbeitskopie) @@ -24,6 +24,7 @@ see the files COPYING3 and COPYING.RUNTIME respect #include "io.h" #include "unix.h" +#include "async.h" #include typedef enum @@ -57,6 +58,15 @@ st_close (st_parameter_close *clp) find_option (&clp->common, clp->status, clp->status_len, status_opt, "Bad STATUS parameter in CLOSE statement"); + u = find_unit (clp->common.unit); + + if (u && u->au) + if (async_wait (&(clp->common), u->au)) + { + library_end(); + return; + } + if ((clp->common.flags & IOPARM_LIBRETURN_MASK) != IOPARM_LIBRETURN_OK) { library_end (); @@ -63,7 +73,6 @@ st_close (st_parameter_close *clp) return; } - u = find_unit (clp->common.unit); if (u != NULL) { if (close_share (u) < 0) Index: libgfortran/io/file_pos.c =================================================================== --- libgfortran/io/file_pos.c (Revision 259739) +++ libgfortran/io/file_pos.c (Arbeitskopie) @@ -25,6 +25,7 @@ see the files COPYING3 and COPYING.RUNTIME respect #include "io.h" #include "fbuf.h" #include "unix.h" +#include "async.h" #include /* file_pos.c-- Implement the file positioning statements, i.e. BACKSPACE, @@ -214,6 +215,9 @@ st_backspace (st_parameter_filepos *fpp) goto done; } + if (u->au && async_wait (&(fpp->common), u->au)) + return; + /* Make sure format buffer is flushed and reset. */ if (u->flags.form == FORM_FORMATTED) { @@ -294,6 +298,9 @@ st_endfile (st_parameter_filepos *fpp) goto done; } + if (u->au && async_wait (&(fpp->common), u->au)) + return; + if (u->flags.access == ACCESS_SEQUENTIAL && u->endfile == AFTER_ENDFILE) { @@ -401,6 +408,10 @@ st_rewind (st_parameter_filepos *fpp) "Cannot REWIND a file opened for DIRECT access"); else { + + if (u->au && async_wait (&(fpp->common), u->au)) + return; + /* If there are previously written bytes from a write with ADVANCE="no", add a record marker before performing the ENDFILE. */ @@ -456,6 +467,9 @@ st_flush (st_parameter_filepos *fpp) u = find_unit (fpp->common.unit); if (u != NULL) { + if (u->au && async_wait (&(fpp->common), u->au)) + return; + /* Make sure format buffer is flushed. */ if (u->flags.form == FORM_FORMATTED) fbuf_flush (u, u->mode); Index: libgfortran/io/inquire.c =================================================================== --- libgfortran/io/inquire.c (Revision 259739) +++ libgfortran/io/inquire.c (Arbeitskopie) @@ -26,6 +26,7 @@ see the files COPYING3 and COPYING.RUNTIME respect /* Implement the non-IOLENGTH variant of the INQUIRY statement */ #include "io.h" +#include "async.h" #include "unix.h" #include @@ -281,12 +282,6 @@ inquire_via_unit (st_parameter_inquire *iqp, gfc_u { GFC_INTEGER_4 cf2 = iqp->flags2; - if ((cf2 & IOPARM_INQUIRE_HAS_PENDING) != 0) - *iqp->pending = 0; - - if ((cf2 & IOPARM_INQUIRE_HAS_ID) != 0) - *iqp->id = 0; - if ((cf2 & IOPARM_INQUIRE_HAS_ENCODING) != 0) { if (u == NULL || u->flags.form != FORM_FORMATTED) @@ -347,6 +342,27 @@ inquire_via_unit (st_parameter_inquire *iqp, gfc_u cf_strcpy (iqp->asynchronous, iqp->asynchronous_len, p); } + if ((cf2 & IOPARM_INQUIRE_HAS_PENDING) != 0) + { + if (u->au == NULL) + *(iqp->pending) = 0; + else + { + LOCK (&(u->au->lock)); + if ((cf2 & IOPARM_INQUIRE_HAS_ID) != 0) + { + int id; + id = *(iqp->id); + *(iqp->pending) = id > u->au->id.low; + } + else + { + *(iqp->pending) = ! u->au->empty; + } + UNLOCK (&(u->au->lock)); + } + } + if ((cf2 & IOPARM_INQUIRE_HAS_SIGN) != 0) { if (u == NULL) Index: libgfortran/io/io.h =================================================================== --- libgfortran/io/io.h (Revision 259739) +++ libgfortran/io/io.h (Arbeitskopie) @@ -531,7 +531,9 @@ typedef struct st_parameter_dt /* A flag used to identify when a non-standard expanded namelist read has occurred. */ unsigned expanded_read : 1; - /* 13 unused bits. */ + /* Flag to indicate if the statement has async="YES". */ + unsigned async : 1; + /* 12 unused bits. */ int child_saved_iostat; int nml_delim; @@ -590,7 +592,7 @@ extern char check_st_parameter_dt[sizeof (((st_par typedef struct { st_parameter_common common; - CHARACTER1 (id); + GFC_INTEGER_4 *id; } st_parameter_wait; @@ -659,6 +661,9 @@ typedef struct gfc_unit int continued; + /* contains the pointer to the async unit */ + struct async_unit *au; + __gthread_mutex_t lock; /* Number of threads waiting to acquire this unit's lock. When non-zero, close_unit doesn't only removes the unit @@ -817,9 +822,16 @@ internal_proto(next_record); extern void st_wait (st_parameter_wait *); export_proto(st_wait); +extern void st_wait_async (st_parameter_wait *); +export_proto(st_wait_async); + extern void hit_eof (st_parameter_dt *); internal_proto(hit_eof); +extern void transfer_array_inner (st_parameter_dt *, gfc_array_char *, int, + gfc_charlen_type); +internal_proto(transfer_array_inner); + /* read.c */ extern void set_integer (void *, GFC_INTEGER_LARGEST, int); @@ -988,3 +1000,10 @@ memset4 (gfc_char4_t *p, gfc_char4_t c, int k) #endif +extern void +st_write_done_worker (st_parameter_dt *dtp); +internal_proto (st_write_done_worker); + +extern void +st_read_done_worker (st_parameter_dt *dtp); +internal_proto (st_read_done_worker); Index: libgfortran/io/open.c =================================================================== --- libgfortran/io/open.c (Revision 259739) +++ libgfortran/io/open.c (Arbeitskopie) @@ -26,6 +26,7 @@ see the files COPYING3 and COPYING.RUNTIME respect #include "io.h" #include "fbuf.h" #include "unix.h" +#include "async.h" #ifdef HAVE_UNISTD_H #include @@ -651,8 +652,12 @@ new_unit (st_parameter_open *opp, gfc_unit *u, uni else u->fbuf = NULL; - - + /* Check if asynchrounous. */ + if (flags->async == ASYNC_YES) + u->au = init_async_unit (u); + else + u->au = NULL; + return u; cleanup: Index: libgfortran/io/transfer.c =================================================================== --- libgfortran/io/transfer.c (Revision 259739) +++ libgfortran/io/transfer.c (Arbeitskopie) @@ -31,6 +31,7 @@ see the files COPYING3 and COPYING.RUNTIME respect #include "fbuf.h" #include "format.h" #include "unix.h" +#include "async.h" #include #include @@ -184,6 +185,12 @@ static const st_option pad_opt[] = { {NULL, 0} }; +static const st_option async_opt[] = { + {"yes", ASYNC_YES}, + {"no", ASYNC_NO}, + {NULL, 0} +}; + typedef enum { FORMATTED_SEQUENTIAL, UNFORMATTED_SEQUENTIAL, FORMATTED_DIRECT, UNFORMATTED_DIRECT, FORMATTED_STREAM, UNFORMATTED_STREAM @@ -2281,7 +2288,38 @@ formatted_transfer (st_parameter_dt *dtp, bt type, } } +/* Wrapper function for I/O of scalar types. If this should be an async I/O + request, queue it. For a synchronous write on an async unit, perform the + wait operation and return an error. For all synchronous writes, call the + right transfer function. */ +static void +wrap_scalar_transfer (st_parameter_dt *dtp, bt type, void *p, int kind, + size_t size, size_t n_elem) +{ + if (dtp->u.p.current_unit && dtp->u.p.current_unit->au) + { + if (dtp->u.p.async) + { + transfer_args args; + args.scalar.transfer = dtp->u.p.transfer; + args.scalar.arg_bt = type; + args.scalar.data = p; + args.scalar.i = kind; + args.scalar.s1 = size; + args.scalar.s2 = n_elem; + enqueue_transfer(dtp->u.p.current_unit->au, &args, AIO_TRANSFER_SCALAR); + return; + } + } + /* Come here if there was no asynchronous I/O to be scheduled. */ + if ((dtp->common.flags & IOPARM_LIBRETURN_MASK) != IOPARM_LIBRETURN_OK) + return; + + dtp->u.p.transfer (dtp, type, p, kind, size, 1); +} + + /* Data transfer entry points. The type of the data entity is implicit in the subroutine call. This prevents us from having to share a common enum with the compiler. */ @@ -2289,9 +2327,7 @@ formatted_transfer (st_parameter_dt *dtp, bt type, void transfer_integer (st_parameter_dt *dtp, void *p, int kind) { - if ((dtp->common.flags & IOPARM_LIBRETURN_MASK) != IOPARM_LIBRETURN_OK) - return; - dtp->u.p.transfer (dtp, BT_INTEGER, p, kind, kind, 1); + wrap_scalar_transfer (dtp, BT_INTEGER, p, kind, kind, 1); } void @@ -2307,7 +2343,7 @@ transfer_real (st_parameter_dt *dtp, void *p, int if ((dtp->common.flags & IOPARM_LIBRETURN_MASK) != IOPARM_LIBRETURN_OK) return; size = size_from_real_kind (kind); - dtp->u.p.transfer (dtp, BT_REAL, p, kind, size, 1); + wrap_scalar_transfer(dtp, BT_REAL, p, kind, size, 1); } void @@ -2319,9 +2355,7 @@ transfer_real_write (st_parameter_dt *dtp, void *p void transfer_logical (st_parameter_dt *dtp, void *p, int kind) { - if ((dtp->common.flags & IOPARM_LIBRETURN_MASK) != IOPARM_LIBRETURN_OK) - return; - dtp->u.p.transfer (dtp, BT_LOGICAL, p, kind, kind, 1); + wrap_scalar_transfer(dtp, BT_LOGICAL, p, kind, kind, 1); } void @@ -2345,7 +2379,7 @@ transfer_character (st_parameter_dt *dtp, void *p, p = empty_string; /* Set kind here to 1. */ - dtp->u.p.transfer (dtp, BT_CHARACTER, p, 1, len, 1); + wrap_scalar_transfer(dtp, BT_CHARACTER, p, 1, len, 1); } void @@ -2369,7 +2403,7 @@ transfer_character_wide (st_parameter_dt *dtp, voi p = empty_string; /* Here we pass the actual kind value. */ - dtp->u.p.transfer (dtp, BT_CHARACTER, p, kind, len, 1); + wrap_scalar_transfer(dtp, BT_CHARACTER, p, kind, len, 1); } void @@ -2385,7 +2419,7 @@ transfer_complex (st_parameter_dt *dtp, void *p, i if ((dtp->common.flags & IOPARM_LIBRETURN_MASK) != IOPARM_LIBRETURN_OK) return; size = size_from_complex_kind (kind); - dtp->u.p.transfer (dtp, BT_COMPLEX, p, kind, size, 1); + wrap_scalar_transfer(dtp, BT_COMPLEX, p, kind, size, 1); } void @@ -2395,8 +2429,8 @@ transfer_complex_write (st_parameter_dt *dtp, void } void -transfer_array (st_parameter_dt *dtp, gfc_array_char *desc, int kind, - gfc_charlen_type charlen) +transfer_array_inner (st_parameter_dt *dtp, gfc_array_char *desc, int kind, + gfc_charlen_type charlen) { index_type count[GFC_MAX_DIMENSIONS]; index_type extent[GFC_MAX_DIMENSIONS]; @@ -2471,6 +2505,34 @@ void } void +transfer_array (st_parameter_dt *dtp, gfc_array_char *desc, int kind, + gfc_charlen_type charlen) +{ + if ((dtp->common.flags & IOPARM_LIBRETURN_MASK) != IOPARM_LIBRETURN_OK) + return; + + if (dtp->u.p.current_unit && dtp->u.p.current_unit->au) + { + if (dtp->u.p.async) + { + transfer_args args; + size_t sz = sizeof (gfc_array_char) + + sizeof (descriptor_dimension) * GFC_DESCRIPTOR_RANK (desc); + args.array.desc = xmalloc (sz); + NOTE("desc = %p", (void *) args.array.desc); + memcpy (args.array.desc, desc, sz); + args.array.kind = kind; + args.array.charlen = charlen; + enqueue_transfer(dtp->u.p.current_unit->au, &args, AIO_TRANSFER_ARRAY); + return; + } + } + /* Come here if there was no asynchronous I/O to be scheduled. */ + transfer_array_inner (dtp, desc, kind, charlen); +} + + +void transfer_array_write (st_parameter_dt *dtp, gfc_array_char *desc, int kind, gfc_charlen_type charlen) { @@ -2492,7 +2554,7 @@ transfer_derived (st_parameter_dt *parent, void *d else parent->u.p.fdtio_ptr = (formatted_dtio) dtio_proc; } - parent->u.p.transfer (parent, BT_CLASS, dtio_source, 0, 0, 1); + wrap_scalar_transfer(parent, BT_CLASS, dtio_source, 0, 0, 1); } @@ -2770,6 +2832,29 @@ data_transfer_init (st_parameter_dt *dtp, int read else if (dtp->u.p.current_unit->internal_unit_kind > 0) dtp->u.p.unit_is_internal = 1; + if ((cf & IOPARM_DT_HAS_ASYNCHRONOUS) != 0) + { + int f; + f = find_option (&dtp->common, dtp->asynchronous, dtp->asynchronous_len, + async_opt, "Bad ASYNCHRONOUS in data transfer statement"); + if (f == ASYNC_YES && dtp->u.p.current_unit->flags.async != ASYNC_YES) + { + generate_error (&dtp->common, LIBERROR_OPTION_CONFLICT, + "ASYNCHRONOUS transfer without ASYHCRONOUS='YES' in OPEN"); + return; + } + dtp->u.p.async = f == ASYNC_YES; + } + + /* Perform a wait operation for any pending asynchronous I/O. This needs to + be done before all other error checks. See F2008, 9.6.4.1. */ + + if (dtp->u.p.current_unit->au && !dtp->u.p.async) + { + if (async_wait (&(dtp->common), dtp->u.p.current_unit->au)) + return; + } + /* Check the action. */ if (read_flag && dtp->u.p.current_unit->flags.action == ACTION_WRITE) @@ -3184,6 +3269,9 @@ data_transfer_init (st_parameter_dt *dtp, int read dtp->u.p.current_unit->read_bad = 1; } + if (dtp->u.p.current_unit->au) + change_pdt(dtp->u.p.current_unit->au, dtp); + if (dtp->u.p.current_unit->flags.form == FORM_FORMATTED) { #ifdef HAVE_USELOCALE @@ -4099,7 +4187,7 @@ extern void st_read_done (st_parameter_dt *); export_proto(st_read_done); void -st_read_done (st_parameter_dt *dtp) +st_read_done_worker (st_parameter_dt *dtp) { finalize_transfer (dtp); @@ -4127,6 +4215,24 @@ void free_format_data (dtp->u.p.fmt); free_format (dtp); } + } +} + +void +st_read_done(st_parameter_dt *dtp) +{ + if (dtp->u.p.current_unit) + { + if (dtp->u.p.current_unit->au) + { + if (dtp->common.flags & IOPARM_DT_HAS_ID) + *dtp->id = enqueue_done_id (dtp->u.p.current_unit->au, AIO_READ_DONE); + else + enqueue_done (dtp->u.p.current_unit->au, AIO_READ_DONE); + } + else + st_read_done_worker (dtp); + unlock_unit (dtp->u.p.current_unit); } @@ -4143,11 +4249,9 @@ st_write (st_parameter_dt *dtp) data_transfer_init (dtp, 0); } -extern void st_write_done (st_parameter_dt *); -export_proto(st_write_done); void -st_write_done (st_parameter_dt *dtp) +st_write_done_worker (st_parameter_dt *dtp) { finalize_transfer (dtp); @@ -4196,19 +4300,61 @@ void free_format_data (dtp->u.p.fmt); free_format (dtp); } + } +} + +extern void st_write_done (st_parameter_dt *); +export_proto(st_write_done); + +void +st_write_done (st_parameter_dt *dtp) +{ + if (dtp->u.p.current_unit) + { + if (dtp->u.p.current_unit->au) + { + if (dtp->common.flags & IOPARM_DT_HAS_ID) + *dtp->id = enqueue_done_id(dtp->u.p.current_unit->au, AIO_WRITE_DONE); + else + enqueue_done (dtp->u.p.current_unit->au, AIO_WRITE_DONE); + } + else + st_write_done_worker (dtp); + unlock_unit (dtp->u.p.current_unit); } + library_end (); } +/* Wait operation. We need to keep around the do-nothing version + of st_wait for compatibility with previous versions, which had marked + the argument as unused (and thus liable to be removed). -/* F2003: This is a stub for the runtime portion of the WAIT statement. */ + TODO: remove at next bump in version number. */ + void st_wait (st_parameter_wait *wtp __attribute__((unused))) { + return; } +void +st_wait_async (st_parameter_wait *wtp) +{ + gfc_unit *u = find_unit(wtp->common.unit); + if (u->au) + { + if (wtp->common.flags & IOPARM_WAIT_HAS_ID) + async_wait_id (&(wtp->common), u->au, *wtp->id); + else + async_wait (&(wtp->common), u->au); + } + unlock_unit(u); +} + + /* Receives the scalar information for namelist objects and stores it in a linked list of namelist_info types. */ Index: libgfortran/io/unit.c =================================================================== --- libgfortran/io/unit.c (Revision 259739) +++ libgfortran/io/unit.c (Arbeitskopie) @@ -27,6 +27,7 @@ see the files COPYING3 and COPYING.RUNTIME respect #include "fbuf.h" #include "format.h" #include "unix.h" +#include "async.h" #include #include @@ -240,7 +241,7 @@ insert_unit (int n) #else __GTHREAD_MUTEX_INIT_FUNCTION (&u->lock); #endif - __gthread_mutex_lock (&u->lock); + LOCK (&u->lock); u->priority = pseudo_random (); unit_root = insert (u, unit_root); return u; @@ -327,7 +328,9 @@ get_gfc_unit (int n, int do_create) gfc_unit *p; int c, created = 0; - __gthread_mutex_lock (&unit_lock); + NOTE("Unit n=%d, do_create = %d", n, do_create); + LOCK(&unit_lock); + retry: for (c = 0; c < CACHE_SIZE; c++) if (unit_cache[c] != NULL && unit_cache[c]->unit_number == n) @@ -366,7 +369,7 @@ retry: { /* Newly created units have their lock held already from insert_unit. Just unlock UNIT_LOCK and return. */ - __gthread_mutex_unlock (&unit_lock); + UNLOCK(&unit_lock); return p; } @@ -374,10 +377,10 @@ found: if (p != NULL && (p->child_dtio == 0)) { /* Fast path. */ - if (! __gthread_mutex_trylock (&p->lock)) + if (! TRYLOCK (&p->lock)) { /* assert (p->closed == 0); */ - __gthread_mutex_unlock (&unit_lock); + UNLOCK(&unit_lock); return p; } @@ -385,15 +388,15 @@ found: } - __gthread_mutex_unlock (&unit_lock); + UNLOCK (&unit_lock); if (p != NULL && (p->child_dtio == 0)) { - __gthread_mutex_lock (&p->lock); + LOCK (&p->lock); if (p->closed) { - __gthread_mutex_lock (&unit_lock); - __gthread_mutex_unlock (&p->lock); + LOCK (&unit_lock); + UNLOCK (&p->lock); if (predec_waiting_locked (p) == 0) destroy_unit_mutex (p); goto retry; @@ -640,7 +643,7 @@ init_units (void) fbuf_init (u, 0); - __gthread_mutex_unlock (&u->lock); + UNLOCK (&u->lock); } if (options.stdout_unit >= 0) @@ -671,7 +674,7 @@ init_units (void) fbuf_init (u, 0); - __gthread_mutex_unlock (&u->lock); + UNLOCK (&u->lock); } if (options.stderr_unit >= 0) @@ -702,13 +705,13 @@ init_units (void) fbuf_init (u, 256); /* 256 bytes should be enough, probably not doing any kind of exotic formatting to stderr. */ - __gthread_mutex_unlock (&u->lock); + UNLOCK (&u->lock); } /* The default internal units. */ u = insert_unit (GFC_INTERNAL_UNIT); - __gthread_mutex_unlock (&u->lock); + UNLOCK (&u->lock); u = insert_unit (GFC_INTERNAL_UNIT4); - __gthread_mutex_unlock (&u->lock); + UNLOCK (&u->lock); } @@ -717,6 +720,9 @@ close_unit_1 (gfc_unit *u, int locked) { int i, rc; + if(u->au) + async_close (u->au); + /* If there are previously written bytes from a write with ADVANCE="no" Reposition the buffer before closing. */ if (u->previous_nonadvancing_write) @@ -723,10 +729,10 @@ close_unit_1 (gfc_unit *u, int locked) finish_last_advance_record (u); rc = (u->s == NULL) ? 0 : sclose (u->s) == -1; - + u->closed = 1; if (!locked) - __gthread_mutex_lock (&unit_lock); + LOCK (&unit_lock); for (i = 0; i < CACHE_SIZE; i++) if (unit_cache[i] == u) @@ -744,7 +750,7 @@ close_unit_1 (gfc_unit *u, int locked) newunit_free (u->unit_number); if (!locked) - __gthread_mutex_unlock (&u->lock); + UNLOCK (&u->lock); /* If there are any threads waiting in find_unit for this unit, avoid freeing the memory, the last such thread will free it @@ -753,7 +759,7 @@ close_unit_1 (gfc_unit *u, int locked) destroy_unit_mutex (u); if (!locked) - __gthread_mutex_unlock (&unit_lock); + UNLOCK (&unit_lock); return rc; } @@ -761,7 +767,9 @@ close_unit_1 (gfc_unit *u, int locked) void unlock_unit (gfc_unit *u) { - __gthread_mutex_unlock (&u->lock); + NOTE ("unlock_unit = %d", u->unit_number); + UNLOCK (&u->lock); + NOTE ("unlock_unit done"); } /* close_unit()-- Close a unit. The stream is closed, and any memory @@ -785,10 +793,10 @@ close_unit (gfc_unit *u) void close_units (void) { - __gthread_mutex_lock (&unit_lock); + LOCK (&unit_lock); while (unit_root != NULL) close_unit_1 (unit_root, 1); - __gthread_mutex_unlock (&unit_lock); + UNLOCK (&unit_lock); free (newunits); @@ -895,7 +903,7 @@ finish_last_advance_record (gfc_unit *u) int newunit_alloc (void) { - __gthread_mutex_lock (&unit_lock); + LOCK (&unit_lock); if (!newunits) { newunits = xcalloc (16, 1); @@ -909,7 +917,7 @@ newunit_alloc (void) { newunits[ii] = true; newunit_lwi = ii + 1; - __gthread_mutex_unlock (&unit_lock); + UNLOCK (&unit_lock); return -ii + NEWUNIT_START; } } @@ -922,7 +930,7 @@ newunit_alloc (void) memset (newunits + old_size, 0, old_size); newunits[old_size] = true; newunit_lwi = old_size + 1; - __gthread_mutex_unlock (&unit_lock); + UNLOCK (&unit_lock); return -old_size + NEWUNIT_START; } Index: libgfortran/io/unix.c =================================================================== --- libgfortran/io/unix.c (Revision 259739) +++ libgfortran/io/unix.c (Arbeitskopie) @@ -27,6 +27,7 @@ see the files COPYING3 and COPYING.RUNTIME respect #include "io.h" #include "unix.h" +#include "async.h" #include #ifdef HAVE_UNISTD_H @@ -1742,7 +1743,7 @@ find_file (const char *file, gfc_charlen_type file id = id_from_path (path); #endif - __gthread_mutex_lock (&unit_lock); + LOCK (&unit_lock); retry: u = find_file0 (unit_root, FIND_FILE0_ARGS); if (u != NULL) @@ -1751,20 +1752,20 @@ retry: if (! __gthread_mutex_trylock (&u->lock)) { /* assert (u->closed == 0); */ - __gthread_mutex_unlock (&unit_lock); + UNLOCK (&unit_lock); goto done; } inc_waiting_locked (u); } - __gthread_mutex_unlock (&unit_lock); + UNLOCK (&unit_lock); if (u != NULL) { - __gthread_mutex_lock (&u->lock); + LOCK (&u->lock); if (u->closed) { - __gthread_mutex_lock (&unit_lock); - __gthread_mutex_unlock (&u->lock); + LOCK (&unit_lock); + UNLOCK (&u->lock); if (predec_waiting_locked (u) == 0) free (u); goto retry; @@ -1794,7 +1795,7 @@ flush_all_units_1 (gfc_unit *u, int min_unit) return u; if (u->s) sflush (u->s); - __gthread_mutex_unlock (&u->lock); + UNLOCK (&u->lock); } u = u->right; } @@ -1807,17 +1808,17 @@ flush_all_units (void) gfc_unit *u; int min_unit = 0; - __gthread_mutex_lock (&unit_lock); + LOCK (&unit_lock); do { u = flush_all_units_1 (unit_root, min_unit); if (u != NULL) inc_waiting_locked (u); - __gthread_mutex_unlock (&unit_lock); + UNLOCK (&unit_lock); if (u == NULL) return; - __gthread_mutex_lock (&u->lock); + LOCK (&u->lock); min_unit = u->unit_number + 1; @@ -1824,14 +1825,14 @@ flush_all_units (void) if (u->closed == 0) { sflush (u->s); - __gthread_mutex_lock (&unit_lock); - __gthread_mutex_unlock (&u->lock); + LOCK (&unit_lock); + UNLOCK (&u->lock); (void) predec_waiting_locked (u); } else { - __gthread_mutex_lock (&unit_lock); - __gthread_mutex_unlock (&u->lock); + LOCK (&unit_lock); + UNLOCK (&u->lock); if (predec_waiting_locked (u) == 0) free (u); } Index: libgfortran/libgfortran.h =================================================================== --- libgfortran/libgfortran.h (Revision 259739) +++ libgfortran/libgfortran.h (Arbeitskopie) @@ -743,6 +743,9 @@ internal_proto(translate_error); extern void generate_error (st_parameter_common *, int, const char *); iexport_proto(generate_error); +extern bool generate_error_common (st_parameter_common *, int, const char *); +iexport_proto(generate_error_common); + extern void generate_warning (st_parameter_common *, const char *); internal_proto(generate_warning); @@ -1748,5 +1751,7 @@ void cshift1_16_c16 (gfc_array_c16 * const restric internal_proto(cshift1_16_c16); #endif +/* Define this if we support asynchronous I/O on this platform. This + currently requires weak symbols. */ #endif /* LIBGFOR_H */ Index: libgfortran/runtime/error.c =================================================================== --- libgfortran/runtime/error.c (Revision 259739) +++ libgfortran/runtime/error.c (Arbeitskopie) @@ -24,6 +24,9 @@ see the files COPYING3 and COPYING.RUNTIME respect #include "libgfortran.h" +#include "io.h" +#include "async.h" + #include #include #include @@ -526,24 +529,30 @@ translate_error (int code) } -/* generate_error()-- Come here when an error happens. This - * subroutine is called if it is possible to continue on after the error. - * If an IOSTAT or IOMSG variable exists, we set it. If IOSTAT or - * ERR labels are present, we return, otherwise we terminate the program - * after printing a message. The error code is always required but the - * message parameter can be NULL, in which case a string describing - * the most recent operating system error is used. */ +/* Worker function for generate_error and generate_error_async. Return true + if a straight return is to be done, zero if the program should abort. */ -void -generate_error (st_parameter_common *cmp, int family, const char *message) +bool +generate_error_common (st_parameter_common *cmp, int family, const char *message) { char errmsg[STRERR_MAXSZ]; + gfc_unit *u; + u = thread_unit; + if (u && u->au && __gthread_equal (u->au->thread, __gthread_self ())) + { + u->au->error.has_error = 1; + u->au->error.cmp = cmp; + u->au->error.family = family; + u->au->error.message = message; + return true; + } + /* If there was a previous error, don't mask it with another error message, EOF or EOR condition. */ if ((cmp->flags & IOPARM_LIBRETURN_MASK) == IOPARM_LIBRETURN_ERROR) - return; + return true; /* Set the error status. */ if ((cmp->flags & IOPARM_HAS_IOSTAT)) @@ -564,27 +573,28 @@ translate_error (int code) case LIBERROR_EOR: cmp->flags |= IOPARM_LIBRETURN_EOR; if ((cmp->flags & IOPARM_EOR)) - return; + return true; break; case LIBERROR_END: cmp->flags |= IOPARM_LIBRETURN_END; if ((cmp->flags & IOPARM_END)) - return; + return true; break; default: cmp->flags |= IOPARM_LIBRETURN_ERROR; if ((cmp->flags & IOPARM_ERR)) - return; + return true; break; } /* Return if the user supplied an iostat variable. */ if ((cmp->flags & IOPARM_HAS_IOSTAT)) - return; + return true; - /* Terminate the program */ + /* Return code, caller is responsible for terminating + the program if necessary. */ recursion_check (); show_locus (cmp); @@ -591,8 +601,27 @@ translate_error (int code) estr_write ("Fortran runtime error: "); estr_write (message); estr_write ("\n"); - exit_error (2); + return false; } + +/* generate_error()-- Come here when an error happens. This + * subroutine is called if it is possible to continue on after the error. + * If an IOSTAT or IOMSG variable exists, we set it. If IOSTAT or + * ERR labels are present, we return, otherwise we terminate the program + * after printing a message. The error code is always required but the + * message parameter can be NULL, in which case a string describing + * the most recent operating system error is used. + * If the error is for an asynchronous unit and if the program is currently + * executing the asynchronous thread, just mark the error and return. */ + +void +generate_error (st_parameter_common *cmp, int family, const char *message) +{ + if (generate_error_common (cmp, family, message)) + return; + + exit_error(2); +} iexport(generate_error);