#include #include #include #include #include #include #include static void check_pthread (const char *func, int ret) { if (ret != 0) { errno = ret; err (1, "%s", func); } } /* Parallel arrays of keys and values. */ enum { key_count = 1024 }; struct storage { unsigned long int keys[key_count]; unsigned long int values[key_count]; }; /* Use a fixed offset from keys to values, to enable easy checking. */ enum { value_offset = 0x12345 }; /* stm_version & 1 is the index into storage_versions. */ static unsigned long int stm_version; struct storage storage_versions[2]; enum mismatch_kind { none, /* No consistency problem detected. */ key_range, /* Key is >= key_count. */ duplicate_key, /* The same key is present multiple times. */ wrong_value, /* A key has an unexpected value. */ missing_key, /* A key is missing (not a permuation). */ }; /* These counters are just for statistics. */ static unsigned long int verifier_read_success; static unsigned long int verifier_read_failure; static unsigned long int mismatch_counters[missing_key + 1]; /* Thread that continously reads the TM storage and checks for consistent read results. */ static void * verifier_thread (void *closure) { while (true) { /* Start of the transactional read region. */ unsigned long int start_version = __atomic_load_n (&stm_version, __ATOMIC_ACQUIRE); bool seen[key_count] = { 0 }; struct storage *st = &storage_versions[start_version & 1]; enum mismatch_kind mismatch = none; for (int i = 0; i < key_count; ++i) { unsigned long int key = __atomic_load_n (&st->keys[i], __ATOMIC_RELAXED); if (key >= key_count) { mismatch = key_range; break; } if (seen[key]) { mismatch = duplicate_key; break; } seen[key] = true; if (key + value_offset != __atomic_load_n (&st->values[i], __ATOMIC_RELAXED)) { mismatch = wrong_value; break; } } if (mismatch == none) for (int i = 0; i < key_count; ++i) if (!seen[i]) mismatch = missing_key; /* End of the transaction read region. */ unsigned long int end_version = __atomic_load_n (&stm_version, __ATOMIC_ACQUIRE); if (start_version == end_version) { /* If the version is unchanged, the read was successful. Any inconsistency is an algorithmic failure. */ __atomic_fetch_add (&verifier_read_success, 1, __ATOMIC_RELAXED); if (mismatch != none) errx (2, "mismatch: %d", (int) mismatch); } else __atomic_fetch_add (&verifier_read_failure, 1, __ATOMIC_RELAXED); __atomic_fetch_add (&mismatch_counters[mismatch], 1, __ATOMIC_RELAXED); } } static void * reporter_thread (void *closure) { while (true) { usleep (1000 * 1000); printf ("stm_version=%lu success=%lu failure=%lu\n" " range=%lu duplicate=%lu value=%lu missing=%lu\n", __atomic_load_n (&stm_version, __ATOMIC_RELAXED), __atomic_load_n (&verifier_read_success, __ATOMIC_RELAXED), __atomic_load_n (&verifier_read_failure, __ATOMIC_RELAXED), __atomic_load_n (&mismatch_counters[key_range], __ATOMIC_RELAXED), __atomic_load_n (&mismatch_counters[duplicate_key], __ATOMIC_RELAXED), __atomic_load_n (&mismatch_counters[wrong_value], __ATOMIC_RELAXED), __atomic_load_n (&mismatch_counters[missing_key], __ATOMIC_RELAXED)); } return NULL; } struct drand48_data rand_state; /* Write a random permutation to *ST. This simulates a workload that produces inconsistency during updates. */ static void update_storage (struct storage *st) { /* Fisher-Yates shuffle. Very slight bias due to %. */ unsigned long int permutation[key_count]; for (int i = 0; i < key_count; ++i) permutation[i] = i; for (int i = 0; i < key_count - 1; ++i) { long int r; lrand48_r (&rand_state, &r); int j = i + (r % (key_count - i)); unsigned long int tmp = permutation[i]; permutation[i] = permutation[j]; permutation[j] = tmp; } for (int i = 0; i < key_count; ++i) { st->keys[i] = permutation[i]; st->values[i] = permutation[i] + value_offset; } } int main (int argc, char **argv) { int thread_count; if (argc != 2 || (thread_count = atoi (argv[1])) <= 0) { fprintf (stderr, "usage: %s THREAD-COUNT\n", argv[0]); return 1; } srand48_r (1, &rand_state); update_storage (&storage_versions[0]); for (int i = 0; i < thread_count; ++i) { pthread_t thr; check_pthread ("pthread_create", pthread_create (&thr, NULL, verifier_thread, NULL)); } { pthread_t thr; check_pthread ("pthread_create", pthread_create (&thr, NULL, reporter_thread, NULL)); } while (true) { /* Start new STM round, but do not switch versions yet. */ unsigned long int ver = __atomic_fetch_add (&stm_version, 2, __ATOMIC_RELEASE); update_storage (&storage_versions[(ver + 1) & 1]); /* Commit and switch versions. */ __atomic_fetch_add (&stm_version, 1, __ATOMIC_RELEASE); } }