Skip to content

Commit cbb152e

Browse files
revert s_coro.pause
1 parent dfc97d0 commit cbb152e

File tree

2 files changed

+24
-18
lines changed

2 files changed

+24
-18
lines changed

synapse/lib/coro.py

-12
Original file line numberDiff line numberDiff line change
@@ -39,18 +39,6 @@ async def agen(item):
3939
for x in item:
4040
yield x
4141

42-
async def pause(genr, iterations=1000):
43-
idx = 0
44-
45-
async for out in agen(genr):
46-
yield out
47-
idx += 1
48-
49-
if idx % iterations == 0:
50-
await asyncio.sleep(0)
51-
52-
return
53-
5442
def executor(func, *args, **kwargs):
5543
'''
5644
Execute a non-coroutine function in the ioloop executor pool.

synapse/lib/layer.py

+24-6
Original file line numberDiff line numberDiff line change
@@ -4097,7 +4097,7 @@ async def iterNodeDataKeys(self, buid):
40974097
async def iterLayerAddPerms(self):
40984098

40994099
# nodes & props
4100-
async for byts, abrv in s_coro.pause(self.propabrv.slab.scanByFull(db=self.propabrv.name2abrv)):
4100+
for idx, (byts, abrv) in enumerate(self.propabrv.slab.scanByFull(db=self.propabrv.name2abrv)):
41014101
form, prop = s_msgpack.un(byts)
41024102
if form is None:
41034103
continue
@@ -4108,37 +4108,52 @@ async def iterLayerAddPerms(self):
41084108
else:
41094109
yield ('node', 'add', form)
41104110

4111+
if idx % 1000 == 0:
4112+
await asyncio.sleep(0)
4113+
41114114
# tagprops
4112-
async for byts, abrv in s_coro.pause(self.tagpropabrv.slab.scanByFull(db=self.tagpropabrv.name2abrv)):
4115+
for idx, (byts, abrv) in enumerate(self.tagpropabrv.slab.scanByFull(db=self.tagpropabrv.name2abrv)):
41134116
info = s_msgpack.un(byts)
41144117
if None in info or len(info) != 3:
41154118
continue
41164119

41174120
if self.layrslab.prefexists(abrv, db=self.bytagprop):
41184121
yield ('node', 'tag', 'add', *info[1].split('.'))
41194122

4123+
if idx % 1000 == 0:
4124+
await asyncio.sleep(0)
4125+
41204126
# nodedata
4121-
async for abrv in s_coro.pause(self.dataslab.scanKeys(db=self.dataname)):
4127+
for idx, abrv in enumerate(self.dataslab.scanKeys(db=self.dataname)):
41224128
name, _ = self.getAbrvProp(abrv)
41234129
yield ('node', 'data', 'set', name)
41244130

4131+
if idx % 1000 == 0:
4132+
await asyncio.sleep(0)
4133+
41254134
# edges
4126-
async for verb in s_coro.pause(self.layrslab.scanKeys(db=self.byverb)):
4135+
for idx, verb in enumerate(self.layrslab.scanKeys(db=self.byverb)):
41274136
yield ('node', 'edge', 'add', verb.decode())
41284137

4138+
if idx % 1000 == 0:
4139+
await asyncio.sleep(0)
4140+
41294141
# tags
41304142
# NB: tag perms should be yielded for every leaf on every node in the layer
41314143
async with self.core.getSpooledDict() as tags:
41324144

41334145
# Collect all tag abrvs for all nodes in the layer
4134-
async for lkey, buid in s_coro.pause(self.layrslab.scanByFull(db=self.bytag)):
4146+
for idx, (lkey, buid) in enumerate(self.layrslab.scanByFull(db=self.bytag)):
41354147
abrv = lkey[:8]
41364148
abrvs = list(tags.get(buid, []))
41374149
abrvs.append(abrv)
41384150
await tags.set(buid, abrvs)
41394151

4152+
if idx % 1000 == 0:
4153+
await asyncio.sleep(0)
4154+
41404155
# Iterate over each node and it's tags
4141-
async for buid, abrvs in s_coro.pause(tags.items()):
4156+
for idx, (buid, abrvs) in enumerate(tags.items()):
41424157
seen = {}
41434158

41444159
if len(abrvs) == 1:
@@ -4161,6 +4176,9 @@ async def iterLayerAddPerms(self):
41614176
if count == 1:
41624177
yield ('node', 'tag', 'add', *key)
41634178

4179+
if idx % 1000 == 0:
4180+
await asyncio.sleep(0)
4181+
41644182
async def iterLayerDelPerms(self):
41654183
async for perm in self.iterLayerAddPerms():
41664184
if perm[:2] == ('node', 'add'):

0 commit comments

Comments
 (0)