303 lines
9.8 KiB
C
303 lines
9.8 KiB
C
// testActor.c -- the actor dispatch machinery, engine-free.
|
|
//
|
|
// Uses synthetic contexts (engine == NULL) and plain C callables (calogFnCreate) to
|
|
// exercise the cross-thread invoke/reply path, the always-live nested pump (the
|
|
// re-entrant A->B->A deadlock test), the generationed registry (a callable owned by
|
|
// a closed+recycled context is rejected as dead, not misrouted), and concurrent
|
|
// drivers. calogFnInvoke on the host thread blocks-and-pumps the host queue while it
|
|
// waits, so no explicit calogPump is needed here. Built under ASan+UBSan and, via
|
|
// `make tsan`, ThreadSanitizer.
|
|
|
|
#define _POSIX_C_SOURCE 200809L
|
|
|
|
#include "calogInternal.h"
|
|
|
|
#include <pthread.h>
|
|
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
#include <string.h>
|
|
|
|
#define CHECK(cond, msg) checkImpl((cond), (msg), __FILE__, __LINE__)
|
|
|
|
#define DRIVER_COUNT 4
|
|
#define STRESS_ITERATIONS 100
|
|
#define INNER_VALUE 100
|
|
#define REENTRANT_EXPECTED 101
|
|
|
|
static CalogT *calog = NULL;
|
|
static CalogContextT *ctx1 = NULL;
|
|
static CalogContextT *ctx2 = NULL;
|
|
static CalogFnT *innerA = NULL; // owned by ctx1: returns INNER_VALUE
|
|
static CalogFnT *relayB = NULL; // owned by ctx2: innerA() + 1
|
|
static CalogFnT *entryA = NULL; // owned by ctx1: relayB()
|
|
static CalogFnT *whoFn = NULL; // owned by ctx1: returns its running context id
|
|
static int32_t testsRun = 0;
|
|
static int32_t testsFailed = 0;
|
|
|
|
static void checkImpl(bool condition, const char *message, const char *file, int32_t line);
|
|
static void *driver(void *arg);
|
|
static int32_t fnEntryA(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData);
|
|
static int32_t fnInnerA(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData);
|
|
static int32_t fnRelayB(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData);
|
|
static int32_t fnWho(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData);
|
|
static void *runtimeThread(void *arg);
|
|
static void testConcurrentRuntimes(void);
|
|
static void testConcurrentStress(void);
|
|
static void testCrossThreadInvoke(void);
|
|
static void testDestroyClosesOpenContexts(void);
|
|
static void testGeneration(void);
|
|
static void testReentrant(void);
|
|
|
|
|
|
static void checkImpl(bool condition, const char *message, const char *file, int32_t line) {
|
|
testsRun++;
|
|
if (!condition) {
|
|
testsFailed++;
|
|
printf("FAIL %s:%d %s\n", file, line, message);
|
|
}
|
|
}
|
|
|
|
|
|
static void *driver(void *arg) {
|
|
int32_t *failures;
|
|
int32_t index;
|
|
|
|
failures = (int32_t *)arg;
|
|
for (index = 0; index < STRESS_ITERATIONS; index++) {
|
|
CalogValueT result;
|
|
int32_t status;
|
|
status = calogFnInvoke(whoFn, NULL, 0, &result);
|
|
if (status != calogOkE || result.type != calogIntE || result.as.i != (int64_t)calogContextId(ctx1)) {
|
|
(*failures)++;
|
|
}
|
|
calogValueFree(&result);
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
|
|
static int32_t fnEntryA(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData) {
|
|
CalogValueT inner;
|
|
int32_t status;
|
|
|
|
(void)args;
|
|
(void)argCount;
|
|
(void)userData;
|
|
// Runs on ctx1; calls relayB (ctx2), which calls back into ctx1 -- the deadlock
|
|
// test, resolved by ctx1's nested pump.
|
|
status = calogFnInvoke(relayB, NULL, 0, &inner);
|
|
if (status != calogOkE) {
|
|
calogValueFree(&inner);
|
|
return calogFail(result, status, "entryA: relayB failed");
|
|
}
|
|
calogValueMove(result, &inner);
|
|
return calogOkE;
|
|
}
|
|
|
|
|
|
static int32_t fnInnerA(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData) {
|
|
(void)args;
|
|
(void)argCount;
|
|
(void)userData;
|
|
calogValueInt(result, INNER_VALUE);
|
|
return calogOkE;
|
|
}
|
|
|
|
|
|
static int32_t fnRelayB(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData) {
|
|
CalogValueT inner;
|
|
int32_t status;
|
|
|
|
(void)args;
|
|
(void)argCount;
|
|
(void)userData;
|
|
status = calogFnInvoke(innerA, NULL, 0, &inner);
|
|
if (status != calogOkE) {
|
|
calogValueFree(&inner);
|
|
return calogFail(result, status, "relayB: innerA failed");
|
|
}
|
|
calogValueInt(result, inner.as.i + 1);
|
|
calogValueFree(&inner);
|
|
return calogOkE;
|
|
}
|
|
|
|
|
|
static int32_t fnWho(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData) {
|
|
(void)args;
|
|
(void)argCount;
|
|
(void)userData;
|
|
calogValueInt(result, (int64_t)calogCurrentId());
|
|
return calogOkE;
|
|
}
|
|
|
|
|
|
static void *runtimeThread(void *arg) {
|
|
int32_t *ok;
|
|
CalogT *rt;
|
|
CalogContextT *ctx;
|
|
CalogFnT *fn;
|
|
CalogValueT result;
|
|
int32_t status;
|
|
|
|
// Each thread creates, drives, and destroys its OWN independent runtime -- no
|
|
// process-global state is shared. A callable owned by this runtime's context must
|
|
// route to that context's thread and back, entirely within this runtime.
|
|
ok = (int32_t *)arg;
|
|
rt = calogCreate();
|
|
if (rt == NULL) {
|
|
return NULL;
|
|
}
|
|
ctx = calogContextOpen(rt, NULL);
|
|
if (ctx == NULL) {
|
|
calogDestroy(rt);
|
|
return NULL;
|
|
}
|
|
calogFnCreate(&fn, rt, fnWho, NULL, NULL, calogContextId(ctx));
|
|
status = calogFnInvoke(fn, NULL, 0, &result);
|
|
if (status == calogOkE && result.type == calogIntE && result.as.i == (int64_t)calogContextId(ctx)) {
|
|
*ok = 1;
|
|
}
|
|
calogValueFree(&result);
|
|
calogFnRelease(fn);
|
|
calogContextClose(ctx);
|
|
calogDestroy(rt);
|
|
return NULL;
|
|
}
|
|
|
|
|
|
static void testConcurrentRuntimes(void) {
|
|
pthread_t threads[DRIVER_COUNT];
|
|
int32_t oks[DRIVER_COUNT];
|
|
int32_t index;
|
|
|
|
for (index = 0; index < DRIVER_COUNT; index++) {
|
|
oks[index] = 0;
|
|
pthread_create(&threads[index], NULL, runtimeThread, &oks[index]);
|
|
}
|
|
for (index = 0; index < DRIVER_COUNT; index++) {
|
|
pthread_join(threads[index], NULL);
|
|
CHECK(oks[index] == 1, "an independent runtime on its own host thread dispatched correctly");
|
|
}
|
|
}
|
|
|
|
|
|
static void testConcurrentStress(void) {
|
|
pthread_t threads[DRIVER_COUNT];
|
|
int32_t failures[DRIVER_COUNT];
|
|
int32_t index;
|
|
int32_t total;
|
|
|
|
for (index = 0; index < DRIVER_COUNT; index++) {
|
|
failures[index] = 0;
|
|
pthread_create(&threads[index], NULL, driver, &failures[index]);
|
|
}
|
|
total = 0;
|
|
for (index = 0; index < DRIVER_COUNT; index++) {
|
|
pthread_join(threads[index], NULL);
|
|
total += failures[index];
|
|
}
|
|
CHECK(total == 0, "concurrent drivers: every callable invoke ran on ctx1 and returned its id");
|
|
}
|
|
|
|
|
|
static void testCrossThreadInvoke(void) {
|
|
CalogValueT result;
|
|
int32_t status;
|
|
|
|
// whoFn is owned by ctx1; invoking it from the host thread must run it on ctx1.
|
|
status = calogFnInvoke(whoFn, NULL, 0, &result);
|
|
CHECK(status == calogOkE && result.type == calogIntE && result.as.i == (int64_t)calogContextId(ctx1), "callable invoke routed to its owner context's thread");
|
|
calogValueFree(&result);
|
|
}
|
|
|
|
|
|
static void testGeneration(void) {
|
|
CalogContextT *tmp;
|
|
CalogFnT *orphan;
|
|
CalogValueT result;
|
|
uint64_t staleId;
|
|
int32_t status;
|
|
|
|
tmp = calogContextOpen(calog, NULL);
|
|
staleId = calogContextId(tmp);
|
|
status = calogFnCreate(&orphan, calog, fnWho, NULL, NULL, staleId);
|
|
CHECK(status == calogOkE, "created a callable owned by a soon-to-close context");
|
|
|
|
calogContextClose(tmp);
|
|
|
|
// A new context recycles tmp's slot with a bumped generation.
|
|
tmp = calogContextOpen(calog, NULL);
|
|
CHECK(calogContextId(tmp) != staleId, "recycled slot yields a new generationed id");
|
|
|
|
// Invoking the orphan (old id) must fail dead, not misroute to the recycler.
|
|
status = calogFnInvoke(orphan, NULL, 0, &result);
|
|
CHECK(status == calogErrDeadE, "callable of a recycled context is rejected as dead, not misrouted");
|
|
calogValueFree(&result);
|
|
|
|
calogFnRelease(orphan);
|
|
calogContextClose(tmp);
|
|
}
|
|
|
|
|
|
static void testReentrant(void) {
|
|
CalogValueT result;
|
|
int32_t status;
|
|
|
|
// host -> entryA(ctx1) -> relayB(ctx2) -> innerA(ctx1): the re-entrant call back
|
|
// into ctx1 must be serviced by its nested pump (no deadlock).
|
|
status = calogFnInvoke(entryA, NULL, 0, &result);
|
|
CHECK(status == calogOkE && result.type == calogIntE && result.as.i == REENTRANT_EXPECTED, "re-entrant A->B->A resolved without deadlock");
|
|
calogValueFree(&result);
|
|
}
|
|
|
|
|
|
static void testDestroyClosesOpenContexts(void) {
|
|
int32_t i;
|
|
|
|
// Open several contexts and deliberately DO NOT close them: because CalogT owns
|
|
// its active-context list, calogDestroy (in main) must close them all. ASan
|
|
// verifies there is no leak at exit -- this is the cleanup guarantee.
|
|
for (i = 0; i < 32; i++) {
|
|
CalogContextT *leaked;
|
|
leaked = calogContextOpen(calog, NULL);
|
|
CHECK(leaked != NULL, "opened a context left for calogDestroy to close");
|
|
}
|
|
}
|
|
|
|
|
|
int main(void) {
|
|
calog = calogCreate();
|
|
if (calog == NULL) {
|
|
printf("calog create failed\n");
|
|
return 1;
|
|
}
|
|
ctx1 = calogContextOpen(calog, NULL);
|
|
ctx2 = calogContextOpen(calog, NULL);
|
|
|
|
calogFnCreate(&whoFn, calog, fnWho, NULL, NULL, calogContextId(ctx1));
|
|
calogFnCreate(&innerA, calog, fnInnerA, NULL, NULL, calogContextId(ctx1));
|
|
calogFnCreate(&relayB, calog, fnRelayB, NULL, NULL, calogContextId(ctx2));
|
|
calogFnCreate(&entryA, calog, fnEntryA, NULL, NULL, calogContextId(ctx1));
|
|
|
|
testCrossThreadInvoke();
|
|
testReentrant();
|
|
testConcurrentStress();
|
|
testGeneration();
|
|
testDestroyClosesOpenContexts();
|
|
testConcurrentRuntimes();
|
|
|
|
calogFnRelease(whoFn);
|
|
calogFnRelease(innerA);
|
|
calogFnRelease(relayB);
|
|
calogFnRelease(entryA);
|
|
calogContextClose(ctx1);
|
|
calogContextClose(ctx2);
|
|
calogDestroy(calog);
|
|
|
|
printf("\n%d checks, %d failed\n", testsRun, testsFailed);
|
|
fflush(stdout);
|
|
if (testsFailed != 0) {
|
|
return 1;
|
|
}
|
|
return 0;
|
|
}
|