Skip to content

Commit

Permalink
exclude non required schemas
Browse files Browse the repository at this point in the history
  • Loading branch information
toluaina committed Aug 13, 2024
1 parent d8855a5 commit d577c5b
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 11 deletions.
2 changes: 1 addition & 1 deletion pgsync/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

__author__ = "Tolu Aina"
__email__ = "[email protected]"
__version__ = "3.2.0"
__version__ = "3.2.1"
6 changes: 6 additions & 0 deletions pgsync/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ class Tree(threading.local):
def __post_init__(self):
self.tables: t.Set[str] = set()
self.__nodes: t.Dict[Node] = {}
self.__schemas: t.Set[str] = set()
self.root: t.Optional[Node] = None
self.build(self.nodes)

Expand Down Expand Up @@ -334,6 +335,7 @@ def build(self, nodes: dict) -> Node:
node.add_child(self.build(child))

self.__nodes[key] = node
self.__schemas.add(schema)
return node

def get_node(self, table: str, schema: str) -> Node:
Expand All @@ -352,3 +354,7 @@ def get_node(self, table: str, schema: str) -> Node:
else:
raise RuntimeError(f"Node for {schema}.{table} not found")
return self.__nodes[key]

@property
def schemas(self) -> t.Set[str]:
return self.__schemas
22 changes: 19 additions & 3 deletions pgsync/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,11 @@ def logical_slot_changes(
f"Error parsing row: {e}\nRow data: {row.data}"
)
raise

# filter out unknown schemas
if payload.schema not in self.tree.schemas:
continue

payloads.append(payload)

j: int = i + 1
Expand Down Expand Up @@ -811,7 +816,10 @@ def _payloads(self, payloads: t.List[Payload]) -> None:
# e.g a through table which we need to react to.
# in this case, we find the parent of the through
# table and force a re-sync.
if payload.table not in self.tree.tables:
if (
payload.table not in self.tree.tables
or payload.schema not in self.tree.schemas
):
return

node: Node = self.tree.get_node(payload.table, payload.schema)
Expand Down Expand Up @@ -1112,7 +1120,11 @@ def poll_db(self) -> None:
notification: t.AnyStr = conn.notifies.pop(0)
if notification.channel == self.database:
payload = json.loads(notification.payload)
if payload["indices"] and self.index in payload["indices"]:
if (
payload["indices"]
and self.index in payload["indices"]
and payload["schema"] in self.tree.schemas
):
payloads.append(payload)
logger.debug(f"poll_db: {payload}")
self.count["db"] += 1
Expand All @@ -1134,7 +1146,11 @@ def async_poll_db(self) -> None:
notification: t.AnyStr = self.conn.notifies.pop(0)
if notification.channel == self.database:
payload = json.loads(notification.payload)
if payload["indices"] and self.index in payload["indices"]:
if (
payload["indices"]
and self.index in payload["indices"]
and payload["schema"] in self.tree.schemas
):
self.redis.push([payload])
logger.debug(f"async_poll: {payload}")
self.count["db"] += 1
Expand Down
6 changes: 3 additions & 3 deletions requirements/base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
#
async-timeout==4.0.3
# via redis
boto3==1.34.158
boto3==1.34.159
# via -r requirements/base.in
botocore==1.34.158
botocore==1.34.159
# via
# boto3
# s3transfer
Expand All @@ -23,7 +23,7 @@ click==8.1.7
# via -r requirements/base.in
elastic-transport==8.15.0
# via elasticsearch
elasticsearch==8.14.0
elasticsearch==8.15.0
# via
# -r requirements/base.in
# elasticsearch-dsl
Expand Down
8 changes: 4 additions & 4 deletions requirements/dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ async-timeout==4.0.3
# via redis
black==24.8.0
# via -r requirements/dev.in
boto3==1.34.158
boto3==1.34.159
# via -r requirements/base.in
botocore==1.34.158
botocore==1.34.159
# via
# boto3
# s3transfer
Expand All @@ -35,7 +35,7 @@ distlib==0.3.8
# via virtualenv
elastic-transport==8.15.0
# via elasticsearch
elasticsearch==8.14.0
elasticsearch==8.15.0
# via
# -r requirements/base.in
# elasticsearch-dsl
Expand All @@ -47,7 +47,7 @@ events==0.5
# via opensearch-py
exceptiongroup==1.2.2
# via pytest
faker==26.3.0
faker==27.0.0
# via -r requirements/dev.in
filelock==3.15.4
# via virtualenv
Expand Down

0 comments on commit d577c5b

Please sign in to comment.