Skip to content

Commit

Permalink
SYN-7128: view.mergeAllowed/wipeAllowed to pull required perms from i…
Browse files Browse the repository at this point in the history
…ndex data (#3736)

feat: Update view merge logic to be more performant by querying the write layer for required permissions instead of iterating over nodeedits.
  • Loading branch information
MichaelSquires authored Jun 10, 2024
1 parent c898399 commit dfc97d0
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 145 deletions.
5 changes: 5 additions & 0 deletions synapse/lib/cell.py
Original file line number Diff line number Diff line change
Expand Up @@ -4142,6 +4142,11 @@ async def getSpooledSet(self):
async with await s_spooled.Set.anit(dirn=self.dirn, cell=self) as sset:
yield sset

@contextlib.asynccontextmanager
async def getSpooledDict(self):
async with await s_spooled.Dict.anit(dirn=self.dirn, cell=self) as sdict:
yield sdict

async def addSignalHandlers(self):
await s_base.Base.addSignalHandlers(self)

Expand Down
12 changes: 12 additions & 0 deletions synapse/lib/coro.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,18 @@ async def agen(item):
for x in item:
yield x

async def pause(genr, iterations=1000):
idx = 0

async for out in agen(genr):
yield out
idx += 1

if idx % iterations == 0:
await asyncio.sleep(0)

return

def executor(func, *args, **kwargs):
'''
Execute a non-coroutine function in the ioloop executor pool.
Expand Down
163 changes: 87 additions & 76 deletions synapse/lib/layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@

import synapse.lib.gis as s_gis
import synapse.lib.cell as s_cell
import synapse.lib.coro as s_coro
import synapse.lib.cache as s_cache
import synapse.lib.nexus as s_nexus
import synapse.lib.queue as s_queue
Expand Down Expand Up @@ -4093,6 +4094,92 @@ async def iterNodeDataKeys(self, buid):
prop = self.getAbrvProp(abrv)
yield prop[0]

async def iterLayerAddPerms(self):

# nodes & props
async for byts, abrv in s_coro.pause(self.propabrv.slab.scanByFull(db=self.propabrv.name2abrv)):
form, prop = s_msgpack.un(byts)
if form is None:
continue

if self.layrslab.prefexists(abrv, db=self.byprop):
if prop:
yield ('node', 'prop', 'set', f'{form}:{prop}')
else:
yield ('node', 'add', form)

# tagprops
async for byts, abrv in s_coro.pause(self.tagpropabrv.slab.scanByFull(db=self.tagpropabrv.name2abrv)):
info = s_msgpack.un(byts)
if None in info or len(info) != 3:
continue

if self.layrslab.prefexists(abrv, db=self.bytagprop):
yield ('node', 'tag', 'add', *info[1].split('.'))

# nodedata
async for abrv in s_coro.pause(self.dataslab.scanKeys(db=self.dataname)):
name, _ = self.getAbrvProp(abrv)
yield ('node', 'data', 'set', name)

# edges
async for verb in s_coro.pause(self.layrslab.scanKeys(db=self.byverb)):
yield ('node', 'edge', 'add', verb.decode())

# tags
# NB: tag perms should be yielded for every leaf on every node in the layer
async with self.core.getSpooledDict() as tags:

# Collect all tag abrvs for all nodes in the layer
async for lkey, buid in s_coro.pause(self.layrslab.scanByFull(db=self.bytag)):
abrv = lkey[:8]
abrvs = list(tags.get(buid, []))
abrvs.append(abrv)
await tags.set(buid, abrvs)

# Iterate over each node and it's tags
async for buid, abrvs in s_coro.pause(tags.items()):
seen = {}

if len(abrvs) == 1:
# Easy optimization: If there's only one tag abrv, then it's a
# leaf by default
name = self.tagabrv.abrvToName(abrv)
key = tuple(name.split('.'))
yield ('node', 'tag', 'add', *key)

else:
for abrv in abrvs:
name = self.tagabrv.abrvToName(abrv)
parts = tuple(name.split('.'))
for idx in range(1, len(parts) + 1):
key = tuple(parts[:idx])
seen.setdefault(key, 0)
seen[key] += 1

for key, count in seen.items():
if count == 1:
yield ('node', 'tag', 'add', *key)

async def iterLayerDelPerms(self):
async for perm in self.iterLayerAddPerms():
if perm[:2] == ('node', 'add'):
yield ('node', 'del', *perm[2:])
continue

match perm[:3]:
case ('node', 'prop', 'set'):
yield ('node', 'prop', 'del', *perm[3:])

case ('node', 'tag', 'add'):
yield ('node', 'tag', 'del', *perm[3:])

case ('node', 'data', 'set'):
yield ('node', 'data', 'pop', *perm[3:])

case ('node', 'edge', 'add'):
yield ('node', 'edge', 'del', *perm[3:])

async def iterLayerNodeEdits(self):
'''
Scan the full layer and yield artificial sets of nodeedits.
Expand Down Expand Up @@ -4450,79 +4537,3 @@ def addedits(buid, form, edits):
addedits(buid, form, edits)

return [(k[0], k[1], v) for (k, v) in editsbynode.items()]

def getNodeEditPerms(nodeedits):
'''
Yields (offs, perm) tuples that can be used in user.allowed()
'''
tags = []
tagadds = []

for nodeoffs, (buid, form, edits) in enumerate(nodeedits):

tags.clear()
tagadds.clear()

for editoffs, (edit, info, _) in enumerate(edits):

permoffs = (nodeoffs, editoffs)

if edit == EDIT_NODE_ADD:
yield (permoffs, ('node', 'add', form))
continue

if edit == EDIT_NODE_DEL:
yield (permoffs, ('node', 'del', form))
continue

if edit == EDIT_PROP_SET:
yield (permoffs, ('node', 'prop', 'set', f'{form}:{info[0]}'))
continue

if edit == EDIT_PROP_DEL:
yield (permoffs, ('node', 'prop', 'del', f'{form}:{info[0]}'))
continue

if edit == EDIT_TAG_SET:
if info[1] != (None, None):
tagadds.append(info[0])
yield (permoffs, ('node', 'tag', 'add', *info[0].split('.')))
else:
tags.append((len(info[0]), editoffs, info[0]))
continue

if edit == EDIT_TAG_DEL:
yield (permoffs, ('node', 'tag', 'del', *info[0].split('.')))
continue

if edit == EDIT_TAGPROP_SET:
yield (permoffs, ('node', 'tag', 'add', *info[0].split('.')))
continue

if edit == EDIT_TAGPROP_DEL:
yield (permoffs, ('node', 'tag', 'del', *info[0].split('.')))
continue

if edit == EDIT_NODEDATA_SET:
yield (permoffs, ('node', 'data', 'set', info[0]))
continue

if edit == EDIT_NODEDATA_DEL:
yield (permoffs, ('node', 'data', 'pop', info[0]))
continue

if edit == EDIT_EDGE_ADD:
yield (permoffs, ('node', 'edge', 'add', info[0]))
continue

if edit == EDIT_EDGE_DEL:
yield (permoffs, ('node', 'edge', 'del', info[0]))
continue

for _, editoffs, tag in sorted(tags, reverse=True):
look = tag + '.'
if any([tagadd.startswith(look) for tagadd in tagadds]):
continue

yield ((nodeoffs, editoffs), ('node', 'tag', 'add', *tag.split('.')))
tagadds.append(tag)
15 changes: 7 additions & 8 deletions synapse/lib/view.py
Original file line number Diff line number Diff line change
Expand Up @@ -1445,10 +1445,9 @@ async def mergeAllowed(self, user=None, force=False):
return

async with await self.parent.snap(user=user) as snap:
async for nodeedit in fromlayr.iterLayerNodeEdits():
for offs, perm in s_layer.getNodeEditPerms([nodeedit]):
self.parent._confirm(user, perm)
await asyncio.sleep(0)
async for perm in fromlayr.iterLayerAddPerms():
self.parent._confirm(user, perm)
await asyncio.sleep(0)

async def wipeAllowed(self, user=None):
'''
Expand All @@ -1457,10 +1456,10 @@ async def wipeAllowed(self, user=None):
if user is None or user.isAdmin():
return

async for nodeedit in self.layers[0].iterWipeNodeEdits():
for offs, perm in s_layer.getNodeEditPerms([nodeedit]):
self._confirm(user, perm)
await asyncio.sleep(0)
layer = self.layers[0]
async for perm in layer.iterLayerDelPerms():
self._confirm(user, perm)
await asyncio.sleep(0)

async def runTagAdd(self, node, tag, valu):

Expand Down
Loading

0 comments on commit dfc97d0

Please sign in to comment.