// calogDb.c -- calog database library (see calogDb.h). // // The four natives are thin dispatchers: dbOpen picks a backend by driver name, and // dbExec/dbQuery/dbClose recover the backend from the handle's type tag. Each backend // (SQLite, Postgres, MySQL) is compiled in behind CALOG_WITH_SQLITE / _PG / _MYSQL and // provides open/exec/query plus a marshalling of its rows to CalogValueT. #define _POSIX_C_SOURCE 200809L #include "calogDb.h" #include "calogHandle.h" #include #include #include #include #include #ifdef CALOG_WITH_SQLITE #include "sqlite3.h" #endif #ifdef CALOG_WITH_PG #include "libpq-fe.h" // A few stable pg_type OIDs so a typed column comes back typed (everything else, including // numeric/date/json, arrives as its text string). #define PG_OID_BOOL 16 #define PG_OID_BYTEA 17 #define PG_OID_INT8 20 #define PG_OID_INT2 21 #define PG_OID_INT4 23 #define PG_OID_FLOAT4 700 #define PG_OID_FLOAT8 701 #endif #ifdef CALOG_WITH_MYSQL #include "mysql.h" #endif // Handle type tags, distinct across the whole registry so a stray handle of the wrong // backend fails to resolve. #define DB_TYPE_SQLITE 1u #define DB_TYPE_PG 2u #define DB_TYPE_MYSQL 3u #define DB_NUM_SCRATCH 32 // Process-wide DB library state: one connection registry shared by every runtime that // registers the natives (handles are globally-unique ints, so sharing is safe). refCount is // one per registered runtime, so the singleton is freed only when the last one shuts down. typedef struct DbLibT { CalogHandleTableT *handles; int32_t refCount; } DbLibT; static pthread_mutex_t gDbLibMutex = PTHREAD_MUTEX_INITIALIZER; static DbLibT *gDbLib = NULL; static int32_t dbClose(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData); static void dbCloser(uint32_t type, void *resource); static int32_t dbExec(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData); static int32_t dbOpen(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData); static int32_t dbQuery(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData); static uint32_t dbResolve(DbLibT *lib, int64_t handle, void **connOut); #ifdef CALOG_WITH_MYSQL static void mysqlCleanup(MYSQL_BIND *binds, char **buffers, unsigned long *lengths, my_bool *isNull, my_bool *errors, unsigned int columnCount, MYSQL_RES *meta, MYSQL_STMT *stmt); static int32_t mysqlColumn(enum enum_field_types type, const char *bytes, unsigned long length, CalogValueT *out); static int32_t mysqlExec(MYSQL *conn, const CalogValueT *sqlValue, const CalogValueT *params, int32_t paramCount, CalogValueT *result); static int32_t mysqlOpen(DbLibT *lib, const char *conninfo, CalogValueT *result); static MYSQL_STMT *mysqlPrepare(MYSQL *conn, const CalogValueT *sqlValue, const CalogValueT *params, int32_t paramCount, CalogValueT *result); static int32_t mysqlQuery(MYSQL *conn, const CalogValueT *sqlValue, const CalogValueT *params, int32_t paramCount, CalogValueT *result); #endif #ifdef CALOG_WITH_PG static int32_t pgColumn(PGresult *res, int row, int column, CalogValueT *out); static int32_t pgExec(PGconn *conn, const CalogValueT *sqlValue, const CalogValueT *params, int32_t paramCount, CalogValueT *result); static int32_t pgOpen(DbLibT *lib, const char *conninfo, CalogValueT *result); static int32_t pgQuery(PGconn *conn, const CalogValueT *sqlValue, const CalogValueT *params, int32_t paramCount, CalogValueT *result); static PGresult *pgRun(PGconn *conn, const CalogValueT *sqlValue, const CalogValueT *params, int32_t paramCount, int resultFormat, CalogValueT *result); #endif #ifdef CALOG_WITH_SQLITE static int32_t sqliteColumn(sqlite3_stmt *stmt, int column, CalogValueT *out); static int32_t sqliteExec(sqlite3 *db, const CalogValueT *sqlValue, const CalogValueT *params, int32_t paramCount, CalogValueT *result); static int32_t sqliteOpen(DbLibT *lib, const char *path, CalogValueT *result); static int32_t sqlitePrepare(sqlite3 *db, const CalogValueT *sqlValue, const CalogValueT *params, int32_t paramCount, sqlite3_stmt **out, CalogValueT *result); static int32_t sqliteQuery(sqlite3 *db, const CalogValueT *sqlValue, const CalogValueT *params, int32_t paramCount, CalogValueT *result); #endif int32_t calogDbRegister(CalogT *calog) { pthread_mutex_lock(&gDbLibMutex); if (gDbLib == NULL) { DbLibT *lib; lib = (DbLibT *)calloc(1, sizeof(*lib)); if (lib == NULL) { pthread_mutex_unlock(&gDbLibMutex); return calogErrOomE; } lib->handles = calogHandleTableCreate(); if (lib->handles == NULL) { free(lib); pthread_mutex_unlock(&gDbLibMutex); return calogErrOomE; } gDbLib = lib; } gDbLib->refCount++; pthread_mutex_unlock(&gDbLibMutex); calogRegisterInline(calog, "dbOpen", dbOpen, gDbLib); calogRegisterInline(calog, "dbExec", dbExec, gDbLib); calogRegisterInline(calog, "dbQuery", dbQuery, gDbLib); calogRegisterInline(calog, "dbClose", dbClose, gDbLib); return calogOkE; } void calogDbShutdown(void) { pthread_mutex_lock(&gDbLibMutex); if (gDbLib == NULL) { pthread_mutex_unlock(&gDbLibMutex); return; } gDbLib->refCount--; if (gDbLib->refCount <= 0) { calogHandleTableDestroy(gDbLib->handles, dbCloser); free(gDbLib); gDbLib = NULL; } pthread_mutex_unlock(&gDbLibMutex); } static int32_t dbClose(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData) { DbLibT *lib; void *conn; uint32_t backend; lib = (DbLibT *)userData; calogValueNil(result); if (argCount != 1 || args[0].type != calogIntE) { return calogFail(result, calogErrArgE, "dbClose expects a connection handle"); } backend = dbResolve(lib, args[0].as.i, &conn); if (backend == 0) { return calogFail(result, calogErrArgE, "dbClose: invalid connection handle"); } // Close only the connection that this call atomically removed from the table -- a // concurrent dbClose of the same handle gets NULL here and must not double-close. conn = calogHandleRemove(lib->handles, args[0].as.i, backend); if (conn == NULL) { return calogFail(result, calogErrArgE, "dbClose: invalid connection handle"); } dbCloser(backend, conn); return calogOkE; } static void dbCloser(uint32_t type, void *resource) { switch (type) { #ifdef CALOG_WITH_SQLITE case DB_TYPE_SQLITE: sqlite3_close((sqlite3 *)resource); break; #endif #ifdef CALOG_WITH_PG case DB_TYPE_PG: PQfinish((PGconn *)resource); break; #endif #ifdef CALOG_WITH_MYSQL case DB_TYPE_MYSQL: mysql_close((MYSQL *)resource); break; #endif default: break; } } static int32_t dbExec(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData) { DbLibT *lib; void *conn; uint32_t backend; lib = (DbLibT *)userData; calogValueNil(result); if (argCount < 2 || args[0].type != calogIntE || args[1].type != calogStringE) { return calogFail(result, calogErrArgE, "dbExec expects (handle, sql, ...params)"); } backend = dbResolve(lib, args[0].as.i, &conn); switch (backend) { #ifdef CALOG_WITH_SQLITE case DB_TYPE_SQLITE: return sqliteExec((sqlite3 *)conn, &args[1], &args[2], argCount - 2, result); #endif #ifdef CALOG_WITH_PG case DB_TYPE_PG: return pgExec((PGconn *)conn, &args[1], &args[2], argCount - 2, result); #endif #ifdef CALOG_WITH_MYSQL case DB_TYPE_MYSQL: return mysqlExec((MYSQL *)conn, &args[1], &args[2], argCount - 2, result); #endif default: return calogFail(result, calogErrArgE, "dbExec: invalid connection handle"); } } static int32_t dbOpen(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData) { DbLibT *lib; const char *driver; lib = (DbLibT *)userData; calogValueNil(result); if (argCount != 2 || args[0].type != calogStringE || args[1].type != calogStringE) { return calogFail(result, calogErrArgE, "dbOpen expects (driver, connection)"); } driver = args[0].as.s.bytes; #ifdef CALOG_WITH_SQLITE if (strcmp(driver, "sqlite") == 0) { return sqliteOpen(lib, args[1].as.s.bytes, result); } #endif #ifdef CALOG_WITH_PG if (strcmp(driver, "postgres") == 0) { return pgOpen(lib, args[1].as.s.bytes, result); } #endif #ifdef CALOG_WITH_MYSQL if (strcmp(driver, "mysql") == 0) { return mysqlOpen(lib, args[1].as.s.bytes, result); } #endif return calogFail(result, calogErrUnsupportedE, "dbOpen: unknown or uncompiled driver"); } static int32_t dbQuery(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData) { DbLibT *lib; void *conn; uint32_t backend; lib = (DbLibT *)userData; calogValueNil(result); if (argCount < 2 || args[0].type != calogIntE || args[1].type != calogStringE) { return calogFail(result, calogErrArgE, "dbQuery expects (handle, sql, ...params)"); } backend = dbResolve(lib, args[0].as.i, &conn); switch (backend) { #ifdef CALOG_WITH_SQLITE case DB_TYPE_SQLITE: return sqliteQuery((sqlite3 *)conn, &args[1], &args[2], argCount - 2, result); #endif #ifdef CALOG_WITH_PG case DB_TYPE_PG: return pgQuery((PGconn *)conn, &args[1], &args[2], argCount - 2, result); #endif #ifdef CALOG_WITH_MYSQL case DB_TYPE_MYSQL: return mysqlQuery((MYSQL *)conn, &args[1], &args[2], argCount - 2, result); #endif default: return calogFail(result, calogErrArgE, "dbQuery: invalid connection handle"); } } // Resolve a handle to its backend type tag and connection pointer, or 0 if not found. static uint32_t dbResolve(DbLibT *lib, int64_t handle, void **connOut) { void *conn; *connOut = NULL; #ifdef CALOG_WITH_SQLITE conn = calogHandleGet(lib->handles, handle, DB_TYPE_SQLITE); if (conn != NULL) { *connOut = conn; return DB_TYPE_SQLITE; } #endif #ifdef CALOG_WITH_PG conn = calogHandleGet(lib->handles, handle, DB_TYPE_PG); if (conn != NULL) { *connOut = conn; return DB_TYPE_PG; } #endif #ifdef CALOG_WITH_MYSQL conn = calogHandleGet(lib->handles, handle, DB_TYPE_MYSQL); if (conn != NULL) { *connOut = conn; return DB_TYPE_MYSQL; } #endif (void)conn; return 0; } #ifdef CALOG_WITH_MYSQL // Stable storage for a bound scalar parameter during mysql_stmt_execute. typedef union MysqlScalarT { long long i; double d; char b; } MysqlScalarT; // Free everything a query allocated: the per-column buffers, the four parallel arrays, the // result metadata, and the statement. Each argument may be NULL. static void mysqlCleanup(MYSQL_BIND *binds, char **buffers, unsigned long *lengths, my_bool *isNull, my_bool *errors, unsigned int columnCount, MYSQL_RES *meta, MYSQL_STMT *stmt) { unsigned int column; if (buffers != NULL) { for (column = 0; column < columnCount; column++) { free(buffers[column]); } } free(binds); free(buffers); free(lengths); free(isNull); free(errors); if (meta != NULL) { mysql_free_result(meta); } mysql_stmt_close(stmt); } // Marshal a MySQL cell (fetched as its string form) to a CalogValueT, typed by the field's // SQL type: integer types -> int, float/double -> real, everything else (text, blob, // decimal, dates) -> a binary-safe string. bytes is NUL-terminated at [length]. static int32_t mysqlColumn(enum enum_field_types type, const char *bytes, unsigned long length, CalogValueT *out) { calogValueNil(out); switch (type) { case MYSQL_TYPE_TINY: case MYSQL_TYPE_SHORT: case MYSQL_TYPE_LONG: case MYSQL_TYPE_LONGLONG: case MYSQL_TYPE_INT24: case MYSQL_TYPE_YEAR: calogValueInt(out, (int64_t)strtoll(bytes, NULL, 10)); return calogOkE; case MYSQL_TYPE_FLOAT: case MYSQL_TYPE_DOUBLE: calogValueReal(out, strtod(bytes, NULL)); return calogOkE; default: return calogValueString(out, bytes, (int64_t)length); } } static int32_t mysqlExec(MYSQL *conn, const CalogValueT *sqlValue, const CalogValueT *params, int32_t paramCount, CalogValueT *result) { MYSQL_STMT *stmt; stmt = mysqlPrepare(conn, sqlValue, params, paramCount, result); if (stmt == NULL) { return calogErrArgE; } calogValueInt(result, (int64_t)mysql_stmt_affected_rows(stmt)); mysql_stmt_close(stmt); return calogOkE; } // A libpq-style "key=value key=value" conninfo, so the driver's connection string is // uniform: host, user, password, dbname, port, socket. static int32_t mysqlOpen(DbLibT *lib, const char *conninfo, CalogValueT *result) { MYSQL *conn; int64_t handle; char buffer[512]; char *host; char *user; char *password; char *dbname; char *socket; char *sslmode; char *token; char *saveptr; unsigned int port; my_bool sslEnforce; my_bool sslVerify; host = NULL; user = NULL; password = NULL; dbname = NULL; socket = NULL; sslmode = NULL; saveptr = NULL; port = 0; snprintf(buffer, sizeof(buffer), "%s", conninfo); token = strtok_r(buffer, " ", &saveptr); while (token != NULL) { char *equals; equals = strchr(token, '='); if (equals != NULL) { *equals = '\0'; if (strcmp(token, "host") == 0) { host = equals + 1; } else if (strcmp(token, "user") == 0) { user = equals + 1; } else if (strcmp(token, "password") == 0) { password = equals + 1; } else if (strcmp(token, "dbname") == 0) { dbname = equals + 1; } else if (strcmp(token, "socket") == 0) { socket = equals + 1; } else if (strcmp(token, "sslmode") == 0) { sslmode = equals + 1; } else if (strcmp(token, "port") == 0) { port = (unsigned int)strtoul(equals + 1, NULL, 10); } } token = strtok_r(NULL, " ", &saveptr); } conn = mysql_init(NULL); if (conn == NULL) { return calogFail(result, calogErrOomE, "dbOpen: mysql_init failed"); } // The connector otherwise defaults to require-TLS + verify, which fails against a plain // server. sslmode selects the posture (default "prefer"): // "disable" -> plain, no TLS. // "prefer" -> use TLS if the server offers it, else fall back to plain; no certificate // verification (matches libpq's default sslmode=prefer). // "require" -> require TLS and verify the server certificate. // Any other value is REJECTED, so an explicit stronger request is never silently // downgraded to the unverified default. if (sslmode == NULL || strcmp(sslmode, "prefer") == 0) { sslEnforce = 1; sslVerify = 0; } else if (strcmp(sslmode, "disable") == 0) { sslEnforce = 0; sslVerify = 0; } else if (strcmp(sslmode, "require") == 0) { sslEnforce = 1; sslVerify = 1; } else { mysql_close(conn); return calogFail(result, calogErrArgE, "dbOpen: mysql sslmode must be 'disable', 'prefer', or 'require'"); } mysql_options(conn, MYSQL_OPT_SSL_ENFORCE, &sslEnforce); mysql_options(conn, MYSQL_OPT_SSL_VERIFY_SERVER_CERT, &sslVerify); if (mysql_real_connect(conn, host, user, password, dbname, port, socket, 0) == NULL) { int32_t status; status = calogFail(result, calogErrArgE, mysql_error(conn)); mysql_close(conn); return status; } handle = calogHandleAdd(lib->handles, DB_TYPE_MYSQL, conn); if (handle == 0) { mysql_close(conn); return calogFail(result, calogErrOomE, "dbOpen: out of memory"); } calogValueInt(result, handle); return calogOkE; } // Init, prepare, bind params as ?, and execute. Returns the executed statement (the caller // closes it) or NULL with the error set into result. static MYSQL_STMT *mysqlPrepare(MYSQL *conn, const CalogValueT *sqlValue, const CalogValueT *params, int32_t paramCount, CalogValueT *result) { MYSQL_STMT *stmt; MYSQL_BIND *binds; MysqlScalarT *scalars; unsigned long *lengths; int32_t index; int rc; stmt = mysql_stmt_init(conn); if (stmt == NULL) { calogFail(result, calogErrOomE, "dbExec/dbQuery: mysql_stmt_init failed"); return NULL; } if (mysql_stmt_prepare(stmt, sqlValue->as.s.bytes, (unsigned long)sqlValue->as.s.length) != 0) { calogFail(result, calogErrArgE, mysql_stmt_error(stmt)); mysql_stmt_close(stmt); return NULL; } if (paramCount == 0) { if (mysql_stmt_execute(stmt) != 0) { calogFail(result, calogErrArgE, mysql_stmt_error(stmt)); mysql_stmt_close(stmt); return NULL; } return stmt; } binds = (MYSQL_BIND *)calloc((size_t)paramCount, sizeof(MYSQL_BIND)); scalars = (MysqlScalarT *)calloc((size_t)paramCount, sizeof(MysqlScalarT)); lengths = (unsigned long *)calloc((size_t)paramCount, sizeof(unsigned long)); if (binds == NULL || scalars == NULL || lengths == NULL) { free(binds); free(scalars); free(lengths); mysql_stmt_close(stmt); calogFail(result, calogErrOomE, "dbExec/dbQuery: out of memory"); return NULL; } for (index = 0; index < paramCount; index++) { const CalogValueT *param; param = ¶ms[index]; switch (param->type) { case calogNilE: binds[index].buffer_type = MYSQL_TYPE_NULL; break; case calogBoolE: scalars[index].b = param->as.b ? 1 : 0; binds[index].buffer_type = MYSQL_TYPE_TINY; binds[index].buffer = &scalars[index].b; break; case calogIntE: scalars[index].i = (long long)param->as.i; binds[index].buffer_type = MYSQL_TYPE_LONGLONG; binds[index].buffer = &scalars[index].i; break; case calogRealE: scalars[index].d = param->as.r; binds[index].buffer_type = MYSQL_TYPE_DOUBLE; binds[index].buffer = &scalars[index].d; break; case calogStringE: lengths[index] = (unsigned long)param->as.s.length; binds[index].buffer_type = MYSQL_TYPE_STRING; binds[index].buffer = (void *)param->as.s.bytes; binds[index].buffer_length = lengths[index]; binds[index].length = &lengths[index]; break; default: free(binds); free(scalars); free(lengths); mysql_stmt_close(stmt); calogFail(result, calogErrTypeE, "dbExec/dbQuery: a bound parameter must be nil, bool, int, real, or string"); return NULL; } } rc = mysql_stmt_bind_param(stmt, binds); if (rc == 0) { rc = mysql_stmt_execute(stmt); } free(binds); free(scalars); free(lengths); if (rc != 0) { calogFail(result, calogErrArgE, mysql_stmt_error(stmt)); mysql_stmt_close(stmt); return NULL; } return stmt; } static int32_t mysqlQuery(MYSQL *conn, const CalogValueT *sqlValue, const CalogValueT *params, int32_t paramCount, CalogValueT *result) { MYSQL_STMT *stmt; MYSQL_RES *meta; MYSQL_FIELD *fields; MYSQL_BIND *binds; char **buffers; unsigned long *lengths; my_bool *isNull; my_bool *errors; my_bool updateMaxLength; CalogAggT *rows; unsigned int columnCount; unsigned int column; int32_t status; int rc; stmt = mysqlPrepare(conn, sqlValue, params, paramCount, result); if (stmt == NULL) { return calogErrArgE; } meta = mysql_stmt_result_metadata(stmt); if (meta == NULL) { // No result set; yield an empty list. status = calogAggCreate(&rows, calogListE); if (status != calogOkE) { mysql_stmt_close(stmt); return calogFail(result, status, "dbQuery: out of memory"); } calogValueAgg(result, rows); mysql_stmt_close(stmt); return calogOkE; } // Buffer the full result client-side with max_length computed, so each column can bind a // real, correctly-sized string buffer -- the connector then converts every value // (including numerics) to its string form in a single fetch. updateMaxLength = 1; mysql_stmt_attr_set(stmt, STMT_ATTR_UPDATE_MAX_LENGTH, &updateMaxLength); if (mysql_stmt_store_result(stmt) != 0) { status = calogFail(result, calogErrArgE, mysql_stmt_error(stmt)); mysqlCleanup(NULL, NULL, NULL, NULL, NULL, 0, meta, stmt); return status; } columnCount = mysql_num_fields(meta); fields = mysql_fetch_fields(meta); binds = (MYSQL_BIND *)calloc(columnCount, sizeof(MYSQL_BIND)); buffers = (char **)calloc(columnCount, sizeof(char *)); lengths = (unsigned long *)calloc(columnCount, sizeof(unsigned long)); isNull = (my_bool *)calloc(columnCount, sizeof(my_bool)); errors = (my_bool *)calloc(columnCount, sizeof(my_bool)); if (binds == NULL || buffers == NULL || lengths == NULL || isNull == NULL || errors == NULL) { mysqlCleanup(binds, buffers, lengths, isNull, errors, columnCount, meta, stmt); return calogFail(result, calogErrOomE, "dbQuery: out of memory"); } for (column = 0; column < columnCount; column++) { unsigned long size; size = (unsigned long)fields[column].max_length + 1; buffers[column] = (char *)malloc(size); if (buffers[column] == NULL) { mysqlCleanup(binds, buffers, lengths, isNull, errors, columnCount, meta, stmt); return calogFail(result, calogErrOomE, "dbQuery: out of memory"); } binds[column].buffer_type = MYSQL_TYPE_STRING; binds[column].buffer = buffers[column]; binds[column].buffer_length = size; binds[column].length = &lengths[column]; binds[column].is_null = &isNull[column]; binds[column].error = &errors[column]; } if (mysql_stmt_bind_result(stmt, binds) != 0) { status = calogFail(result, calogErrArgE, mysql_stmt_error(stmt)); mysqlCleanup(binds, buffers, lengths, isNull, errors, columnCount, meta, stmt); return status; } status = calogAggCreate(&rows, calogListE); if (status != calogOkE) { mysqlCleanup(binds, buffers, lengths, isNull, errors, columnCount, meta, stmt); return calogFail(result, status, "dbQuery: out of memory"); } while ((rc = mysql_stmt_fetch(stmt)) == 0 || rc == MYSQL_DATA_TRUNCATED) { CalogAggT *rowMap; CalogValueT rowValue; status = calogAggCreate(&rowMap, calogMapE); if (status != calogOkE) { calogAggFree(rows); mysqlCleanup(binds, buffers, lengths, isNull, errors, columnCount, meta, stmt); return calogFail(result, status, "dbQuery: out of memory"); } for (column = 0; column < columnCount; column++) { CalogValueT key; CalogValueT value; calogValueNil(&value); status = calogValueString(&key, fields[column].name, (int64_t)strlen(fields[column].name)); if (status == calogOkE) { if (isNull[column]) { calogValueNil(&value); } else { buffers[column][lengths[column]] = '\0'; status = mysqlColumn(fields[column].type, buffers[column], lengths[column], &value); } } if (status == calogOkE) { status = calogAggSet(rowMap, &key, &value); } if (status != calogOkE) { calogValueFree(&key); calogValueFree(&value); calogAggFree(rowMap); calogAggFree(rows); mysqlCleanup(binds, buffers, lengths, isNull, errors, columnCount, meta, stmt); return calogFail(result, status, "dbQuery: failed to build a row"); } } calogValueAgg(&rowValue, rowMap); status = calogAggPush(rows, &rowValue); if (status != calogOkE) { calogValueFree(&rowValue); calogAggFree(rows); mysqlCleanup(binds, buffers, lengths, isNull, errors, columnCount, meta, stmt); return calogFail(result, status, "dbQuery: out of memory"); } } if (rc != MYSQL_NO_DATA) { status = calogFail(result, calogErrArgE, mysql_stmt_error(stmt)); calogAggFree(rows); mysqlCleanup(binds, buffers, lengths, isNull, errors, columnCount, meta, stmt); return status; } mysqlCleanup(binds, buffers, lengths, isNull, errors, columnCount, meta, stmt); calogValueAgg(result, rows); return calogOkE; } #endif #ifdef CALOG_WITH_PG // Marshal a Postgres cell (text result format) to a CalogValueT: typed for the common // numeric/bool OIDs, bytea decoded from its "\x.." hex, everything else as its text bytes. static int32_t pgColumn(PGresult *res, int row, int column, CalogValueT *out) { const char *text; int length; Oid oid; calogValueNil(out); if (PQgetisnull(res, row, column)) { return calogOkE; } text = PQgetvalue(res, row, column); length = PQgetlength(res, row, column); oid = PQftype(res, column); switch (oid) { case PG_OID_INT2: case PG_OID_INT4: case PG_OID_INT8: calogValueInt(out, (int64_t)strtoll(text, NULL, 10)); return calogOkE; case PG_OID_FLOAT4: case PG_OID_FLOAT8: calogValueReal(out, strtod(text, NULL)); return calogOkE; case PG_OID_BOOL: calogValueBool(out, text[0] == 't'); return calogOkE; case PG_OID_BYTEA: if (length >= 2 && text[0] == '\\' && text[1] == 'x') { int32_t status; char *bytes; int count; int index; count = (length - 2) / 2; bytes = (char *)malloc((size_t)(count > 0 ? count : 1)); if (bytes == NULL) { return calogErrOomE; } for (index = 0; index < count; index++) { bytes[index] = (char)(strtol((char[]){ text[2 + index * 2], text[3 + index * 2], 0 }, NULL, 16)); } status = calogValueString(out, bytes, (int64_t)count); free(bytes); return status; } return calogValueString(out, text, (int64_t)length); default: return calogValueString(out, text, (int64_t)length); } } static int32_t pgExec(PGconn *conn, const CalogValueT *sqlValue, const CalogValueT *params, int32_t paramCount, CalogValueT *result) { PGresult *res; res = pgRun(conn, sqlValue, params, paramCount, 0, result); if (res == NULL) { return calogErrArgE; } calogValueInt(result, (int64_t)strtoll(PQcmdTuples(res), NULL, 10)); PQclear(res); return calogOkE; } static int32_t pgOpen(DbLibT *lib, const char *conninfo, CalogValueT *result) { PGconn *conn; int64_t handle; conn = PQconnectdb(conninfo); if (PQstatus(conn) != CONNECTION_OK) { int32_t status; status = calogFail(result, calogErrArgE, PQerrorMessage(conn)); PQfinish(conn); return status; } handle = calogHandleAdd(lib->handles, DB_TYPE_PG, conn); if (handle == 0) { PQfinish(conn); return calogFail(result, calogErrOomE, "dbOpen: out of memory"); } calogValueInt(result, handle); return calogOkE; } static int32_t pgQuery(PGconn *conn, const CalogValueT *sqlValue, const CalogValueT *params, int32_t paramCount, CalogValueT *result) { PGresult *res; CalogAggT *rows; int rowCount; int columnCount; int row; int32_t status; res = pgRun(conn, sqlValue, params, paramCount, 0, result); if (res == NULL) { return calogErrArgE; } status = calogAggCreate(&rows, calogListE); if (status != calogOkE) { PQclear(res); return calogFail(result, status, "dbQuery: out of memory"); } rowCount = PQntuples(res); columnCount = PQnfields(res); for (row = 0; row < rowCount; row++) { CalogAggT *rowMap; CalogValueT rowValue; int column; status = calogAggCreate(&rowMap, calogMapE); if (status != calogOkE) { calogAggFree(rows); PQclear(res); return calogFail(result, status, "dbQuery: out of memory"); } for (column = 0; column < columnCount; column++) { CalogValueT key; CalogValueT value; const char *name; name = PQfname(res, column); status = calogValueString(&key, name, (int64_t)strlen(name)); if (status == calogOkE) { status = pgColumn(res, row, column, &value); } if (status == calogOkE) { status = calogAggSet(rowMap, &key, &value); } if (status != calogOkE) { calogValueFree(&key); calogValueFree(&value); calogAggFree(rowMap); calogAggFree(rows); PQclear(res); return calogFail(result, status, "dbQuery: failed to build a row"); } } calogValueAgg(&rowValue, rowMap); status = calogAggPush(rows, &rowValue); if (status != calogOkE) { calogValueFree(&rowValue); calogAggFree(rows); PQclear(res); return calogFail(result, status, "dbQuery: out of memory"); } } calogValueAgg(result, rows); PQclear(res); return calogOkE; } // Run a parameterized statement ($1..$N): nil -> SQL NULL; bool as 't'/'f'; ints/reals // formatted as text; strings bound in BINARY format with an explicit length, so they are // binary-safe (embedded NULs survive, unlike NUL-terminated text params). On any error the // message is copied into result via calogFail and NULL is returned. static PGresult *pgRun(PGconn *conn, const CalogValueT *sqlValue, const CalogValueT *params, int32_t paramCount, int resultFormat, CalogValueT *result) { const char **values; int *lengths; int *formats; char *scratch; PGresult *res; int32_t index; values = NULL; lengths = NULL; formats = NULL; scratch = NULL; if (paramCount > 0) { values = (const char **)calloc((size_t)paramCount, sizeof(char *)); lengths = (int *)calloc((size_t)paramCount, sizeof(int)); formats = (int *)calloc((size_t)paramCount, sizeof(int)); scratch = (char *)malloc((size_t)paramCount * DB_NUM_SCRATCH); if (values == NULL || lengths == NULL || formats == NULL || scratch == NULL) { free((void *)values); free(lengths); free(formats); free(scratch); calogFail(result, calogErrOomE, "dbExec/dbQuery: out of memory"); return NULL; } } for (index = 0; index < paramCount; index++) { const CalogValueT *param; char *buffer; param = ¶ms[index]; buffer = &scratch[index * DB_NUM_SCRATCH]; switch (param->type) { case calogNilE: values[index] = NULL; break; case calogBoolE: values[index] = param->as.b ? "t" : "f"; break; case calogIntE: snprintf(buffer, DB_NUM_SCRATCH, "%lld", (long long)param->as.i); values[index] = buffer; break; case calogRealE: snprintf(buffer, DB_NUM_SCRATCH, "%.17g", param->as.r); values[index] = buffer; break; case calogStringE: values[index] = param->as.s.bytes; lengths[index] = (int)param->as.s.length; formats[index] = 1; break; default: free((void *)values); free(lengths); free(formats); free(scratch); calogFail(result, calogErrTypeE, "dbExec/dbQuery: a bound parameter must be nil, bool, int, real, or string"); return NULL; } } res = PQexecParams(conn, sqlValue->as.s.bytes, paramCount, NULL, values, lengths, formats, resultFormat); free((void *)values); free(lengths); free(formats); free(scratch); if (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK) { calogFail(result, calogErrArgE, PQerrorMessage(conn)); PQclear(res); return NULL; } return res; } #endif #ifdef CALOG_WITH_SQLITE static int32_t sqliteColumn(sqlite3_stmt *stmt, int column, CalogValueT *out) { calogValueNil(out); switch (sqlite3_column_type(stmt, column)) { case SQLITE_INTEGER: calogValueInt(out, (int64_t)sqlite3_column_int64(stmt, column)); return calogOkE; case SQLITE_FLOAT: calogValueReal(out, sqlite3_column_double(stmt, column)); return calogOkE; case SQLITE_TEXT: return calogValueString(out, (const char *)sqlite3_column_text(stmt, column), (int64_t)sqlite3_column_bytes(stmt, column)); case SQLITE_BLOB: return calogValueString(out, (const char *)sqlite3_column_blob(stmt, column), (int64_t)sqlite3_column_bytes(stmt, column)); case SQLITE_NULL: default: return calogOkE; } } static int32_t sqliteExec(sqlite3 *db, const CalogValueT *sqlValue, const CalogValueT *params, int32_t paramCount, CalogValueT *result) { sqlite3_stmt *stmt; int32_t status; int rc; status = sqlitePrepare(db, sqlValue, params, paramCount, &stmt, result); if (status != calogOkE) { return status; } do { rc = sqlite3_step(stmt); } while (rc == SQLITE_ROW); if (rc != SQLITE_DONE) { status = calogFail(result, calogErrArgE, sqlite3_errmsg(db)); sqlite3_finalize(stmt); return status; } calogValueInt(result, (int64_t)sqlite3_changes(db)); sqlite3_finalize(stmt); return calogOkE; } static int32_t sqliteOpen(DbLibT *lib, const char *path, CalogValueT *result) { sqlite3 *db; int64_t handle; db = NULL; if (sqlite3_open(path, &db) != SQLITE_OK) { int32_t status; status = calogFail(result, calogErrArgE, sqlite3_errmsg(db)); sqlite3_close(db); return status; } handle = calogHandleAdd(lib->handles, DB_TYPE_SQLITE, db); if (handle == 0) { sqlite3_close(db); return calogFail(result, calogErrOomE, "dbOpen: out of memory"); } calogValueInt(result, handle); return calogOkE; } // Prepare sql and bind params[0..paramCount) as ?1..?N. Binary-safe strings bind as text // with an explicit length (preserving embedded NULs); an aggregate/function param errors. static int32_t sqlitePrepare(sqlite3 *db, const CalogValueT *sqlValue, const CalogValueT *params, int32_t paramCount, sqlite3_stmt **out, CalogValueT *result) { sqlite3_stmt *stmt; int32_t index; *out = NULL; if (sqlite3_prepare_v2(db, sqlValue->as.s.bytes, (int)sqlValue->as.s.length, &stmt, NULL) != SQLITE_OK) { return calogFail(result, calogErrArgE, sqlite3_errmsg(db)); } for (index = 0; index < paramCount; index++) { const CalogValueT *param; int rc; param = ¶ms[index]; switch (param->type) { case calogNilE: rc = sqlite3_bind_null(stmt, index + 1); break; case calogBoolE: rc = sqlite3_bind_int(stmt, index + 1, param->as.b ? 1 : 0); break; case calogIntE: rc = sqlite3_bind_int64(stmt, index + 1, param->as.i); break; case calogRealE: rc = sqlite3_bind_double(stmt, index + 1, param->as.r); break; case calogStringE: rc = sqlite3_bind_text(stmt, index + 1, param->as.s.bytes, (int)param->as.s.length, SQLITE_TRANSIENT); break; default: sqlite3_finalize(stmt); return calogFail(result, calogErrTypeE, "dbExec/dbQuery: a bound parameter must be nil, bool, int, real, or string"); } if (rc != SQLITE_OK) { int32_t status; status = calogFail(result, calogErrArgE, sqlite3_errmsg(db)); sqlite3_finalize(stmt); return status; } } *out = stmt; return calogOkE; } static int32_t sqliteQuery(sqlite3 *db, const CalogValueT *sqlValue, const CalogValueT *params, int32_t paramCount, CalogValueT *result) { sqlite3_stmt *stmt; CalogAggT *rows; int32_t status; int rc; status = sqlitePrepare(db, sqlValue, params, paramCount, &stmt, result); if (status != calogOkE) { return status; } status = calogAggCreate(&rows, calogListE); if (status != calogOkE) { sqlite3_finalize(stmt); return calogFail(result, status, "dbQuery: out of memory"); } while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) { CalogAggT *rowMap; CalogValueT rowValue; int columns; int column; status = calogAggCreate(&rowMap, calogMapE); if (status != calogOkE) { calogAggFree(rows); sqlite3_finalize(stmt); return calogFail(result, status, "dbQuery: out of memory"); } columns = sqlite3_column_count(stmt); for (column = 0; column < columns; column++) { CalogValueT key; CalogValueT value; const char *name; name = sqlite3_column_name(stmt, column); status = calogValueString(&key, name, (int64_t)strlen(name)); if (status == calogOkE) { status = sqliteColumn(stmt, column, &value); } if (status == calogOkE) { status = calogAggSet(rowMap, &key, &value); } if (status != calogOkE) { calogValueFree(&key); calogValueFree(&value); calogAggFree(rowMap); calogAggFree(rows); sqlite3_finalize(stmt); return calogFail(result, status, "dbQuery: failed to build a row"); } } calogValueAgg(&rowValue, rowMap); status = calogAggPush(rows, &rowValue); if (status != calogOkE) { calogValueFree(&rowValue); calogAggFree(rows); sqlite3_finalize(stmt); return calogFail(result, status, "dbQuery: out of memory"); } } if (rc != SQLITE_DONE) { status = calogFail(result, calogErrArgE, sqlite3_errmsg(db)); calogAggFree(rows); sqlite3_finalize(stmt); return status; } calogValueAgg(result, rows); sqlite3_finalize(stmt); return calogOkE; } #endif