Skip to content

Commit

Permalink
Merge branch 'master' into docs_changelog_20240530
Browse files Browse the repository at this point in the history
  • Loading branch information
Cisphyx authored Jun 3, 2024
2 parents e8fa2f2 + 3123889 commit e2f9ffa
Show file tree
Hide file tree
Showing 8 changed files with 294 additions and 29 deletions.
84 changes: 84 additions & 0 deletions synapse/cortex.py
Original file line number Diff line number Diff line change
Expand Up @@ -4496,6 +4496,87 @@ async def _addView(self, vdef):
await self.feedBeholder('view:add', pack, gates=[iden])
return pack

async def delViewWithLayer(self, iden):
'''
Delete a Cortex View and its write Layer if not in use by other View stacks.
Note:
Any children of the View will have their parent View updated to
the deleted View's parent (if present). The deleted View's write Layer
will also be removed from any child Views which contain it in their
Layer stack. If the Layer is used in Views which are not children of
the deleted View, the Layer will be preserved, otherwise it will be
deleted as well.
'''
view = self.views.get(iden)
if view is None:
raise s_exc.NoSuchView(mesg=f'No such view {iden=}', iden=iden)

if view.info.get('protected'):
mesg = f'Cannot delete view ({iden}) that has protected set.'
raise s_exc.CantDelView(mesg=mesg)

layriden = view.layers[0].iden
pareiden = None
if view.parent is not None:
pareiden = view.parent.iden

return await self._push('view:delwithlayer', iden, layriden, newparent=pareiden)

@s_nexus.Pusher.onPush('view:delwithlayer', passitem=True)
async def _delViewWithLayer(self, viewiden, layriden, nexsitem, newparent=None):

if viewiden == self.view.iden:
raise s_exc.SynErr(mesg='Cannot delete the main view')

if (view := self.views.get(viewiden)) is not None:

await self.hive.pop(('cortex', 'views', viewiden))
await view.delete()

self._calcViewsByLayer()
await self.feedBeholder('view:del', {'iden': viewiden}, gates=[viewiden])
await self.auth.delAuthGate(viewiden)

if newparent is not None:
newview = self.views.get(newparent)

layrinuse = False
for view in self.viewsbylayer[layriden]:
if not view.isForkOf(viewiden):
layrinuse = True
continue

view.layers = [lyr for lyr in view.layers if lyr.iden != layriden]
await view.info.set('layers', [lyr.iden for lyr in view.layers])

if view.parent.iden == viewiden:
if newparent is None:
view.parent = None
await view.info.pop('parent')
else:
view.parent = newview
await view.info.set('parent', newparent)

if not layrinuse and (layr := self.layers.get(layriden)) is not None:
del self.layers[layriden]

for pdef in layr.layrinfo.get('pushs', {}).values():
await self.delActiveCoro(pdef.get('iden'))

for pdef in layr.layrinfo.get('pulls', {}).values():
await self.delActiveCoro(pdef.get('iden'))

await self.feedBeholder('layer:del', {'iden': layriden}, gates=[layriden])
await self.auth.delAuthGate(layriden)
self.dynitems.pop(layriden)

await self.hive.pop(('cortex', 'layers', layriden))

await layr.delete()

layr.deloffs = nexsitem[0]

async def delView(self, iden):
view = self.views.get(iden)
if view is None:
Expand Down Expand Up @@ -5158,12 +5239,15 @@ async def runStormDmon(self, iden, ddef):

return dmon

@s_cell.from_leader
async def getStormDmon(self, iden):
return self.stormdmons.getDmonDef(iden)

@s_cell.from_leader
async def getStormDmons(self):
return self.stormdmons.getDmonDefs()

@s_cell.from_leader
async def getStormDmonLog(self, iden):
return self.stormdmons.getDmonRunlog(iden)

Expand Down
19 changes: 19 additions & 0 deletions synapse/lib/cell.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,25 @@ def wrapped(*args, **kwargs):

return decrfunc

def from_leader(func):
'''
Decorator used to indicate that the decorated method must call up to the
leader to perform it's work.
This only works on Cell classes and subclasses. The decorated method name
MUST be the same as the telepath API name.
'''
@functools.wraps(func)
async def wrapper(self, *args, **kwargs):
if not self.isactive:
proxy = await self.nexsroot.client.proxy()
api = getattr(proxy, func.__name__)
return await api(*args, **kwargs)

return await func(self, *args, **kwargs)

return wrapper

async def _doIterBackup(path, chunksize=1024):
'''
Create tarball and stream bytes.
Expand Down
18 changes: 4 additions & 14 deletions synapse/lib/view.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,6 @@ def hasKids(self):
async def _setMergeRequest(self, mergeinfo):
self.reqParentQuorum()

if self.hasKids():
mesg = 'Cannot add a merge request to a view with children.'
raise s_exc.BadState(mesg=mesg)

s_schemas.reqValidMerge(mergeinfo)
lkey = self.bidn + b'merge:req'
self.core.slab.put(lkey, s_msgpack.en(mergeinfo), db='view:meta')
Expand Down Expand Up @@ -450,8 +446,7 @@ async def chunked():
await self.core.feedBeholder('view:merge:fini', {'view': self.iden, 'merge': merge, 'merge': merge, 'votes': votes})

# remove the view and top layer
await self.core.delView(self.iden)
await self.core.delLayer(self.layers[0].iden)
await self.core.delViewWithLayer(self.iden)

except Exception as e: # pragma: no cover
logger.exception(f'Error while merging view: {self.iden}')
Expand Down Expand Up @@ -1112,10 +1107,6 @@ async def setViewInfo(self, name, valu):
mesg = 'Circular dependency of view parents is not supported.'
raise s_exc.BadArg(mesg=mesg)

if parent.getMergeRequest() is not None:
mesg = 'You may not set the parent to a view with a pending merge request.'
raise s_exc.BadState(mesg=mesg)

if self.parent is not None:
if self.parent.iden == parent.iden:
return valu
Expand Down Expand Up @@ -1299,6 +1290,9 @@ async def _insertParentFork(self, ldef, vdef, nexsitem):
s_layer.reqValidLdef(ldef)
s_schemas.reqValidView(vdef)

if self.getMergeRequest() is not None:
await self._delMergeRequest()

await self.core._addLayer(ldef, nexsitem)
await self.core._addView(vdef)

Expand Down Expand Up @@ -1355,10 +1349,6 @@ async def fork(self, ldef=None, vdef=None):
if vdef is None:
vdef = {}

if self.getMergeRequest() is not None:
mesg = 'Cannot fork a view which has a merge request.'
raise s_exc.BadState(mesg=mesg)

ldef = await self.core.addLayer(ldef)
layriden = ldef.get('iden')

Expand Down
64 changes: 64 additions & 0 deletions synapse/tests/test_cortex.py
Original file line number Diff line number Diff line change
Expand Up @@ -6292,12 +6292,14 @@ async def test_cortex_delLayerView(self):

# Can't delete the default view
await self.asyncraises(s_exc.SynErr, core.delView(core.view.iden))
await self.asyncraises(s_exc.SynErr, core._delViewWithLayer(core.view.iden, None, None))

# Can't delete a layer in a view
await self.asyncraises(s_exc.SynErr, core.delLayer(core.view.layers[0].iden))

# Can't delete a nonexistent view
await self.asyncraises(s_exc.NoSuchView, core.delView('XXX'))
await self.asyncraises(s_exc.NoSuchView, core.delViewWithLayer('XXX'))

# Can't delete a nonexistent layer
await self.asyncraises(s_exc.NoSuchLayer, core.delLayer('XXX'))
Expand All @@ -6310,6 +6312,68 @@ async def test_cortex_delLayerView(self):
await core.delView(view2_iden)
await self.asyncraises(s_exc.NoSuchView, core.delView(view2_iden))

layr = await core.addLayer()
layriden = layr['iden']
vdef3 = {'layers': (layriden,)}
view3_iden = (await core.addView(vdef3)).get('iden')

opts = {'view': view3_iden}
await core.callStorm('$lib.view.get().set(protected, $lib.true)', opts=opts)

await self.asyncraises(s_exc.CantDelView, core.delViewWithLayer(view3_iden))

await core.callStorm('$lib.view.get().set(protected, $lib.false)', opts=opts)

view3 = core.getView(view3_iden)
vdef4 = await view3.fork()

deadlayr = view3.layers[0].iden
view4_iden = vdef4.get('iden')
view4 = core.getView(view4_iden)

self.eq(view4.parent, view3)
self.len(2, view4.layers)

await core.auth.rootuser.setPasswd('secret')
host, port = await core.dmon.listen('tcp://127.0.0.1:0/')
layr2 = await core.callStorm('$layer=$lib.layer.add() return($layer)')
varz = {'iden': layriden, 'tgt': layr2.get('iden'), 'port': port}
opts = {'vars': varz, 'view': view3_iden}

pullq = '$layer=$lib.layer.get($iden).addPull(`tcp://root:[email protected]:{$port}/*/layer/{$tgt}`)'
pushq = '$layer=$lib.layer.get($iden).addPush(`tcp://root:[email protected]:{$port}/*/layer/{$tgt}`)'
msgs = await core.stormlist(pullq, opts=opts)
self.stormHasNoWarnErr(msgs)

msgs = await core.stormlist(pushq, opts=opts)
self.stormHasNoWarnErr(msgs)

coros = len(core.activecoros)

await core.delViewWithLayer(view3_iden)

# push/pull activecoros have been deleted
self.len(coros - 2, core.activecoros)

self.none(view4.parent)
self.len(1, view4.layers)
self.none(core.getLayer(deadlayr))

vdef5 = await view4.fork()
view5 = core.getView(vdef5.get('iden'))

usedlayr = view4.layers[0].iden
vdef6 = {'layers': (usedlayr,)}
view6 = core.getView((await core.addView(vdef6)).get('iden'))

await core.delViewWithLayer(view4_iden)

self.none(view5.parent)
self.len(1, view5.layers)

self.nn(core.getLayer(usedlayr))
self.eq([usedlayr], [lyr.iden for lyr in view6.layers])

async def test_cortex_view_opts(self):
'''
Test that the view opts work
Expand Down
86 changes: 86 additions & 0 deletions synapse/tests/test_lib_storm.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import synapse.tests.utils as s_t_utils
from synapse.tests.utils import alist

import synapse.tools.backup as s_tools_backup

class StormTest(s_t_utils.SynTest):

async def test_lib_storm_jsonexpr(self):
Expand Down Expand Up @@ -2041,6 +2043,90 @@ async def test_storm_dmon_caching(self):

self.eq((1, 6), await core.callStorm('return($lib.queue.gen(foo).get(1))'))

async def test_storm_dmon_query_state(self):
with self.getTestDir() as dirn:
dirn00 = s_common.gendir(dirn, 'core00')
dirn01 = s_common.gendir(dirn, 'core01')
dirn02 = s_common.gendir(dirn, 'core02')

async with self.getTestCore(dirn=dirn00) as core00:

msgs = await core00.stormlist('[ inet:ipv4=1.2.3.4 ]')
self.stormHasNoWarnErr(msgs)

s_tools_backup.backup(dirn00, dirn01)
s_tools_backup.backup(dirn00, dirn02)

async with self.getTestCore(dirn=dirn00) as core00:
conf01 = {'mirror': core00.getLocalUrl()}

async with self.getTestCore(dirn=dirn01, conf=conf01) as core01:

conf02 = {'mirror': core01.getLocalUrl()}

async with self.getTestCore(dirn=dirn02, conf=conf02) as core02:

await core02.sync()

nodes = await core01.nodes('inet:ipv4')
self.len(1, nodes)
self.eq(nodes[0].ndef, ('inet:ipv4', 16909060))

q = '''
$lib.queue.gen(dmonloop)
return(
$lib.dmon.add(${
$queue = $lib.queue.get(dmonloop)
while $lib.true {
($offs, $mesg) = $queue.get()
switch $mesg.0 {
"print": { $lib.print($mesg.1) }
"warn": { $lib.warn($mesg.1) }
"leave": {
$lib.print(leaving)
break
}
*: { continue }
}
$queue.cull($offs)
}
}, name=dmonloop)
)
'''
ddef = await core02.callStorm(q)
self.nn(ddef['iden'])

dmons = await core02.getStormDmons()
self.len(1, dmons)
self.eq(dmons[0]['iden'], ddef['iden'])

info = await core02.getStormDmon(ddef['iden'])
self.eq(info['iden'], ddef['iden'])
self.eq(info['name'], 'dmonloop')
self.eq(info['status'], 'running')

await core02.callStorm('$lib.queue.get(dmonloop).put((print, printfoo))')
await core02.callStorm('$lib.queue.get(dmonloop).put((warn, warnfoo))')

info = await core02.getStormDmon(ddef['iden'])
self.eq(info['status'], 'running')

logs = await core02.getStormDmonLog(ddef['iden'])
msgs = [k[1] for k in logs]
self.stormIsInPrint('printfoo', msgs)
self.stormIsInWarn('warnfoo', msgs)

await core02.callStorm('$lib.queue.get(dmonloop).put((leave,))')

info = await core02.getStormDmon(ddef['iden'])
self.eq(info['status'], 'sleeping')

logs = await core02.getStormDmonLog(ddef['iden'])
msgs = [k[1] for k in logs]
self.stormIsInPrint('leaving', msgs)

async def test_storm_pipe(self):

async with self.getTestCore() as core:
Expand Down
Loading

0 comments on commit e2f9ffa

Please sign in to comment.