201 lines
7.3 KiB
C
201 lines
7.3 KiB
C
// testPubsub.c -- exercises the pubsub library. Two Lua contexts subscribe to topic "t"; a
|
|
// third Lua context publishes. publish reaches both subscribers (count 2); after one context
|
|
// unsubscribes, publish reaches only the other (count 1); a published table payload is
|
|
// deep-copied and read back by the surviving subscriber; publishing an unheard topic returns
|
|
// 0; and publishing after calogPubsubShutdown is safe (the static registry is empty).
|
|
//
|
|
// The publisher is deliberately a SEPARATE context that is not itself subscribed to "t":
|
|
// publish runs inline on the publisher's thread and calogFnInvoke marshals each delivery to
|
|
// the subscriber's own thread, so a publisher that was subscribed would block waiting for
|
|
// itself (synchronous delivery is cyclic-deadlock-prone -- see calogPubsub.h).
|
|
|
|
#define _POSIX_C_SOURCE 200809L
|
|
|
|
#include "calog.h"
|
|
#include "calogPubsub.h"
|
|
|
|
#include <stdatomic.h>
|
|
#include <stdio.h>
|
|
#include <time.h>
|
|
|
|
#define CHECK(cond, msg) checkImpl((cond), (msg), __FILE__, __LINE__)
|
|
|
|
#define PUMP_LIMIT 4000
|
|
#define RESULT_SLOTS 12
|
|
|
|
static CalogT *calog = NULL;
|
|
static _Atomic int64_t results[RESULT_SLOTS];
|
|
static _Atomic int32_t doneCount = 0;
|
|
static _Atomic int32_t errorCount = 0;
|
|
static int32_t testsRun = 0;
|
|
static int32_t testsFailed = 0;
|
|
|
|
static int64_t asInt(const CalogValueT *value);
|
|
static void checkImpl(bool condition, const char *message, const char *file, int32_t line);
|
|
static int32_t nativeDone(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData);
|
|
static int32_t nativeReport(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData);
|
|
static void onError(uint64_t contextId, const char *message, void *userData);
|
|
static void pumpUntilDone(int32_t target);
|
|
|
|
|
|
static int64_t asInt(const CalogValueT *value) {
|
|
if (value->type == calogIntE) {
|
|
return value->as.i;
|
|
}
|
|
return (int64_t)value->as.r;
|
|
}
|
|
|
|
|
|
static void checkImpl(bool condition, const char *message, const char *file, int32_t line) {
|
|
testsRun++;
|
|
if (!condition) {
|
|
testsFailed++;
|
|
printf("FAIL %s:%d %s\n", file, line, message);
|
|
}
|
|
}
|
|
|
|
|
|
static int32_t nativeDone(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData) {
|
|
(void)args;
|
|
(void)argCount;
|
|
(void)userData;
|
|
atomic_fetch_add(&doneCount, 1);
|
|
calogValueNil(result);
|
|
return calogOkE;
|
|
}
|
|
|
|
|
|
static int32_t nativeReport(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData) {
|
|
int64_t tag;
|
|
|
|
(void)userData;
|
|
calogValueNil(result);
|
|
if (argCount != 2 || (args[0].type != calogIntE && args[0].type != calogRealE) || (args[1].type != calogIntE && args[1].type != calogRealE)) {
|
|
return calogFail(result, calogErrArgE, "report expects (tag, value)");
|
|
}
|
|
tag = asInt(&args[0]);
|
|
if (tag < 0 || tag >= RESULT_SLOTS) {
|
|
return calogFail(result, calogErrArgE, "report: tag out of range");
|
|
}
|
|
atomic_store(&results[tag], asInt(&args[1]));
|
|
return calogOkE;
|
|
}
|
|
|
|
|
|
static void onError(uint64_t contextId, const char *message, void *userData) {
|
|
(void)contextId;
|
|
(void)userData;
|
|
fprintf(stderr, " [script error] %s\n", (message != NULL) ? message : "(null)");
|
|
atomic_fetch_add(&errorCount, 1);
|
|
}
|
|
|
|
|
|
static void pumpUntilDone(int32_t target) {
|
|
struct timespec ts = { 0, 500000 };
|
|
int i;
|
|
|
|
for (i = 0; i < PUMP_LIMIT; i++) {
|
|
calogPump(calog);
|
|
if (atomic_load(&doneCount) >= target) {
|
|
calogPump(calog);
|
|
return;
|
|
}
|
|
nanosleep(&ts, NULL);
|
|
}
|
|
}
|
|
|
|
|
|
int main(void) {
|
|
CalogContextT *subA;
|
|
CalogContextT *subB;
|
|
CalogContextT *pub;
|
|
int32_t i;
|
|
|
|
calog = calogCreate();
|
|
if (calog == NULL) {
|
|
printf("calog create failed\n");
|
|
return 1;
|
|
}
|
|
calogSetErrorHandler(calog, onError, NULL);
|
|
calogRegister(calog, "report", nativeReport, NULL);
|
|
calogRegister(calog, "done", nativeDone, NULL);
|
|
if (calogPubsubRegister(calog) != calogOkE) {
|
|
printf("calogPubsubRegister failed\n");
|
|
return 1;
|
|
}
|
|
for (i = 0; i < RESULT_SLOTS; i++) {
|
|
atomic_store(&results[i], -1);
|
|
}
|
|
|
|
// Two independent Lua contexts each subscribe to "t". Each callback records the payload it
|
|
// received into its own result slot (unwrapping a table's .v field so report always gets a
|
|
// number). subA keeps its subscription id in a global so a later eval can unsubscribe it.
|
|
subA = calogContextOpen(calog, &calogLuaEngine);
|
|
calogContextEval(subA,
|
|
"subAId = psSubscribe('t', function(m)\n"
|
|
" if type(m) == 'table' then report(1, m.v) else report(1, m) end\n"
|
|
"end)\n"
|
|
"done()");
|
|
subB = calogContextOpen(calog, &calogLuaEngine);
|
|
calogContextEval(subB,
|
|
"psSubscribe('t', function(m)\n"
|
|
" if type(m) == 'table' then report(2, m.v) else report(2, m) end\n"
|
|
"end)\n"
|
|
"done()");
|
|
pumpUntilDone(2);
|
|
|
|
// A third context (NOT subscribed to "t") publishes a number: both subscribers run.
|
|
pub = calogContextOpen(calog, &calogLuaEngine);
|
|
calogContextEval(pub, "report(3, psPublish('t', 7))\n done()");
|
|
pumpUntilDone(3);
|
|
|
|
CHECK(atomic_load(&results[3]) == 2, "publish reached both subscribers (count 2)");
|
|
CHECK(atomic_load(&results[1]) == 7, "subscriber A received the published number");
|
|
CHECK(atomic_load(&results[2]) == 7, "subscriber B received the published number");
|
|
|
|
// Unsubscribe A, then publish again: only B is invoked, and A's slot stays untouched.
|
|
atomic_store(&results[1], -1);
|
|
atomic_store(&results[2], -1);
|
|
calogContextEval(subA, "psUnsubscribe(subAId)\n done()");
|
|
pumpUntilDone(4);
|
|
calogContextEval(pub, "report(4, psPublish('t', 9))\n done()");
|
|
pumpUntilDone(5);
|
|
|
|
CHECK(atomic_load(&results[4]) == 1, "after unsubscribe only one subscriber is invoked (count 1)");
|
|
CHECK(atomic_load(&results[2]) == 9, "the surviving subscriber received the number");
|
|
CHECK(atomic_load(&results[1]) == -1, "the unsubscribed subscriber was not invoked");
|
|
|
|
// Publish a table payload: publish deep-copies it, so B reads m.v out of its own copy.
|
|
atomic_store(&results[2], -1);
|
|
calogContextEval(pub, "report(6, psPublish('t', {v = 42}))\n done()");
|
|
pumpUntilDone(6);
|
|
|
|
CHECK(atomic_load(&results[6]) == 1, "publishing a table reached the surviving subscriber (count 1)");
|
|
CHECK(atomic_load(&results[2]) == 42, "the subscriber received the deep-copied table payload");
|
|
|
|
// A topic nobody subscribed to delivers to no one.
|
|
calogContextEval(pub, "report(7, psPublish('none', 5))\n done()");
|
|
pumpUntilDone(7);
|
|
CHECK(atomic_load(&results[7]) == 0, "publishing to a topic with no subscribers returns 0");
|
|
|
|
CHECK(atomic_load(&errorCount) == 0, "no errors during pubsub");
|
|
|
|
// Release every subscribed callback while the contexts are still alive, then confirm a
|
|
// publish after shutdown is safe and simply finds no subscribers.
|
|
calogPubsubShutdown();
|
|
calogContextEval(pub, "report(11, psPublish('t', 1))\n done()");
|
|
pumpUntilDone(8);
|
|
CHECK(atomic_load(&results[11]) == 0, "publishing after shutdown is safe and finds no subscribers");
|
|
|
|
calogContextClose(subA);
|
|
calogContextClose(subB);
|
|
calogContextClose(pub);
|
|
calogDestroy(calog);
|
|
|
|
printf("\n%d checks, %d failed\n", testsRun, testsFailed);
|
|
fflush(stdout);
|
|
if (testsFailed != 0) {
|
|
return 1;
|
|
}
|
|
return 0;
|
|
}
|