calog/libs/calogDb.c

1099 lines
40 KiB
C

// calogDb.c -- calog database library (see calogDb.h).
//
// The four natives are thin dispatchers: dbOpen picks a backend by driver name, and
// dbExec/dbQuery/dbClose recover the backend from the handle's type tag. Each backend
// (SQLite, Postgres, MySQL) is compiled in behind CALOG_WITH_SQLITE / _PG / _MYSQL and
// provides open/exec/query plus a marshalling of its rows to CalogValueT.
#define _POSIX_C_SOURCE 200809L
#include "calogDb.h"
#include "calogHandle.h"
#include <pthread.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#ifdef CALOG_WITH_SQLITE
#include "sqlite3.h"
#endif
#ifdef CALOG_WITH_PG
#include "libpq-fe.h"
// A few stable pg_type OIDs so a typed column comes back typed (everything else, including
// numeric/date/json, arrives as its text string).
#define PG_OID_BOOL 16
#define PG_OID_BYTEA 17
#define PG_OID_INT8 20
#define PG_OID_INT2 21
#define PG_OID_INT4 23
#define PG_OID_FLOAT4 700
#define PG_OID_FLOAT8 701
#endif
#ifdef CALOG_WITH_MYSQL
#include "mysql.h"
#endif
// Handle type tags, distinct across the whole registry so a stray handle of the wrong
// backend fails to resolve.
#define DB_TYPE_SQLITE 1u
#define DB_TYPE_PG 2u
#define DB_TYPE_MYSQL 3u
#define DB_NUM_SCRATCH 32
// Process-wide DB library state: one connection registry shared by every runtime that
// registers the natives (handles are globally-unique ints, so sharing is safe). refCount is
// one per registered runtime, so the singleton is freed only when the last one shuts down.
typedef struct DbLibT {
CalogHandleTableT *handles;
int32_t refCount;
} DbLibT;
static pthread_mutex_t gDbLibMutex = PTHREAD_MUTEX_INITIALIZER;
static DbLibT *gDbLib = NULL;
static int32_t dbClose(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData);
static void dbCloser(uint32_t type, void *resource);
static int32_t dbExec(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData);
static int32_t dbOpen(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData);
static int32_t dbQuery(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData);
static uint32_t dbResolve(DbLibT *lib, int64_t handle, void **connOut);
#ifdef CALOG_WITH_MYSQL
static void mysqlCleanup(MYSQL_BIND *binds, char **buffers, unsigned long *lengths, my_bool *isNull, my_bool *errors, unsigned int columnCount, MYSQL_RES *meta, MYSQL_STMT *stmt);
static int32_t mysqlColumn(enum enum_field_types type, const char *bytes, unsigned long length, CalogValueT *out);
static int32_t mysqlExec(MYSQL *conn, const CalogValueT *sqlValue, const CalogValueT *params, int32_t paramCount, CalogValueT *result);
static int32_t mysqlOpen(DbLibT *lib, const char *conninfo, CalogValueT *result);
static MYSQL_STMT *mysqlPrepare(MYSQL *conn, const CalogValueT *sqlValue, const CalogValueT *params, int32_t paramCount, CalogValueT *result);
static int32_t mysqlQuery(MYSQL *conn, const CalogValueT *sqlValue, const CalogValueT *params, int32_t paramCount, CalogValueT *result);
#endif
#ifdef CALOG_WITH_PG
static int32_t pgColumn(PGresult *res, int row, int column, CalogValueT *out);
static int32_t pgExec(PGconn *conn, const CalogValueT *sqlValue, const CalogValueT *params, int32_t paramCount, CalogValueT *result);
static int32_t pgOpen(DbLibT *lib, const char *conninfo, CalogValueT *result);
static int32_t pgQuery(PGconn *conn, const CalogValueT *sqlValue, const CalogValueT *params, int32_t paramCount, CalogValueT *result);
static PGresult *pgRun(PGconn *conn, const CalogValueT *sqlValue, const CalogValueT *params, int32_t paramCount, int resultFormat, CalogValueT *result);
#endif
#ifdef CALOG_WITH_SQLITE
static int32_t sqliteColumn(sqlite3_stmt *stmt, int column, CalogValueT *out);
static int32_t sqliteExec(sqlite3 *db, const CalogValueT *sqlValue, const CalogValueT *params, int32_t paramCount, CalogValueT *result);
static int32_t sqliteOpen(DbLibT *lib, const char *path, CalogValueT *result);
static int32_t sqlitePrepare(sqlite3 *db, const CalogValueT *sqlValue, const CalogValueT *params, int32_t paramCount, sqlite3_stmt **out, CalogValueT *result);
static int32_t sqliteQuery(sqlite3 *db, const CalogValueT *sqlValue, const CalogValueT *params, int32_t paramCount, CalogValueT *result);
#endif
int32_t calogDbRegister(CalogT *calog) {
pthread_mutex_lock(&gDbLibMutex);
if (gDbLib == NULL) {
DbLibT *lib;
lib = (DbLibT *)calloc(1, sizeof(*lib));
if (lib == NULL) {
pthread_mutex_unlock(&gDbLibMutex);
return calogErrOomE;
}
lib->handles = calogHandleTableCreate();
if (lib->handles == NULL) {
free(lib);
pthread_mutex_unlock(&gDbLibMutex);
return calogErrOomE;
}
gDbLib = lib;
}
gDbLib->refCount++;
pthread_mutex_unlock(&gDbLibMutex);
calogRegisterInline(calog, "dbOpen", dbOpen, gDbLib);
calogRegisterInline(calog, "dbExec", dbExec, gDbLib);
calogRegisterInline(calog, "dbQuery", dbQuery, gDbLib);
calogRegisterInline(calog, "dbClose", dbClose, gDbLib);
return calogOkE;
}
void calogDbShutdown(void) {
pthread_mutex_lock(&gDbLibMutex);
if (gDbLib == NULL) {
pthread_mutex_unlock(&gDbLibMutex);
return;
}
gDbLib->refCount--;
if (gDbLib->refCount <= 0) {
calogHandleTableDestroy(gDbLib->handles, dbCloser);
free(gDbLib);
gDbLib = NULL;
}
pthread_mutex_unlock(&gDbLibMutex);
}
static int32_t dbClose(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData) {
DbLibT *lib;
void *conn;
uint32_t backend;
lib = (DbLibT *)userData;
calogValueNil(result);
if (argCount != 1 || args[0].type != calogIntE) {
return calogFail(result, calogErrArgE, "dbClose expects a connection handle");
}
backend = dbResolve(lib, args[0].as.i, &conn);
if (backend == 0) {
return calogFail(result, calogErrArgE, "dbClose: invalid connection handle");
}
// Close only the connection that this call atomically removed from the table -- a
// concurrent dbClose of the same handle gets NULL here and must not double-close.
conn = calogHandleRemove(lib->handles, args[0].as.i, backend);
if (conn == NULL) {
return calogFail(result, calogErrArgE, "dbClose: invalid connection handle");
}
dbCloser(backend, conn);
return calogOkE;
}
static void dbCloser(uint32_t type, void *resource) {
switch (type) {
#ifdef CALOG_WITH_SQLITE
case DB_TYPE_SQLITE:
sqlite3_close((sqlite3 *)resource);
break;
#endif
#ifdef CALOG_WITH_PG
case DB_TYPE_PG:
PQfinish((PGconn *)resource);
break;
#endif
#ifdef CALOG_WITH_MYSQL
case DB_TYPE_MYSQL:
mysql_close((MYSQL *)resource);
break;
#endif
default:
break;
}
}
static int32_t dbExec(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData) {
DbLibT *lib;
void *conn;
uint32_t backend;
lib = (DbLibT *)userData;
calogValueNil(result);
if (argCount < 2 || args[0].type != calogIntE || args[1].type != calogStringE) {
return calogFail(result, calogErrArgE, "dbExec expects (handle, sql, ...params)");
}
backend = dbResolve(lib, args[0].as.i, &conn);
switch (backend) {
#ifdef CALOG_WITH_SQLITE
case DB_TYPE_SQLITE:
return sqliteExec((sqlite3 *)conn, &args[1], &args[2], argCount - 2, result);
#endif
#ifdef CALOG_WITH_PG
case DB_TYPE_PG:
return pgExec((PGconn *)conn, &args[1], &args[2], argCount - 2, result);
#endif
#ifdef CALOG_WITH_MYSQL
case DB_TYPE_MYSQL:
return mysqlExec((MYSQL *)conn, &args[1], &args[2], argCount - 2, result);
#endif
default:
return calogFail(result, calogErrArgE, "dbExec: invalid connection handle");
}
}
static int32_t dbOpen(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData) {
DbLibT *lib;
const char *driver;
lib = (DbLibT *)userData;
calogValueNil(result);
if (argCount != 2 || args[0].type != calogStringE || args[1].type != calogStringE) {
return calogFail(result, calogErrArgE, "dbOpen expects (driver, connection)");
}
driver = args[0].as.s.bytes;
#ifdef CALOG_WITH_SQLITE
if (strcmp(driver, "sqlite") == 0) {
return sqliteOpen(lib, args[1].as.s.bytes, result);
}
#endif
#ifdef CALOG_WITH_PG
if (strcmp(driver, "postgres") == 0) {
return pgOpen(lib, args[1].as.s.bytes, result);
}
#endif
#ifdef CALOG_WITH_MYSQL
if (strcmp(driver, "mysql") == 0) {
return mysqlOpen(lib, args[1].as.s.bytes, result);
}
#endif
return calogFail(result, calogErrUnsupportedE, "dbOpen: unknown or uncompiled driver");
}
static int32_t dbQuery(CalogValueT *args, int32_t argCount, CalogValueT *result, void *userData) {
DbLibT *lib;
void *conn;
uint32_t backend;
lib = (DbLibT *)userData;
calogValueNil(result);
if (argCount < 2 || args[0].type != calogIntE || args[1].type != calogStringE) {
return calogFail(result, calogErrArgE, "dbQuery expects (handle, sql, ...params)");
}
backend = dbResolve(lib, args[0].as.i, &conn);
switch (backend) {
#ifdef CALOG_WITH_SQLITE
case DB_TYPE_SQLITE:
return sqliteQuery((sqlite3 *)conn, &args[1], &args[2], argCount - 2, result);
#endif
#ifdef CALOG_WITH_PG
case DB_TYPE_PG:
return pgQuery((PGconn *)conn, &args[1], &args[2], argCount - 2, result);
#endif
#ifdef CALOG_WITH_MYSQL
case DB_TYPE_MYSQL:
return mysqlQuery((MYSQL *)conn, &args[1], &args[2], argCount - 2, result);
#endif
default:
return calogFail(result, calogErrArgE, "dbQuery: invalid connection handle");
}
}
// Resolve a handle to its backend type tag and connection pointer, or 0 if not found.
static uint32_t dbResolve(DbLibT *lib, int64_t handle, void **connOut) {
void *conn;
*connOut = NULL;
#ifdef CALOG_WITH_SQLITE
conn = calogHandleGet(lib->handles, handle, DB_TYPE_SQLITE);
if (conn != NULL) {
*connOut = conn;
return DB_TYPE_SQLITE;
}
#endif
#ifdef CALOG_WITH_PG
conn = calogHandleGet(lib->handles, handle, DB_TYPE_PG);
if (conn != NULL) {
*connOut = conn;
return DB_TYPE_PG;
}
#endif
#ifdef CALOG_WITH_MYSQL
conn = calogHandleGet(lib->handles, handle, DB_TYPE_MYSQL);
if (conn != NULL) {
*connOut = conn;
return DB_TYPE_MYSQL;
}
#endif
(void)conn;
return 0;
}
#ifdef CALOG_WITH_MYSQL
// Stable storage for a bound scalar parameter during mysql_stmt_execute.
typedef union MysqlScalarT {
long long i;
double d;
char b;
} MysqlScalarT;
// Free everything a query allocated: the per-column buffers, the four parallel arrays, the
// result metadata, and the statement. Each argument may be NULL.
static void mysqlCleanup(MYSQL_BIND *binds, char **buffers, unsigned long *lengths, my_bool *isNull, my_bool *errors, unsigned int columnCount, MYSQL_RES *meta, MYSQL_STMT *stmt) {
unsigned int column;
if (buffers != NULL) {
for (column = 0; column < columnCount; column++) {
free(buffers[column]);
}
}
free(binds);
free(buffers);
free(lengths);
free(isNull);
free(errors);
if (meta != NULL) {
mysql_free_result(meta);
}
mysql_stmt_close(stmt);
}
// Marshal a MySQL cell (fetched as its string form) to a CalogValueT, typed by the field's
// SQL type: integer types -> int, float/double -> real, everything else (text, blob,
// decimal, dates) -> a binary-safe string. bytes is NUL-terminated at [length].
static int32_t mysqlColumn(enum enum_field_types type, const char *bytes, unsigned long length, CalogValueT *out) {
calogValueNil(out);
switch (type) {
case MYSQL_TYPE_TINY:
case MYSQL_TYPE_SHORT:
case MYSQL_TYPE_LONG:
case MYSQL_TYPE_LONGLONG:
case MYSQL_TYPE_INT24:
case MYSQL_TYPE_YEAR:
calogValueInt(out, (int64_t)strtoll(bytes, NULL, 10));
return calogOkE;
case MYSQL_TYPE_FLOAT:
case MYSQL_TYPE_DOUBLE:
calogValueReal(out, strtod(bytes, NULL));
return calogOkE;
default:
return calogValueString(out, bytes, (int64_t)length);
}
}
static int32_t mysqlExec(MYSQL *conn, const CalogValueT *sqlValue, const CalogValueT *params, int32_t paramCount, CalogValueT *result) {
MYSQL_STMT *stmt;
stmt = mysqlPrepare(conn, sqlValue, params, paramCount, result);
if (stmt == NULL) {
return calogErrArgE;
}
calogValueInt(result, (int64_t)mysql_stmt_affected_rows(stmt));
mysql_stmt_close(stmt);
return calogOkE;
}
// A libpq-style "key=value key=value" conninfo, so the driver's connection string is
// uniform: host, user, password, dbname, port, socket.
static int32_t mysqlOpen(DbLibT *lib, const char *conninfo, CalogValueT *result) {
MYSQL *conn;
int64_t handle;
char buffer[512];
char *host;
char *user;
char *password;
char *dbname;
char *socket;
char *sslmode;
char *token;
char *saveptr;
unsigned int port;
my_bool sslEnforce;
my_bool sslVerify;
host = NULL;
user = NULL;
password = NULL;
dbname = NULL;
socket = NULL;
sslmode = NULL;
saveptr = NULL;
port = 0;
snprintf(buffer, sizeof(buffer), "%s", conninfo);
token = strtok_r(buffer, " ", &saveptr);
while (token != NULL) {
char *equals;
equals = strchr(token, '=');
if (equals != NULL) {
*equals = '\0';
if (strcmp(token, "host") == 0) {
host = equals + 1;
} else if (strcmp(token, "user") == 0) {
user = equals + 1;
} else if (strcmp(token, "password") == 0) {
password = equals + 1;
} else if (strcmp(token, "dbname") == 0) {
dbname = equals + 1;
} else if (strcmp(token, "socket") == 0) {
socket = equals + 1;
} else if (strcmp(token, "sslmode") == 0) {
sslmode = equals + 1;
} else if (strcmp(token, "port") == 0) {
port = (unsigned int)strtoul(equals + 1, NULL, 10);
}
}
token = strtok_r(NULL, " ", &saveptr);
}
conn = mysql_init(NULL);
if (conn == NULL) {
return calogFail(result, calogErrOomE, "dbOpen: mysql_init failed");
}
// The connector otherwise defaults to require-TLS + verify, which fails against a plain
// server. sslmode selects the posture (default "prefer"):
// "disable" -> plain, no TLS.
// "prefer" -> use TLS if the server offers it, else fall back to plain; no certificate
// verification (matches libpq's default sslmode=prefer).
// "require" -> require TLS and verify the server certificate.
// Any other value is REJECTED, so an explicit stronger request is never silently
// downgraded to the unverified default.
if (sslmode == NULL || strcmp(sslmode, "prefer") == 0) {
sslEnforce = 1;
sslVerify = 0;
} else if (strcmp(sslmode, "disable") == 0) {
sslEnforce = 0;
sslVerify = 0;
} else if (strcmp(sslmode, "require") == 0) {
sslEnforce = 1;
sslVerify = 1;
} else {
mysql_close(conn);
return calogFail(result, calogErrArgE, "dbOpen: mysql sslmode must be 'disable', 'prefer', or 'require'");
}
mysql_options(conn, MYSQL_OPT_SSL_ENFORCE, &sslEnforce);
mysql_options(conn, MYSQL_OPT_SSL_VERIFY_SERVER_CERT, &sslVerify);
if (mysql_real_connect(conn, host, user, password, dbname, port, socket, 0) == NULL) {
int32_t status;
status = calogFail(result, calogErrArgE, mysql_error(conn));
mysql_close(conn);
return status;
}
handle = calogHandleAdd(lib->handles, DB_TYPE_MYSQL, conn);
if (handle == 0) {
mysql_close(conn);
return calogFail(result, calogErrOomE, "dbOpen: out of memory");
}
calogValueInt(result, handle);
return calogOkE;
}
// Init, prepare, bind params as ?, and execute. Returns the executed statement (the caller
// closes it) or NULL with the error set into result.
static MYSQL_STMT *mysqlPrepare(MYSQL *conn, const CalogValueT *sqlValue, const CalogValueT *params, int32_t paramCount, CalogValueT *result) {
MYSQL_STMT *stmt;
MYSQL_BIND *binds;
MysqlScalarT *scalars;
unsigned long *lengths;
int32_t index;
int rc;
stmt = mysql_stmt_init(conn);
if (stmt == NULL) {
calogFail(result, calogErrOomE, "dbExec/dbQuery: mysql_stmt_init failed");
return NULL;
}
if (mysql_stmt_prepare(stmt, sqlValue->as.s.bytes, (unsigned long)sqlValue->as.s.length) != 0) {
calogFail(result, calogErrArgE, mysql_stmt_error(stmt));
mysql_stmt_close(stmt);
return NULL;
}
if (paramCount == 0) {
if (mysql_stmt_execute(stmt) != 0) {
calogFail(result, calogErrArgE, mysql_stmt_error(stmt));
mysql_stmt_close(stmt);
return NULL;
}
return stmt;
}
binds = (MYSQL_BIND *)calloc((size_t)paramCount, sizeof(MYSQL_BIND));
scalars = (MysqlScalarT *)calloc((size_t)paramCount, sizeof(MysqlScalarT));
lengths = (unsigned long *)calloc((size_t)paramCount, sizeof(unsigned long));
if (binds == NULL || scalars == NULL || lengths == NULL) {
free(binds);
free(scalars);
free(lengths);
mysql_stmt_close(stmt);
calogFail(result, calogErrOomE, "dbExec/dbQuery: out of memory");
return NULL;
}
for (index = 0; index < paramCount; index++) {
const CalogValueT *param;
param = &params[index];
switch (param->type) {
case calogNilE:
binds[index].buffer_type = MYSQL_TYPE_NULL;
break;
case calogBoolE:
scalars[index].b = param->as.b ? 1 : 0;
binds[index].buffer_type = MYSQL_TYPE_TINY;
binds[index].buffer = &scalars[index].b;
break;
case calogIntE:
scalars[index].i = (long long)param->as.i;
binds[index].buffer_type = MYSQL_TYPE_LONGLONG;
binds[index].buffer = &scalars[index].i;
break;
case calogRealE:
scalars[index].d = param->as.r;
binds[index].buffer_type = MYSQL_TYPE_DOUBLE;
binds[index].buffer = &scalars[index].d;
break;
case calogStringE:
lengths[index] = (unsigned long)param->as.s.length;
binds[index].buffer_type = MYSQL_TYPE_STRING;
binds[index].buffer = (void *)param->as.s.bytes;
binds[index].buffer_length = lengths[index];
binds[index].length = &lengths[index];
break;
default:
free(binds);
free(scalars);
free(lengths);
mysql_stmt_close(stmt);
calogFail(result, calogErrTypeE, "dbExec/dbQuery: a bound parameter must be nil, bool, int, real, or string");
return NULL;
}
}
rc = mysql_stmt_bind_param(stmt, binds);
if (rc == 0) {
rc = mysql_stmt_execute(stmt);
}
free(binds);
free(scalars);
free(lengths);
if (rc != 0) {
calogFail(result, calogErrArgE, mysql_stmt_error(stmt));
mysql_stmt_close(stmt);
return NULL;
}
return stmt;
}
static int32_t mysqlQuery(MYSQL *conn, const CalogValueT *sqlValue, const CalogValueT *params, int32_t paramCount, CalogValueT *result) {
MYSQL_STMT *stmt;
MYSQL_RES *meta;
MYSQL_FIELD *fields;
MYSQL_BIND *binds;
char **buffers;
unsigned long *lengths;
my_bool *isNull;
my_bool *errors;
my_bool updateMaxLength;
CalogAggT *rows;
unsigned int columnCount;
unsigned int column;
int32_t status;
int rc;
stmt = mysqlPrepare(conn, sqlValue, params, paramCount, result);
if (stmt == NULL) {
return calogErrArgE;
}
meta = mysql_stmt_result_metadata(stmt);
if (meta == NULL) {
// No result set; yield an empty list.
status = calogAggCreate(&rows, calogListE);
if (status != calogOkE) {
mysql_stmt_close(stmt);
return calogFail(result, status, "dbQuery: out of memory");
}
calogValueAgg(result, rows);
mysql_stmt_close(stmt);
return calogOkE;
}
// Buffer the full result client-side with max_length computed, so each column can bind a
// real, correctly-sized string buffer -- the connector then converts every value
// (including numerics) to its string form in a single fetch.
updateMaxLength = 1;
mysql_stmt_attr_set(stmt, STMT_ATTR_UPDATE_MAX_LENGTH, &updateMaxLength);
if (mysql_stmt_store_result(stmt) != 0) {
status = calogFail(result, calogErrArgE, mysql_stmt_error(stmt));
mysqlCleanup(NULL, NULL, NULL, NULL, NULL, 0, meta, stmt);
return status;
}
columnCount = mysql_num_fields(meta);
fields = mysql_fetch_fields(meta);
binds = (MYSQL_BIND *)calloc(columnCount, sizeof(MYSQL_BIND));
buffers = (char **)calloc(columnCount, sizeof(char *));
lengths = (unsigned long *)calloc(columnCount, sizeof(unsigned long));
isNull = (my_bool *)calloc(columnCount, sizeof(my_bool));
errors = (my_bool *)calloc(columnCount, sizeof(my_bool));
if (binds == NULL || buffers == NULL || lengths == NULL || isNull == NULL || errors == NULL) {
mysqlCleanup(binds, buffers, lengths, isNull, errors, columnCount, meta, stmt);
return calogFail(result, calogErrOomE, "dbQuery: out of memory");
}
for (column = 0; column < columnCount; column++) {
unsigned long size;
size = (unsigned long)fields[column].max_length + 1;
buffers[column] = (char *)malloc(size);
if (buffers[column] == NULL) {
mysqlCleanup(binds, buffers, lengths, isNull, errors, columnCount, meta, stmt);
return calogFail(result, calogErrOomE, "dbQuery: out of memory");
}
binds[column].buffer_type = MYSQL_TYPE_STRING;
binds[column].buffer = buffers[column];
binds[column].buffer_length = size;
binds[column].length = &lengths[column];
binds[column].is_null = &isNull[column];
binds[column].error = &errors[column];
}
if (mysql_stmt_bind_result(stmt, binds) != 0) {
status = calogFail(result, calogErrArgE, mysql_stmt_error(stmt));
mysqlCleanup(binds, buffers, lengths, isNull, errors, columnCount, meta, stmt);
return status;
}
status = calogAggCreate(&rows, calogListE);
if (status != calogOkE) {
mysqlCleanup(binds, buffers, lengths, isNull, errors, columnCount, meta, stmt);
return calogFail(result, status, "dbQuery: out of memory");
}
while ((rc = mysql_stmt_fetch(stmt)) == 0 || rc == MYSQL_DATA_TRUNCATED) {
CalogAggT *rowMap;
CalogValueT rowValue;
status = calogAggCreate(&rowMap, calogMapE);
if (status != calogOkE) {
calogAggFree(rows);
mysqlCleanup(binds, buffers, lengths, isNull, errors, columnCount, meta, stmt);
return calogFail(result, status, "dbQuery: out of memory");
}
for (column = 0; column < columnCount; column++) {
CalogValueT key;
CalogValueT value;
calogValueNil(&value);
status = calogValueString(&key, fields[column].name, (int64_t)strlen(fields[column].name));
if (status == calogOkE) {
if (isNull[column]) {
calogValueNil(&value);
} else {
buffers[column][lengths[column]] = '\0';
status = mysqlColumn(fields[column].type, buffers[column], lengths[column], &value);
}
}
if (status == calogOkE) {
status = calogAggSet(rowMap, &key, &value);
}
if (status != calogOkE) {
calogValueFree(&key);
calogValueFree(&value);
calogAggFree(rowMap);
calogAggFree(rows);
mysqlCleanup(binds, buffers, lengths, isNull, errors, columnCount, meta, stmt);
return calogFail(result, status, "dbQuery: failed to build a row");
}
}
calogValueAgg(&rowValue, rowMap);
status = calogAggPush(rows, &rowValue);
if (status != calogOkE) {
calogValueFree(&rowValue);
calogAggFree(rows);
mysqlCleanup(binds, buffers, lengths, isNull, errors, columnCount, meta, stmt);
return calogFail(result, status, "dbQuery: out of memory");
}
}
if (rc != MYSQL_NO_DATA) {
status = calogFail(result, calogErrArgE, mysql_stmt_error(stmt));
calogAggFree(rows);
mysqlCleanup(binds, buffers, lengths, isNull, errors, columnCount, meta, stmt);
return status;
}
mysqlCleanup(binds, buffers, lengths, isNull, errors, columnCount, meta, stmt);
calogValueAgg(result, rows);
return calogOkE;
}
#endif
#ifdef CALOG_WITH_PG
// Marshal a Postgres cell (text result format) to a CalogValueT: typed for the common
// numeric/bool OIDs, bytea decoded from its "\x.." hex, everything else as its text bytes.
static int32_t pgColumn(PGresult *res, int row, int column, CalogValueT *out) {
const char *text;
int length;
Oid oid;
calogValueNil(out);
if (PQgetisnull(res, row, column)) {
return calogOkE;
}
text = PQgetvalue(res, row, column);
length = PQgetlength(res, row, column);
oid = PQftype(res, column);
switch (oid) {
case PG_OID_INT2:
case PG_OID_INT4:
case PG_OID_INT8:
calogValueInt(out, (int64_t)strtoll(text, NULL, 10));
return calogOkE;
case PG_OID_FLOAT4:
case PG_OID_FLOAT8:
calogValueReal(out, strtod(text, NULL));
return calogOkE;
case PG_OID_BOOL:
calogValueBool(out, text[0] == 't');
return calogOkE;
case PG_OID_BYTEA:
if (length >= 2 && text[0] == '\\' && text[1] == 'x') {
int32_t status;
char *bytes;
int count;
int index;
count = (length - 2) / 2;
bytes = (char *)malloc((size_t)(count > 0 ? count : 1));
if (bytes == NULL) {
return calogErrOomE;
}
for (index = 0; index < count; index++) {
bytes[index] = (char)(strtol((char[]){ text[2 + index * 2], text[3 + index * 2], 0 }, NULL, 16));
}
status = calogValueString(out, bytes, (int64_t)count);
free(bytes);
return status;
}
return calogValueString(out, text, (int64_t)length);
default:
return calogValueString(out, text, (int64_t)length);
}
}
static int32_t pgExec(PGconn *conn, const CalogValueT *sqlValue, const CalogValueT *params, int32_t paramCount, CalogValueT *result) {
PGresult *res;
res = pgRun(conn, sqlValue, params, paramCount, 0, result);
if (res == NULL) {
return calogErrArgE;
}
calogValueInt(result, (int64_t)strtoll(PQcmdTuples(res), NULL, 10));
PQclear(res);
return calogOkE;
}
static int32_t pgOpen(DbLibT *lib, const char *conninfo, CalogValueT *result) {
PGconn *conn;
int64_t handle;
conn = PQconnectdb(conninfo);
if (PQstatus(conn) != CONNECTION_OK) {
int32_t status;
status = calogFail(result, calogErrArgE, PQerrorMessage(conn));
PQfinish(conn);
return status;
}
handle = calogHandleAdd(lib->handles, DB_TYPE_PG, conn);
if (handle == 0) {
PQfinish(conn);
return calogFail(result, calogErrOomE, "dbOpen: out of memory");
}
calogValueInt(result, handle);
return calogOkE;
}
static int32_t pgQuery(PGconn *conn, const CalogValueT *sqlValue, const CalogValueT *params, int32_t paramCount, CalogValueT *result) {
PGresult *res;
CalogAggT *rows;
int rowCount;
int columnCount;
int row;
int32_t status;
res = pgRun(conn, sqlValue, params, paramCount, 0, result);
if (res == NULL) {
return calogErrArgE;
}
status = calogAggCreate(&rows, calogListE);
if (status != calogOkE) {
PQclear(res);
return calogFail(result, status, "dbQuery: out of memory");
}
rowCount = PQntuples(res);
columnCount = PQnfields(res);
for (row = 0; row < rowCount; row++) {
CalogAggT *rowMap;
CalogValueT rowValue;
int column;
status = calogAggCreate(&rowMap, calogMapE);
if (status != calogOkE) {
calogAggFree(rows);
PQclear(res);
return calogFail(result, status, "dbQuery: out of memory");
}
for (column = 0; column < columnCount; column++) {
CalogValueT key;
CalogValueT value;
const char *name;
name = PQfname(res, column);
status = calogValueString(&key, name, (int64_t)strlen(name));
if (status == calogOkE) {
status = pgColumn(res, row, column, &value);
}
if (status == calogOkE) {
status = calogAggSet(rowMap, &key, &value);
}
if (status != calogOkE) {
calogValueFree(&key);
calogValueFree(&value);
calogAggFree(rowMap);
calogAggFree(rows);
PQclear(res);
return calogFail(result, status, "dbQuery: failed to build a row");
}
}
calogValueAgg(&rowValue, rowMap);
status = calogAggPush(rows, &rowValue);
if (status != calogOkE) {
calogValueFree(&rowValue);
calogAggFree(rows);
PQclear(res);
return calogFail(result, status, "dbQuery: out of memory");
}
}
calogValueAgg(result, rows);
PQclear(res);
return calogOkE;
}
// Run a parameterized statement ($1..$N): nil -> SQL NULL; bool as 't'/'f'; ints/reals
// formatted as text; strings bound in BINARY format with an explicit length, so they are
// binary-safe (embedded NULs survive, unlike NUL-terminated text params). On any error the
// message is copied into result via calogFail and NULL is returned.
static PGresult *pgRun(PGconn *conn, const CalogValueT *sqlValue, const CalogValueT *params, int32_t paramCount, int resultFormat, CalogValueT *result) {
const char **values;
int *lengths;
int *formats;
char *scratch;
PGresult *res;
int32_t index;
values = NULL;
lengths = NULL;
formats = NULL;
scratch = NULL;
if (paramCount > 0) {
values = (const char **)calloc((size_t)paramCount, sizeof(char *));
lengths = (int *)calloc((size_t)paramCount, sizeof(int));
formats = (int *)calloc((size_t)paramCount, sizeof(int));
scratch = (char *)malloc((size_t)paramCount * DB_NUM_SCRATCH);
if (values == NULL || lengths == NULL || formats == NULL || scratch == NULL) {
free((void *)values);
free(lengths);
free(formats);
free(scratch);
calogFail(result, calogErrOomE, "dbExec/dbQuery: out of memory");
return NULL;
}
}
for (index = 0; index < paramCount; index++) {
const CalogValueT *param;
char *buffer;
param = &params[index];
buffer = &scratch[index * DB_NUM_SCRATCH];
switch (param->type) {
case calogNilE:
values[index] = NULL;
break;
case calogBoolE:
values[index] = param->as.b ? "t" : "f";
break;
case calogIntE:
snprintf(buffer, DB_NUM_SCRATCH, "%lld", (long long)param->as.i);
values[index] = buffer;
break;
case calogRealE:
snprintf(buffer, DB_NUM_SCRATCH, "%.17g", param->as.r);
values[index] = buffer;
break;
case calogStringE:
values[index] = param->as.s.bytes;
lengths[index] = (int)param->as.s.length;
formats[index] = 1;
break;
default:
free((void *)values);
free(lengths);
free(formats);
free(scratch);
calogFail(result, calogErrTypeE, "dbExec/dbQuery: a bound parameter must be nil, bool, int, real, or string");
return NULL;
}
}
res = PQexecParams(conn, sqlValue->as.s.bytes, paramCount, NULL, values, lengths, formats, resultFormat);
free((void *)values);
free(lengths);
free(formats);
free(scratch);
if (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK) {
calogFail(result, calogErrArgE, PQerrorMessage(conn));
PQclear(res);
return NULL;
}
return res;
}
#endif
#ifdef CALOG_WITH_SQLITE
static int32_t sqliteColumn(sqlite3_stmt *stmt, int column, CalogValueT *out) {
calogValueNil(out);
switch (sqlite3_column_type(stmt, column)) {
case SQLITE_INTEGER:
calogValueInt(out, (int64_t)sqlite3_column_int64(stmt, column));
return calogOkE;
case SQLITE_FLOAT:
calogValueReal(out, sqlite3_column_double(stmt, column));
return calogOkE;
case SQLITE_TEXT:
return calogValueString(out, (const char *)sqlite3_column_text(stmt, column), (int64_t)sqlite3_column_bytes(stmt, column));
case SQLITE_BLOB:
return calogValueString(out, (const char *)sqlite3_column_blob(stmt, column), (int64_t)sqlite3_column_bytes(stmt, column));
case SQLITE_NULL:
default:
return calogOkE;
}
}
static int32_t sqliteExec(sqlite3 *db, const CalogValueT *sqlValue, const CalogValueT *params, int32_t paramCount, CalogValueT *result) {
sqlite3_stmt *stmt;
int32_t status;
int rc;
status = sqlitePrepare(db, sqlValue, params, paramCount, &stmt, result);
if (status != calogOkE) {
return status;
}
do {
rc = sqlite3_step(stmt);
} while (rc == SQLITE_ROW);
if (rc != SQLITE_DONE) {
status = calogFail(result, calogErrArgE, sqlite3_errmsg(db));
sqlite3_finalize(stmt);
return status;
}
calogValueInt(result, (int64_t)sqlite3_changes(db));
sqlite3_finalize(stmt);
return calogOkE;
}
static int32_t sqliteOpen(DbLibT *lib, const char *path, CalogValueT *result) {
sqlite3 *db;
int64_t handle;
db = NULL;
if (sqlite3_open(path, &db) != SQLITE_OK) {
int32_t status;
status = calogFail(result, calogErrArgE, sqlite3_errmsg(db));
sqlite3_close(db);
return status;
}
handle = calogHandleAdd(lib->handles, DB_TYPE_SQLITE, db);
if (handle == 0) {
sqlite3_close(db);
return calogFail(result, calogErrOomE, "dbOpen: out of memory");
}
calogValueInt(result, handle);
return calogOkE;
}
// Prepare sql and bind params[0..paramCount) as ?1..?N. Binary-safe strings bind as text
// with an explicit length (preserving embedded NULs); an aggregate/function param errors.
static int32_t sqlitePrepare(sqlite3 *db, const CalogValueT *sqlValue, const CalogValueT *params, int32_t paramCount, sqlite3_stmt **out, CalogValueT *result) {
sqlite3_stmt *stmt;
int32_t index;
*out = NULL;
if (sqlite3_prepare_v2(db, sqlValue->as.s.bytes, (int)sqlValue->as.s.length, &stmt, NULL) != SQLITE_OK) {
return calogFail(result, calogErrArgE, sqlite3_errmsg(db));
}
for (index = 0; index < paramCount; index++) {
const CalogValueT *param;
int rc;
param = &params[index];
switch (param->type) {
case calogNilE:
rc = sqlite3_bind_null(stmt, index + 1);
break;
case calogBoolE:
rc = sqlite3_bind_int(stmt, index + 1, param->as.b ? 1 : 0);
break;
case calogIntE:
rc = sqlite3_bind_int64(stmt, index + 1, param->as.i);
break;
case calogRealE:
rc = sqlite3_bind_double(stmt, index + 1, param->as.r);
break;
case calogStringE:
rc = sqlite3_bind_text(stmt, index + 1, param->as.s.bytes, (int)param->as.s.length, SQLITE_TRANSIENT);
break;
default:
sqlite3_finalize(stmt);
return calogFail(result, calogErrTypeE, "dbExec/dbQuery: a bound parameter must be nil, bool, int, real, or string");
}
if (rc != SQLITE_OK) {
int32_t status;
status = calogFail(result, calogErrArgE, sqlite3_errmsg(db));
sqlite3_finalize(stmt);
return status;
}
}
*out = stmt;
return calogOkE;
}
static int32_t sqliteQuery(sqlite3 *db, const CalogValueT *sqlValue, const CalogValueT *params, int32_t paramCount, CalogValueT *result) {
sqlite3_stmt *stmt;
CalogAggT *rows;
int32_t status;
int rc;
status = sqlitePrepare(db, sqlValue, params, paramCount, &stmt, result);
if (status != calogOkE) {
return status;
}
status = calogAggCreate(&rows, calogListE);
if (status != calogOkE) {
sqlite3_finalize(stmt);
return calogFail(result, status, "dbQuery: out of memory");
}
while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) {
CalogAggT *rowMap;
CalogValueT rowValue;
int columns;
int column;
status = calogAggCreate(&rowMap, calogMapE);
if (status != calogOkE) {
calogAggFree(rows);
sqlite3_finalize(stmt);
return calogFail(result, status, "dbQuery: out of memory");
}
columns = sqlite3_column_count(stmt);
for (column = 0; column < columns; column++) {
CalogValueT key;
CalogValueT value;
const char *name;
name = sqlite3_column_name(stmt, column);
status = calogValueString(&key, name, (int64_t)strlen(name));
if (status == calogOkE) {
status = sqliteColumn(stmt, column, &value);
}
if (status == calogOkE) {
status = calogAggSet(rowMap, &key, &value);
}
if (status != calogOkE) {
calogValueFree(&key);
calogValueFree(&value);
calogAggFree(rowMap);
calogAggFree(rows);
sqlite3_finalize(stmt);
return calogFail(result, status, "dbQuery: failed to build a row");
}
}
calogValueAgg(&rowValue, rowMap);
status = calogAggPush(rows, &rowValue);
if (status != calogOkE) {
calogValueFree(&rowValue);
calogAggFree(rows);
sqlite3_finalize(stmt);
return calogFail(result, status, "dbQuery: out of memory");
}
}
if (rc != SQLITE_DONE) {
status = calogFail(result, calogErrArgE, sqlite3_errmsg(db));
calogAggFree(rows);
sqlite3_finalize(stmt);
return status;
}
calogValueAgg(result, rows);
sqlite3_finalize(stmt);
return calogOkE;
}
#endif