calog/tests/testActor.c

303 lines
9.8 KiB
C

// testActor.c -- the actor dispatch machinery, engine-free.
//
// Uses synthetic contexts (engine == NULL) and plain C callables (calogFnCreate) to
// exercise the cross-thread invoke/reply path, the always-live nested pump (the
// re-entrant A->B->A deadlock test), the generationed registry (a callable owned by
// a closed+recycled context is rejected as dead, not misrouted), and concurrent
// drivers. calogFnInvoke on the host thread blocks-and-pumps the host queue while it
// waits, so no explicit calogPump is needed here. Built under ASan+UBSan and, via
// `make tsan`, ThreadSanitizer.
#define _POSIX_C_SOURCE 200809L
#include "calogInternal.h"
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define CHECK(cond, msg) checkImpl((cond), (msg), __FILE__, __LINE__)
#define DRIVER_COUNT 4
#define STRESS_ITERATIONS 100
#define INNER_VALUE 100
#define REENTRANT_EXPECTED 101
static CalogT *calog = NULL;
static CalogContextT *ctx1 = NULL;
static CalogContextT *ctx2 = NULL;
static CalogFnT *innerA = NULL; // owned by ctx1: returns INNER_VALUE
static CalogFnT *relayB = NULL; // owned by ctx2: innerA() + 1
static CalogFnT *entryA = NULL; // owned by ctx1: relayB()
static CalogFnT *whoFn = NULL; // owned by ctx1: returns its running context id
static int32_t testsRun = 0;
static int32_t testsFailed = 0;
static void checkImpl(bool condition, const char *message, const char *file, int32_t line);
static void *driver(void *arg);
static int32_t fnEntryA(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData);
static int32_t fnInnerA(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData);
static int32_t fnRelayB(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData);
static int32_t fnWho(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData);
static void *runtimeThread(void *arg);
static void testConcurrentRuntimes(void);
static void testConcurrentStress(void);
static void testCrossThreadInvoke(void);
static void testDestroyClosesOpenContexts(void);
static void testGeneration(void);
static void testReentrant(void);
static void checkImpl(bool condition, const char *message, const char *file, int32_t line) {
testsRun++;
if (!condition) {
testsFailed++;
printf("FAIL %s:%d %s\n", file, line, message);
}
}
static void *driver(void *arg) {
int32_t *failures;
int32_t index;
failures = (int32_t *)arg;
for (index = 0; index < STRESS_ITERATIONS; index++) {
CalogValueT result;
int32_t status;
status = calogFnInvoke(whoFn, NULL, 0, &result);
if (status != calogOkE || result.type != calogIntE || result.as.i != (int64_t)calogContextId(ctx1)) {
(*failures)++;
}
calogValueFree(&result);
}
return NULL;
}
static int32_t fnEntryA(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData) {
CalogValueT inner;
int32_t status;
(void)args;
(void)argCount;
(void)userData;
// Runs on ctx1; calls relayB (ctx2), which calls back into ctx1 -- the deadlock
// test, resolved by ctx1's nested pump.
status = calogFnInvoke(relayB, NULL, 0, &inner);
if (status != calogOkE) {
calogValueFree(&inner);
return calogFail(result, status, "entryA: relayB failed");
}
calogValueMove(result, &inner);
return calogOkE;
}
static int32_t fnInnerA(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData) {
(void)args;
(void)argCount;
(void)userData;
calogValueInt(result, INNER_VALUE);
return calogOkE;
}
static int32_t fnRelayB(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData) {
CalogValueT inner;
int32_t status;
(void)args;
(void)argCount;
(void)userData;
status = calogFnInvoke(innerA, NULL, 0, &inner);
if (status != calogOkE) {
calogValueFree(&inner);
return calogFail(result, status, "relayB: innerA failed");
}
calogValueInt(result, inner.as.i + 1);
calogValueFree(&inner);
return calogOkE;
}
static int32_t fnWho(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData) {
(void)args;
(void)argCount;
(void)userData;
calogValueInt(result, (int64_t)calogCurrentId());
return calogOkE;
}
static void *runtimeThread(void *arg) {
int32_t *ok;
CalogT *rt;
CalogContextT *ctx;
CalogFnT *fn;
CalogValueT result;
int32_t status;
// Each thread creates, drives, and destroys its OWN independent runtime -- no
// process-global state is shared. A callable owned by this runtime's context must
// route to that context's thread and back, entirely within this runtime.
ok = (int32_t *)arg;
rt = calogCreate();
if (rt == NULL) {
return NULL;
}
ctx = calogContextOpen(rt, NULL);
if (ctx == NULL) {
calogDestroy(rt);
return NULL;
}
calogFnCreate(&fn, rt, fnWho, NULL, NULL, calogContextId(ctx));
status = calogFnInvoke(fn, NULL, 0, &result);
if (status == calogOkE && result.type == calogIntE && result.as.i == (int64_t)calogContextId(ctx)) {
*ok = 1;
}
calogValueFree(&result);
calogFnRelease(fn);
calogContextClose(ctx);
calogDestroy(rt);
return NULL;
}
static void testConcurrentRuntimes(void) {
pthread_t threads[DRIVER_COUNT];
int32_t oks[DRIVER_COUNT];
int32_t index;
for (index = 0; index < DRIVER_COUNT; index++) {
oks[index] = 0;
pthread_create(&threads[index], NULL, runtimeThread, &oks[index]);
}
for (index = 0; index < DRIVER_COUNT; index++) {
pthread_join(threads[index], NULL);
CHECK(oks[index] == 1, "an independent runtime on its own host thread dispatched correctly");
}
}
static void testConcurrentStress(void) {
pthread_t threads[DRIVER_COUNT];
int32_t failures[DRIVER_COUNT];
int32_t index;
int32_t total;
for (index = 0; index < DRIVER_COUNT; index++) {
failures[index] = 0;
pthread_create(&threads[index], NULL, driver, &failures[index]);
}
total = 0;
for (index = 0; index < DRIVER_COUNT; index++) {
pthread_join(threads[index], NULL);
total += failures[index];
}
CHECK(total == 0, "concurrent drivers: every callable invoke ran on ctx1 and returned its id");
}
static void testCrossThreadInvoke(void) {
CalogValueT result;
int32_t status;
// whoFn is owned by ctx1; invoking it from the host thread must run it on ctx1.
status = calogFnInvoke(whoFn, NULL, 0, &result);
CHECK(status == calogOkE && result.type == calogIntE && result.as.i == (int64_t)calogContextId(ctx1), "callable invoke routed to its owner context's thread");
calogValueFree(&result);
}
static void testGeneration(void) {
CalogContextT *tmp;
CalogFnT *orphan;
CalogValueT result;
uint64_t staleId;
int32_t status;
tmp = calogContextOpen(calog, NULL);
staleId = calogContextId(tmp);
status = calogFnCreate(&orphan, calog, fnWho, NULL, NULL, staleId);
CHECK(status == calogOkE, "created a callable owned by a soon-to-close context");
calogContextClose(tmp);
// A new context recycles tmp's slot with a bumped generation.
tmp = calogContextOpen(calog, NULL);
CHECK(calogContextId(tmp) != staleId, "recycled slot yields a new generationed id");
// Invoking the orphan (old id) must fail dead, not misroute to the recycler.
status = calogFnInvoke(orphan, NULL, 0, &result);
CHECK(status == calogErrDeadE, "callable of a recycled context is rejected as dead, not misrouted");
calogValueFree(&result);
calogFnRelease(orphan);
calogContextClose(tmp);
}
static void testReentrant(void) {
CalogValueT result;
int32_t status;
// host -> entryA(ctx1) -> relayB(ctx2) -> innerA(ctx1): the re-entrant call back
// into ctx1 must be serviced by its nested pump (no deadlock).
status = calogFnInvoke(entryA, NULL, 0, &result);
CHECK(status == calogOkE && result.type == calogIntE && result.as.i == REENTRANT_EXPECTED, "re-entrant A->B->A resolved without deadlock");
calogValueFree(&result);
}
static void testDestroyClosesOpenContexts(void) {
int32_t i;
// Open several contexts and deliberately DO NOT close them: because CalogT owns
// its active-context list, calogDestroy (in main) must close them all. ASan
// verifies there is no leak at exit -- this is the cleanup guarantee.
for (i = 0; i < 32; i++) {
CalogContextT *leaked;
leaked = calogContextOpen(calog, NULL);
CHECK(leaked != NULL, "opened a context left for calogDestroy to close");
}
}
int main(void) {
calog = calogCreate();
if (calog == NULL) {
printf("calog create failed\n");
return 1;
}
ctx1 = calogContextOpen(calog, NULL);
ctx2 = calogContextOpen(calog, NULL);
calogFnCreate(&whoFn, calog, fnWho, NULL, NULL, calogContextId(ctx1));
calogFnCreate(&innerA, calog, fnInnerA, NULL, NULL, calogContextId(ctx1));
calogFnCreate(&relayB, calog, fnRelayB, NULL, NULL, calogContextId(ctx2));
calogFnCreate(&entryA, calog, fnEntryA, NULL, NULL, calogContextId(ctx1));
testCrossThreadInvoke();
testReentrant();
testConcurrentStress();
testGeneration();
testDestroyClosesOpenContexts();
testConcurrentRuntimes();
calogFnRelease(whoFn);
calogFnRelease(innerA);
calogFnRelease(relayB);
calogFnRelease(entryA);
calogContextClose(ctx1);
calogContextClose(ctx2);
calogDestroy(calog);
printf("\n%d checks, %d failed\n", testsRun, testsFailed);
fflush(stdout);
if (testsFailed != 0) {
return 1;
}
return 0;
}