// testPubsub.c -- exercises the pubsub library. Two Lua contexts subscribe to topic "t"; a // third Lua context publishes. publish reaches both subscribers (count 2); after one context // unsubscribes, publish reaches only the other (count 1); a published table payload is // deep-copied and read back by the surviving subscriber; publishing an unheard topic returns // 0; and publishing after calogPubsubShutdown is safe (the static registry is empty). // // The publisher is deliberately a SEPARATE context that is not itself subscribed to "t": // publish runs inline on the publisher's thread and calogFnInvoke marshals each delivery to // the subscriber's own thread, so a publisher that was subscribed would block waiting for // itself (synchronous delivery is cyclic-deadlock-prone -- see calogPubsub.h). #define _POSIX_C_SOURCE 200809L #include "calog.h" #include "calogPubsub.h" #include #include #include #define CHECK(cond, msg) checkImpl((cond), (msg), __FILE__, __LINE__) #define PUMP_LIMIT 4000 #define RESULT_SLOTS 12 static CalogT *calog = NULL; static _Atomic int64_t results[RESULT_SLOTS]; static _Atomic int32_t doneCount = 0; static _Atomic int32_t errorCount = 0; static int32_t testsRun = 0; static int32_t testsFailed = 0; static int64_t asInt(const CalogValueT *value); static void checkImpl(bool condition, const char *message, const char *file, int32_t line); static int32_t nativeDone(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData); static int32_t nativeReport(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData); static void onError(uint64_t contextId, const char *message, void *userData); static void pumpUntilDone(int32_t target); static int64_t asInt(const CalogValueT *value) { if (value->type == calogIntE) { return value->as.i; } return (int64_t)value->as.r; } static void checkImpl(bool condition, const char *message, const char *file, int32_t line) { testsRun++; if (!condition) { testsFailed++; printf("FAIL %s:%d %s\n", file, line, message); } } static int32_t nativeDone(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData) { (void)args; (void)argCount; (void)userData; atomic_fetch_add(&doneCount, 1); calogValueNil(result); return calogOkE; } static int32_t nativeReport(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData) { int64_t tag; (void)userData; calogValueNil(result); if (argCount != 2 || (args[0].type != calogIntE && args[0].type != calogRealE) || (args[1].type != calogIntE && args[1].type != calogRealE)) { return calogFail(result, calogErrArgE, "report expects (tag, value)"); } tag = asInt(&args[0]); if (tag < 0 || tag >= RESULT_SLOTS) { return calogFail(result, calogErrArgE, "report: tag out of range"); } atomic_store(&results[tag], asInt(&args[1])); return calogOkE; } static void onError(uint64_t contextId, const char *message, void *userData) { (void)contextId; (void)userData; fprintf(stderr, " [script error] %s\n", (message != NULL) ? message : "(null)"); atomic_fetch_add(&errorCount, 1); } static void pumpUntilDone(int32_t target) { struct timespec ts = { 0, 500000 }; int i; for (i = 0; i < PUMP_LIMIT; i++) { calogPump(calog); if (atomic_load(&doneCount) >= target) { calogPump(calog); return; } nanosleep(&ts, NULL); } } int main(void) { CalogContextT *subA; CalogContextT *subB; CalogContextT *pub; int32_t i; calog = calogCreate(); if (calog == NULL) { printf("calog create failed\n"); return 1; } calogSetErrorHandler(calog, onError, NULL); calogRegister(calog, "report", nativeReport, NULL); calogRegister(calog, "done", nativeDone, NULL); if (calogPubsubRegister(calog) != calogOkE) { printf("calogPubsubRegister failed\n"); return 1; } for (i = 0; i < RESULT_SLOTS; i++) { atomic_store(&results[i], -1); } // Two independent Lua contexts each subscribe to "t". Each callback records the payload it // received into its own result slot (unwrapping a table's .v field so report always gets a // number). subA keeps its subscription id in a global so a later eval can unsubscribe it. subA = calogContextOpen(calog, &calogLuaEngine); calogContextEval(subA, "subAId = psSubscribe('t', function(m)\n" " if type(m) == 'table' then report(1, m.v) else report(1, m) end\n" "end)\n" "done()"); subB = calogContextOpen(calog, &calogLuaEngine); calogContextEval(subB, "psSubscribe('t', function(m)\n" " if type(m) == 'table' then report(2, m.v) else report(2, m) end\n" "end)\n" "done()"); pumpUntilDone(2); // A third context (NOT subscribed to "t") publishes a number: both subscribers run. pub = calogContextOpen(calog, &calogLuaEngine); calogContextEval(pub, "report(3, psPublish('t', 7))\n done()"); pumpUntilDone(3); CHECK(atomic_load(&results[3]) == 2, "publish reached both subscribers (count 2)"); CHECK(atomic_load(&results[1]) == 7, "subscriber A received the published number"); CHECK(atomic_load(&results[2]) == 7, "subscriber B received the published number"); // Unsubscribe A, then publish again: only B is invoked, and A's slot stays untouched. atomic_store(&results[1], -1); atomic_store(&results[2], -1); calogContextEval(subA, "psUnsubscribe(subAId)\n done()"); pumpUntilDone(4); calogContextEval(pub, "report(4, psPublish('t', 9))\n done()"); pumpUntilDone(5); CHECK(atomic_load(&results[4]) == 1, "after unsubscribe only one subscriber is invoked (count 1)"); CHECK(atomic_load(&results[2]) == 9, "the surviving subscriber received the number"); CHECK(atomic_load(&results[1]) == -1, "the unsubscribed subscriber was not invoked"); // Publish a table payload: publish deep-copies it, so B reads m.v out of its own copy. atomic_store(&results[2], -1); calogContextEval(pub, "report(6, psPublish('t', {v = 42}))\n done()"); pumpUntilDone(6); CHECK(atomic_load(&results[6]) == 1, "publishing a table reached the surviving subscriber (count 1)"); CHECK(atomic_load(&results[2]) == 42, "the subscriber received the deep-copied table payload"); // A topic nobody subscribed to delivers to no one. calogContextEval(pub, "report(7, psPublish('none', 5))\n done()"); pumpUntilDone(7); CHECK(atomic_load(&results[7]) == 0, "publishing to a topic with no subscribers returns 0"); CHECK(atomic_load(&errorCount) == 0, "no errors during pubsub"); // Release every subscribed callback while the contexts are still alive, then confirm a // publish after shutdown is safe and simply finds no subscribers. calogPubsubShutdown(); calogContextEval(pub, "report(11, psPublish('t', 1))\n done()"); pumpUntilDone(8); CHECK(atomic_load(&results[11]) == 0, "publishing after shutdown is safe and finds no subscribers"); calogContextClose(subA); calogContextClose(subB); calogContextClose(pub); calogDestroy(calog); printf("\n%d checks, %d failed\n", testsRun, testsFailed); fflush(stdout); if (testsFailed != 0) { return 1; } return 0; }