// calogPubsub.c -- calog publish/subscribe library (see calogPubsub.h). A process-wide, // mutex-guarded list of {int64 id, topic bytes, topic length, callback}. subscribe/ // unsubscribe/publish are natives over that shared list; subscribe retains the callback // function value, and publish delivers a deep copy of the message to every matching // subscriber SYNCHRONOUSLY. // // The registry state is STATIC (not heap) and its bookkeeping is never freed: the three // natives stay reachable (via any context) right up until calogDestroy tears the broker // registry down, so freeing the state in calogPubsubShutdown -- which the contract requires // BEFORE calogDestroy -- would leave those natives dereferencing freed memory. Shutdown // instead releases every subscribed callback and empties the list; the small static // bookkeeping persists for the process (no per-cycle leak), and a post-shutdown publish // simply finds no subscribers. // // Delivery discipline: publish collects (and retains) the matching callbacks under the lock, // UNLOCKS, then invokes each -- the mutex is NEVER held across calogFnInvoke, and the // retained reference keeps a concurrently-unsubscribed callback alive across the call. Each // callback marshals to its owner context's thread; a subscriber that publishes back onto a // topic it is on can deadlock (delivery is synchronous), so keep publish graphs acyclic. #define _POSIX_C_SOURCE 200809L #include "calogPubsub.h" #include #include #include #include #define PUBSUB_INITIAL_CAP 8 #define PUBSUB_GROWTH 2 typedef struct SubEntryT { int64_t id; char *topic; int64_t topicLen; CalogFnT *callback; } SubEntryT; // The list (heap array), guarded by gListMutex. gNextId hands out monotonically increasing // subscription ids. refCount (guarded by gInitMutex) counts registered runtimes so the LAST // calogPubsubShutdown releases the callbacks and empties the list. static pthread_mutex_t gListMutex = PTHREAD_MUTEX_INITIALIZER; static SubEntryT *gEntries = NULL; static int32_t gCount = 0; static int32_t gCap = 0; static int64_t gNextId = 1; static pthread_mutex_t gInitMutex = PTHREAD_MUTEX_INITIALIZER; static int32_t gRefCount = 0; static int32_t pubsubFindByIdLocked(int64_t id); static int32_t pubsubPublish(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData); static int32_t pubsubSubscribe(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData); static int32_t pubsubUnsubscribe(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData); int32_t calogPubsubRegister(CalogT *calog) { pthread_mutex_lock(&gInitMutex); gRefCount++; pthread_mutex_unlock(&gInitMutex); calogRegisterInline(calog, "subscribe", pubsubSubscribe, NULL); calogRegisterInline(calog, "unsubscribe", pubsubUnsubscribe, NULL); calogRegisterInline(calog, "publish", pubsubPublish, NULL); return calogOkE; } void calogPubsubShutdown(void) { pthread_mutex_lock(&gInitMutex); if (gRefCount > 0) { gRefCount--; } if (gRefCount <= 0) { int32_t index; pthread_mutex_lock(&gListMutex); for (index = 0; index < gCount; index++) { free(gEntries[index].topic); calogFnRelease(gEntries[index].callback); } free(gEntries); gEntries = NULL; gCount = 0; gCap = 0; pthread_mutex_unlock(&gListMutex); } pthread_mutex_unlock(&gInitMutex); } static int32_t pubsubFindByIdLocked(int64_t id) { int32_t index; for (index = 0; index < gCount; index++) { if (gEntries[index].id == id) { return index; } } return -1; } static int32_t pubsubPublish(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData) { CalogFnT **collected; int64_t topicLen; int32_t delivered; int32_t matched; int32_t index; (void)userData; calogValueNil(result); if (argCount != 2 || args[0].type != calogStringE) { return calogFail(result, calogErrArgE, "publish expects (topic, message)"); } topicLen = args[0].as.s.length; collected = NULL; matched = 0; delivered = 0; pthread_mutex_lock(&gListMutex); if (gCount > 0) { collected = (CalogFnT **)malloc((size_t)gCount * sizeof(CalogFnT *)); if (collected == NULL) { pthread_mutex_unlock(&gListMutex); return calogFail(result, calogErrOomE, "publish: out of memory"); } for (index = 0; index < gCount; index++) { if (gEntries[index].topicLen == topicLen && memcmp(gEntries[index].topic, args[0].as.s.bytes, (size_t)topicLen) == 0) { // Retain across the (unlocked) delivery so a concurrent unsubscribe cannot free it. calogFnRetain(gEntries[index].callback); collected[matched] = gEntries[index].callback; matched++; } } } pthread_mutex_unlock(&gListMutex); // Deliver OUTSIDE the lock: each callback marshals to its owner context's thread and may // re-enter subscribe/unsubscribe/publish. The mutex is never held across calogFnInvoke. for (index = 0; index < matched; index++) { CalogValueT messageCopy; CalogValueT callResult; int32_t status; status = calogValueCopy(&messageCopy, &args[1]); if (status != calogOkE) { calogValueFree(&messageCopy); calogFnRelease(collected[index]); continue; } calogValueNil(&callResult); status = calogFnInvoke(collected[index], &messageCopy, 1, &callResult); calogValueFree(&callResult); calogValueFree(&messageCopy); calogFnRelease(collected[index]); // A subscriber whose owner context is gone was not actually invoked, so it is not // counted (calogErrDeadE from the marshalled path, calogErrNotFoundE from the direct). if (status != calogErrDeadE && status != calogErrNotFoundE) { delivered++; } } free(collected); calogValueInt(result, (int64_t)delivered); return calogOkE; } static int32_t pubsubSubscribe(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData) { char *topicCopy; int64_t id; int64_t topicLen; (void)userData; calogValueNil(result); if (argCount != 2 || args[0].type != calogStringE || args[1].type != calogFnE) { return calogFail(result, calogErrArgE, "subscribe expects (topic, function)"); } topicLen = args[0].as.s.length; topicCopy = (char *)malloc((size_t)topicLen + 1); if (topicCopy == NULL) { return calogFail(result, calogErrOomE, "subscribe: out of memory"); } memcpy(topicCopy, args[0].as.s.bytes, (size_t)topicLen); topicCopy[topicLen] = '\0'; pthread_mutex_lock(&gListMutex); if (gCount == gCap) { int32_t newCap; SubEntryT *grown; newCap = (gCap == 0) ? PUBSUB_INITIAL_CAP : gCap * PUBSUB_GROWTH; grown = (SubEntryT *)realloc(gEntries, (size_t)newCap * sizeof(SubEntryT)); if (grown == NULL) { pthread_mutex_unlock(&gListMutex); free(topicCopy); return calogFail(result, calogErrOomE, "subscribe: out of memory"); } gEntries = grown; gCap = newCap; } id = gNextId; gNextId++; calogFnRetain(args[1].as.fn); gEntries[gCount].id = id; gEntries[gCount].topic = topicCopy; gEntries[gCount].topicLen = topicLen; gEntries[gCount].callback = args[1].as.fn; gCount++; pthread_mutex_unlock(&gListMutex); calogValueInt(result, id); return calogOkE; } static int32_t pubsubUnsubscribe(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData) { int64_t id; int32_t index; (void)userData; calogValueNil(result); if (argCount != 1 || (args[0].type != calogIntE && args[0].type != calogRealE)) { return calogFail(result, calogErrArgE, "unsubscribe expects (id)"); } id = (args[0].type == calogIntE) ? args[0].as.i : (int64_t)args[0].as.r; pthread_mutex_lock(&gListMutex); index = pubsubFindByIdLocked(id); if (index >= 0) { free(gEntries[index].topic); calogFnRelease(gEntries[index].callback); gEntries[index] = gEntries[gCount - 1]; gCount--; } pthread_mutex_unlock(&gListMutex); return calogOkE; }