// testActor.c -- tests for the actor threading core using synthetic contexts. // // Synthetic contexts have no interpreter (engine == NULL); their "natives" are // plain C functions routed to the owning context's thread. This isolates the // threading machinery -- registry, queue, cross-context calls, the nested pump, // the reply token+stash, and shutdown -- from any engine. Built under ASan+UBSan // here and under ThreadSanitizer via `make tsan`. #include "calog.h" #include #include #include #include #define CHECK(cond, msg) checkImpl((cond), (msg), __FILE__, __LINE__) #define DRIVER_COUNT 4 #define STRESS_ITERATIONS 100 #define RELAY_EXPECTED 6 // ctx ids 1 + 2 + 3 #define REENTRANT_EXPECTED 101 #define BOOM_MESSAGE "boom went off" typedef struct DriverArgT { CalogT *broker; int32_t iterations; int32_t failures; } DriverArgT; static CalogT *broker = NULL; static CalogContextT *ctx1 = NULL; static CalogContextT *ctx2 = NULL; static CalogContextT *ctx3 = NULL; static int32_t testsRun = 0; static int32_t testsFailed = 0; static void checkImpl(bool condition, const char *message, const char *file, int32_t line); static void *driver(void *arg); static int32_t nativeAEntry(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData); static int32_t nativeAInner(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData); static int32_t nativeBoom(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData); static int32_t nativeBRelay(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData); static int32_t nativeRelay(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData); static int32_t nativeWhoAmI(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData); static void testConcurrentStress(void); static void testCrossContextCall(void); static void testErrorChannel(void); static void testGeneration(void); static void testReentrant(void); static void checkImpl(bool condition, const char *message, const char *file, int32_t line) { testsRun++; if (!condition) { testsFailed++; printf("FAIL %s:%d %s\n", file, line, message); } } static void *driver(void *arg) { DriverArgT *driverArg; int32_t index; driverArg = (DriverArgT *)arg; for (index = 0; index < driverArg->iterations; index++) { CalogValueT result; int32_t status; status = calogCall(driverArg->broker, "relay", NULL, 0, &result); if (status != calogOkE || result.type != calogIntE || result.as.i != RELAY_EXPECTED) { driverArg->failures++; } calogValueFree(&result); } return NULL; } static int32_t nativeAEntry(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData) { CalogT *callBroker; CalogValueT inner; int32_t status; (void)args; (void)argCount; (void)userData; callBroker = calogCurrent(); status = calogCall(callBroker, "bRelay", NULL, 0, &inner); if (status != calogOkE) { calogValueFree(&inner); return calogFail(result, status, "aEntry: bRelay failed"); } calogValueMove(result, &inner); return calogOkE; } static int32_t nativeAInner(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData) { (void)args; (void)argCount; (void)userData; calogValueInt(result, 100); return calogOkE; } static int32_t nativeBoom(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData) { (void)args; (void)argCount; (void)userData; // Fails on a context other than the caller's: exercises the single error // channel -- the message rides back in result, status carries the code. return calogFail(result, calogErrArgE, BOOM_MESSAGE); } static int32_t nativeBRelay(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData) { CalogT *callBroker; CalogValueT inner; int32_t status; (void)args; (void)argCount; (void)userData; // Calls back into context 1 while context 1 is blocked waiting on this call: // the deadlock test. Context 1 services it via the nested pump. callBroker = calogCurrent(); status = calogCall(callBroker, "aInner", NULL, 0, &inner); if (status != calogOkE) { calogValueFree(&inner); return calogFail(result, status, "bRelay: aInner failed"); } calogValueInt(result, inner.as.i + 1); calogValueFree(&inner); return calogOkE; } static int32_t nativeRelay(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData) { CalogT *callBroker; CalogValueT v2; CalogValueT v3; int32_t status; (void)args; (void)argCount; (void)userData; callBroker = calogCurrent(); status = calogCall(callBroker, "who2", NULL, 0, &v2); if (status != calogOkE) { calogValueFree(&v2); return calogFail(result, status, "relay: who2 failed"); } status = calogCall(callBroker, "who3", NULL, 0, &v3); if (status != calogOkE) { calogValueFree(&v2); calogValueFree(&v3); return calogFail(result, status, "relay: who3 failed"); } calogValueInt(result, (int64_t)calogCurrentId() + v2.as.i + v3.as.i); calogValueFree(&v2); calogValueFree(&v3); return calogOkE; } static int32_t nativeWhoAmI(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData) { (void)args; (void)argCount; (void)userData; calogValueInt(result, (int64_t)calogCurrentId()); return calogOkE; } static void testConcurrentStress(void) { pthread_t threads[DRIVER_COUNT]; DriverArgT driverArgs[DRIVER_COUNT]; int32_t index; int32_t totalFailures; for (index = 0; index < DRIVER_COUNT; index++) { driverArgs[index].broker = broker; driverArgs[index].iterations = STRESS_ITERATIONS; driverArgs[index].failures = 0; pthread_create(&threads[index], NULL, driver, &driverArgs[index]); } totalFailures = 0; for (index = 0; index < DRIVER_COUNT; index++) { pthread_join(threads[index], NULL); totalFailures += driverArgs[index].failures; } CHECK(totalFailures == 0, "concurrent drivers: every fan-out call returned the expected sum"); } static void testCrossContextCall(void) { CalogValueT result; int32_t status; status = calogCall(broker, "who1", NULL, 0, &result); CHECK(status == calogOkE && result.type == calogIntE && result.as.i == 1, "call ran on context 1's thread"); calogValueFree(&result); status = calogCall(broker, "who2", NULL, 0, &result); CHECK(status == calogOkE && result.type == calogIntE && result.as.i == 2, "call ran on context 2's thread"); calogValueFree(&result); status = calogCall(broker, "who3", NULL, 0, &result); CHECK(status == calogOkE && result.type == calogIntE && result.as.i == 3, "call ran on context 3's thread"); calogValueFree(&result); } static void testErrorChannel(void) { CalogValueT result; int32_t status; // A cross-context call that fails must surface its message through result // (the single error channel) and the failure status -- freed once, no leak. status = calogCall(broker, "boom", NULL, 0, &result); CHECK(status == calogErrArgE && result.type == calogStringE && strcmp(result.as.s.bytes, BOOM_MESSAGE) == 0, "cross-context error rides back in result, freed once"); calogValueFree(&result); } static void testGeneration(void) { CalogContextT *tmpA; CalogContextT *tmpB; uint32_t staleId; CalogValueT result; int32_t status; calogContextCreate(broker, NULL, NULL, &tmpA); calogContextStart(tmpA); staleId = calogContextId(tmpA); calogContextDestroy(tmpA); // The next create recycles tmpA's slot but with a bumped generation, so its // id differs from the destroyed one. calogContextCreate(broker, NULL, NULL, &tmpB); CHECK(calogContextId(tmpB) != staleId, "recycled slot yields a new generationed id"); calogContextStart(tmpB); // A binding still naming the destroyed context's id must fail dead, never // misroute to the context that recycled the slot. calogRegister(broker, "ghost", nativeWhoAmI, NULL, staleId); status = calogCall(broker, "ghost", NULL, 0, &result); CHECK(status == calogErrDeadE, "call to a recycled id is rejected as dead, not misrouted"); calogValueFree(&result); calogContextDestroy(tmpB); } static void testReentrant(void) { CalogValueT result; int32_t status; // 1 -> 2 -> 1: context 1 calls context 2, which calls back into context 1 // while context 1 is blocked. The nested pump must service it (no deadlock). status = calogCall(broker, "aEntry", NULL, 0, &result); CHECK(status == calogOkE && result.type == calogIntE && result.as.i == REENTRANT_EXPECTED, "re-entrant A->B->A resolved without deadlock"); calogValueFree(&result); } int main(void) { broker = calogCreate(); if (broker == NULL) { printf("broker create failed\n"); return 1; } calogContextCreate(broker, NULL, NULL, &ctx1); calogContextCreate(broker, NULL, NULL, &ctx2); calogContextCreate(broker, NULL, NULL, &ctx3); calogRegister(broker, "who1", nativeWhoAmI, NULL, calogContextId(ctx1)); calogRegister(broker, "who2", nativeWhoAmI, NULL, calogContextId(ctx2)); calogRegister(broker, "who3", nativeWhoAmI, NULL, calogContextId(ctx3)); calogRegister(broker, "aEntry", nativeAEntry, NULL, calogContextId(ctx1)); calogRegister(broker, "bRelay", nativeBRelay, NULL, calogContextId(ctx2)); calogRegister(broker, "aInner", nativeAInner, NULL, calogContextId(ctx1)); calogRegister(broker, "relay", nativeRelay, NULL, calogContextId(ctx1)); calogRegister(broker, "boom", nativeBoom, NULL, calogContextId(ctx2)); calogContextStart(ctx1); calogContextStart(ctx2); calogContextStart(ctx3); testCrossContextCall(); testReentrant(); testConcurrentStress(); testErrorChannel(); testGeneration(); calogDestroy(broker); printf("\n%d checks, %d failed\n", testsRun, testsFailed); fflush(stdout); if (testsFailed != 0) { return 1; } return 0; }