Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start one background worker per database #545

Open
wants to merge 1 commit into
base: yl/md-bgw-per-db
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion include/pgduckdb/pgduckdb_guc.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ extern bool duckdb_allow_unsigned_extensions;
extern bool duckdb_autoinstall_known_extensions;
extern bool duckdb_autoload_known_extensions;
extern int duckdb_max_workers_per_postgres_scan;
extern char *duckdb_motherduck_postgres_database;
extern int duckdb_motherduck_enabled;
extern char *duckdb_motherduck_token;
extern char *duckdb_postgres_role;
Expand Down
1 change: 0 additions & 1 deletion include/pgduckdb/pgduckdb_metadata_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ Oid ExtensionOid();
Oid DuckdbTableAmOid();
bool IsMotherDuckEnabled();
bool IsMotherDuckEnabledAnywhere();
bool IsMotherDuckPostgresDatabase();
Oid MotherDuckPostgresUser();
Oid IsDuckdbTable(Form_pg_class relation);
Oid IsDuckdbTable(Relation relation);
Expand Down
4 changes: 0 additions & 4 deletions src/pgduckdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ bool duckdb_force_execution = false;
int duckdb_max_workers_per_postgres_scan = 2;
int duckdb_motherduck_enabled = MotherDuckEnabled::MOTHERDUCK_AUTO;
char *duckdb_motherduck_token = strdup("");
char *duckdb_motherduck_postgres_database = strdup("postgres");
char *duckdb_motherduck_default_database = strdup("");
char *duckdb_postgres_role = strdup("");

Expand Down Expand Up @@ -175,9 +174,6 @@ DuckdbInitGUC(void) {
DefineCustomVariable("duckdb.motherduck_token", "The token to use for MotherDuck", &duckdb_motherduck_token,
PGC_POSTMASTER, GUC_SUPERUSER_ONLY);

DefineCustomVariable("duckdb.motherduck_postgres_database", "Which database to enable MotherDuck support in",
&duckdb_motherduck_postgres_database, PGC_POSTMASTER, GUC_SUPERUSER_ONLY);

DefineCustomVariable("duckdb.motherduck_default_database",
"Which database in MotherDuck to designate as default (in place of my_db)",
&duckdb_motherduck_default_database, PGC_POSTMASTER, GUC_SUPERUSER_ONLY);
Expand Down
20 changes: 12 additions & 8 deletions src/pgduckdb_background_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ extern "C" {
#include "miscadmin.h"
#include "pgstat.h"
#include "executor/spi.h"
#include "commands/dbcommands.h"
#include "common/file_utils.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
Expand Down Expand Up @@ -65,17 +66,20 @@ bool CanTakeLockForDatabase(Oid database_oid);
extern "C" {

PGDLLEXPORT void
pgduckdb_background_worker_main(Datum /* main_arg */) {
elog(LOG, "started pg_duckdb background worker");
if (!CanTakeLockForDatabase(0)) {
elog(LOG, "pg_duckdb background worker: could not take lock for database '%u'. Will exit.", 0);
pgduckdb_background_worker_main(Datum main_arg) {
Oid database_oid = DatumGetObjectId(main_arg);
if (!CanTakeLockForDatabase(database_oid)) {
elog(LOG, "pg_duckdb background worker: could not take lock for database '%u'. Will exit.", database_oid);
return;
}

elog(LOG, "started pg_duckdb background worker for database %u -- MyDB: %u", database_oid, MyDatabaseId);

// Set up a signal handler for SIGTERM
pqsignal(SIGTERM, die);
BackgroundWorkerUnblockSignals();

BackgroundWorkerInitializeConnection(duckdb_motherduck_postgres_database, NULL, 0);
BackgroundWorkerInitializeConnectionByOid(database_oid, InvalidOid, 0);

pgduckdb::doing_motherduck_sync = true;
is_background_worker = true;
Expand Down Expand Up @@ -221,7 +225,7 @@ StartBackgroundWorkerIfNeeded(void) {
snprintf(worker.bgw_function_name, BGW_MAXLEN, "pgduckdb_background_worker_main");
snprintf(worker.bgw_name, BGW_MAXLEN, PGDUCKDB_SYNC_WORKER_NAME);
worker.bgw_restart_time = 1;
worker.bgw_main_arg = (Datum)0;
worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId);

RegisterDynamicBackgroundWorker(&worker, NULL);
}
Expand Down Expand Up @@ -671,7 +675,8 @@ SyncMotherDuckCatalogsWithPg_Cpp(bool drop_with_cascade) {
}

/* The catalog version has changed, we need to sync the catalog */
elog(LOG, "Syncing MotherDuck catalog for database %s: %s", motherduck_db.c_str(), catalog_version.c_str());
elog(LOG, "Syncing MotherDuck catalog for database '%s' in '%s': %s", motherduck_db.c_str(),
get_database_name(MyDatabaseId), catalog_version.c_str());

/*
* Because of our SPI_commit_that_works_in_bgworker() workaround we need to
Expand Down Expand Up @@ -717,7 +722,6 @@ SyncMotherDuckCatalogsWithPg_Cpp(bool drop_with_cascade) {
}

std::string postgres_schema_name = PgSchemaName(motherduck_db, schema.name, is_default_db);

if (!CreateSchemaIfNotExists(postgres_schema_name.c_str(), is_default_db)) {
/* We failed to create the schema, so we skip the tables in it */
continue;
Expand Down
12 changes: 1 addition & 11 deletions src/pgduckdb_metadata_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ struct {
Oid extension_oid;
/* The OID of the duckdb Table Access Method */
Oid table_am_oid;
/* The OID of the duckdb.motherduck_postgres_database */
Oid motherduck_postgres_database_oid;
/* The OID of the duckdb.postgres_role */
Oid postgres_role_oid;
/*
Expand Down Expand Up @@ -180,8 +178,6 @@ IsExtensionRegistered() {

cache.table_am_oid = GetSysCacheOid1(AMNAME, Anum_pg_am_oid, CStringGetDatum("duckdb"));

cache.motherduck_postgres_database_oid = get_database_oid(duckdb_motherduck_postgres_database, false);

if (duckdb_postgres_role[0] != '\0') {
cache.postgres_role_oid =
GetSysCacheOid1(AUTHNAME, Anum_pg_authid_oid, CStringGetDatum(duckdb_postgres_role));
Expand Down Expand Up @@ -258,7 +254,7 @@ IsMotherDuckTable(Relation relation) {

bool
IsMotherDuckEnabled() {
return IsMotherDuckEnabledAnywhere() && IsMotherDuckPostgresDatabase();
return IsMotherDuckEnabledAnywhere();
}

bool
Expand All @@ -270,12 +266,6 @@ IsMotherDuckEnabledAnywhere() {
return false;
}

bool
IsMotherDuckPostgresDatabase() {
Assert(cache.valid);
return MyDatabaseId == cache.motherduck_postgres_database_oid;
}

Oid
MotherDuckPostgresUser() {
Assert(cache.valid);
Expand Down
2 changes: 0 additions & 2 deletions src/pgduckdb_ruleutils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,6 @@ pgduckdb_get_tabledef(Oid relation_oid) {
elog(ERROR, "Only TEMP tables are supported in DuckDB if MotherDuck support is not enabled");
} else if (relation->rd_rel->relpersistence != RELPERSISTENCE_PERMANENT) {
elog(ERROR, "Only TEMP and non-UNLOGGED tables are supported in DuckDB");
} else if (!pgduckdb::IsMotherDuckPostgresDatabase()) {
elog(ERROR, "MotherDuck tables must be created in the duckb.motherduck_postgres_database");
} else if (relation->rd_rel->relowner != pgduckdb::MotherDuckPostgresUser()) {
elog(ERROR, "MotherDuck tables must be created by the duckb.motherduck_postgres_user");
}
Expand Down