// testActor.c -- the actor dispatch machinery, engine-free. // // Uses synthetic contexts (engine == NULL) and plain C callables (calogFnCreate) to // exercise the cross-thread invoke/reply path, the always-live nested pump (the // re-entrant A->B->A deadlock test), the generationed registry (a callable owned by // a closed+recycled context is rejected as dead, not misrouted), and concurrent // drivers. calogFnInvoke on the host thread blocks-and-pumps the host queue while it // waits, so no explicit calogPump is needed here. Built under ASan+UBSan and, via // `make tsan`, ThreadSanitizer. #define _POSIX_C_SOURCE 200809L #include "calogInternal.h" #include #include #include #include #define CHECK(cond, msg) checkImpl((cond), (msg), __FILE__, __LINE__) #define DRIVER_COUNT 4 #define STRESS_ITERATIONS 100 #define INNER_VALUE 100 #define REENTRANT_EXPECTED 101 static CalogT *calog = NULL; static CalogContextT *ctx1 = NULL; static CalogContextT *ctx2 = NULL; static CalogFnT *innerA = NULL; // owned by ctx1: returns INNER_VALUE static CalogFnT *relayB = NULL; // owned by ctx2: innerA() + 1 static CalogFnT *entryA = NULL; // owned by ctx1: relayB() static CalogFnT *whoFn = NULL; // owned by ctx1: returns its running context id static int32_t testsRun = 0; static int32_t testsFailed = 0; static void checkImpl(bool condition, const char *message, const char *file, int32_t line); static void *driver(void *arg); static int32_t fnEntryA(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData); static int32_t fnInnerA(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData); static int32_t fnRelayB(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData); static int32_t fnWho(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData); static void *runtimeThread(void *arg); static void testConcurrentRuntimes(void); static void testConcurrentStress(void); static void testCrossThreadInvoke(void); static void testDestroyClosesOpenContexts(void); static void testGeneration(void); static void testReentrant(void); static void checkImpl(bool condition, const char *message, const char *file, int32_t line) { testsRun++; if (!condition) { testsFailed++; printf("FAIL %s:%d %s\n", file, line, message); } } static void *driver(void *arg) { int32_t *failures; int32_t index; failures = (int32_t *)arg; for (index = 0; index < STRESS_ITERATIONS; index++) { CalogValueT result; int32_t status; status = calogFnInvoke(whoFn, NULL, 0, &result); if (status != calogOkE || result.type != calogIntE || result.as.i != (int64_t)calogContextId(ctx1)) { (*failures)++; } calogValueFree(&result); } return NULL; } static int32_t fnEntryA(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData) { CalogValueT inner; int32_t status; (void)args; (void)argCount; (void)userData; // Runs on ctx1; calls relayB (ctx2), which calls back into ctx1 -- the deadlock // test, resolved by ctx1's nested pump. status = calogFnInvoke(relayB, NULL, 0, &inner); if (status != calogOkE) { calogValueFree(&inner); return calogFail(result, status, "entryA: relayB failed"); } calogValueMove(result, &inner); return calogOkE; } static int32_t fnInnerA(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData) { (void)args; (void)argCount; (void)userData; calogValueInt(result, INNER_VALUE); return calogOkE; } static int32_t fnRelayB(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData) { CalogValueT inner; int32_t status; (void)args; (void)argCount; (void)userData; status = calogFnInvoke(innerA, NULL, 0, &inner); if (status != calogOkE) { calogValueFree(&inner); return calogFail(result, status, "relayB: innerA failed"); } calogValueInt(result, inner.as.i + 1); calogValueFree(&inner); return calogOkE; } static int32_t fnWho(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData) { (void)args; (void)argCount; (void)userData; calogValueInt(result, (int64_t)calogCurrentId()); return calogOkE; } static void *runtimeThread(void *arg) { int32_t *ok; CalogT *rt; CalogContextT *ctx; CalogFnT *fn; CalogValueT result; int32_t status; // Each thread creates, drives, and destroys its OWN independent runtime -- no // process-global state is shared. A callable owned by this runtime's context must // route to that context's thread and back, entirely within this runtime. ok = (int32_t *)arg; rt = calogCreate(); if (rt == NULL) { return NULL; } ctx = calogContextOpen(rt, NULL); if (ctx == NULL) { calogDestroy(rt); return NULL; } calogFnCreate(&fn, rt, fnWho, NULL, NULL, calogContextId(ctx)); status = calogFnInvoke(fn, NULL, 0, &result); if (status == calogOkE && result.type == calogIntE && result.as.i == (int64_t)calogContextId(ctx)) { *ok = 1; } calogValueFree(&result); calogFnRelease(fn); calogContextClose(ctx); calogDestroy(rt); return NULL; } static void testConcurrentRuntimes(void) { pthread_t threads[DRIVER_COUNT]; int32_t oks[DRIVER_COUNT]; int32_t index; for (index = 0; index < DRIVER_COUNT; index++) { oks[index] = 0; pthread_create(&threads[index], NULL, runtimeThread, &oks[index]); } for (index = 0; index < DRIVER_COUNT; index++) { pthread_join(threads[index], NULL); CHECK(oks[index] == 1, "an independent runtime on its own host thread dispatched correctly"); } } static void testConcurrentStress(void) { pthread_t threads[DRIVER_COUNT]; int32_t failures[DRIVER_COUNT]; int32_t index; int32_t total; for (index = 0; index < DRIVER_COUNT; index++) { failures[index] = 0; pthread_create(&threads[index], NULL, driver, &failures[index]); } total = 0; for (index = 0; index < DRIVER_COUNT; index++) { pthread_join(threads[index], NULL); total += failures[index]; } CHECK(total == 0, "concurrent drivers: every callable invoke ran on ctx1 and returned its id"); } static void testCrossThreadInvoke(void) { CalogValueT result; int32_t status; // whoFn is owned by ctx1; invoking it from the host thread must run it on ctx1. status = calogFnInvoke(whoFn, NULL, 0, &result); CHECK(status == calogOkE && result.type == calogIntE && result.as.i == (int64_t)calogContextId(ctx1), "callable invoke routed to its owner context's thread"); calogValueFree(&result); } static void testGeneration(void) { CalogContextT *tmp; CalogFnT *orphan; CalogValueT result; uint64_t staleId; int32_t status; tmp = calogContextOpen(calog, NULL); staleId = calogContextId(tmp); status = calogFnCreate(&orphan, calog, fnWho, NULL, NULL, staleId); CHECK(status == calogOkE, "created a callable owned by a soon-to-close context"); calogContextClose(tmp); // A new context recycles tmp's slot with a bumped generation. tmp = calogContextOpen(calog, NULL); CHECK(calogContextId(tmp) != staleId, "recycled slot yields a new generationed id"); // Invoking the orphan (old id) must fail dead, not misroute to the recycler. status = calogFnInvoke(orphan, NULL, 0, &result); CHECK(status == calogErrDeadE, "callable of a recycled context is rejected as dead, not misrouted"); calogValueFree(&result); calogFnRelease(orphan); calogContextClose(tmp); } static void testReentrant(void) { CalogValueT result; int32_t status; // host -> entryA(ctx1) -> relayB(ctx2) -> innerA(ctx1): the re-entrant call back // into ctx1 must be serviced by its nested pump (no deadlock). status = calogFnInvoke(entryA, NULL, 0, &result); CHECK(status == calogOkE && result.type == calogIntE && result.as.i == REENTRANT_EXPECTED, "re-entrant A->B->A resolved without deadlock"); calogValueFree(&result); } static void testDestroyClosesOpenContexts(void) { int32_t i; // Open several contexts and deliberately DO NOT close them: because CalogT owns // its active-context list, calogDestroy (in main) must close them all. ASan // verifies there is no leak at exit -- this is the cleanup guarantee. for (i = 0; i < 32; i++) { CalogContextT *leaked; leaked = calogContextOpen(calog, NULL); CHECK(leaked != NULL, "opened a context left for calogDestroy to close"); } } int main(void) { calog = calogCreate(); if (calog == NULL) { printf("calog create failed\n"); return 1; } ctx1 = calogContextOpen(calog, NULL); ctx2 = calogContextOpen(calog, NULL); calogFnCreate(&whoFn, calog, fnWho, NULL, NULL, calogContextId(ctx1)); calogFnCreate(&innerA, calog, fnInnerA, NULL, NULL, calogContextId(ctx1)); calogFnCreate(&relayB, calog, fnRelayB, NULL, NULL, calogContextId(ctx2)); calogFnCreate(&entryA, calog, fnEntryA, NULL, NULL, calogContextId(ctx1)); testCrossThreadInvoke(); testReentrant(); testConcurrentStress(); testGeneration(); testDestroyClosesOpenContexts(); testConcurrentRuntimes(); calogFnRelease(whoFn); calogFnRelease(innerA); calogFnRelease(relayB); calogFnRelease(entryA); calogContextClose(ctx1); calogContextClose(ctx2); calogDestroy(calog); printf("\n%d checks, %d failed\n", testsRun, testsFailed); fflush(stdout); if (testsFailed != 0) { return 1; } return 0; }