Rainer Weikusat
2010-Apr-14 18:56 UTC
[Dovecot] PostgreSQL driver supporting [round-robin] load balancing and redundancy [LONG]
One of the things my employer uses dovecot for is as mail download server for an 'e-mail purification service' (AV/ anti-spam) for smartphones. The service itself presently runs on a rented server somewhere in the UK and the corresponding 'web service' front-end and user account/ mail account database resides on a server in Germany. The UK dovecot server uses the PostgreSQL server on the German machine for user authentication. The latter is reachable using two entirely different 'internet paths' and there was an outage of several hours on one of them a couple of weeks ago. This prompted some frantic network reconfiguration efforts in order to get the abovementioned service going again and resulted in the conviction that - ideally - the dovecot server should be capable of using connections to multiple PostgreSQL servers (or a single server reachable via several IPs) simultaneoulsy, distributing requests among them, and should be capable of detecting a possible problem on one of the db server connections and use the still functioning ones to continue operations. After having spent about eight hours reading through the code of existing pgsql driver, I concluded that modifying that in order to achieve the goals outlined above would require some major brain surgery with a high chance of causing a lot of collateral damage in the course of that and that writing a new driver providing the sought-after feature set was the better idea. Which is the point of this e-mail. Example Usage ------------- Multiple connections can be specified in an ordinary connect-string by separating the necessary parameters with a ;;;-sequence. For the server I was writing about above, this looks like: connect = host=1.1.1.1 dbname=mailgate user=mailgate_user password=secret sslmode=require ;;; \ host=2.2.2.2 dbname=mailgate user=mailgate_user password=secret sslmode=require Leading and trailing whitespace in a connect sub-string is ignored. Basic Operation --------------- After having split the connect string, the driver creates a connection structure (struct multi_pgsql_pgc) for each connection specification contained in that and initiates an asynchronous connect for each. As the actual connections become ready, they are put onto a (linked list) (FIFO) queue of 'connections which are ready'. When a query is submitted to the driver, the first connection structure is removed from the ready list and used to execute the query. After this has finished, the connection is added to the end of the 'connections which are ready' list. If no connection is ready at the time a query was submitted, the query is put on a FIFO queue of queries which will be executed as the resources to actually do so become availabe. Each query has a lifetime of 60 seconds (arbitrary). A query which couldn't be started before his lifetime was over will be aborted with a timeout error. A connection which is currently being used to execute a query will be considered dead if no forward progress of any kind has happened for 20 seconds (also arbitrary). If this happens (or if some kind of I/O error occurs), the existing connection will be closed and an asychronous connect on a newly created one will be started. Assuming the lifetime of the query isn't yet over, it will either be started again on another available connection or pushed back onto the front of the query queue, causing it to be started as soon as a connection becomes available. Attemtps to reestablish the failed connection will continue (with 5 seconds of delay between each) in the background until success. Design Overview --------------- struct multi_pgsql_pgdb & routines working on that provide both the 'simple' query interface and transaction support. struct multi_pgsql_pgc represents a database connection. The result interface is implemented as struct multi_pgsql_result & assorted subroutines. Transactions are provided by a struct multi_pgsql_transaction_context which contains a linked-list of transaction subqueries (struct multi_pgsql_transaction_subquery). A 'class hierarchy' of 'query classes' is used to provide the different kinds of 'queries' supported by the driver (simple queries, simple synchronous queries, transaction queries, synchronous transaction queries). The complete hierarchy itself looks like this: multi_pgsql_query / \ / \ multi_pgsql_simple_query multi_pgsql_transaction_query / \ / \ / \ / \ multi_pgsql_user_query multi_pgsql_sync_query \ / \ / \ multi_pgsql_async_transaction_query multi_pgsql_sync_transaction_query Notable Difference to the pgsql Driver -------------------------------------- sql_exec This interface is not implemented. It is (as far as I can tell) not used anywhere except inside the sqllite driver. It cannot be used for queries which return information because there is no way to access this information and it can neither generally be used for queries which cause side effects since it cannot be determined if execution was successful (the only possible use I can think of is actually a rollback after a failed transaction). sql_result_get_field_value_binary Not implemented. Appears to be unused and the way I understand the libpq programming documentation, the only way to retrieve a binary result to some query is to use PQsendQueryParams or PQsendQueryPrepare and request that some columns are returned in binary. Nothing like this is supported in the present, abstract SQL database interface. synchronous query execution Because background connection reestablishment might still be going on after a synchronous query has been executed, it is necessary to switch any timeouts/ io operations registered to the ioloop used for synchronous query execution back to the main ioloop. The code implements this by using io_loop_set_current to temporarily restore the original ioloop, unregistering and registering any timeouts/ io-requests possibly associated with one of the existing database connections, switching back to the synchronous execution ioloop afterwards and then destroying that via io_loop_destroy. This works but is probably not exactly how the existing interfaces were supposed to be used. error messages for synchronously committed transaction These are put onto the data stack by the commit_s subroutine which will hopefully (judging from the dict-code, this should work, too) be sufficient to enable them to survive until someone higher up in the callchain wants to look at them. Remarks on the Patch -------------------- Created against dovecot-1-2-cf3fe573a560. Applies cleanly to 1.2.11. Applies to 2.0.beta4 with a rejection because of a feature which doesn't exist anymore. The result can be compiled but it hasn't been tested. ---- Index: dovecot/configure.in ==================================================================RCS file: /sysdata/cvs/dovecot/configure.in,v retrieving revision 1.1.1.2 retrieving revision 1.1.1.2.6.9 diff -u -r1.1.1.2 -r1.1.1.2.6.9 --- dovecot/configure.in 15 Mar 2010 18:18:14 -0000 1.1.1.2 +++ dovecot/configure.in 5 Apr 2010 16:16:03 -0000 1.1.1.2.6.9 @@ -159,6 +159,11 @@ [ --with-sqlite Build with SQLite3 driver support], TEST_WITH(sqlite, $withval), want_sqlite=no) + +AC_ARG_WITH(multi-pgsql, +[ --with-multi-pgsql Build with PostgreSQL driver supporting multiple connections], + TEST_WITH(multi-pgsql, $withval), + want_multi_pgsql=no) AC_ARG_WITH(lucene, [ --with-lucene Build with CLucene full text search support], @@ -1969,7 +1974,12 @@ LIBS=$old_LIBS fi -if test $want_pgsql != no; then +want_pgsql_driver=no +test "$want_pgsql" != no && want_pgsql_driver=yes +test "$want_multi_pgsql" != no && want_pgsql_driver=yes + + +if test $want_pgsql_driver != no; then AC_CHECK_PROG(PG_CONFIG, pg_config, YES, NO) if test $PG_CONFIG = NO; then # based on code from PHP @@ -2014,19 +2024,20 @@ fi PGSQL_LIBS="$PGSQL_LIBS -lpq" AC_DEFINE(HAVE_PGSQL,, Build with PostgreSQL support) - found_sql_drivers="$found_sql_drivers pgsql" + test "$want_pgsql" != no && found_sql_drivers="$found_sql_drivers pgsql" + test "$want_multi_pgsql" != no && found_sql_drivers="$found_sql_drivers multi_pgsql" if test "$all_sql_drivers" = "yes"; then - sql_drivers="$sql_drivers pgsql" + sql_drivers="$sql_drivers pgsql pgsql_timeout" fi ], [ - if test $want_pgsql = yes; then + if test $want_pgsql_driver = yes; then AC_ERROR([Can't build with PostgreSQL support: libpq-fe.h not found]) fi ]) CPPFLAGS=$old_CPPFLAGS ], [ - if test $want_pgsql = yes; then + if test $want_pgsql_driver = yes; then AC_ERROR([Can't build with PostgreSQL support: libpq not found]) fi ]) @@ -2323,6 +2334,7 @@ build_pgsql=no build_mysql=no build_sqlite=no +build_multi_pgsql=no for driver in $sql_drivers; do if test "$driver" = "pgsql"; then AC_DEFINE(BUILD_PGSQL,, Built-in PostgreSQL support) @@ -2333,7 +2345,10 @@ elif test "$driver" = "sqlite"; then AC_DEFINE(BUILD_SQLITE,, Built-in SQLite support) build_sqlite=yes - fi + elif test "$driver" = "multi_pgsql"; then + AC_DEFINE(BUILD_MULTI_PGSQL,, Built-in PostgreSQL support with multiple connection support) + build_multi_pgsql=yes + fi done if test $build_pgsql = no; then not_sql_drivers="$not_sql_drivers pgsql" @@ -2344,11 +2359,15 @@ if test $build_sqlite = no; then not_sql_drivers="$not_sql_drivers sqlite" fi +if test $build_multi_pgsql = no; then + not_sql_drivers="$not_sql_drivers multi_pgsql" +fi AC_SUBST(sql_drivers) AM_CONDITIONAL(BUILD_PGSQL, test "$build_pgsql" = "yes") AM_CONDITIONAL(BUILD_MYSQL, test "$build_mysql" = "yes") AM_CONDITIONAL(BUILD_SQLITE, test "$build_sqlite" = "yes") +AM_CONDITIONAL(BUILD_MULTI_PGSQL, test "$build_multi_pgsql" = "yes") AM_CONDITIONAL(SQL_PLUGINS, test "$want_sql" = "plugin") dnl ** Index: dovecot/src/lib-sql/Makefile.am ==================================================================RCS file: /sysdata/cvs/dovecot/src/lib-sql/Makefile.am,v retrieving revision 1.1.1.1 retrieving revision 1.1.1.1.6.3 diff -u -r1.1.1.1 -r1.1.1.1.6.3 --- dovecot/src/lib-sql/Makefile.am 28 Dec 2009 13:52:04 -0000 1.1.1.1 +++ dovecot/src/lib-sql/Makefile.am 5 Apr 2010 15:05:54 -0000 1.1.1.1.6.3 @@ -15,11 +15,16 @@ SQLITE_LIB = libdriver_sqlite.la SQL_DRIVER_PLUGINS += sqlite endif +if BUILD_MULTI_PGSQL +MULTI_PGSQL_LIB = libdriver_multi_pgsql.la +SQL_DRIVER_PLUGINS += multi_pgsql +endif sql_module_LTLIBRARIES = \ $(MYSQL_LIB) \ $(PGSQL_LIB) \ - $(SQLITE_LIB) + $(SQLITE_LIB) \ + $(MULTI_PGSQL_LIB) sql_moduledir = $(moduledir)/sql endif @@ -39,7 +44,8 @@ driver_sources = \ driver-mysql.c \ driver-pgsql.c \ - driver-sqlite.c + driver-sqlite.c \ + driver-multi-pgsql.c endif libsql_a_SOURCES = \ @@ -62,6 +68,11 @@ libdriver_sqlite_la_LIBADD = $(SQLITE_LIBS) libdriver_sqlite_la_CPPFLAGS = -I$(top_srcdir)/src/lib $(SQLITE_CFLAGS) libdriver_sqlite_la_SOURCES = driver-sqlite.c + +libdriver_multi_pgsql_la_LDFLAGS = -module -avoid-version +libdriver_multi_pgsql_la_LIBADD = $(MULTI_PGSQL_LIBS) +libdriver_multi_pgsql_la_CPPFLAGS = -I$(top_srcdir)/src/lib $(MULTI_PGSQL_CFLAGS) +libdriver_multi_pgsql_la_SOURCES = driver-multi-pgsql.c endif headers = \ Index: dovecot/src/lib-sql/driver-multi-pgsql.c ==================================================================RCS file: dovecot/src/lib-sql/driver-multi-pgsql.c diff -N dovecot/src/lib-sql/driver-multi-pgsql.c --- /dev/null 1 Jan 1970 00:00:00 -0000 +++ dovecot/src/lib-sql/driver-multi-pgsql.c 14 Apr 2010 15:31:48 -0000 1.1.2.134 @@ -0,0 +1,1763 @@ +/* + Copyright (c) MAD Partners, Ltd 2010 (rweikusat at madpartnerltd.com) + Portions Copyright (c) 2004-2010 Dovecot authors, see the included COPYING file +*/ + +/* includes */ +#include "lib.h" +#include "array.h" +#include "ioloop.h" +#include "ioloop-internal.h" /* kind of dirty, but it should be fine.. */ +#include "sql-api-private.h" + +#ifdef BUILD_MULTI_PGSQL +#include <ctype.h> +#include <stdlib.h> +#include <time.h> +#include <libpq-fe.h> + +/* macros */ +/* #define DEBUG */ +#ifdef DEBUG +# define dprintf(args) i_info args +#else +# define dprintf(args) +#endif + +#define DEINIT_ERROR "db driver deinit" +#define TIMEOUT_ERROR "query timed out" + +/* constants */ +enum { + MULTI_PGSQL_POOL = 512, + MULTI_PGSQL_XACT_POOL = 1024, + + TIMEOUT_UNIT = 1000, + + RECONNECT_DELAY = 5 * TIMEOUT_UNIT, + USER_QUERY_LIFETIME = 60, + + PGC_IO_TIMEOUT = 20, + MIN_QUERY_TIMEOUT = 10 +}; + +/** general helper routines */ +static char *kill_pg_errmsg_newline(char *s) +{ + char *r; + unsigned c; + + r = s; + while ((c = *r) && c != '\n') ++r; + if (c) *r = 0; + + return s; +} + +/** error result */ +struct multi_pgsql_error_result { + struct sql_result api; + char *msg; +}; + +static void error_result_free(struct sql_result *r) +{ + struct multi_pgsql_error_result *result; + + result = (void *)r; + dprintf(("%s: %p", __func__, result)); + + i_free(result->msg); + i_free(result); +} + +static void error_result_nop(struct sql_result *r) +{ + dprintf(("%s: %p", __func__, r)); +} + +static int error_result_next_row(struct sql_result *r) +{ + dprintf(("%s: %p", __func__, r)); + return -1; +} + +static char const *error_result_get_error(struct sql_result *r) +{ + struct multi_pgsql_error_result *result; + + result = (void *)r; + dprintf(("%s: %p", __func__, result)); + + return result->msg; +} + +static void init_error_result(struct multi_pgsql_error_result *result, + char const *msg) +{ + struct sql_result_vfuncs *v; + + memset(&result->api, 0, sizeof(result->api)); + + v = &result->api.v; + v->free = error_result_nop; + v->next_row = error_result_next_row; + v->get_error = error_result_get_error; + + result->msg = (char *)msg; + + dprintf(("%s: %p", __func__, result)); +} + +/** multi_pgsql_result */ +struct multi_pgsql_result { + struct sql_result api; + struct multi_pgsql_pgc *pgc; + + PGresult *pgr; + unsigned row, n_rows; + char const **values, *errmsg; +}; + +/*** methods/ subroutines */ +static void multi_pgsql_result_free(struct sql_result *r) +{ + struct multi_pgsql_result *result; + + result = (void *)r; + + PQclear(result->pgr); + if (result->values) i_free(result->values); + i_free(result); +} + +static void multi_pgsql_result_nop(struct sql_result *r) +{ + (void)r; +} + + +static int multi_pgsql_result_next_row(struct sql_result *r) +{ + struct multi_pgsql_result *result; + unsigned row; + + dprintf(("%s: %p", __func__, r)); + + result = (void *)r; + row = result->row; + if (++row < result->n_rows) return result->row = row; + return 0; +} + +static int multi_pgsql_result_first_row(struct sql_result *r) +{ + struct multi_pgsql_result *result; + PGresult *pgr; + ExecStatusType status; + unsigned n_rows; + + result = (void *)r; + pgr = result->pgr; + + status = PQresultStatus(pgr); + dprintf(("%s: %p: result status %s(%d)", + __func__, r, PQresStatus(status), status)); + + switch (status) { + case PGRES_COMMAND_OK: + case PGRES_COPY_OUT: + case PGRES_COPY_IN: + n_rows = 0; + break; + + case PGRES_TUPLES_OK: + n_rows = PQntuples(pgr); + + dprintf(("%s: %p: %u rows", __func__, result, n_rows)); + break; + + default: + return -1; + } + + result->n_rows = n_rows; + result->api.v.next_row = multi_pgsql_result_next_row; + return n_rows; +} + +static unsigned multi_pgsql_result_get_fields_count(struct sql_result *r) +{ + struct multi_pgsql_result *result; + unsigned count; + + result = (void *)r; + count = PQnfields(result->pgr); + + dprintf(("%s: %p: %u", __func__, result, count)); + return count; +} + +static char const *multi_pgsql_result_get_field_name(struct sql_result *r, + unsigned i) +{ + struct multi_pgsql_result *result; + char const *name; + + result = (void *)r; + name = PQfname(result->pgr, i); + + dprintf(("%s: %p: %u: '%s'", __func__, result, i, name)); + return name; +} + +static int multi_pgsql_result_find_field(struct sql_result *r, + char const *fname) +{ + struct multi_pgsql_result *result; + int i; + + result = (void *)r; + i = PQfnumber(result->pgr, fname); + + if (i == -1) { + i_info("%s: %p: no field named '%s'", + __func__, result, fname); + return -1; + } + + dprintf(("%s: %p: '%s' <-> %d", + __func__, result, fname, i)); + return i; +} + +static char const *multi_pgsql_result_get_field_value(struct sql_result *r, + unsigned i) +{ + struct multi_pgsql_result *result; + char const *value; + unsigned row; + + result = (void *)r; + row = result->row; + value = PQgetvalue(result->pgr, row, i); + if (!*value && PQgetisnull(result->pgr, row, i)) value = NULL; + + dprintf(("%s: %p: %u,%u: '%s'", + __func__, result, row, i, value ? value : "NULL")); + return value; +} + +static unsigned char const * +multi_pgsql_result_get_field_value_binary(struct sql_result *r, + unsigned i, size_t *size) +{ + (void)i; + i_info("%s: %p: not implemented", __func__, r); + + *size = 0; + return NULL; +} + +static char const *multi_pgsql_result_find_field_value(struct sql_result *r, + char const *fname) +{ + int i; + + dprintf(("%s: %p", __func__, r)); + + i = r->v.find_field(r, fname); + return i != -1 ? r->v.get_field_value(r, i) : NULL; +} + +static char const * const *multi_pgsql_result_get_values(struct sql_result *r) +{ + struct multi_pgsql_result *result; + char const **values; + unsigned n; + + dprintf(("%s: %p", __func__, r)); + result = (void *)r; + + values = result->values; + if (values) return values; + + n = r->v.get_fields_count(r); + values = result->values = i_new(char const *, n); + do { + --n; + values[n] = r->v.get_field_value(r, n); + } while (n); + + return values; +} + +static char const * +multi_pgsql_result_get_error(struct sql_result *r) +{ + struct multi_pgsql_result *result; + char const *errmsg; + + result = (void *)r; + + errmsg = result->errmsg; + if (errmsg) return errmsg; + + errmsg = result->errmsg + kill_pg_errmsg_newline(PQresultErrorMessage(result->pgr)); + return errmsg; +} + +/*** vtable/ init */ +static struct sql_result multi_pgsql_result = { + MEMBER(v) { + multi_pgsql_result_nop, + multi_pgsql_result_first_row, + multi_pgsql_result_get_fields_count, + multi_pgsql_result_get_field_name, + multi_pgsql_result_find_field, + multi_pgsql_result_get_field_value, + multi_pgsql_result_get_field_value_binary, + multi_pgsql_result_find_field_value, + multi_pgsql_result_get_values, + multi_pgsql_result_get_error + } +}; + +static void init_multi_pgsql_result(struct multi_pgsql_result *result, + PGresult *pgr) +{ + result->api = multi_pgsql_result; + result->pgr = pgr; + result->row = 0; + result->values = NULL; +} + +/** query classes */ +struct multi_pgsql_query { + struct multi_pgsql_query *p; + struct multi_pgsql_query_vtable *vtable; +}; + +struct multi_pgsql_query_vtable { + void (*start)(struct multi_pgsql_query *, struct multi_pgsql_pgc *); + void (*result)(struct multi_pgsql_query *, struct multi_pgsql_pgc *, + PGresult **); + void (*abort)(struct multi_pgsql_query *, struct sql_result *); + void (*dtor)(struct multi_pgsql_query *); + time_t (*eol)(struct multi_pgsql_query *); + void (*set_sync_ioloop)(struct multi_pgsql_query *, struct ioloop *); + char const *(*get_query)(struct multi_pgsql_query *); +}; + +static void start_new_query_on_pgc(struct multi_pgsql_query *, struct multi_pgsql_pgc *); +static void start_query_on_pgc(struct multi_pgsql_pgc *); +static void done_with_query(struct multi_pgsql_pgc *); + +/*** method invocation wrappers */ +#define the_qry(q) ((struct multi_pgsql_query *)q) + +static inline void start_query(void *q, struct multi_pgsql_pgc *pgc) +{ + dprintf(("%s: %p", __func__, q)); + the_qry(q)->vtable->start(the_qry(q), pgc); +} + +static inline void query_result(void *q, struct multi_pgsql_pgc *pgc, + PGresult **pgr) +{ + dprintf(("%s: %p", __func__, q)); + the_qry(q)->vtable->result(the_qry(q), pgc, pgr); +} + +static inline void abort_query(void *q, struct sql_result *r) +{ + dprintf(("%s: %p", __func__, q)); + + the_qry(q)->vtable->abort(the_qry(q), r); + the_qry(q)->vtable->dtor(the_qry(q)); +} + +static inline void destroy_query(void *q) +{ + dprintf(("%s: %p", __func__, q)); + the_qry(q)->vtable->dtor(the_qry(q)); +} + +static inline time_t query_eol(void *q) +{ + dprintf(("%s: %p", __func__, q)); + return the_qry(q)->vtable->eol(the_qry(q)); +} + +static inline void set_query_sync_ioloop(void *q, struct ioloop *ioloop) +{ + dprintf(("%s: %p", __func__, q)); + the_qry(q)->vtable->set_sync_ioloop(the_qry(q), ioloop); +} + +static inline char const *get_query(void *q) +{ + return + the_qry(q)->vtable->get_query(the_qry(q)); +} + +#undef the_qry + +/*** query */ +static void init_multi_pgsql_query(struct multi_pgsql_query *qry, + struct multi_pgsql_query_vtable *vtable) +{ + qry->p = NULL; + qry->vtable = vtable; +} + +/*** simple_query */ +struct multi_pgsql_simple_query { + struct multi_pgsql_query super; + + time_t eol_at; + char *query; +}; + +static void start_simple_query(struct multi_pgsql_query *q, + struct multi_pgsql_pgc *pgc) +{ + start_new_query_on_pgc(q, pgc); +} + +static time_t simple_query_eol(struct multi_pgsql_query *q) +{ + struct multi_pgsql_simple_query *qry; + + qry = (void *)q; + dprintf(("%s: %p", __func__, qry)); + return qry->eol_at; +} + +static char const *simple_query_get_query(struct multi_pgsql_query *q) +{ + return ((struct multi_pgsql_simple_query *)q)->query; +} + +static void init_simple_query(struct multi_pgsql_simple_query *qry, + struct multi_pgsql_query_vtable *vtable, + unsigned query_lifetime, char const *query) +{ + init_multi_pgsql_query(&qry->super, vtable); + + qry->query = (void *)query; + qry->eol_at = time(NULL) + query_lifetime; +} + +/*** user_query */ +struct multi_pgsql_user_query { + struct multi_pgsql_simple_query super; + + sql_query_callback_t *cb; + void *ctx; +}; + +static void abort_user_query(struct multi_pgsql_query *q, + struct sql_result *r) +{ + struct multi_pgsql_user_query *qry; + + qry = (void *)q; + + T_BEGIN { + qry->cb(r, qry->ctx); + } T_END; +} + +static void user_query_result(struct multi_pgsql_query *q, + struct multi_pgsql_pgc *pgc, + PGresult **pgr) +{ + struct multi_pgsql_result result; + struct multi_pgsql_user_query *qry; + + qry = (void *)q; + init_multi_pgsql_result(&result, *pgr); + + T_BEGIN { + qry->cb(&result.api, qry->ctx); + } T_END; + + if (result.values) i_free(result.values); + + done_with_query(pgc); +} + +static void user_query_dtor(struct multi_pgsql_query *q) +{ + struct multi_pgsql_user_query *qry; + + qry = (void *)q; + + i_free(qry->super.query); + i_free(qry); +} + +/**** vtable/ init */ +static struct multi_pgsql_query_vtable user_query_vtable = { + start_simple_query, + user_query_result, + abort_user_query, + user_query_dtor, + simple_query_eol, + NULL, + simple_query_get_query +}; + +static struct multi_pgsql_query *create_user_query(char const *query, + sql_query_callback_t *cb, void *ctx) +{ + struct multi_pgsql_user_query *qry; + + qry = i_new(struct multi_pgsql_user_query, 1); + + init_simple_query(&qry->super, &user_query_vtable, + USER_QUERY_LIFETIME, i_strdup(query)); + + qry->cb = cb; + qry->ctx = ctx; + return (void *)qry; +} + +/*** sync_query */ +struct multi_pgsql_sync_query { + struct multi_pgsql_simple_query super; + + struct ioloop *ioloop; + PGresult *pgr; + char *errmsg; +}; + +static void sync_query_result(struct multi_pgsql_query *q, + struct multi_pgsql_pgc *pgc, + PGresult **pgr) +{ + struct multi_pgsql_sync_query *qry; + + qry = (void *)q; + + qry->errmsg = NULL; + qry->pgr = *pgr; + *pgr = NULL; + + done_with_query(pgc); +} + +static void abort_sync_query(struct multi_pgsql_query *q, + struct sql_result *r) +{ + struct multi_pgsql_sync_query *qry; + + qry = (void *)q; + + qry->pgr = NULL; + qry->errmsg = i_strdup(r->v.get_error(r)); +} + +static void sync_query_dtor(struct multi_pgsql_query *q) +{ + struct multi_pgsql_sync_query *qry; + + qry = (void *)q; + io_loop_stop(qry->ioloop); +} + +static void set_sync_query_sync_ioloop(struct multi_pgsql_query *q, + struct ioloop *ioloop) +{ + struct multi_pgsql_sync_query *qry; + + qry = (void *)q; + qry->ioloop = ioloop; +} + +static struct sql_result * +result_from_sync_query(struct multi_pgsql_sync_query *qry) +{ + struct sql_result *result; + PGresult *pgr; + + pgr = qry->pgr; + + if (!pgr) { + result = (void *)i_new(struct multi_pgsql_error_result, 1); + init_error_result((void *)result, qry->errmsg); + result->v.free = error_result_free; + + return result; + } + + result = (void *)i_new(struct multi_pgsql_result, 1); + init_multi_pgsql_result((void *)result, pgr); + result->v.free = multi_pgsql_result_free; + + return result; +} + +/**** vtable/ init */ +static struct multi_pgsql_query_vtable sync_query_vtable = { + start_simple_query, + sync_query_result, + abort_sync_query, + sync_query_dtor, + simple_query_eol, + set_sync_query_sync_ioloop, + simple_query_get_query +}; + +static inline void init_sync_query(struct multi_pgsql_sync_query *qry, + char const *query) +{ + init_simple_query(&qry->super, &sync_query_vtable, + USER_QUERY_LIFETIME, query); +} + +/*** transaction query */ +struct multi_pgsql_transaction_subquery { + struct multi_pgsql_transaction_subquery *p; + + char *query; + unsigned *affected_rows; +}; + +struct multi_pgsql_transaction_context { + struct sql_transaction_context super; + struct multi_pgsql_transaction_subquery *first, **link_to; + pool_t pool; +}; + +struct multi_pgsql_transaction_query { + struct multi_pgsql_query super; + struct multi_pgsql_transaction_context *x_ctx; + + struct multi_pgsql_transaction_subquery *sub; + time_t cur_eol; + + /* + If set, points to the PGresult corresponding + with the subquery which caused the transaction + to fail. + */ + PGresult *pgr; +}; + +struct multi_pgsql_transaction_query_vtable { + struct multi_pgsql_query_vtable super; + void (*xact_done)(struct multi_pgsql_transaction_query *, char const *); +}; + +static inline void transaction_query_xact_done(void *q, char const *errmsg) +{ + struct multi_pgsql_transaction_query *qry; + struct multi_pgsql_transaction_query_vtable *vtable; + + dprintf(("%s: %p", __func__, q)); + + qry = q; + vtable = (void *)qry->super.vtable; + vtable->xact_done(q, errmsg); +} + + +static void start_transaction_query(struct multi_pgsql_query *q, + struct multi_pgsql_pgc *pgc) +{ + struct multi_pgsql_transaction_query *qry; + + qry = (void *)q; + qry->sub = qry->x_ctx->first; + start_new_query_on_pgc(q, pgc); +} + +static void transaction_query_result(struct multi_pgsql_query *q, + struct multi_pgsql_pgc *pgc, + PGresult **ppgr) +{ + struct multi_pgsql_transaction_query *qry; + struct multi_pgsql_transaction_subquery *sub; + PGresult *pgr; + char const *errmsg; + ExecStatusType status; + + qry = (void *)q; + + pgr = qry->pgr; + if (pgr) { + errmsg = kill_pg_errmsg_newline(PQresultErrorMessage(pgr)); + + query_done: + transaction_query_xact_done(qry, errmsg); + done_with_query(pgc); + return; + } + + sub = qry->sub; + + pgr = *ppgr; + status = PQresultStatus(pgr); + dprintf(("%s: %p: status %s(%d)", + __func__, qry, PQresStatus(status), status)); + + switch (status) { + case PGRES_COMMAND_OK: + case PGRES_TUPLES_OK: + case PGRES_COPY_OUT: + case PGRES_COPY_IN: + if (sub->affected_rows) + *sub->affected_rows = atoi(PQcmdTuples(pgr)); + + sub = sub->p; + if (!sub) { + errmsg = NULL; + goto query_done; + } + + qry->sub = sub; + qry->cur_eol = time(NULL) + USER_QUERY_LIFETIME; + break; + + default: + qry->pgr = pgr; + *ppgr = NULL; + + qry->cur_eol = 0; /* don't restart failed transactions */ + sub->query = "rollback"; + } + + start_query_on_pgc(pgc); +} + +static void abort_transaction_query(struct multi_pgsql_query *q, + struct sql_result *r) +{ + char const *errmsg; + PGresult *pgr; + + pgr = ((struct multi_pgsql_transaction_query *)q)->pgr; + errmsg = pgr ? + kill_pg_errmsg_newline(PQresultErrorMessage(pgr)) : r->v.get_error(r); + transaction_query_xact_done(q, errmsg); +} + +static time_t transaction_query_eol(struct multi_pgsql_query *q) +{ + return ((struct multi_pgsql_transaction_query *)q)->cur_eol; +} + +static void transaction_query_dtor(struct multi_pgsql_query *q) +{ + struct multi_pgsql_transaction_query *qry; + PGresult *pgr; + pool_t pool; + + qry = (void *)q; + + pgr = qry->pgr; + if (pgr) PQclear(pgr); + + pool = qry->x_ctx->pool; + pool_unref(&pool); +} + +static char const *transaction_query_get_query(struct multi_pgsql_query *q) +{ + struct multi_pgsql_transaction_query *qry; + + qry = (void *)q; + return qry->sub->query; +} + +static void init_transaction_query(struct multi_pgsql_transaction_query *qry, + struct multi_pgsql_transaction_query_vtable *vtable, + struct multi_pgsql_transaction_context *x_ctx) +{ + init_multi_pgsql_query(&qry->super, &vtable->super); + + qry->x_ctx = x_ctx; + qry->cur_eol = time(NULL) + USER_QUERY_LIFETIME; + qry->pgr = NULL; +} + +/*** async transaction query */ +struct multi_pgsql_async_transaction_query { + struct multi_pgsql_transaction_query super; + + sql_commit_callback_t *cb; + void *ctx; +}; + +static void async_transaction_query_xact_done(struct multi_pgsql_transaction_query *q, + char const *errmsg) +{ + struct multi_pgsql_async_transaction_query *qry; + + qry = (void *)q; + qry->cb(errmsg, qry->ctx); +} + +/**** vtable/ init */ +static struct multi_pgsql_transaction_query_vtable async_transaction_query_vtable = { + { + start_transaction_query, + transaction_query_result, + abort_transaction_query, + transaction_query_dtor, + transaction_query_eol, + NULL, + transaction_query_get_query }, + + async_transaction_query_xact_done +}; + +static void +init_async_transaction_query(struct multi_pgsql_async_transaction_query *qry, + struct multi_pgsql_transaction_context *x_ctx, + sql_commit_callback_t *cb, void *cb_ctx) +{ + init_transaction_query(&qry->super, + &async_transaction_query_vtable, x_ctx); + + qry->cb = cb; + qry->ctx = cb_ctx; +} + +/*** sync transaction query */ +struct multi_pgsql_sync_transaction_query { + struct multi_pgsql_transaction_query super; + + struct ioloop *ioloop; + char const *errmsg; +}; + +static void sync_transaction_query_dtor(struct multi_pgsql_query *q) +{ + struct multi_pgsql_sync_transaction_query *qry; + + qry = (void *)q; + io_loop_stop(qry->ioloop); + transaction_query_dtor(q); +} + +static void set_sync_transaction_query_sync_ioloop(struct multi_pgsql_query *q, + struct ioloop *ioloop) +{ + struct multi_pgsql_sync_transaction_query *qry; + + qry = (void *)q; + qry->ioloop = ioloop; +} + +static void sync_transaction_query_xact_done(struct multi_pgsql_transaction_query *q, + char const *errmsg) +{ + struct multi_pgsql_sync_transaction_query *qry; + + qry = (void *)q; + qry->errmsg = i_strdup(errmsg); +} + +/**** vtable/ init */ +static struct multi_pgsql_transaction_query_vtable sync_transaction_query_vtable = { + { + start_transaction_query, + transaction_query_result, + abort_transaction_query, + sync_transaction_query_dtor, + transaction_query_eol, + set_sync_transaction_query_sync_ioloop, + transaction_query_get_query }, + + sync_transaction_query_xact_done +}; + +static void init_sync_transaction_query(struct multi_pgsql_sync_transaction_query *qry, + struct multi_pgsql_transaction_context *x_ctx) +{ + init_transaction_query(&qry->super, + &sync_transaction_query_vtable, x_ctx); +} + +/** pgc (connection) code */ +struct multi_pgsql_pgc { + struct multi_pgsql_pgc *p; + + char *connect_string; + PGconn *pgc; + PGresult *pgr; + struct multi_pgsql_db *pgdb; + struct multi_pgsql_query *qry; + struct io *io; + struct timeout *timeout; +}; + +/*** misc */ +static void start_pgc_connect(struct multi_pgsql_pgc *); + +static void init_pgc(struct multi_pgsql_pgc *pgc, + struct multi_pgsql_db *pgdb, char *connect_string) +{ + pgc->connect_string = connect_string; + pgc->pgdb = pgdb; + + dprintf(("%s: pgc %p, '%s'", __func__, pgc, connect_string)); + + start_pgc_connect(pgc); +} + +static void reset_pgc(struct multi_pgsql_pgc *pgc) +{ + dprintf(("%s: %p", __func__, pgc)); + + if (pgc->io) io_remove(&pgc->io); + if (pgc->timeout) timeout_remove(&pgc->timeout); + + if (pgc->pgr) { + PQclear(pgc->pgr); + pgc->pgr = NULL; + } + + if (pgc->pgc) { + PQfinish(pgc->pgc); + pgc->pgc = NULL; + } +} + +static void pgc_ioloop_switch(struct multi_pgsql_pgc *pgc) +{ + union { + struct io io; + struct timeout timeout; + } old; + + if (pgc->io) { + old.io = *pgc->io; + io_remove(&pgc->io); + + pgc->io = io_add(PQsocket(pgc->pgc), + old.io.condition, old.io.callback, old.io.context); + } + + if (pgc->timeout) { + old.timeout = *pgc->timeout; + timeout_remove(&pgc->timeout); + + pgc->timeout = timeout_add(old.timeout.msecs, + old.timeout.callback, old.timeout.context); + } +} + +static void log_pg_error(struct multi_pgsql_pgc *pgc, char const *fnc, char const *pg_call) +{ + i_error("%s: %p: %s: %s", + fnc, pgc, pg_call, kill_pg_errmsg_newline(PQerrorMessage(pgc->pgc))); +} + +/*** connection establishment */ +static void add_ready_pgc(struct multi_pgsql_db *, struct multi_pgsql_pgc *); + +static void restart_pgc_connect(struct multi_pgsql_pgc *pgc) +{ + timeout_remove(&pgc->timeout); + start_pgc_connect(pgc); +} + +static void continue_pgc_connect(struct multi_pgsql_pgc *pgc) +{ + PGconn *the_pgc; + char const *pg_call; + PostgresPollingStatusType status; + int io_dir, rc; + + io_remove(&pgc->io); + the_pgc = pgc->pgc; + + status = PQconnectPoll(the_pgc); + dprintf(("%s: %p: PQconnectPoll returned %d", + __func__, pgc, status)); + + switch (status) { + /* + According to the PostgreSQL source, this is an + unused legacy constant. Here to prevent gcc from + complaining about it. + */ + case PGRES_POLLING_ACTIVE: + case PGRES_POLLING_OK: + rc = PQsetnonblocking(the_pgc, 1); + if (rc == -1) { + pg_call = "PQsetnonblocking"; + goto error; + } + + i_info("%s: %p: connected", __func__, pgc); + add_ready_pgc(pgc->pgdb, pgc); + return; + + case PGRES_POLLING_FAILED: + pg_call = "PQconnectPoll"; + goto error; + + case PGRES_POLLING_READING: + io_dir = IO_READ; + break; + + case PGRES_POLLING_WRITING: + io_dir = IO_WRITE; + } + + pgc->io = io_add(PQsocket(the_pgc), io_dir, continue_pgc_connect, pgc); + return; + +error: + log_pg_error(pgc, __func__, pg_call); + + PQfinish(the_pgc); + pgc->pgc = NULL; + + pgc->timeout = timeout_add(RECONNECT_DELAY, + restart_pgc_connect, pgc); +} + +static void start_pgc_connect(struct multi_pgsql_pgc *pgc) +{ + PGconn *the_pgc; + + dprintf(("%s: %p", __func__, pgc)); + + pgc->pgc = the_pgc = PQconnectStart(pgc->connect_string); + if (!the_pgc) + i_fatal("%s: %p: out of memory", + __func__, pgc); + + if (PQstatus(the_pgc) == CONNECTION_BAD) { + log_pg_error(pgc, __func__, "PQconnectStart"); + pgc->timeout = timeout_add(RECONNECT_DELAY, + restart_pgc_connect, pgc); + return; + } + + pgc->io = io_add(PQsocket(the_pgc), IO_WRITE, + continue_pgc_connect, pgc); +} + +/*** query processing */ +/**** error handling */ +static void requeue_query_to_pgdb(struct multi_pgsql_query *, struct multi_pgsql_db *); + +static void pgc_query_io_timeout(struct multi_pgsql_pgc *pgc) +{ + i_info("%s: %p", __func__, pgc); + + requeue_query_to_pgdb(pgc->qry, pgc->pgdb); + pgc->qry = NULL; + + reset_pgc(pgc); + start_pgc_connect(pgc); +} + +static void pgc_query_processing_failure(struct multi_pgsql_pgc *pgc, + char const *fnc, + char const *pg_call) +{ + log_pg_error(pgc, fnc, pg_call); + + requeue_query_to_pgdb(pgc->qry, pgc->pgdb); + pgc->qry = NULL; + + reset_pgc(pgc); + start_pgc_connect(pgc); +} + +/**** result processing */ +static void done_with_query(struct multi_pgsql_pgc *pgc) +{ + struct multi_pgsql_query *qry; + + qry = pgc->qry; + pgc->qry = NULL; + + destroy_query(qry); +} + +static inline int got_result(PGconn *pgc) +{ + PGresult *pgr; + + pgr = PQgetResult(pgc); + if (!pgr) return 0; + + PQclear(pgr); + return 1; +} + +static void eat_results(struct multi_pgsql_pgc *pgc) +{ + PGconn *the_pgc; + PGresult *pgr; + int rc; + + dprintf(("%s: %p", __func__, pgc)); + + the_pgc = pgc->pgc; + do { + rc = PQconsumeInput(the_pgc); + if (rc == 0) { + pgc_query_processing_failure(pgc, __func__, + "PQconsumeInput"); + return; + } + + if (PQisBusy(the_pgc)) { + timeout_reset(pgc->timeout); + return; + } + } while (got_result(the_pgc)); + + io_remove(&pgc->io); + timeout_remove(&pgc->timeout); + + pgr = pgc->pgr; + pgc->pgr = NULL; + query_result(pgc->qry, pgc, &pgr); + if (pgr) PQclear(pgr); + + if (!pgc->qry) add_ready_pgc(pgc->pgdb, pgc); +} + +static void get_first_result(struct multi_pgsql_pgc *pgc) +{ + PGconn *the_pgc; + PGresult *pgr; + int rc; + + dprintf(("%s: %p", __func__, pgc)); + the_pgc = pgc->pgc; + + rc = PQconsumeInput(the_pgc); + if (rc == 0) { + pgc_query_processing_failure(pgc, __func__, + "PQconsumeInput"); + return; + } + + if (PQisBusy(the_pgc)) { + dprintf(("%s: %p: still busy", __func__, pgc)); + + timeout_reset(pgc->timeout); + return; + } + + io_remove(&pgc->io); + + pgr = pgc->pgr = PQgetResult(the_pgc); + if (pgr) { + pgc->io = io_add(PQsocket(the_pgc), IO_READ, eat_results, pgc); + timeout_reset(pgc->timeout); + + eat_results(pgc); + return; + } else + timeout_remove(&pgc->timeout); + + + query_result(pgc->qry, pgc, NULL); + if (!pgc->qry) add_ready_pgc(pgc->pgdb, pgc); +} + +/**** query transmission */ +static void flush_query(struct multi_pgsql_pgc *pgc) +{ + PGconn *the_pgc; + int rc; + + dprintf(("%s: %p", __func__, pgc)); + the_pgc = pgc->pgc; + + rc = PQflush(the_pgc); + switch (rc) { + case -1: + pgc_query_processing_failure(pgc, __func__, + "PQflush"); + return; + + case 0: + dprintf(("%s: %p: query sent", __func__, pgc)); + + io_remove(&pgc->io); + pgc->io = io_add(PQsocket(the_pgc), IO_READ, get_first_result, pgc); + } + + timeout_reset(pgc->timeout); +} + +static void start_query_on_pgc(struct multi_pgsql_pgc *pgc) +{ + PGconn *the_pgc; + char const *pg_call, *query; + void (*io_cb)(struct multi_pgsql_pgc *); + int rc, io_dir; + + query = get_query(pgc->qry); + dprintf(("%s: %p: %s", __func__, pgc, query)); + + the_pgc = pgc->pgc; + + rc = PQsendQuery(the_pgc, query); + if (rc == 0) { + pg_call = "PQsendQuery"; + goto error; + } + + rc = PQflush(the_pgc); + switch (rc) { + case -1: + pg_call = "PQflush"; + goto error; + + case 0: + io_cb = get_first_result; + io_dir = IO_READ; + break; + + default: + io_cb = flush_query; + io_dir = IO_WRITE; + } + + pgc->io = io_add(PQsocket(the_pgc), io_dir, io_cb, pgc); + pgc->timeout = timeout_add(PGC_IO_TIMEOUT * TIMEOUT_UNIT, + pgc_query_io_timeout, pgc); + return; + +error: + pgc_query_processing_failure(pgc, __func__, "PQflush"); +} + +static void start_new_query_on_pgc(struct multi_pgsql_query *qry, + struct multi_pgsql_pgc *pgc) +{ + dprintf(("%s: %p -> %p", __func__, qry, pgc)); + + pgc->qry = qry; + start_query_on_pgc(pgc); +} + +/** pgdb */ +struct multi_pgsql_db { + struct sql_db api; + + unsigned n_pgcs; + struct multi_pgsql_pgc *pgcs; + + struct { + struct multi_pgsql_pgc *first, **link_to; + } pgc_q; + + struct { + struct timeout *timeout; + struct multi_pgsql_query *first, **link_to; + } query_q; + + pool_t pool; +}; + +static struct multi_pgsql_query *dequeue_query_from_pgdb(struct multi_pgsql_db *); + +/*** pgc support code */ +static void do_init_pgcs(struct multi_pgsql_db *pgdb, char const *s, unsigned ofs) +{ + char const *r, *lws_start; + unsigned c, semi_count; + + while ((c = *s) && isspace(c)) ++s; + + if (!c) { + if (!ofs) i_fatal("%s: empty connect string", __func__); + + dprintf(("%s: end of connect_string, %u pgcs", + __func__, ofs)); + + pgdb->n_pgcs = ofs; + pgdb->pgcs = p_new(pgdb->pool, struct multi_pgsql_pgc, ofs); + return; + } + + semi_count = 0; + lws_start = NULL; + r = s; + do { + if (c == ';') { + if (++semi_count == 3) break; + continue; + } + + semi_count = 0; + + if (isspace(c)) { + if (!lws_start) lws_start = r; + continue; + } + + lws_start = NULL; + } while ((c = *++r)); + + do_init_pgcs(pgdb, c ? r + 1 : r, ofs + 1); + + if (lws_start) r = lws_start; + else if (c) r -= 2; + + init_pgc(pgdb->pgcs + ofs, pgdb, p_strndup(pgdb->pool, s, r - s)); +} + +static inline void init_pgcs(struct multi_pgsql_db *pgdb, char const *connect_string) +{ + do_init_pgcs(pgdb, connect_string, 0); +} + +static void add_ready_pgc(struct multi_pgsql_db *pgdb, struct multi_pgsql_pgc *pgc) +{ + struct multi_pgsql_query *qry; + + if (!pgdb->pgc_q.first) { + qry = dequeue_query_from_pgdb(pgdb); + + if (qry) { + start_query(qry, pgc); + return; + } + } + + *pgdb->pgc_q.link_to = pgc; + pgdb->pgc_q.link_to = &pgc->p; + + dprintf(("%s: %p -> %p", __func__, pgc, pgdb)); +} + +static struct multi_pgsql_pgc *get_ready_pgc(struct multi_pgsql_db *pgdb) +{ + struct multi_pgsql_pgc *pgc, *next_pgc; + + pgc = pgdb->pgc_q.first; + if (!pgc) return NULL; + + next_pgc = pgc->p; + if (next_pgc) pgc->p = NULL; + else pgdb->pgc_q.link_to = &pgdb->pgc_q.first; + pgdb->pgc_q.first = next_pgc; + + dprintf(("%s: %p -> %p", __func__, pgdb, pgc)); + return pgc; +} + +static void switch_pgdb_pgcs(struct multi_pgsql_db *pgdb) +{ + struct multi_pgsql_pgc *pgcs; + unsigned n; + + pgcs = pgdb->pgcs; + n = pgdb->n_pgcs; + do pgc_ioloop_switch(pgcs + --n); while (n); +} + +/*** query timeout */ +static void start_pgdb_query_timeout(struct multi_pgsql_db *, + struct multi_pgsql_query *); + +static void pgdb_query_timeout(struct multi_pgsql_db *pgdb) +{ + struct multi_pgsql_query *qry, *next_qry; + struct multi_pgsql_error_result timeout; + time_t now; + + dprintf(("%s: %p", __func__, pgdb)); + + timeout_remove(&pgdb->query_q.timeout); + qry = pgdb->query_q.first; + now = time(NULL); + init_error_result(&timeout, TIMEOUT_ERROR); + do { + next_qry = qry->p; + abort_query(qry, &timeout.api); + } while (next_qry && now > query_eol(next_qry)); + + pgdb->query_q.first = next_qry; + + if (next_qry) { + start_pgdb_query_timeout(pgdb, next_qry); + return; + } + + pgdb->query_q.link_to = &pgdb->query_q.first; +} + +static void start_pgdb_query_timeout(struct multi_pgsql_db *pgdb, + struct multi_pgsql_query *qry) +{ + time_t now, timeout, eol; + + now = time(NULL); + eol = query_eol(qry); + timeout = now < eol ? eol - now : 0; + if (timeout < MIN_QUERY_TIMEOUT) timeout = MIN_QUERY_TIMEOUT; + + pgdb->query_q.timeout = timeout_add(timeout * TIMEOUT_UNIT, + pgdb_query_timeout, pgdb); + dprintf(("%s: %p: timeout in %us", __func__, pgdb, (unsigned)timeout)); +} + +/*** query queueing */ +static void queue_query_to_pgdb(struct multi_pgsql_query *qry, struct multi_pgsql_db *pgdb) +{ + if (!pgdb->query_q.first) start_pgdb_query_timeout(pgdb, qry); + + *pgdb->query_q.link_to = qry; + pgdb->query_q.link_to = &qry->p; +} + +static struct multi_pgsql_query *dequeue_query_from_pgdb(struct multi_pgsql_db *pgdb) +{ + struct multi_pgsql_query *qry, *next_qry; + + qry = pgdb->query_q.first; + if (!qry) return NULL; + + timeout_remove(&pgdb->query_q.timeout); + + next_qry = qry->p; + if (next_qry) start_pgdb_query_timeout(pgdb, next_qry); + else pgdb->query_q.link_to = &pgdb->query_q.first; + pgdb->query_q.first = next_qry; + + qry->p = NULL; + return qry; +} + +static void requeue_query_to_pgdb(struct multi_pgsql_query *qry, + struct multi_pgsql_db *pgdb) +{ + struct multi_pgsql_error_result timeout; + struct multi_pgsql_pgc *pgc; + + if (time(NULL) >= query_eol(qry)) { + init_error_result(&timeout, TIMEOUT_ERROR); + abort_query(qry, &timeout.api); + return; + } + + pgc = get_ready_pgc(pgdb); + if (pgc) { + start_query(qry, pgc); + return; + } + + if (pgdb->query_q.timeout) { + timeout_remove(&pgdb->query_q.timeout); + qry->p = pgdb->query_q.first; + } else + pgdb->query_q.link_to = &qry->p; + + pgdb->query_q.first = qry; + start_pgdb_query_timeout(pgdb, qry); +} + + +/*** query submission */ +static void pgdb_async_query(struct multi_pgsql_db *pgdb, + struct multi_pgsql_query *qry) +{ + struct multi_pgsql_pgc *pgc; + + dprintf(("%s: %p: %p", __func__, pgdb, qry)); + + pgc = get_ready_pgc(pgdb); + if (pgc) { + start_query(qry, pgc); + return; + } + + dprintf(("%s: %p: no pgcs ready, queueing", __func__, pgdb)); + queue_query_to_pgdb(qry, pgdb); +} + + +static void pgdb_sync_query(struct multi_pgsql_db *pgdb, + struct multi_pgsql_query *qry) +{ + struct ioloop *old_ioloop, *sync_ioloop; + + dprintf(("%s: %p: %p", __func__, pgdb, qry)); + + old_ioloop = current_ioloop; + sync_ioloop = io_loop_create(); + switch_pgdb_pgcs(pgdb); + + set_query_sync_ioloop(qry, sync_ioloop); + pgdb_async_query(pgdb, qry); + + io_loop_run(sync_ioloop); + + io_loop_set_current(old_ioloop); + switch_pgdb_pgcs(pgdb); + io_loop_set_current(sync_ioloop); + io_loop_destroy(&sync_ioloop); +} + +/*** non-transaction SQL driver methods */ +extern struct sql_db driver_multi_pgsql_db; + +static struct sql_db *multi_pgsql_init_v(char const *connect_string) +{ + struct multi_pgsql_db *pgdb; + pool_t pool; + + dprintf(("%s: %s", __func__, connect_string)); + + pool = pool_alloconly_create("multi_pgsql_pool", MULTI_PGSQL_POOL); + + pgdb = p_new(pool, struct multi_pgsql_db, 1); + pgdb->api = driver_multi_pgsql_db; + pgdb->pool = pool; + + pgdb->query_q.link_to = &pgdb->query_q.first; + pgdb->pgc_q.link_to = &pgdb->pgc_q.first; + + init_pgcs(pgdb, connect_string); + return (void *)pgdb; +} + +static void multi_pgsql_deinit_v(struct sql_db *db) +{ + struct multi_pgsql_error_result deinit; + struct multi_pgsql_db *pgdb; + struct multi_pgsql_pgc *pgc; + struct multi_pgsql_query *qry, *next_qry; + pool_t pool; + unsigned n; + + dprintf(("%s: %p", __func__, db)); + + array_free(&db->module_contexts); + + pgdb = (void *)db; + init_error_result(&deinit, DEINIT_ERROR); + + n = pgdb->n_pgcs; + do { + pgc = pgdb->pgcs + --n; + + qry = pgc->qry; + if (qry) abort_query(qry, &deinit.api); + + reset_pgc(pgc); + + } while (n); + + qry = pgdb->query_q.first; + if (qry) { + timeout_remove(&pgdb->query_q.timeout); + + do { + next_qry = qry->p; + abort_query(qry, &deinit.api); + } while ((qry = next_qry)); + } + + pool = pgdb->pool; + pool_unref(&pool); +} + +static int multi_pgsql_connect(struct sql_db *db) +{ + dprintf(("%s: %p", __func__, db)); + return 1; +} + +static enum sql_db_flags +multi_pgsql_get_flags(struct sql_db *db) +{ + dprintf(("%s: %p", __func__, db)); + return 0; +} + +static const char * +multi_pgsql_escape_string(struct sql_db *db, const char *in) +{ + struct multi_pgsql_db *pgdb; + char *out; + size_t len; + + pgdb = (void *)db; + len = strlen(in); + out = t_buffer_get(len * 2 + 1); + len = PQescapeStringConn(pgdb->pgcs->pgc, out, in, len, + NULL); + t_buffer_alloc(len + 1); + + dprintf(("%s: %p: '%s' -> '%s'", + __func__, pgdb, in, out)); + return out; +} + +static void multi_pgsql_query(struct sql_db *db, const char *query, + sql_query_callback_t *cb, void *ctx) +{ + struct multi_pgsql_query *qry; + + dprintf(("%s: %p: %s", __func__, db, query)); + + qry = create_user_query(query, cb, ctx); + pgdb_async_query((void *)db, qry); +} + +static void multi_pgsql_exec(struct sql_db *db, const char *query) +{ + i_info("%s: %p: %s", __func__, db, query); + i_info("%s: not implemented", __func__); +} + +static struct sql_result * +multi_pgsql_query_s(struct sql_db *db, const char *query) +{ + struct multi_pgsql_db *pgdb; + struct multi_pgsql_sync_query sync_qry; + + pgdb = (void *)db; + dprintf(("%s: %p: %s", __func__, pgdb, query)); + + init_sync_query(&sync_qry, query); + pgdb_sync_query(pgdb, (void *)&sync_qry); + return result_from_sync_query(&sync_qry); +} + +/*** transaction support code */ +static void add_subquery_to(struct multi_pgsql_transaction_context *x_ctx, + char const *query, unsigned *affected_rows) + +{ + struct multi_pgsql_transaction_subquery *sub; + pool_t pool; + + pool = x_ctx->pool; + + sub = p_new(pool, struct multi_pgsql_transaction_subquery, 1); + sub->query = p_strdup(pool, query); + sub->affected_rows = affected_rows; + + *x_ctx->link_to = sub; + x_ctx->link_to = &sub->p; + + dprintf(("%s: %p: '%s'", __func__, x_ctx, query)); +} + +static struct sql_transaction_context * +multi_pgsql_transaction_begin(struct sql_db *db) +{ + struct multi_pgsql_transaction_context *x_ctx; + pool_t pool; + + pool = pool_alloconly_create("multi_pgsql_xact_pool", + MULTI_PGSQL_XACT_POOL); + x_ctx = p_new(pool, struct multi_pgsql_transaction_context, 1); + x_ctx->super.db = db; + x_ctx->link_to = &x_ctx->first; + x_ctx->pool = pool; + + add_subquery_to(x_ctx, "begin", NULL); + + dprintf(("%s: db %p: %p", __func__, db, x_ctx)); + return (void *)x_ctx; +} + +static void +multi_pgsql_transaction_commit(struct sql_transaction_context *ctx, + sql_commit_callback_t *cb, void *cb_ctx) +{ + struct multi_pgsql_transaction_context *x_ctx; + struct multi_pgsql_async_transaction_query *qry; + + x_ctx = (void *)ctx; + dprintf(("%s: %p", __func__, x_ctx)); + + add_subquery_to(x_ctx, "commit", NULL); + + qry = p_new(x_ctx->pool, struct multi_pgsql_async_transaction_query, 1); + init_async_transaction_query(qry, x_ctx, cb, cb_ctx); + pgdb_async_query((void *)ctx->db, (void *)qry); +} + +static int +multi_pgsql_transaction_commit_s(struct sql_transaction_context *ctx, + const char **error_r) +{ + struct multi_pgsql_transaction_context *x_ctx; + struct multi_pgsql_sync_transaction_query sync_qry; + char *errmsg; + + x_ctx = (void *)ctx; + dprintf(("%s: %p", __func__, x_ctx)); + + add_subquery_to(x_ctx, "commit", NULL); + + init_sync_transaction_query(&sync_qry, x_ctx); + pgdb_sync_query((void *)x_ctx->super.db, &sync_qry.super.super); + + errmsg = (char *)sync_qry.errmsg; + if (errmsg) { + *error_r = t_strdup(errmsg); + i_free(errmsg); + + return -1; + } + + return 0; +} + +static void +multi_pgsql_transaction_rollback(struct sql_transaction_context *ctx) +{ + struct multi_pgsql_transaction_context *x_ctx; + + x_ctx = (void *)ctx; + dprintf(("%s: %p", __func__, x_ctx)); + + pool_unref(&x_ctx->pool); +} + +static void +multi_pgsql_transaction_update(struct sql_transaction_context *ctx, const char *query, + unsigned int *affected_rows) +{ + struct multi_pgsql_transaction_context *x_ctx; + + x_ctx = (void *)ctx; + add_subquery_to(x_ctx, query, affected_rows); +} + +/*** db driver vtable */ +struct sql_db driver_multi_pgsql_db = { + "multi-pgsql", + + MEMBER(v) { + multi_pgsql_init_v, + multi_pgsql_deinit_v, + multi_pgsql_get_flags, + multi_pgsql_connect, + multi_pgsql_escape_string, + multi_pgsql_exec, + multi_pgsql_query, + multi_pgsql_query_s, + + multi_pgsql_transaction_begin, + multi_pgsql_transaction_commit, + multi_pgsql_transaction_commit_s, + multi_pgsql_transaction_rollback, + multi_pgsql_transaction_update + } +}; +#endif
Timo Sirainen
2010-Apr-14 19:20 UTC
[Dovecot] PostgreSQL driver supporting [round-robin] load balancing and redundancy [LONG]
On Wed, 2010-04-14 at 20:56 +0200, Rainer Weikusat wrote:> Multiple connections can be specified in an ordinary connect-string by > separating the necessary parameters with a ;;;-sequence. For the > server I was writing about above, this looks like: > > connect = host=1.1.1.1 dbname=mailgate user=mailgate_user password=secret sslmode=require ;;; \ > host=2.2.2.2 dbname=mailgate user=mailgate_user password=secret sslmode=requireI'll look at this patch more closely later, but is it really necessary to support completely different connect strings for different servers? MySQL code also already supports round-robin connections and it works simply by giving multiple host= parameters. I'd guess in most installations that would be enough.. At some point it probably would be better to abstract out the round robin code so all SQL drivers could use the same common code for it. -------------- next part -------------- A non-text attachment was scrubbed... Name: signature.asc Type: application/pgp-signature Size: 198 bytes Desc: This is a digitally signed message part URL: <http://dovecot.org/pipermail/dovecot/attachments/20100414/99609269/attachment-0002.bin>
Rainer Weikusat
2010-Apr-14 19:48 UTC
[Dovecot] PostgreSQL driver supporting [round-robin] load balancing and redundancy [LONG]
Timo Sirainen <tss at iki.fi> writes:> On Wed, 2010-04-14 at 20:56 +0200, Rainer Weikusat wrote: >> Multiple connections can be specified in an ordinary connect-string by >> separating the necessary parameters with a ;;;-sequence. For the >> server I was writing about above, this looks like: >> >> connect = host=1.1.1.1 dbname=mailgate user=mailgate_user password=secret sslmode=require ;;; \ >> host=2.2.2.2 dbname=mailgate user=mailgate_user password=secret sslmode=require > > I'll look at this patch more closely later, but is it really necessary > to support completely different connect strings for different > servers?Given that the connect string is essentially an opaque value which is passed uninterpreted to the DBMS interface library, I'd answer this question with yes. [...]> At some point it probably would be better to abstract out the round > robin code so all SQL drivers could use the same common code for it.The first hope I had for this was that just creating a virtual driver which forwards queries to n 'real drivers' would be sufficient. Ideally, there would be an abstract query queueing and scheduling layer which passes queries to 'database connection objects' of some kind, possibly even referring to different kinds of databases. But this would require a wholesale replacement of all existing drivers and I am already happy that I was allowed to work a whole ten (nine, actually) days on this ...
Marshal Newrock
2010-Apr-18 15:57 UTC
[Dovecot] PostgreSQL driver supporting [round-robin] load balancing and redundancy [LONG]
On Wed, 14 Apr 2010 20:56:13 +0200 Rainer Weikusat <rweikusat at mssgmbh.com> wrote:> One of the things my employer uses dovecot for is as mail download > server for an 'e-mail purification service' (AV/ anti-spam) for > smartphones. The service itself presently runs on a rented server > somewhere in the UK and the corresponding 'web service' front-end and > user account/ mail account database resides on a server in > Germany. The UK dovecot server uses the PostgreSQL server on the > German machine for user authentication. The latter is reachable using > two entirely different 'internet paths' and there was an outage of > several hours on one of them a couple of weeks ago. This prompted > some frantic network reconfiguration efforts in order to get the > abovementioned service going again and resulted in the conviction > that - ideally - the dovecot server should be capable of using > connections to multiple PostgreSQL servers (or a single server > reachable via several IPs) simultaneoulsy, distributing requests > among them, and should be capable of detecting a possible problem on > one of the db server connections and use the still functioning ones > to continue operations.Have you looked at anything like pgpool? I looked at this a while back, and from what I could see, it sits between the client app and postgresql, and works fairly invisibly. It does appear to have a failover mode which will do what you want. I haven't yet implemented pgpool anywhere, so I can't give details or opinions. -- Marshal Newrock Zordio, LLC - http://www.zordio.com -------------- next part -------------- A non-text attachment was scrubbed... Name: signature.asc Type: application/pgp-signature Size: 198 bytes Desc: not available URL: <http://dovecot.org/pipermail/dovecot/attachments/20100418/5be4482d/attachment-0002.bin>
Rainer Weikusat
2010-Apr-18 18:40 UTC
[Dovecot] PostgreSQL driver supporting [round-robin] load balancing and redundancy [LONG]
Marshal Newrock <marshal at zordio.com> writes:> On Wed, 14 Apr 2010 20:56:13 +0200 > Rainer Weikusat <rweikusat at mssgmbh.com> wrote: >> One of the things my employer uses dovecot for is as mail download >> server for an 'e-mail purification service' (AV/ anti-spam) for >> smartphones.[...]> Have you looked at anything like pgpool?Have you read enough of the message you were replying to actually find its content?