Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 93 additions & 6 deletions pgxn/neon/neon_ddl_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ typedef struct DdlHashTable
size_t subtrans_level;
HTAB *db_table;
HTAB *role_table;
int other_ddl_count;
} DdlHashTable;

static DdlHashTable RootTable;
Expand Down Expand Up @@ -207,6 +208,25 @@ ConstructDeltaMessage()
}
pushJsonbValue(&state, WJB_END_ARRAY, NULL);
}

if (RootTable.other_ddl_count > 0)
{
JsonbValue other_key, other_value;
char count_str[32];

snprintf(count_str, sizeof(count_str), "%d", RootTable.other_ddl_count);

other_key.type = jbvString;
other_key.val.string.val = "other";
other_key.val.string.len = strlen("other");

other_value.type = jbvString;
other_value.val.string.val = count_str;
other_value.val.string.len = strlen(count_str);

pushJsonbValue(&state, WJB_KEY, &other_key);
pushJsonbValue(&state, WJB_VALUE, &other_value);
}
{
JsonbValue *result = pushJsonbValue(&state, WJB_END_OBJECT, NULL);
Jsonb *jsonb = JsonbValueToJsonb(result);
Expand Down Expand Up @@ -235,7 +255,7 @@ ErrorWriteCallback(char *ptr, size_t size, size_t nmemb, void *userdata)
if (str->size + nmemb + 1 >= ERROR_SIZE)
to_write = ERROR_SIZE - str->size - 1;

/* Ignore everyrthing past the first ERROR_SIZE bytes */
/* Ignore everything past the first ERROR_SIZE bytes */
if (to_write == 0)
return nmemb;
memcpy(str->str + str->size, ptr, to_write);
Expand All @@ -249,11 +269,12 @@ SendDeltasToControlPlane()
{
static CURL *handle = NULL;

if (!RootTable.db_table && !RootTable.role_table)
if (!RootTable.db_table && !RootTable.role_table && RootTable.other_ddl_count == 0)
return;
if (!ConsoleURL)
{
elog(LOG, "ConsoleURL not set, skipping forwarding");
if (!RegressTestMode)
elog(LOG, "ConsoleURL not set, skipping forwarding");
return;
}
if (!ForwardDDL)
Expand Down Expand Up @@ -346,6 +367,7 @@ InitCurrentDdlTableIfNeeded()
new_table->subtrans_level = SubtransLevel;
new_table->role_table = NULL;
new_table->db_table = NULL;
new_table->other_ddl_count = 0;
CurrentDdlTable = new_table;
}
}
Expand Down Expand Up @@ -493,6 +515,8 @@ MergeTable()
}
hash_destroy(old_table->role_table);
}

CurrentDdlTable->other_ddl_count += old_table->other_ddl_count;
}

static void
Expand Down Expand Up @@ -538,6 +562,7 @@ NeonXactCallback(XactEvent event, void *arg)
}
RootTable.role_table = NULL;
RootTable.db_table = NULL;
RootTable.other_ddl_count = 0;
Assert(CurrentDdlTable == &RootTable);
}

Expand All @@ -549,6 +574,13 @@ IsPrivilegedRole(const char *role_name)
return strcmp(role_name, privileged_role_name) == 0;
}

static void
HandleOtherDDLCommand()
{
InitCurrentDdlTableIfNeeded();
CurrentDdlTable->other_ddl_count++;
}

static void
HandleCreateDb(CreatedbStmt *stmt)
{
Expand Down Expand Up @@ -598,8 +630,11 @@ HandleAlterOwner(AlterOwnerStmt *stmt)
DbEntry *entry;
const char *new_owner;

if (stmt->objectType != OBJECT_DATABASE)
return;
if (stmt->objectType != OBJECT_DATABASE){
HandleOtherDDLCommand();
return ;
}

InitDbTableIfNeeded();

name = strVal(stmt->object);
Expand Down Expand Up @@ -826,6 +861,10 @@ HandleRename(RenameStmt *stmt)
return HandleDbRename(stmt);
else if (stmt->renameType == OBJECT_ROLE)
return HandleRoleRename(stmt);
else {
HandleOtherDDLCommand();
return;
}
}


Expand Down Expand Up @@ -1271,6 +1310,7 @@ NeonProcessUtility(
QueryCompletion *qc)
{
Node *parseTree = pstmt->utilityStmt;
bool isCompleteQuery = (context != PROCESS_UTILITY_SUBCOMMAND);

/*
* The process utility hook for CREATE EVENT TRIGGER is its own
Expand Down Expand Up @@ -1323,7 +1363,54 @@ NeonProcessUtility(
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("CREATE TABLESPACE is not supported on Neon")));
}
break;
break;

// all the other commands we are interested in for the purposes of
// tracking schema changes and forwarding to data-api
// Schema & Database Structure
case T_CreateSchemaStmt:
case T_CreateStmt: // CREATE TABLE
case T_CreateForeignTableStmt:
case T_AlterTableStmt:
case T_AlterDomainStmt:
case T_CreateTableAsStmt: // CREATE TABLE AS

// Views & Materialized Views
case T_ViewStmt: // CREATE VIEW

// Functions & Procedures
case T_CreateFunctionStmt:
case T_AlterFunctionStmt:

// Types & Domains
case T_CompositeTypeStmt: // CREATE TYPE (composite)
case T_CreateEnumStmt: // CREATE TYPE (enum)
case T_CreateRangeStmt: // CREATE TYPE (range)
case T_AlterEnumStmt:
case T_AlterTypeStmt:
case T_CreateDomainStmt:

// Policies
case T_CreatePolicyStmt:
case T_AlterPolicyStmt:

// Generic Operations (object type dependent)
case T_DropStmt: // DROP (tables, views, functions, etc.)
// case T_RenameStmt: // ALTER ... RENAME TO
// case T_AlterOwnerStmt: // ALTER ... OWNER TO
case T_AlterObjectDependsStmt: // ALTER ... DEPENDS ON
case T_AlterObjectSchemaStmt: // ALTER ... SET SCHEMA
case T_CommentStmt: // COMMENT ON
case T_SecLabelStmt: // SECURITY LABEL

// Grants (object type dependent)
case T_GrantStmt: // GRANT/REVOKE (if object supports event triggers)
case T_GrantRoleStmt: // GRANT/REVOKE role membership
if (isCompleteQuery)
{
HandleOtherDDLCommand();
}
break;
default:
break;
}
Expand Down
Loading