Skip to content

Commit

Permalink
pg2arrow: add snapshot export/import for parallel consistency
Browse files Browse the repository at this point in the history
  • Loading branch information
kaigai committed Mar 31, 2024
1 parent b3b5763 commit 6d0033a
Showing 1 changed file with 33 additions and 0 deletions.
33 changes: 33 additions & 0 deletions arrow-tools/pgsql_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,7 @@ sqldb_begin_query(void *sqldb_state,
ArrowFileInfo *af_info,
SQLdictionary *dictionary_list)
{
static char *snapshot_identifier = NULL;
PGSTATE *pgstate = sqldb_state;
PGconn *conn = pgstate->conn;
PGresult *res;
Expand All @@ -1017,6 +1018,38 @@ sqldb_begin_query(void *sqldb_state,
Elog("unable to begin transaction: %s", PQresultErrorMessage(res));
PQclear(res);

res = PQexec(conn, "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
Elog("unable to switch transaction isolation level: %s",
PQresultErrorMessage(res));
PQclear(res);

/* export snaphot / import snapshot */
if (!snapshot_identifier)
{
res = PQexec(conn, "SELECT pg_catalog.pg_export_snapshot()");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
Elog("unable to export the current transaction snapshot: %s",
PQresultErrorMessage(res));
if (PQntuples(res) != 1 || PQnfields(res) != 1)
Elog("unexpected result for pg_export_snapshot()");
snapshot_identifier = pstrdup(PQgetvalue(res, 0, 0));
printf("snapshot_identifier = [%s]\n", snapshot_identifier);
PQclear(res);
}
else
{
char query[200];

snprintf(query, sizeof(query),
"SET TRANSACTION SNAPSHOT '%s'",
snapshot_identifier);
res = PQexec(conn, query);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
Elog("unable to import transaction shapshot: %s",
PQresultErrorMessage(res));
}

/* declare cursor */
query = palloc(strlen(sqldb_command) + 1024);
sprintf(query, "DECLARE " CURSOR_NAME " BINARY CURSOR FOR %s",
Expand Down

0 comments on commit 6d0033a

Please sign in to comment.