// calogTimer.c -- calog timer library (see calogTimer.h). ONE background thread owns a // mutex-guarded list of timers and, for each one that comes due, retains its callback, drops // the list lock, and invokes it (which marshals to the callback's owning context thread). The // list is scheduled on CLOCK_MONOTONIC; the thread sleeps on a condition variable (given the // MONOTONIC clock via a condattr) until the earliest nextFire, or until it is signalled when a // timer is added or cancelled. // // The registry state is STATIC and its small bookkeeping is never freed: the three natives // stay reachable (via any context) right up until calogDestroy tears the broker registry down, // so freeing the state in calogTimerShutdown -- which the contract requires BEFORE calogDestroy // -- would leave those natives dereferencing freed memory. Shutdown instead stops the thread, // releases every remaining callback, and empties the list; the mutexes/counters persist for the // process (no per-cycle leak), so the library re-registers cleanly across runtimes. // // Locking discipline: the list mutex is NEVER held across calogFnInvoke (a blocking, cross- // thread call) -- the callback is retained, the lock dropped, then the callback invoked and // released. calogFnRelease is only ever held under the list mutex to post a non-blocking // finalize (as in calogExport); it never re-enters the timer library. #define _POSIX_C_SOURCE 200809L #include "calogTimer.h" #include #include #include #include #define TIMER_INITIAL_CAP 8 #define TIMER_GROWTH 2 // One scheduled timer. intervalNs == 0 marks a one-shot; a periodic timer keeps a non-zero // interval. A cancelled timer is tombstoned here and pruned by the thread at the top of its // loop, which keeps cancellation safe against a fire that is already in flight. typedef struct TimerEntryT { int64_t id; CalogFnT *callback; int64_t nextFireMonoNs; int64_t intervalNs; bool cancelled; } TimerEntryT; // The timer list (heap array) plus its background thread, guarded by gTimerMutex/gTimerCond. // refCount (guarded by gInitMutex) counts registered runtimes so the LAST calogTimerShutdown // stops the thread and frees the list. gTimerCond is initialised (with CLOCK_MONOTONIC) when // the thread starts and destroyed when it stops, so every cond operation is guarded by // gThreadStarted. static pthread_mutex_t gTimerMutex = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t gTimerCond; static TimerEntryT *gTimers = NULL; static int32_t gCount = 0; static int32_t gCap = 0; static int64_t gNextId = 1; static pthread_t gThread; static bool gThreadStarted = false; static bool gShutdown = false; static pthread_mutex_t gInitMutex = PTHREAD_MUTEX_INITIALIZER; static int32_t gRefCount = 0; static int32_t timerAfterNative(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData); static int32_t timerCancelNative(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData); static int32_t timerEnsureThreadLocked(void); static int32_t timerEveryNative(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData); static int32_t timerFindByIdLocked(int64_t id); static int64_t timerNowNs(void); static void timerPruneLocked(void); static int32_t timerSchedule(CalogValueT *args, int32_t argCount, CalogValueT *result, bool periodic); static void *timerThreadMain(void *arg); int32_t calogTimerRegister(CalogT *calog) { pthread_mutex_lock(&gInitMutex); gRefCount++; pthread_mutex_unlock(&gInitMutex); calogRegisterInline(calog, "timerAfter", timerAfterNative, NULL); calogRegisterInline(calog, "timerEvery", timerEveryNative, NULL); calogRegisterInline(calog, "timerCancel", timerCancelNative, NULL); return calogOkE; } void calogTimerShutdown(void) { pthread_t threadHandle; bool joinThread; int32_t index; joinThread = false; pthread_mutex_lock(&gInitMutex); if (gRefCount > 0) { gRefCount--; } if (gRefCount > 0) { pthread_mutex_unlock(&gInitMutex); return; } // Last runtime is gone: stop the thread (if it ever started), then release every remaining // callback and free the list. Contexts are still alive here, so a callback release routes a // finalize to its owner thread just like calogExport. pthread_mutex_lock(&gTimerMutex); gShutdown = true; threadHandle = gThread; if (gThreadStarted) { joinThread = true; pthread_cond_signal(&gTimerCond); } pthread_mutex_unlock(&gTimerMutex); if (joinThread) { pthread_join(threadHandle, NULL); } pthread_mutex_lock(&gTimerMutex); for (index = 0; index < gCount; index++) { calogFnRelease(gTimers[index].callback); } free(gTimers); gTimers = NULL; gCount = 0; gCap = 0; if (joinThread) { pthread_cond_destroy(&gTimerCond); } gThreadStarted = false; gShutdown = false; pthread_mutex_unlock(&gTimerMutex); pthread_mutex_unlock(&gInitMutex); } static int32_t timerAfterNative(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData) { (void)userData; return timerSchedule(args, argCount, result, false); } static int32_t timerCancelNative(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData) { int32_t index; (void)userData; calogValueNil(result); if (argCount != 1 || args[0].type != calogIntE) { return calogFail(result, calogErrArgE, "timerCancel expects (id)"); } pthread_mutex_lock(&gTimerMutex); index = timerFindByIdLocked(args[0].as.i); if (index >= 0) { // Tombstone it; the thread prunes it (and stops firing it) at the top of its loop. gTimers[index].cancelled = true; pthread_cond_signal(&gTimerCond); } pthread_mutex_unlock(&gTimerMutex); return calogOkE; } static int32_t timerEnsureThreadLocked(void) { pthread_condattr_t attr; // Caller holds gTimerMutex. Start the single background thread lazily, on the first timer. if (gThreadStarted) { return calogOkE; } if (pthread_condattr_init(&attr) != 0) { return calogErrUnsupportedE; } pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); if (pthread_cond_init(&gTimerCond, &attr) != 0) { pthread_condattr_destroy(&attr); return calogErrUnsupportedE; } pthread_condattr_destroy(&attr); gShutdown = false; if (pthread_create(&gThread, NULL, timerThreadMain, NULL) != 0) { pthread_cond_destroy(&gTimerCond); return calogErrUnsupportedE; } gThreadStarted = true; return calogOkE; } static int32_t timerEveryNative(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData) { (void)userData; return timerSchedule(args, argCount, result, true); } static int32_t timerFindByIdLocked(int64_t id) { int32_t index; for (index = 0; index < gCount; index++) { if (gTimers[index].id == id) { return index; } } return -1; } static int64_t timerNowNs(void) { struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); return (int64_t)ts.tv_sec * 1000000000 + (int64_t)ts.tv_nsec; } static void timerPruneLocked(void) { int32_t index; index = 0; while (index < gCount) { if (gTimers[index].cancelled) { calogFnRelease(gTimers[index].callback); gTimers[index] = gTimers[gCount - 1]; gCount--; } else { index++; } } } static int32_t timerSchedule(CalogValueT *args, int32_t argCount, CalogValueT *result, bool periodic) { const char *label; double ms; int64_t delayNs; int64_t id; int32_t status; label = periodic ? "timerEvery expects (ms, function)" : "timerAfter expects (ms, function)"; calogValueNil(result); if (argCount != 2 || (args[0].type != calogIntE && args[0].type != calogRealE) || args[1].type != calogFnE) { return calogFail(result, calogErrArgE, label); } ms = (args[0].type == calogIntE) ? (double)args[0].as.i : args[0].as.r; // Bound ms so ms*1e6 stays well within int64 nanoseconds. This also rejects +Inf and NaN // (both fail the comparison); the cast below is otherwise undefined once the product // exceeds INT64_MAX. 9.0e12 ms is ~285 years -- far beyond any real timer. if (!(ms >= 0.0 && ms <= 9.0e12)) { return calogFail(result, calogErrRangeE, "timer: milliseconds out of range"); } delayNs = (int64_t)(ms * 1000000.0); pthread_mutex_lock(&gTimerMutex); if (gCount == gCap) { int32_t newCap; TimerEntryT *grown; newCap = (gCap == 0) ? TIMER_INITIAL_CAP : gCap * TIMER_GROWTH; grown = (TimerEntryT *)realloc(gTimers, (size_t)newCap * sizeof(TimerEntryT)); if (grown == NULL) { pthread_mutex_unlock(&gTimerMutex); return calogFail(result, calogErrOomE, "timer: out of memory"); } gTimers = grown; gCap = newCap; } status = timerEnsureThreadLocked(); if (status != calogOkE) { pthread_mutex_unlock(&gTimerMutex); return calogFail(result, status, "timer: could not start the timer thread"); } id = gNextId++; // Retain the callback for as long as the list holds it; released on cancel/fire/shutdown. calogFnRetain(args[1].as.fn); gTimers[gCount].id = id; gTimers[gCount].callback = args[1].as.fn; gTimers[gCount].nextFireMonoNs = timerNowNs() + delayNs; gTimers[gCount].intervalNs = periodic ? ((delayNs > 0) ? delayNs : 1) : 0; gTimers[gCount].cancelled = false; gCount++; pthread_cond_signal(&gTimerCond); pthread_mutex_unlock(&gTimerMutex); calogValueInt(result, id); return calogOkE; } static void *timerThreadMain(void *arg) { (void)arg; pthread_mutex_lock(&gTimerMutex); for (;;) { int64_t nowNs; int32_t index; int32_t minIndex; // Drop tombstoned timers first, so the scan below only sees live entries. timerPruneLocked(); if (gShutdown) { break; } if (gCount == 0) { pthread_cond_wait(&gTimerCond, &gTimerMutex); continue; } nowNs = timerNowNs(); minIndex = 0; for (index = 1; index < gCount; index++) { if (gTimers[index].nextFireMonoNs < gTimers[minIndex].nextFireMonoNs) { minIndex = index; } } if (gTimers[minIndex].nextFireMonoNs > nowNs) { struct timespec target; int64_t fireAt; fireAt = gTimers[minIndex].nextFireMonoNs; target.tv_sec = (time_t)(fireAt / 1000000000); target.tv_nsec = (long)(fireAt % 1000000000); pthread_cond_timedwait(&gTimerCond, &gTimerMutex, &target); continue; } { CalogFnT *cb; CalogValueT res; int64_t id; int32_t status; bool oneShot; cb = gTimers[minIndex].callback; id = gTimers[minIndex].id; oneShot = (gTimers[minIndex].intervalNs == 0); // Retain across the unlocked invoke so a concurrent cancel/shutdown can't free it. calogFnRetain(cb); if (oneShot) { gTimers[minIndex].cancelled = true; } else { gTimers[minIndex].nextFireMonoNs += gTimers[minIndex].intervalNs; if (gTimers[minIndex].nextFireMonoNs <= nowNs) { // Fell behind (a slow callback): resync rather than fire in a burst. gTimers[minIndex].nextFireMonoNs = nowNs + gTimers[minIndex].intervalNs; } } pthread_mutex_unlock(&gTimerMutex); status = calogFnInvoke(cb, NULL, 0, &res); calogValueFree(&res); if (status == calogErrDeadE) { // The owning context is gone: auto-cancel the timer (find it again by id, since // the list may have changed while unlocked). pthread_mutex_lock(&gTimerMutex); index = timerFindByIdLocked(id); if (index >= 0) { gTimers[index].cancelled = true; } pthread_mutex_unlock(&gTimerMutex); } calogFnRelease(cb); pthread_mutex_lock(&gTimerMutex); } } pthread_mutex_unlock(&gTimerMutex); return NULL; }