/* Command line options are (in this order): Number of Sender threads Number of Receiver threads CV wait time (microsecs) Trace level Sender sleeps (bool) Receiver sleeps (bool) Monitor rate (per second) E.g. condvar10.exe 8 8 1000000 -10 1 0 10 Log info is to 'condvar10.log'. */ #include #include #include #include #include "condvar10.h" char * logFile = "condvar10.log"; const DWORD MILLISEC_PER_SEC = 1000L; const DWORD MICROSEC_PER_NANOSEC = 1000L; const DWORD NANOSEC_PER_MILLISEC = 1000000L; const DWORD MICROSEC_PER_SEC = 1000000L; const DWORD NANOSEC_PER_SEC = 1000000000L; enum { Receiver = 0, Sender, MaxThreads = 100 }; pthread_t tid[2][MaxThreads]; typedef struct thrState_t_ { int op; int watchdog; int signalled; pthread_mutex_t opLock; } thrState_t; thrState_t thrState[2][MaxThreads]; void* recvReq(void *arg); void* sendReq(void *arg); void SendData(); void RecvData(); int msg=0; int trace=1; int sendSleep=0; int recvSleep=0; int monitorRate=10; DWORD monitorInterval; DWORD counter=0; DWORD lastCount=0; DWORD received=0; DWORD TOs=0; pthread_mutex_t lock; pthread_cond_t sig; int noSthr = 1; int noRthr = 1; DWORD timeint = 5*MICROSEC_PER_SEC; // 5 sec int ThreadRecvCount[MaxThreads]; int ThreadTOCount[MaxThreads]; int ThreadSentCount[MaxThreads]; enum Operations { SLock = 0x00000001, ELock = 0x00000002, SUnlock = 0x00000010, EUnlock = 0x00000020, SWait = 0x00000100, EWait = 0x00000200, WaitTimeout = 0x00000400, SSignal = 0x00001000, ESignal = 0x00002000, MsgFalse = 0x40000000, MsgTrue = 0x80000000, }; void RecvData(int threadNum); void SendData(int threadNum); void * recvReq(void *arg); void * sendReq(void *arg); pthread_mutex_t LOGX; #define OPENLOG(_openMode) \ { \ FILE * LOGFP; \ (void)pthread_mutex_lock(&LOGX); \ if ((LOGFP=fopen(logFile, _openMode)) == NULL) \ { \ fprintf(stdout, "Line %d: Log open error\n", __LINE__); \ fflush(stdout); \ } \ else \ { #define CLOSELOG(_exitAfterClose) \ fclose(LOGFP); \ if(_exitAfterClose) exit(1); \ } \ (void)pthread_mutex_unlock(&LOGX); \ } #define LOGERR \ { \ if(status!=0) \ { \ OPENLOG("a"); \ fprintf(LOGFP,"Error at line %d, status %d\n",__LINE__, status); \ CLOSELOG(1); \ } \ } void PR (char *s) { long id; if(trace>0) { id=GetCurrentThreadId (); OPENLOG("a"); fprintf(LOGFP,"TH-%lx:%s\n",id,s); CLOSELOG(0); } } void SetOp(int SR, int threadNum, int op) { int status = pthread_mutex_lock(&thrState[SR][threadNum].opLock); LOGERR; thrState[SR][threadNum].op = op; status = pthread_mutex_unlock(&thrState[SR][threadNum].opLock); LOGERR; } void OrOp(int SR, int threadNum, int op) { int status = pthread_mutex_lock(&thrState[SR][threadNum].opLock); LOGERR; thrState[SR][threadNum].op |= op; status = pthread_mutex_unlock(&thrState[SR][threadNum].opLock); LOGERR; } void SetWatchdog(int SR, int threadNum, int woof) { int status = pthread_mutex_lock(&thrState[SR][threadNum].opLock); LOGERR; thrState[SR][threadNum].watchdog = woof; status = pthread_mutex_unlock(&thrState[SR][threadNum].opLock); LOGERR; } BOOL logPoint(long increment) { if (counter >= lastCount + increment) { lastCount = counter; return TRUE; } return FALSE; } void PrintOptions (FILE * fp) { fprintf(fp, "Options are (in this order):\n"); fprintf(fp, " %-30s: %8ld\n", "Number of Sender threads", noSthr); fprintf(fp, " %-30s: %8ld\n", "Number of Receiver threads", noRthr); fprintf(fp, " %-30s: %8ld\n", "CV wait time (microsecs)", timeint); fprintf(fp, " %-30s: %8ld\n", "Trace level", trace); fprintf(fp, " %-30s: %8ld\n", "Sender sleeps (bool)", sendSleep); fprintf(fp, " %-30s: %8ld\n", "Receiver sleeps (bool)", recvSleep); fprintf(fp, " %-30s: %8ld\n", "Monitor rate (per second)", monitorRate); putc('\n',fp); } int main(int argc , char * argv[]) { pthread_mutexattr_t la; int status; int ii; DWORD milliseconds = 0; DWORD lastSendWatch = 0; DWORD lastRecvWatch = 0; int r = 0; DWORD seconds = 0; DWORD lastLogSeconds = 0; char * rotor = "/-\\|"; if(argc>1) { noSthr=atoi(argv[1]); if (noSthr >= MaxThreads) { printf("Requested too many Secnder threads = %d. Max is %d\n", noSthr, MaxThreads); exit(1); } } if(argc>2) { noRthr=atoi(argv[2]); if (noRthr >= MaxThreads) { printf("Requested too many Receiver threads = %d. Max is %d\n", noRthr, MaxThreads); exit(1); } } if(argc>3) { timeint=atoi(argv[3]); } if(argc>4) { trace=atoi(argv[4]); } if(argc>5) { sendSleep=atoi(argv[5]); } if(argc>6) { recvSleep=atoi(argv[6]); } if(argc>7) { monitorRate=atoi(argv[7]); // Round to nearest 10 so that 1000ms (1 sec) is a multiple of the interval monitorRate=((monitorRate+5)/10)*10; } monitorInterval = MILLISEC_PER_SEC/monitorRate; if (pthread_mutexattr_init(&la) != 0 || pthread_mutexattr_settype(&la, PTHREAD_MUTEX_ERRORCHECK) != 0 || pthread_mutex_init(&LOGX, &la) != 0) { printf("Line %d: Error initialising log mutex.\n", __LINE__); exit(1); } status = pthread_mutex_init(&lock, &la); LOGERR; status = pthread_cond_init(&sig, NULL); LOGERR; PrintOptions(stdout); fflush(stdout); OPENLOG("w"); PrintOptions(LOGFP); CLOSELOG(0); for(ii = 0; ii < noRthr; ii++) { status = pthread_mutex_init(&thrState[Receiver][ii].opLock, &la); LOGERR; status = pthread_create(&tid[Receiver][ii], NULL, (PTHREAD_START_ROUTINE_DECL)&recvReq, (void *)ii); LOGERR; } for(ii = 0; ii < noSthr; ii++) { status = pthread_mutex_init(&thrState[Sender][ii].opLock, &la); LOGERR; status = pthread_create(&tid[Sender][ii], NULL, (PTHREAD_START_ROUTINE_DECL)&sendReq, (void *)ii); LOGERR; } status = pthread_mutexattr_destroy(&la); LOGERR; while(1) //Monitor threads until they hang { int stillRunning; BOOL newSecond; Sleep(monitorInterval); milliseconds+=monitorInterval; newSecond = (milliseconds >= MILLISEC_PER_SEC); putchar(rotor[r=((r++)&0x3)]); putchar('\b'); // Log Sends and Receives/Timeouts if (trace > 0 || (trace < 0 && trace >= -5000 && (logPoint(-trace) || seconds > lastLogSeconds + 1 /* At least 1 second */))) { int ii; lastLogSeconds = seconds; OPENLOG("a"); fprintf(LOGFP,"count=%010ld, Thr/Recvd/TOs", counter); for (ii=0;ii (lastSendWatch + (2*timeint/MICROSEC_PER_SEC))) { for (ii=0;ii (lastRecvWatch + (2*timeint/MICROSEC_PER_SEC))) || (sendSleep && seconds > (lastSendWatch + (2*timeint/MICROSEC_PER_SEC)))) { stillRunning=noRthr; for (ii=0;ii (lastSendWatch + (2*timeint/MICROSEC_PER_SEC)))) { lastSendWatch = seconds; for (ii=0;ii (lastRecvWatch + (2*timeint/MICROSEC_PER_SEC))) || (sendSleep && seconds > (lastSendWatch + (2*timeint/MICROSEC_PER_SEC)))) { lastRecvWatch = seconds; for (ii=0;ii= NANOSEC_PER_SEC) { abstime.tv_nsec -= NANOSEC_PER_SEC; abstime.tv_sec++; } abstime.tv_sec += timeint/MICROSEC_PER_SEC; // printf("TO : %ld.%ld\n", abstime.tv_sec, abstime.tv_nsec); // fflush(stdout); PR("wait/unlock -11"); OrOp(Receiver,threadNum,SWait); status = pthread_cond_timedwait(&sig, &lock, &abstime); OrOp(Receiver,threadNum,EWait); PR("lock/awake -11"); if (status == ETIMEDOUT) { ThreadTOCount[threadNum]++; TOs++; PR("timeout -11"); PR("unlock -11"); OrOp(Receiver,threadNum,WaitTimeout); OrOp(Receiver,threadNum,SUnlock); status=pthread_mutex_unlock(&lock); OrOp(Receiver,threadNum,EUnlock); LOGERR; return ; } LOGERR; } if (msg==1) { OrOp(Receiver,threadNum,MsgTrue); } ThreadRecvCount[threadNum]++; msg=0; received++; PR("unlock -11"); OrOp(Receiver,threadNum,SUnlock); status=pthread_mutex_unlock(&lock); OrOp(Receiver,threadNum,EUnlock); LOGERR; return ; }