Skip to content

Commit 002c998

Browse files
committed
change REDIS_CHUNK_SIZE to REDIS_READ_CHUNK_SIZE
1 parent a32f117 commit 002c998

File tree

5 files changed

+27
-23
lines changed

5 files changed

+27
-23
lines changed

.env.sample

+3-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,9 @@
7272
# REDIS_DB=0
7373
# REDIS_AUTH=PLEASE_REPLACE_ME
7474
# number of items to read from Redis at a time
75-
# REDIS_CHUNK_SIZE=1000
75+
# REDIS_READ_CHUNK_SIZE=1000
76+
# number of items to write from Redis at a time
77+
# REDIS_WRITE_CHUNK_SIZE=1000
7678
# redis socket connection timeout
7779
# REDIS_SOCKET_TIMEOUT=5
7880
# REDIS_POLL_INTERVAL=0.01

pgsync/redisqueue.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from redis import Redis
77
from redis.exceptions import ConnectionError
88

9-
from .settings import REDIS_CHUNK_SIZE, REDIS_SOCKET_TIMEOUT
9+
from .settings import REDIS_READ_CHUNK_SIZE, REDIS_SOCKET_TIMEOUT
1010
from .urls import get_redis_url
1111

1212
logger = logging.getLogger(__name__)
@@ -32,13 +32,14 @@ def __init__(self, name: str, namespace: str = "queue", **kwargs):
3232
logger.exception(f"Redis server is not running: {e}")
3333
raise
3434

35+
@property
3536
def qsize(self) -> int:
3637
"""Return the approximate size of the queue."""
3738
return self.__db.llen(self.key)
3839

3940
def empty(self) -> bool:
4041
"""Return True if the queue is empty, False otherwise."""
41-
return self.qsize() == 0
42+
return self.qsize == 0
4243

4344
def push(self, item) -> None:
4445
"""Push item into the queue."""
@@ -60,7 +61,7 @@ def pop(self, block: bool = True, timeout: int = None) -> dict:
6061

6162
def bulk_pop(self, chunk_size: Optional[int] = None) -> List[dict]:
6263
"""Remove and return multiple items from the queue."""
63-
chunk_size: int = chunk_size or REDIS_CHUNK_SIZE
64+
chunk_size: int = chunk_size or REDIS_READ_CHUNK_SIZE
6465
pipeline = self.__db.pipeline()
6566
pipeline.lrange(self.key, 0, chunk_size - 1)
6667
pipeline.ltrim(self.key, chunk_size, -1)

pgsync/settings.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@
105105
REDIS_DB = env.int("REDIS_DB", default=0)
106106
REDIS_AUTH = env.str("REDIS_AUTH", default=None)
107107
# number of items to read from Redis at a time
108-
REDIS_CHUNK_SIZE = env.int("REDIS_CHUNK_SIZE", default=1000)
108+
REDIS_READ_CHUNK_SIZE = env.int("REDIS_READ_CHUNK_SIZE", default=1000)
109109
# number of items to write to Redis at a time
110110
REDIS_WRITE_CHUNK_SIZE = env.int("REDIS_WRITE_CHUNK_SIZE", default=1000)
111111
# redis socket connection timeout

pgsync/sync.py

+11-10
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@
5252
CHECKPOINT_PATH,
5353
LOG_INTERVAL,
5454
POLL_TIMEOUT,
55-
REDIS_WRITE_CHUNK_SIZE,
5655
REDIS_POLL_INTERVAL,
56+
REDIS_WRITE_CHUNK_SIZE,
5757
REPLICATION_SLOT_CLEANUP_INTERVAL,
5858
)
5959
from .transform import get_private_keys, transform
@@ -992,14 +992,15 @@ def poll_db(self) -> None:
992992
channel: str = self.database
993993
cursor.execute(f'LISTEN "{channel}"')
994994
logger.debug(f'Listening for notifications on channel "{channel}"')
995-
item_queue = []
995+
items: list = []
996+
996997
while True:
997998
# NB: consider reducing POLL_TIMEOUT to increase throughout
998999
if select.select([conn], [], [], POLL_TIMEOUT) == ([], [], []):
9991000
# Catch any hanging items from the last poll
1000-
if len(item_queue)>0:
1001-
self.redis.bulk_push(item_queue)
1002-
item_queue = []
1001+
if items:
1002+
self.redis.bulk_push(items)
1003+
items = []
10031004
continue
10041005

10051006
try:
@@ -1009,13 +1010,13 @@ def poll_db(self) -> None:
10091010
os._exit(-1)
10101011

10111012
while conn.notifies:
1012-
if len(item_queue)>=REDIS_WRITE_CHUNK_SIZE:
1013-
self.redis.bulk_push(item_queue)
1014-
item_queue=[]
1013+
if len(items) >= REDIS_WRITE_CHUNK_SIZE:
1014+
self.redis.bulk_push(items)
1015+
items = []
10151016
notification: AnyStr = conn.notifies.pop(0)
10161017
if notification.channel == channel:
10171018
payload = json.loads(notification.payload)
1018-
item_queue.append(payload)
1019+
items.append(payload)
10191020
logger.debug(f"on_notify: {payload}")
10201021
self.count["db"] += 1
10211022

@@ -1094,7 +1095,7 @@ def status(self):
10941095
f"Xlog: [{self.count['xlog']:,}] => "
10951096
f"Db: [{self.count['db']:,}] => "
10961097
f"Redis: [total = {self.count['redis']:,} "
1097-
f"pending = {self.redis.qsize():,}] => "
1098+
f"pending = {self.redis.qsize:,}] => "
10981099
f"Elastic: [{self.es.doc_count:,}] ...\n"
10991100
)
11001101
sys.stdout.flush()

pgsync/utils.py

+8-8
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@
2828

2929
def timeit(func):
3030
def timed(*args, **kwargs):
31-
since = time()
31+
since: float = time()
3232
retval = func(*args, **kwargs)
33-
until = time()
33+
until: float = time()
3434
sys.stdout.write(
3535
f"{func.__name__} ({args}, {kwargs}) {until-since} secs\n"
3636
)
@@ -41,18 +41,18 @@ def timed(*args, **kwargs):
4141

4242
class Timer:
4343
def __init__(self, message: Optional[str] = None):
44-
self._message = message or ""
44+
self.message: str = message or ""
4545

4646
def __enter__(self):
4747
self.start = time()
4848
return self
4949

5050
def __exit__(self, *args):
51-
self.end = time()
52-
self.elapsed = self.end - self.start
51+
end: float = time()
52+
elapsed: float = end - self.start
5353
sys.stdout.write(
54-
f"{self._message} {str(timedelta(seconds=self.elapsed))} "
55-
f"({self.elapsed:2.2f} sec)\n"
54+
f"{self.message} {(timedelta(seconds=elapsed))} "
55+
f"({elapsed:2.2f} sec)\n"
5656
)
5757

5858

@@ -90,7 +90,7 @@ def show_settings(schema: str = None, **kwargs) -> None:
9090
def threaded(fn):
9191
"""Decorator for threaded code execution."""
9292

93-
def wrapper(*args, **kwargs):
93+
def wrapper(*args, **kwargs) -> Thread:
9494
thread: Thread = Thread(target=fn, args=args, kwargs=kwargs)
9595
thread.start()
9696
return thread

0 commit comments

Comments
 (0)