Skip to content

Commit 64de542

Browse files
committed
support multiple threads for polldb
1 parent 7f8a181 commit 64de542

File tree

7 files changed

+27
-10
lines changed

7 files changed

+27
-10
lines changed

pgsync/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@
44

55
__author__ = "Tolu Aina"
66
__email__ = "[email protected]"
7-
__version__ = "2.2.0"
7+
__version__ = "2.2.1"

pgsync/elastichelper.py

+5
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,10 @@ def _bulk(
150150
)
151151
raise_on_error: bool = raise_on_error or ELASTICSEARCH_RAISE_ON_ERROR
152152

153+
# when using multiple threads for poll_db we need to account for other
154+
# threads performing deletions
155+
ignore_status: Tuple[int] = (400, 404)
156+
153157
if ELASTICSEARCH_STREAMING_BULK:
154158
for _ in helpers.streaming_bulk(
155159
self.__es,
@@ -178,6 +182,7 @@ def _bulk(
178182
refresh=refresh,
179183
raise_on_exception=raise_on_exception,
180184
raise_on_error=raise_on_error,
185+
ignore_status=ignore_status,
181186
):
182187
self.doc_count += 1
183188

pgsync/redisqueue.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ def __init__(self, name: str, namespace: str = "queue", **kwargs):
2828
except ConnectionError as e:
2929
logger.exception(f"Redis server is not running: {e}")
3030
raise
31-
self._pipeline = self.__db.pipeline()
3231

3332
@property
3433
def qsize(self) -> int:
@@ -38,9 +37,10 @@ def qsize(self) -> int:
3837
def bulk_pop(self, chunk_size: Optional[int] = None) -> List[dict]:
3938
"""Remove and return multiple items from the queue."""
4039
chunk_size: int = chunk_size or REDIS_READ_CHUNK_SIZE
41-
self._pipeline.lrange(self.key, 0, chunk_size - 1)
42-
self._pipeline.ltrim(self.key, chunk_size, -1)
43-
items: List[List[bytes], bool] = self._pipeline.execute()
40+
pipeline = self.__db.pipeline()
41+
pipeline.lrange(self.key, 0, chunk_size - 1)
42+
pipeline.ltrim(self.key, chunk_size, -1)
43+
items: List[List[bytes], bool] = pipeline.execute()
4444
logger.debug(f"bulk_pop nsize: {len(items[0])}")
4545
return list(map(lambda value: json.loads(value), items[0]))
4646

pgsync/settings.py

+2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
# page block size
3030
BLOCK_SIZE = env.int("BLOCK_SIZE", default=2048 * 10)
3131
QUERY_LITERAL_BINDS = env.bool("QUERY_LITERAL_BINDS", default=None)
32+
# number of threads to spawn for poll db
33+
NTHREADS_POLLDB = env.int("NTHREADS_POLLDB", default=1)
3234

3335
# Elasticsearch:
3436
ELASTICSEARCH_SCHEME = env.str("ELASTICSEARCH_SCHEME", default="http")

pgsync/sync.py

+14-3
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
from .settings import (
4545
CHECKPOINT_PATH,
4646
LOG_INTERVAL,
47+
NTHREADS_POLLDB,
4748
POLL_TIMEOUT,
4849
REDIS_POLL_INTERVAL,
4950
REDIS_WRITE_CHUNK_SIZE,
@@ -1107,7 +1108,7 @@ def status(self):
11071108
sys.stdout.flush()
11081109
time.sleep(LOG_INTERVAL)
11091110

1110-
def receive(self) -> None:
1111+
def receive(self, nthreads_polldb=None) -> None:
11111112
"""
11121113
Receive events from db.
11131114
@@ -1119,7 +1120,9 @@ def receive(self) -> None:
11191120
"""
11201121
# start a background worker producer thread to poll the db and populate
11211122
# the Redis cache
1122-
self.poll_db()
1123+
nthreads_polldb = nthreads_polldb or NTHREADS_POLLDB
1124+
for _ in range(nthreads_polldb):
1125+
self.poll_db()
11231126

11241127
# sync up to current transaction_id
11251128
self.pull()
@@ -1187,6 +1190,13 @@ def receive(self) -> None:
11871190
default=False,
11881191
help="Analyse database",
11891192
)
1193+
@click.option(
1194+
"--nthreads_polldb",
1195+
"-n",
1196+
help="Number of threads to spawn for poll db",
1197+
type=int,
1198+
default=NTHREADS_POLLDB,
1199+
)
11901200
def main(
11911201
config,
11921202
daemon,
@@ -1199,6 +1209,7 @@ def main(
11991209
verbose,
12001210
version,
12011211
analyze,
1212+
nthreads_polldb,
12021213
):
12031214
"""Main application syncer."""
12041215
if version:
@@ -1237,7 +1248,7 @@ def main(
12371248
sync: Sync = Sync(document, verbose=verbose, **kwargs)
12381249
sync.pull()
12391250
if daemon:
1240-
sync.receive()
1251+
sync.receive(nthreads_polldb)
12411252

12421253

12431254
if __name__ == "__main__":

setup.cfg

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[bumpversion]
2-
current_version = 2.1.11
2+
current_version = 2.2.0
33
commit = True
44
tag = True
55

tests/test_redisqueue.py

-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ def test_redis_conn(self, mock_logger, mocker):
2323
mock_get_redis_url.assert_called_once()
2424
mock_ping.assert_called_once()
2525
mock_logger.exception.assert_not_called()
26-
assert queue._pipeline is not None
2726

2827
@patch("pgsync.redisqueue.logger")
2928
def test_redis_conn_fail(self, mock_logger, mocker):

0 commit comments

Comments
 (0)