diff --git a/synapse/common.py b/synapse/common.py index 1796a4140c7..95222fde7c7 100644 --- a/synapse/common.py +++ b/synapse/common.py @@ -1383,6 +1383,29 @@ def _timeout(delay): """ loop = asyncio.get_running_loop() return _Timeout(loop.time() + delay if delay is not None else None) +# End - Vendored Code from Python 3.12+ + +async def waitretn(futu, timeout): + try: + valu = await wait_for(futu, timeout) + return (True, valu) + except Exception as e: + return (False, excinfo(e)) + +async def waitgenr(genr, timeout): + + async with contextlib.aclosing(genr.__aiter__()) as genr: + + while True: + retn = await waitretn(genr.__anext__(), timeout) + + if not retn[0] and retn[1]['err'] == 'StopAsyncIteration': + return + + yield retn + + if not retn[0]: + return def format(text, **kwargs): ''' diff --git a/synapse/cortex.py b/synapse/cortex.py index 3cae3b4f228..81ba8cb9887 100644 --- a/synapse/cortex.py +++ b/synapse/cortex.py @@ -83,6 +83,7 @@ import synapse.lib.stormlib.oauth as s_stormlib_oauth # NOQA import synapse.lib.stormlib.stats as s_stormlib_stats # NOQA import synapse.lib.stormlib.storm as s_stormlib_storm # NOQA +import synapse.lib.stormlib.utils as s_stormlib_utils # NOQA import synapse.lib.stormlib.vault as s_stormlib_vault # NOQA import synapse.lib.stormlib.backup as s_stormlib_backup # NOQA import synapse.lib.stormlib.cortex as s_stormlib_cortex # NOQA diff --git a/synapse/daemon.py b/synapse/daemon.py index 822eae06eec..b212f9f4aba 100644 --- a/synapse/daemon.py +++ b/synapse/daemon.py @@ -462,6 +462,7 @@ async def sessfini(): link.set('sess', sess) if isinstance(item, s_telepath.Aware): + reply[1]['features'] = await item.getTeleFeats() item = await s_coro.ornot(item.getTeleApi, link, mesg, path) if isinstance(item, s_base.Base): link.onfini(item) diff --git a/synapse/lib/aha.py b/synapse/lib/aha.py index 9b2e93ea5a4..95f4c8a11d4 100644 --- a/synapse/lib/aha.py +++ b/synapse/lib/aha.py @@ -132,10 +132,18 @@ async def addAhaClone(self, host, port=27492, conf=None): async def getAhaUrls(self, user='root'): ahaurls = await self.cell.getAhaUrls(user=user) - if ahaurls is None: - return () return ahaurls + @s_cell.adminapi() + async def callAhaPeerApi(self, iden, todo, timeout=None, skiprun=None): + async for item in self.cell.callAhaPeerApi(iden, todo, timeout=timeout, skiprun=skiprun): + yield item + + @s_cell.adminapi() + async def callAhaPeerGenr(self, iden, todo, timeout=None, skiprun=None): + async for item in self.cell.callAhaPeerGenr(iden, todo, timeout=timeout, skiprun=skiprun): + yield item + async def getAhaSvc(self, name, filters=None): ''' Return an AHA service description dictionary for a service name. @@ -366,7 +374,6 @@ async def clearAhaClones(self): ''' return await self.cell.clearAhaClones() - class ProvDmon(s_daemon.Daemon): async def __anit__(self, aha): @@ -571,7 +578,8 @@ async def _initCellBoot(self): async def initServiceStorage(self): - # TODO plumb using a remote jsonstor? + self.features['callpeers'] = 1 + dirn = s_common.gendir(self.dirn, 'slabs', 'jsonstor') slab = await s_lmdbslab.Slab.anit(dirn) @@ -682,8 +690,126 @@ def _initCellHttpApis(self): self.addHttpApi('/api/v1/aha/services', AhaServicesV1, {'cell': self}) self.addHttpApi('/api/v1/aha/provision/service', AhaProvisionServiceV1, {'cell': self}) + async def callAhaSvcApi(self, name, todo, timeout=None): + name = self._getAhaName(name) + svcdef = await self._getAhaSvc(name) + return self._callAhaSvcApi(svcdef, todo, timeout=timeout) + + async def _callAhaSvcApi(self, svcdef, todo, timeout=None): + try: + proxy = await self.getAhaSvcProxy(svcdef, timeout=timeout) + meth = getattr(proxy, todo[0]) + return await s_common.waitretn(meth(*todo[1], **todo[2]), timeout=timeout) + except Exception as e: + # in case proxy construction fails + return (False, s_common.excinfo(e)) + + async def _callAhaSvcGenr(self, svcdef, todo, timeout=None): + try: + proxy = await self.getAhaSvcProxy(svcdef, timeout=timeout) + meth = getattr(proxy, todo[0]) + async for item in s_common.waitgenr(meth(*todo[1], **todo[2]), timeout=timeout): + yield item + except Exception as e: + # in case proxy construction fails + yield (False, s_common.excinfo(e)) + + async def getAhaSvcsByIden(self, iden, online=True, skiprun=None): + + runs = set() + async for svcdef in self.getAhaSvcs(): + await asyncio.sleep(0) + + # TODO services by iden indexes (SYN-8467) + if svcdef['svcinfo'].get('iden') != iden: + continue + + if online and svcdef['svcinfo'].get('online') is None: + continue + + svcrun = svcdef['svcinfo'].get('run') + if svcrun in runs: + continue + + if skiprun == svcrun: + continue + + runs.add(svcrun) + yield svcdef + + def getAhaSvcUrl(self, svcdef, user='root'): + svcfull = svcdef.get('name') + svcnetw = svcdef.get('svcnetw') + host = svcdef['svcinfo']['urlinfo']['host'] + port = svcdef['svcinfo']['urlinfo']['port'] + return f'ssl://{host}:{port}?hostname={svcfull}&certname={user}@{svcnetw}' + + async def callAhaPeerApi(self, iden, todo, timeout=None, skiprun=None): + + if not self.isactive: + proxy = await self.nexsroot.client.proxy(timeout=timeout) + async for item in proxy.callAhaPeerApi(iden, todo, timeout=timeout, skiprun=skiprun): + yield item + + queue = asyncio.Queue() + async with await s_base.Base.anit() as base: + + async def call(svcdef): + svcfull = svcdef.get('name') + await queue.put((svcfull, await self._callAhaSvcApi(svcdef, todo, timeout=timeout))) + + count = 0 + async for svcdef in self.getAhaSvcsByIden(iden, skiprun=skiprun): + count += 1 + base.schedCoro(call(svcdef)) + + for i in range(count): + yield await queue.get() + + async def callAhaPeerGenr(self, iden, todo, timeout=None, skiprun=None): + + if not self.isactive: + proxy = await self.nexsroot.client.proxy(timeout=timeout) + async for item in proxy.callAhaPeerGenr(iden, todo, timeout=timeout, skiprun=skiprun): + yield item + + queue = asyncio.Queue() + async with await s_base.Base.anit() as base: + + async def call(svcdef): + svcfull = svcdef.get('name') + try: + async for item in self._callAhaSvcGenr(svcdef, todo, timeout=timeout): + await queue.put((svcfull, item)) + finally: + await queue.put(None) + + count = 0 + async for svcdef in self.getAhaSvcsByIden(iden, skiprun=skiprun): + count += 1 + base.schedCoro(call(svcdef)) + + while count > 0: + + item = await queue.get() + if item is None: + count -= 1 + continue + + yield item + + async def _finiSvcClients(self): + for client in list(self.clients.values()): + await client.fini() + + async def initServicePassive(self): + await self._finiSvcClients() + async def initServiceRuntime(self): + self.clients = {} + self.onfini(self._finiSvcClients) + self.addActiveCoro(self._clearInactiveSessions) if self.isactive: @@ -897,6 +1023,31 @@ async def addAhaSvc(self, name, info, network=None): await self.fire('aha:svcadd', svcinfo=svcinfo) await self.fire(f'aha:svcadd:{svcfull}', svcinfo=svcinfo) + async def getAhaSvcProxy(self, svcdef, timeout=None): + + client = await self.getAhaSvcClient(svcdef) + if client is None: + return None + + return await client.proxy(timeout=timeout) + + async def getAhaSvcClient(self, svcdef): + + svcfull = svcdef.get('name') + + client = self.clients.get(svcfull) + if client is not None: + return client + + svcurl = self.getAhaSvcUrl(svcdef) + + client = self.clients[svcfull] = await s_telepath.ClientV2.anit(svcurl) + async def fini(): + self.clients.pop(svcfull, None) + + client.onfini(fini) + return client + def _getAhaName(self, name): # the modern version of names is absolute or ... if name.endswith('...'): @@ -1066,6 +1217,10 @@ async def _setAhaSvcDown(self, name, linkiden, network=None): logger.info(f'Set [{svcfull}] offline.', extra=await self.getLogExtra(name=svcname, netw=svcnetw)) + client = self.clients.pop(svcfull, None) + if client is not None: + await client.fini() + async def getAhaSvc(self, name, filters=None): name = self._getAhaName(name) diff --git a/synapse/lib/cell.py b/synapse/lib/cell.py index adcd1dc5b76..1012342ed33 100644 --- a/synapse/lib/cell.py +++ b/synapse/lib/cell.py @@ -83,6 +83,8 @@ PERM_ADMIN: 'admin', } +feat_aha_callpeers_v1 = ('callpeers', 1) + diskspace = "Insufficient free space on disk." def adminapi(log=False): @@ -435,6 +437,19 @@ async def ps(self): async def kill(self, iden): return await self.cell.kill(self.user, iden) + @adminapi() + async def getTasks(self, peers=True, timeout=None): + async for task in self.cell.getTasks(peers=peers, timeout=timeout): + yield task + + @adminapi() + async def getTask(self, iden, peers=True, timeout=None): + return await self.cell.getTask(iden, peers=peers, timeout=timeout) + + @adminapi() + async def killTask(self, iden, peers=True, timeout=None): + return await self.cell.killTask(iden, peers=peers, timeout=timeout) + @adminapi(log=True) async def behold(self): ''' @@ -765,8 +780,8 @@ async def saveHiveTree(self, path=()): return await self.cell.saveHiveTree(path=path) @adminapi() - async def getNexusChanges(self, offs, tellready=False): - async for item in self.cell.getNexusChanges(offs, tellready=tellready): + async def getNexusChanges(self, offs, tellready=False, wait=True): + async for item in self.cell.getNexusChanges(offs, tellready=tellready, wait=wait): yield item @adminapi() @@ -1165,6 +1180,11 @@ async def __anit__(self, dirn, conf=None, readonly=False, parent=None): self.netready = asyncio.Event() self.conf = self._initCellConf(conf) + self.features = { + 'tellready': 1, + 'dynmirror': 1, + 'tasks': 1, + } self.minfree = self.conf.get('limit:disk:free') if self.minfree is not None: @@ -2382,8 +2402,8 @@ async def initServiceActive(self): # pragma: no cover async def initServicePassive(self): # pragma: no cover pass - async def getNexusChanges(self, offs, tellready=False): - async for item in self.nexsroot.iter(offs, tellready=tellready): + async def getNexusChanges(self, offs, tellready=False, wait=True): + async for item in self.nexsroot.iter(offs, tellready=tellready, wait=wait): yield item def _reqBackDirn(self, name): @@ -4414,6 +4434,111 @@ async def ps(self, user): return retn + async def getAhaProxy(self, timeout=None, feats=None): + + if self.ahaclient is None: + return + + proxy = await self.ahaclient.proxy(timeout=timeout) + if proxy is None: + logger.warning('AHA client connection failed.') + return + + if feats is not None: + for name, vers in feats: + if not proxy._hasTeleFeat(name, vers): + logger.warning(f'AHA server does not support feature: {name} >= {vers}') + return None + + return proxy + + async def callPeerApi(self, todo, timeout=None): + ''' + Yield responses from our peers via the AHA gather call API. + ''' + proxy = await self.getAhaProxy(timeout=timeout, feats=(feat_aha_callpeers_v1,)) + if proxy is None: # pragma: no cover + return + + async for item in proxy.callAhaPeerApi(self.iden, todo, timeout=timeout, skiprun=self.runid): + yield item + + async def callPeerGenr(self, todo, timeout=None): + ''' + Yield responses from invoking a generator via the AHA gather API. + ''' + proxy = await self.getAhaProxy(timeout=timeout, feats=(feat_aha_callpeers_v1,)) + if proxy is None: # pragma: no cover + return + + async for item in proxy.callAhaPeerGenr(self.iden, todo, timeout=timeout, skiprun=self.runid): + yield item + + async def getTasks(self, peers=True, timeout=None): + + for task in self.boss.ps(): + + item = task.pack() + item['service'] = self.ahasvcname + + yield item + + if not peers: + return + + todo = s_common.todo('getTasks', peers=False) + # we can ignore the yielded aha names because we embed it in the task + async for (ahasvc, (ok, retn)) in self.callPeerGenr(todo, timeout=timeout): + + if not ok: # pragma: no cover + logger.warning(f'getTasks() on {ahasvc} failed: {retn}') + continue + + yield retn + + async def getTask(self, iden, peers=True, timeout=None): + + task = self.boss.get(iden) + if task is not None: + item = task.pack() + item['service'] = self.ahasvcname + return item + + if not peers: + return + + todo = s_common.todo('getTask', iden, peers=False, timeout=timeout) + async for ahasvc, (ok, retn) in self.callPeerApi(todo, timeout=timeout): + + if not ok: # pragma: no cover + logger.warning(f'getTask() on {ahasvc} failed: {retn}') + continue + + if retn is not None: + return retn + + async def killTask(self, iden, peers=True, timeout=None): + + task = self.boss.get(iden) + if task is not None: + await task.kill() + return True + + if not peers: + return False + + todo = s_common.todo('killTask', iden, peers=False, timeout=timeout) + async for ahasvc, (ok, retn) in self.callPeerApi(todo, timeout=timeout): + + if not ok: # pragma: no cover + logger.warning(f'killTask() on {ahasvc} failed: {retn}') + continue + + if retn: + return True + + return False + async def kill(self, user, iden): perm = ('task', 'del') isallowed = await self.isUserAllowed(user.iden, perm) @@ -4488,13 +4613,13 @@ async def getCellInfo(self): 'https': self.https_listeners, } }, - 'features': { - 'tellready': True, - 'dynmirror': True, - }, + 'features': self.features, } return ret + async def getTeleFeats(self): + return dict(self.features) + async def getSystemInfo(self): ''' Get info about the system in which the cell is running diff --git a/synapse/lib/nexus.py b/synapse/lib/nexus.py index 063a34bf12f..1fd837663ca 100644 --- a/synapse/lib/nexus.py +++ b/synapse/lib/nexus.py @@ -409,7 +409,7 @@ async def _apply(self, indx, mesg): return await func(nexus, *args, **kwargs) - async def iter(self, offs: int, tellready=False) -> AsyncIterator[Any]: + async def iter(self, offs: int, tellready=False, wait=True) -> AsyncIterator[Any]: ''' Returns an iterator of change entries in the log ''' @@ -430,6 +430,9 @@ async def iter(self, offs: int, tellready=False) -> AsyncIterator[Any]: if tellready: yield None + if not wait: + return + async with self.getChangeDist(maxoffs) as dist: async for item in dist: if self.isfini: diff --git a/synapse/lib/reflect.py b/synapse/lib/reflect.py index 2897bc5a414..dfc00fd12de 100644 --- a/synapse/lib/reflect.py +++ b/synapse/lib/reflect.py @@ -87,17 +87,16 @@ def getShareInfo(item): if not callable(attr): continue + meths[name] = meth = {} + # We know we can cleanly unwrap these functions # for asyncgenerator inspection. wrapped = getattr(attr, '__syn_wrapped__', None) if wrapped in unwraps: - real = inspect.unwrap(attr) - if inspect.isasyncgenfunction(real): - meths[name] = {'genr': True} - continue + attr = inspect.unwrap(attr) if inspect.isasyncgenfunction(attr): - meths[name] = {'genr': True} + meth['genr'] = True try: setattr(item, key, info) diff --git a/synapse/lib/stormlib/aha.py b/synapse/lib/stormlib/aha.py index d5f060ef20a..43f105e2647 100644 --- a/synapse/lib/stormlib/aha.py +++ b/synapse/lib/stormlib/aha.py @@ -1,4 +1,7 @@ +import textwrap + import synapse.exc as s_exc +import synapse.common as s_common import synapse.lib.stormtypes as s_stormtypes @s_stormtypes.registry.registerLib @@ -49,6 +52,80 @@ class AhaLib(s_stormtypes.Lib): 'type': {'type': 'function', '_funcname': '_methAhaList', 'args': (), 'returns': {'name': 'Yields', 'type': 'list', 'desc': 'The AHA service dictionaries.', }}}, + {'name': 'callPeerApi', 'desc': '''Call an API on all peers (leader and mirrors) of an AHA service and yield the responses from each. + + Examples: + Call getCellInfo on an AHA service:: + + $todo = $lib.utils.todo('getCellInfo') + for $info in $lib.aha.callPeerApi(cortex..., $todo) { + $lib.print($info) + } + + Call getCellInfo on an AHA service, skipping the invoking service:: + + $todo = $lib.utils.todo('getCellInfo') + for $info in $lib.aha.callPeerApi(cortex..., $todo, skiprun=$lib.cell.getCellInfo().cell.run) { + $lib.print($info) + } + + Call method with arguments:: + + $todo = $lib.utils.todo(('method', ([1, 2]), ({'foo': 'bar'}))) + for $info in $lib.aha.callPeerApi(cortex..., $todo) { + $lib.print($info) + } + + ''', + 'type': {'type': 'function', '_funcname': '_methCallPeerApi', + 'args': ( + {'name': 'svcname', 'type': 'str', + 'desc': 'The name of the AHA service to call. It is easiest to use the relative name of a service, ending with "...".', }, + {'name': 'todo', 'type': 'list', + 'desc': 'The todo tuple (name, args, kwargs).'}, + {'name': 'timeout', 'type': 'int', 'default': None, + 'desc': 'Optional timeout in seconds.'}, + {'name': 'skiprun', 'type': 'str', 'default': None, + 'desc': '''Optional run ID argument that allows skipping results from a specific service run ID. + This is most often used to omit the invoking service from the results, ensuring that only responses from other services are included. + '''}, + ), + 'returns': {'name': 'yields', 'type': 'list', + 'desc': 'Yields the results of the API calls as tuples of (svcname, (ok, info)).', }}}, + {'name': 'callPeerGenr', 'desc': '''Call a generator API on all peers (leader and mirrors) of an AHA service and yield the responses from each. + + Examples: + Call getNexusChanges on an AHA service:: + + $todo = $lib.utils.todo('getNexusChanges', (0), wait=$lib.false) + for $info in $lib.aha.callPeerGenr(cortex..., $todo) { + $lib.print($info) + } + + Call getNexusChanges on an AHA service, skipping the invoking service:: + + $todo = $lib.utils.todo('getNexusChanges', (0), wait=$lib.false) + for $info in $lib.aha.callPeerGenr(cortex..., $todo, skiprun=$lib.cell.getCellInfo().cell.run) { + $lib.print($info) + } + + ''', + 'type': {'type': 'function', '_funcname': '_methCallPeerGenr', + 'args': ( + {'name': 'svcname', 'type': 'str', + 'desc': 'The name of the AHA service to call. It is easiest to use the relative name of a service, ending with "...".', }, + {'name': 'todo', 'type': 'list', + 'desc': 'The todo tuple (name, args, kwargs).'}, + {'name': 'timeout', 'type': 'int', 'default': None, + 'desc': 'Optional timeout in seconds.'}, + {'name': 'skiprun', 'type': 'str', 'default': None, + 'desc': '''Optional run ID argument that allows skipping results from a specific service run ID. + This is most often used to omit the invoking service from the results, ensuring that only responses from other services are included. + '''}, + ), + 'returns': {'name': 'yields', 'type': 'list', + 'desc': 'Yields the results of the API calls as tuples containing (svcname, (ok, info)).', }}} + ) _storm_lib_path = ('aha',) def getObjLocals(self): @@ -56,6 +133,8 @@ def getObjLocals(self): 'del': self._methAhaDel, 'get': self._methAhaGet, 'list': self._methAhaList, + 'callPeerApi': self._methCallPeerApi, + 'callPeerGenr': self._methCallPeerGenr, } @s_stormtypes.stormfunc(readonly=True) @@ -85,6 +164,62 @@ async def _methAhaGet(self, svcname, filters=None): proxy = await self.runt.snap.core.reqAhaProxy() return await proxy.getAhaSvc(svcname, filters=filters) + async def _methCallPeerApi(self, svcname, todo, timeout=None, skiprun=None): + ''' + Call an API on an AHA service. + + Args: + svcname (str): The name of the AHA service to call. + todo (list): The todo tuple from $lib.utils.todo(). + timeout (int): Optional timeout in seconds. + skiprun (str): Optional run ID argument allows skipping self-enumeration. + ''' + svcname = await s_stormtypes.tostr(svcname) + todo = await s_stormtypes.toprim(todo) + timeout = await s_stormtypes.toint(timeout, noneok=True) + skiprun = await s_stormtypes.tostr(skiprun, noneok=True) + + proxy = await self.runt.snap.core.reqAhaProxy() + svc = await proxy.getAhaSvc(svcname) + if svc is None: + raise s_exc.NoSuchName(mesg=f'No AHA service found for {svcname}') + + svcinfo = svc.get('svcinfo') + svciden = svcinfo.get('iden') + if svciden is None: + raise s_exc.NoSuchName(mesg=f'Service {svcname} has no iden') + + async for svcname, (ok, info) in proxy.callAhaPeerApi(svciden, todo, timeout=timeout, skiprun=skiprun): + yield (svcname, (ok, info)) + + async def _methCallPeerGenr(self, svcname, todo, timeout=None, skiprun=None): + ''' + Call a generator API on an AHA service. + + Args: + svcname (str): The name of the AHA service to call. + todo (list): The todo tuple from $lib.utils.todo(). + timeout (int): Optional timeout in seconds. + skiprun (str): Optional run ID argument allows skipping self-enumeration. + ''' + svcname = await s_stormtypes.tostr(svcname) + todo = await s_stormtypes.toprim(todo) + timeout = await s_stormtypes.toint(timeout, noneok=True) + skiprun = await s_stormtypes.tostr(skiprun, noneok=True) + + proxy = await self.runt.snap.core.reqAhaProxy() + svc = await proxy.getAhaSvc(svcname) + if svc is None: + raise s_exc.NoSuchName(mesg=f'No AHA service found for {svcname}') + + svcinfo = svc.get('svcinfo') + svciden = svcinfo.get('iden') + if svciden is None: + raise s_exc.NoSuchName(mesg=f'Service {svcname} has no iden') + + async for svcname, (ok, info) in proxy.callAhaPeerGenr(svciden, todo, timeout=timeout, skiprun=skiprun): + yield (svcname, (ok, info)) + @s_stormtypes.registry.registerLib class AhaPoolLib(s_stormtypes.Lib): ''' @@ -531,5 +666,220 @@ async def _methPoolSvcDel(self, svcname): } } ''' - } + }, + { + 'name': 'aha.svc.mirror', + 'descr': textwrap.dedent('''\ + Query the AHA services and their mirror relationships. + + Note: non-mirror services are not displayed. + '''), + 'cmdargs': ( + ('--timeout', {'help': 'The timeout in seconds for individual service API calls.', + 'default': 10, 'type': 'int'}), + ('--wait', {'help': 'Whether to wait for the mirrors to sync.', + 'action': 'store_true'}), + ), + 'storm': ''' + init { + $conf = ({ + "columns": [ + {"name": "name", "width": 40}, + {"name": "role", "width": 9}, + {"name": "online", "width": 7}, + {"name": "ready", "width": 6}, + {"name": "host", "width": 16}, + {"name": "port", "width": 8}, + {"name": "version", "width": 12}, + {"name": "nexus idx", "width": 10}, + ], + "separators": { + "row:outline": false, + "column:outline": false, + "header:row": "#", + "data:row": "", + "column": "", + }, + }) + $printer = $lib.tabular.printer($conf) + $timeout = $cmdopts.timeout + $wait = $cmdopts.wait + } + + function get_cell_infos(vname, timeout) { + $cell_infos = ({}) + $todo = $lib.utils.todo('getCellInfo') + for $info in $lib.aha.callPeerApi($vname, $todo, timeout=$timeout) { + $svcname = $info.0 + ($ok, $info) = $info.1 + if $ok { + $cell_infos.$svcname = $info + } + } + return($cell_infos) + } + + function build_status_list(members, cell_infos) { + $group_status = () + for $svc in $members { + $svcinfo = $svc.svcinfo + $svcname = $svc.name + $status = ({ + 'name': $svcname, + 'role': '', + 'online': $lib.dict.has($svcinfo, 'online'), + 'ready': $svcinfo.ready, + 'host': $svcinfo.urlinfo.host, + 'port': $svcinfo.urlinfo.port, + 'version': '', + 'nexs_indx': (0) + }) + if ($cell_infos.$svcname) { + $info = $cell_infos.$svcname + $cell_info = $info.cell + $status.nexs_indx = $cell_info.nexsindx + if ($cell_info.uplink) { + $status.role = 'follower' + } else { + $status.role = 'leader' + } + $status.version = $info.synapse.verstring + } + $group_status.append($status) + } + return($group_status) + } + + function check_sync_status(group_status) { + $indices = $lib.set() + $known_count = (0) + for $status in $group_status { + $indices.add($status.nexs_indx) + $known_count = ($known_count + (1)) + } + if ($lib.len($indices) = 1) { + if ($known_count = $lib.len($group_status)) { + return(true) + } + } + } + + function output_status(vname, group_status, printer) { + $lib.print($printer.header()) + $lib.print($vname) + for $status in $group_status { + if ($status.nexs_indx = 0) { + $status.nexs_indx = '' + } + $row = ( + $status.name, + $status.role, + $status.online, + $status.ready, + $status.host, + $status.port, + $status.version, + $status.nexs_indx + ) + $lib.print($printer.row($row)) + } + } + + $virtual_services = ({}) + $member_servers = ({}) + + for $svc in $lib.aha.list() { + $name = $svc.name + $svcinfo = $svc.svcinfo + $urlinfo = $svcinfo.urlinfo + $hostname = $urlinfo.hostname + + if ($name != $hostname) { + $virtual_services.$name = $svc + } else { + $member_servers.$name = $svc + } + } + + $mirror_groups = ({}) + for ($vname, $vsvc) in $virtual_services { + $vsvc_info = $vsvc.svcinfo + $vsvc_iden = $vsvc_info.iden + $vsvc_leader = $vsvc_info.leader + $vsvc_hostname = $vsvc_info.urlinfo.hostname + + if (not $vsvc_iden or not $vsvc_hostname or not $vsvc_leader) { + continue + } + + $primary_member = $member_servers.$vsvc_hostname + if (not $primary_member) { + continue + } + + $members = ([$primary_member]) + for ($mname, $msvc) in $member_servers { + if ($mname != $vsvc_hostname) { + $msvc_info = $msvc.svcinfo + if ($msvc_info.iden = $vsvc_iden and $msvc_info.leader = $vsvc_leader) { + $members.append($msvc) + } + } + } + + if ($lib.len($members) > 1) { + $mirror_groups.$vname = $members + } + } + + for ($vname, $members) in $mirror_groups { + $cell_infos = $get_cell_infos($vname, $timeout) + $group_status = $build_status_list($members, $cell_infos) + $lib.print('Service Mirror Groups:') + $output_status($vname, $group_status, $printer) + + if $check_sync_status($group_status) { + $lib.print('Group Status: In Sync') + } else { + $lib.print(`Group Status: Out of Sync`) + if $wait { + $leader_nexs = (0) + for $status in $group_status { + if (($status.role = 'leader') and ($status.nexs_indx > 0)) { + $leader_nexs = $status.nexs_indx + } + } + if ($leader_nexs > 0) { + while (true) { + $responses = () + $todo = $lib.utils.todo(waitNexsOffs, ($leader_nexs - 1), timeout=$timeout) + for $info in $lib.aha.callPeerApi($vname, $todo, timeout=$timeout) { + $svcname = $info.0 + ($ok, $info) = $info.1 + if ($ok and $info) { + $responses.append(($svcname, $info)) + } + } + + if ($lib.len($responses) = $lib.len($members)) { + $cell_infos = $get_cell_infos($vname, $timeout) + $group_status = $build_status_list($members, $cell_infos) + + $lib.print('') + $lib.print('Updated status:') + $output_status($vname, $group_status, $printer) + + if $check_sync_status($group_status) { + $lib.print('Group Status: In Sync') + break + } + } + } + } + } + } + $lib.print('') + } + ''' + }, ) diff --git a/synapse/lib/stormlib/utils.py b/synapse/lib/stormlib/utils.py new file mode 100644 index 00000000000..b0ed1f280ad --- /dev/null +++ b/synapse/lib/stormlib/utils.py @@ -0,0 +1,37 @@ +import synapse.lib.stormtypes as s_stormtypes + +@s_stormtypes.registry.registerLib +class LibUtils(s_stormtypes.Lib): + ''' + A Storm library for working with utility functions. + ''' + _storm_locals = ( + {'name': 'todo', + 'desc': ''' + Create a todo tuple of (name, args, kwargs). + ''', + 'type': {'type': 'function', '_funcname': '_todo', + 'args': ( + {'name': '_todoname', 'type': 'str', + 'desc': 'The todo name.'}, + {'name': '*args', 'type': 'any', + 'desc': 'Positional arguments for the todo.'}, + {'name': '**kwargs', 'type': 'any', + 'desc': 'Keyword arguments for the todo.'}, + ), + 'returns': {'type': 'list', 'desc': 'A todo tuple of (name, args, kwargs).'}, + }}, + ) + _storm_lib_path = ('utils',) + + def getObjLocals(self): + return { + 'todo': self._todo, + } + + @s_stormtypes.stormfunc(readonly=True) + async def _todo(self, _todoname, *args, **kwargs): + _todoname = await s_stormtypes.tostr(_todoname) + args = await s_stormtypes.toprim(args) + kwargs = await s_stormtypes.toprim(kwargs) + return (_todoname, args, kwargs) diff --git a/synapse/telepath.py b/synapse/telepath.py index ea9efe107fe..7b355058b8b 100644 --- a/synapse/telepath.py +++ b/synapse/telepath.py @@ -321,6 +321,9 @@ async def getTeleApi(self, link, mesg, path): ''' return self + async def getTeleFeats(self): + return {} + def onTeleShare(self, dmon, name): pass @@ -599,6 +602,8 @@ async def __anit__(self, link, name): self.shares = {} self._ahainfo = {} + self._features = {} + self.sharinfo = {} self.methinfo = {} @@ -634,6 +639,12 @@ async def fini(): self.onfini(fini) self.link.onfini(self.fini) + def _hasTeleFeat(self, name, vers=1): + return self._features.get(name, 0) >= vers + + def _hasTeleMeth(self, name): + return self.methinfo.get(name) is not None + def _getSynVers(self): ''' Helper method to retrieve the remote Synapse version from Proxy. @@ -906,6 +917,7 @@ async def handshake(self, auth=None): self.sess = self.synack[1].get('sess') self._ahainfo = self.synack[1].get('ahainfo', {}) + self._features = self.synack[1].get('features', {}) self.sharinfo = self.synack[1].get('sharinfo', {}) self.methinfo = self.sharinfo.get('meths', {}) diff --git a/synapse/tests/test_common.py b/synapse/tests/test_common.py index f1c258f71a6..a0a6307f86f 100644 --- a/synapse/tests/test_common.py +++ b/synapse/tests/test_common.py @@ -12,6 +12,24 @@ logger = logging.getLogger(__name__) class CommonTest(s_t_utils.SynTest): + + async def test_waitgenr(self): + + async def genr(): + yield 10 + raise Exception('omg') + + rets = [retn async for retn in s_common.waitgenr(genr(), 10)] + + self.true(rets[0][0]) + self.false(rets[1][0]) + + async def one(): + yield 'item' + + rets = [retn async for retn in s_common.waitgenr(one(), timeout=1.0)] + self.eq(rets, [(True, 'item')]) + def test_tuplify(self): tv = ['node', [['test:str', 'test'], {'tags': { diff --git a/synapse/tests/test_lib_aha.py b/synapse/tests/test_lib_aha.py index 9f26a6314fc..cb09c42c142 100644 --- a/synapse/tests/test_lib_aha.py +++ b/synapse/tests/test_lib_aha.py @@ -828,6 +828,33 @@ async def test_lib_aha_provision(self): async with await s_telepath.openurl(url) as prox: self.fail(f'Connected to an expired clone URL {url}') # pragma: no cover + async def test_lib_aha_mirrors(self): + + async with self.getTestAha() as aha: + async with await s_base.Base.anit() as base: + with self.getTestDir() as dirn: + user = 'synuser' + dirn00 = s_common.genpath(dirn, 'cell00') + dirn01 = s_common.genpath(dirn, 'cell01') + + core00 = await base.enter_context(self.addSvcToAha(aha, '00.cortex', s_cortex.Cortex, dirn=dirn00, + provinfo={'conf': {'aha:user': user}})) + self.eq(core00.conf.get('aha:user'), user) + + core01 = await base.enter_context(self.addSvcToAha(aha, '01.cortex', s_cortex.Cortex, dirn=dirn01, + conf={'axon': 'aha://cortex...'}, + provinfo={'conf': {'aha:user': user}})) + self.eq(core01.conf.get('aha:user'), user) + + async with aha.getLocalProxy() as ahaproxy: + self.eq(None, await ahaproxy.getAhaSvcMirrors('99.bogus')) + self.len(1, await ahaproxy.getAhaSvcMirrors('00.cortex.synapse')) + self.nn(await ahaproxy.getAhaServer(host=aha._getDnsName(), port=aha.sockaddr[1])) + + todo = s_common.todo('getCellInfo') + res = await asyncio.wait_for(await aha.callAhaSvcApi('00.cortex.synapse', todo, timeout=3), 3) + self.nn(res) + async def test_aha_httpapi(self): async with self.getTestAha() as aha: @@ -905,6 +932,10 @@ async def test_aha_httpapi(self): info = await resp.json() self.eq(info.get('status'), 'err') self.eq(info.get('code'), 'SchemaViolation') + async with sess.post(url, json={'name': 'doom' * 16}) as resp: + info = await resp.json() + self.eq(info.get('status'), 'err') + self.eq(info.get('code'), 'BadArg') # Not an admin await aha.addUser('lowuser', passwd='lowuser') @@ -1093,6 +1124,35 @@ async def test_aha_service_pools(self): self.stormHasNoWarnErr(msgs) self.stormIsInPrint('Removed AHA service pool: pool00.synapse', msgs) + async def test_aha_svc_api_exception(self): + + async with self.getTestAha() as aha: + + async def mockGetAhaSvcProxy(*args, **kwargs): + raise s_exc.SynErr(mesg='proxy error') + + aha.getAhaSvcProxy = mockGetAhaSvcProxy + name = '00.cortex.synapse' + todo = ('bogus', (), {}) + retn = await asyncio.wait_for(await aha.callAhaSvcApi(name, todo), 3) + + self.false(retn[0]) + self.eq('SynErr', retn[1].get('err')) + self.eq('proxy error', retn[1].get('errinfo').get('mesg')) + + bad_info = { + 'urlinfo': { + 'host': 'nonexistent.host', + 'port': 12345, + 'scheme': 'ssl' + } + } + + await aha.addAhaSvc(name, bad_info) + async for ok, info in aha.callAhaPeerGenr(name, ('nonexistent.method', (), {})): + self.false(ok) + self.isin('err', info) + async def test_aha_reprovision(self): with self.withNexusReplay() as stack: @@ -1317,3 +1377,184 @@ def _getProvListen(_self): async with await s_telepath.openurl(provurl) as prox: info = await prox.getUserInfo() self.eq(info.get('aha:user'), 'bob.grey') + + async def test_aha_gather(self): + + async with self.getTestAha() as aha: + + async with aha.waiter(3, 'aha:svcadd', timeout=10): + + conf = {'aha:provision': await aha.addAhaSvcProv('00.cell')} + cell00 = await aha.enter_context(self.getTestCell(conf=conf)) + + conf = {'aha:provision': await aha.addAhaSvcProv('01.cell', {'mirror': 'cell'})} + cell01 = await aha.enter_context(self.getTestCell(conf=conf)) + + await cell01.sync() + + async with cell01.getLocalProxy() as proxy: + self.true(proxy._hasTeleFeat('dynmirror')) + self.true(proxy._hasTeleMeth('getNexsIndx')) + + nexsindx = await cell00.getNexsIndx() + + # test the call endpoint + todo = s_common.todo('getCellInfo') + items = dict([item async for item in aha.callAhaPeerApi(cell00.iden, todo, timeout=3)]) + self.sorteq(items.keys(), ('00.cell.synapse', '01.cell.synapse')) + self.true(all(item[0] for item in items.values())) + self.eq(cell00.runid, items['00.cell.synapse'][1]['cell']['run']) + self.eq(cell01.runid, items['01.cell.synapse'][1]['cell']['run']) + + todo = s_common.todo('newp') + items = dict([item async for item in aha.callAhaPeerApi(cell00.iden, todo, timeout=3)]) + self.false(any(item[0] for item in items.values())) + self.sorteq(items.keys(), ('00.cell.synapse', '01.cell.synapse')) + + # test the genr endpoint + reals = [item async for item in cell00.getNexusChanges(0, wait=False)] + todo = s_common.todo('getNexusChanges', 0, wait=False) + items = [item async for item in aha.callAhaPeerGenr(cell00.iden, todo, timeout=3) if item[1]] + self.len(nexsindx * 2, items) + + # ensure we handle down services correctly + async with aha.waiter(1, 'aha:svcdown', timeout=10): + await cell01.fini() + + # test the call endpoint + todo = s_common.todo('getCellInfo') + items = dict([item async for item in aha.callAhaPeerApi(cell00.iden, todo, timeout=3)]) + self.sorteq(items.keys(), ('00.cell.synapse',)) + self.true(all(item[0] for item in items.values())) + self.eq(cell00.runid, items['00.cell.synapse'][1]['cell']['run']) + + # test the genr endpoint + todo = s_common.todo('getNexusChanges', 0, wait=False) + items = [item async for item in aha.callAhaPeerGenr(cell00.iden, todo, timeout=3) if item[1]] + self.len(nexsindx, items) + self.true(all(item[1][0] for item in items)) + + # test some of the gather API implementations... + purl00 = await aha.addAhaSvcProv('0.cell') + purl01 = await aha.addAhaSvcProv('1.cell', provinfo={'mirror': '0.cell'}) + + cell00 = await aha.enter_context(self.getTestCell(conf={'aha:provision': purl00})) + cell01 = await aha.enter_context(self.getTestCell(conf={'aha:provision': purl01})) + + await cell01.sync() + + async def sleep99(cell): + await cell.boss.promote('sleep99', cell.auth.rootuser) + await cell00.fire('sleep99') + await asyncio.sleep(99) + + async with cell00.waiter(2, 'sleep99', timeout=2): + task00 = cell00.schedCoro(sleep99(cell00)) + task01 = cell01.schedCoro(sleep99(cell01)) + + proxy = await aha.enter_context(aha.getLocalProxy()) + tasks = [task async for task in cell00.getTasks(timeout=3)] + self.len(2, tasks) + self.eq(tasks[0]['service'], '0.cell.synapse') + self.eq(tasks[1]['service'], '1.cell.synapse') + + self.eq(tasks[0], await cell00.getTask(tasks[0].get('iden'))) + self.eq(tasks[1], await cell00.getTask(tasks[1].get('iden'))) + self.none(await cell00.getTask(tasks[1].get('iden'), peers=False)) + + self.true(await cell00.killTask(tasks[0].get('iden'))) + + task01 = tasks[1].get('iden') + self.false(await cell00.killTask(task01, peers=False)) + + self.true(await cell00.killTask(task01)) + + self.none(await cell00.getTask(task01)) + self.false(await cell00.killTask(task01)) + + self.none(await cell00.getAhaProxy(feats=(('newp', 9),))) + + async def test_lib_aha_peer_api(self): + + async with self.getTestAha() as aha: + + purl00 = await aha.addAhaSvcProv('0.cell') + purl01 = await aha.addAhaSvcProv('1.cell', provinfo={'mirror': '0.cell'}) + purl02 = await aha.addAhaSvcProv('2.cell', provinfo={'mirror': '0.cell'}) + + cell00 = await aha.enter_context(self.getTestCell(conf={'aha:provision': purl00})) + cell01 = await aha.enter_context(self.getTestCell(conf={'aha:provision': purl01})) + cell02 = await aha.enter_context(self.getTestCell(conf={'aha:provision': purl02})) + + await cell01.sync() + await cell02.sync() + + todo = s_common.todo('getCellInfo') + items = [item async for item in cell00.callPeerApi(todo)] + self.len(2, items) + + async def test_lib_aha_peer_genr(self): + + async with self.getTestAha() as aha: + + purl00 = await aha.addAhaSvcProv('0.cell') + purl01 = await aha.addAhaSvcProv('1.cell', provinfo={'mirror': '0.cell'}) + + cell00 = await aha.enter_context(self.getTestCell(conf={'aha:provision': purl00})) + cell01 = await aha.enter_context(self.getTestCell(conf={'aha:provision': purl01})) + + await cell01.sync() + + todo = s_common.todo('getNexusChanges', 0, wait=False) + items = dict([item async for item in cell00.callPeerGenr(todo)]) + self.len(1, items) + + todo = s_common.todo('getNexusChanges', 0, wait=False) + items = dict([item async for item in cell00.callPeerGenr(todo, timeout=2)]) + self.len(1, items) + + async def test_lib_aha_call_aha_peer_api_isactive(self): + + async with self.getTestAha() as aha0: + + async with aha0.waiter(3, 'aha:svcadd', timeout=10): + + conf = {'aha:provision': await aha0.addAhaSvcProv('00.cell')} + cell00 = await aha0.enter_context(self.getTestCell(conf=conf)) + + conf = {'aha:provision': await aha0.addAhaSvcProv('01.cell', {'mirror': 'cell'})} + cell01 = await aha0.enter_context(self.getTestCell(conf=conf)) + + await cell01.sync() + + # test active AHA peer + todo = s_common.todo('getCellInfo') + items = dict([item async for item in aha0.callAhaPeerApi(cell00.iden, todo, timeout=3)]) + self.sorteq(items.keys(), ('00.cell.synapse', '01.cell.synapse')) + + todo = s_common.todo('getNexusChanges', 0, wait=False) + items = dict([item async for item in aha0.callAhaPeerGenr(cell00.iden, todo, timeout=3)]) + self.sorteq(items.keys(), ('00.cell.synapse', '01.cell.synapse')) + + async with aha0.getLocalProxy() as proxy0: + purl = await proxy0.addAhaClone('01.aha.loop.vertex.link') + + conf1 = {'clone': purl} + async with self.getTestAha(conf=conf1) as aha1: + + await aha1.sync() + + self.eq(aha0.iden, aha1.iden) + self.nn(aha1.conf.get('mirror')) + + self.true(aha0.isactive) + self.false(aha1.isactive) + + # test non-active AHA peer + todo = s_common.todo('getCellInfo') + items = dict([item async for item in aha1.callAhaPeerApi(cell00.iden, todo, timeout=3)]) + self.sorteq(items.keys(), ('00.cell.synapse', '01.cell.synapse')) + + todo = s_common.todo('getNexusChanges', 0, wait=False) + items = dict([item async for item in aha1.callAhaPeerGenr(cell00.iden, todo, timeout=3)]) + self.sorteq(items.keys(), ('00.cell.synapse', '01.cell.synapse')) diff --git a/synapse/tests/test_lib_cell.py b/synapse/tests/test_lib_cell.py index 08f9b541a34..88b753515ce 100644 --- a/synapse/tests/test_lib_cell.py +++ b/synapse/tests/test_lib_cell.py @@ -3294,6 +3294,51 @@ async def test_lib_cell_promote_schism_prevent(self): await cell00.promote(graceful=True) self.isin('02.cell is not the current leader', cm.exception.get('mesg')) + async def test_cell_get_aha_proxy(self): + + async with self.getTestCell() as cell: + + self.none(await cell.getAhaProxy()) + + class MockAhaClient: + def __init__(self, proxy=None): + self._proxy = proxy + + async def proxy(self, timeout=None): + return self._proxy + + with self.getAsyncLoggerStream('synapse.lib.cell', 'AHA client connection failed.') as stream: + cell.ahaclient = MockAhaClient() + self.none(await cell.getAhaProxy()) + self.true(await stream.wait(timeout=1)) + + class MockProxyHasNot: + def _hasTeleFeat(self, name, vers): + return False + + cell.ahaclient = MockAhaClient(proxy=MockProxyHasNot()) + self.none(await cell.getAhaProxy(feats=(('test', 1),))) + + class MockProxyHas: + def _hasTeleFeat(self, name, vers): + return True + + mock_proxy = MockProxyHas() + cell.ahaclient = MockAhaClient(proxy=mock_proxy) + self.eq(await cell.getAhaProxy(), mock_proxy) + self.eq(await cell.getAhaProxy(feats=(('test', 1),)), mock_proxy) + + async def test_lib_cell_sadaha(self): + + async with self.getTestCell() as cell: + + self.none(await cell.getAhaProxy()) + cell.ahaclient = await s_telepath.Client.anit('cell:///tmp/newp') + + # coverage for failure of aha client to connect + with self.raises(TimeoutError): + self.none(await cell.getAhaProxy(timeout=0.1)) + async def test_stream_backup_exception(self): with self.getTestDir() as dirn: diff --git a/synapse/tests/test_lib_stormlib_aha.py b/synapse/tests/test_lib_stormlib_aha.py index 47a5edde125..7a8c4f9424f 100644 --- a/synapse/tests/test_lib_stormlib_aha.py +++ b/synapse/tests/test_lib_stormlib_aha.py @@ -6,6 +6,8 @@ import synapse.tests.utils as s_test +import unittest.mock as mock + class AhaLibTest(s_test.SynTest): async def test_stormlib_aha_basics(self): @@ -193,3 +195,189 @@ async def test_stormlib_aha_basics(self): with self.raises(s_exc.NoSuchName): await core00.callStorm('$lib.aha.del(axon...)') + + async def test_stormlib_aha_mirror(self): + + async with self.getTestAha() as aha: + + with self.getTestDir() as dirn: + + dirn00 = s_common.genpath(dirn, 'cell00') + dirn01 = s_common.genpath(dirn, 'cell01') + dirn02 = s_common.genpath(dirn, 'cell02') + + async with aha.waiter(3, 'aha:svcadd', timeout=10): + + cell00 = await aha.enter_context(self.addSvcToAha(aha, '00.cell', s_cell.Cell, dirn=dirn00)) + cell01 = await aha.enter_context(self.addSvcToAha(aha, '01.cell', s_cell.Cell, dirn=dirn01, + provinfo={'mirror': 'cell'})) + core00 = await aha.enter_context(self.addSvcToAha(aha, 'core', s_cortex.Cortex, dirn=dirn02)) + await cell01.sync() + + # PeerGenr + resp = await core00.callStorm(''' + $resps = $lib.list() + $todo = $lib.utils.todo('getTasks') + for ($name, $info) in $lib.aha.callPeerGenr(cell..., $todo) { + $resps.append(($name, $info)) + } + return($resps) + ''') + self.len(0, resp) + + resp = await core00.callStorm(''' + $resps = $lib.list() + $todo = $lib.utils.todo('getNexusChanges', (0), wait=(false)) + for ($name, $info) in $lib.aha.callPeerGenr(cell..., $todo) { + $resps.append(($name, $info)) + } + return($resps) + ''') + self.len(4, resp) + + cell00_rid = (await cell00.getCellInfo())['cell']['run'] + resp = await core00.callStorm(''' + $resps = $lib.list() + $todo = $lib.utils.todo('getNexusChanges', (0), wait=(false)) + for $info in $lib.aha.callPeerGenr(cell..., $todo, skiprun=$skiprun) { + $resps.append($info) + } + return($resps) + ''', opts={'vars': {'skiprun': cell00_rid}}) + self.len(2, resp) + + await self.asyncraises(s_exc.NoSuchName, core00.callStorm(''' + $todo = $lib.utils.todo('getTasks') + $lib.aha.callPeerGenr(null, $todo) + ''')) + + # PeerApi + resp = await core00.callStorm(''' + $resps = $lib.list() + $todo = $lib.utils.todo('getCellInfo') + for ($name, $info) in $lib.aha.callPeerApi(cell..., $todo) { + $resps.append(($name, $info)) + } + return($resps) + ''') + self.len(2, resp) + self.eq(resp[0][0], '00.cell.synapse') + self.eq(resp[0][1][0], True) + self.isinstance(resp[0][1][1], dict) + + resp = await core00.callStorm(''' + $resps = $lib.list() + $todo = $lib.utils.todo(getCellInfo) + for ($name, $info) in $lib.aha.callPeerApi(cell..., $todo) { + $resps.append(($name, $info)) + } + return($resps) + ''') + self.len(2, resp) + + resp = await core00.callStorm(''' + $resps = $lib.list() + $todo = $lib.utils.todo(getCellInfo) + for ($name, $info) in $lib.aha.callPeerApi(cell..., $todo, skiprun=$skiprun) { + $resps.append(($name, $info)) + } + return($resps) + ''', opts={'vars': {'skiprun': cell00_rid}}) + self.len(1, resp) + + resp = await core00.callStorm(''' + $resps = $lib.list() + $todo = $lib.utils.todo('getCellInfo') + for ($name, $info) in $lib.aha.callPeerApi(cell..., $todo, timeout=(10)) { + $resps.append(($name, $info)) + } + return($resps) + ''') + self.len(2, resp) + + await self.asyncraises(s_exc.NoSuchName, core00.callStorm(''' + $todo = $lib.utils.todo('getCellInfo') + $lib.aha.callPeerApi(newp..., $todo) + ''')) + + await self.asyncraises(s_exc.NoSuchName, core00.callStorm(''' + $todo = $lib.utils.todo('getCellInfo') + $lib.aha.callPeerApi(null, $todo) + ''')) + + await self.asyncraises(s_exc.NoSuchMeth, core00.callStorm(''' + $todo = $lib.utils.todo('bogusMethod') + for $info in $lib.aha.callPeerApi(cell..., $todo) { + ($ok, $info) = $info.1 + if (not $ok) { + $lib.raise($info.err, $info.errmsg) + } + } + ''')) + + await aha.addAhaSvc('noiden.cell', info={'urlinfo': {'scheme': 'tcp', + 'host': '0.0.0.0', + 'port': '3030'}}, + network='synapse') + await self.asyncraises(s_exc.NoSuchName, core00.callStorm(''' + $todo = $lib.utils.todo('getTasks') + $lib.aha.callPeerGenr(noiden.cell..., $todo) + ''')) + await self.asyncraises(s_exc.NoSuchName, core00.callStorm(''' + $todo = $lib.utils.todo('getCellInfo') + $lib.aha.callPeerApi(noiden.cell..., $todo) + ''')) + + msgs = await core00.stormlist('aha.svc.mirror') + self.stormIsInPrint('Service Mirror Groups:', msgs) + self.stormIsInPrint('cell.synapse', msgs) + self.stormIsInPrint('00.cell.synapse', msgs) + self.stormIsInPrint('01.cell.synapse', msgs) + self.stormIsInPrint('Group Status: In Sync', msgs) + + msgs = await core00.stormlist('aha.svc.mirror --timeout 30') + self.stormIsInPrint('Service Mirror Groups:', msgs) + self.stormIsInPrint('Group Status: In Sync', msgs) + + async def mockCellInfo(): + return { + 'cell': {'ready': True, 'nexsindx': 10, 'uplink': None}, + 'synapse': {'verstring': '2.190.0'}, + } + + async def mockOutOfSyncCellInfo(): + return { + 'cell': {'ready': True, 'nexsindx': 5, 'uplink': cell00.iden}, + 'synapse': {'verstring': '2.190.0'}, + } + + with mock.patch.object(cell00, 'getCellInfo', mockCellInfo): + with mock.patch.object(cell01, 'getCellInfo', mockOutOfSyncCellInfo): + async def mock_call_aha(*args, **kwargs): + todo = args[1] + if todo[0] == 'waitNexsOffs': + yield ('00.cell.synapse', (True, True)) + yield ('01.cell.synapse', (True, True)) + elif todo[0] == 'getCellInfo': + if not hasattr(mock_call_aha, 'called'): + mock_call_aha.called = True + yield ('00.cell.synapse', (True, await mockCellInfo())) + yield ('01.cell.synapse', (True, await mockOutOfSyncCellInfo())) + else: + yield ('00.cell.synapse', (True, await mockCellInfo())) + yield ('01.cell.synapse', (True, await mockCellInfo())) + + with mock.patch.object(aha, 'callAhaPeerApi', mock_call_aha): + msgs = await core00.stormlist('aha.svc.mirror --wait') + self.stormIsInPrint('Group Status: Out of Sync', msgs) + self.stormIsInPrint('Updated status:', msgs) + self.stormIsInPrint('Group Status: In Sync', msgs) + + with mock.patch.object(cell00, 'getCellInfo', mockCellInfo): + with mock.patch.object(cell01, 'getCellInfo', mockOutOfSyncCellInfo): + msgs = await core00.stormlist('aha.svc.mirror --timeout 1') + self.stormIsInPrint('Group Status: Out of Sync', msgs) + + await aha.delAhaSvc('00.cell', network='synapse') + msgs = await core00.stormlist('aha.svc.mirror') + self.stormNotInPrint('Service Mirror Groups:', msgs) diff --git a/synapse/tests/test_lib_stormlib_utils.py b/synapse/tests/test_lib_stormlib_utils.py new file mode 100644 index 00000000000..68d3a316961 --- /dev/null +++ b/synapse/tests/test_lib_stormlib_utils.py @@ -0,0 +1,14 @@ +import synapse.exc as s_exc +import synapse.tests.utils as s_test + +class UtilsTest(s_test.SynTest): + + async def test_lib_stormlib_utils_todo(self): + + async with self.getTestCore() as core: + + valu = await core.callStorm('return($lib.utils.todo(foo))') + self.eq(valu, ('foo', (), {})) + + valu = await core.callStorm('return($lib.utils.todo(fooName, arg1, arg2, keyword=bar, anotherkeyword=hehe))') + self.eq(valu, ('fooName', ('arg1', 'arg2'), {'keyword': 'bar', 'anotherkeyword': 'hehe'})) diff --git a/synapse/tests/test_tools_aha.py b/synapse/tests/test_tools_aha.py index 8938a276a38..b7b866a581e 100644 --- a/synapse/tests/test_tools_aha.py +++ b/synapse/tests/test_tools_aha.py @@ -3,14 +3,17 @@ from unittest import mock +import synapse.exc as s_exc import synapse.common as s_common import synapse.lib.cell as s_cell +import synapse.lib.version as s_version import synapse.tests.utils as s_t_utils import synapse.tools.aha.list as s_a_list import synapse.tools.aha.clone as s_a_clone import synapse.tools.aha.enroll as s_a_enroll +import synapse.tools.aha.mirror as s_a_mirror import synapse.tools.aha.easycert as s_a_easycert import synapse.tools.aha.provision.user as s_a_provision_user @@ -170,3 +173,192 @@ async def test_aha_enroll(self): teleyaml = s_common.yamlload(syndir, 'telepath.yaml') self.eq(teleyaml.get('version'), 1) + + async def test_aha_mirror(self): + + async with self.getTestAha() as aha: + + base_svcinfo = { + 'iden': 'test_iden', + 'leader': 'leader', + 'urlinfo': { + 'scheme': 'tcp', + 'host': '127.0.0.1', + 'port': 0, + 'hostname': 'test.host' + } + } + + conf_no_iden = {'aha:provision': await aha.addAhaSvcProv('no.iden')} + async with self.getTestCell(s_cell.Cell, conf=conf_no_iden) as cell_no_iden: + svcinfo = {k: v for k, v in base_svcinfo.items() if k != 'iden'} + await aha.addAhaSvc('no.iden', svcinfo) + + argv = ['--url', aha.getLocalUrl()] + retn, outp = await self.execToolMain(s_a_mirror.main, argv) + self.eq(retn, 0) + outp.expect('Service Mirror Groups:') + self.notin('no.iden', str(outp)) + + conf_no_host = {'aha:provision': await aha.addAhaSvcProv('no.host')} + async with self.getTestCell(s_cell.Cell, conf=conf_no_host) as cell_no_host: + svcinfo = dict(base_svcinfo) + svcinfo['urlinfo'] = {k: v for k, v in base_svcinfo['urlinfo'].items() if k != 'hostname'} + await aha.addAhaSvc('no.host', svcinfo) + + argv = ['--url', aha.getLocalUrl()] + retn, outp = await self.execToolMain(s_a_mirror.main, argv) + self.eq(retn, 0) + outp.expect('Service Mirror Groups:') + self.notin('no.host', str(outp)) + + conf_no_leader = {'aha:provision': await aha.addAhaSvcProv('no.leader')} + async with self.getTestCell(s_cell.Cell, conf=conf_no_leader) as cell_no_leader: + svcinfo = {k: v for k, v in base_svcinfo.items() if k != 'leader'} + await aha.addAhaSvc('no.leader', svcinfo) + + argv = ['--url', aha.getLocalUrl()] + retn, outp = await self.execToolMain(s_a_mirror.main, argv) + self.eq(retn, 0) + outp.expect('Service Mirror Groups:') + self.notin('no.leader', str(outp)) + + conf_no_primary = {'aha:provision': await aha.addAhaSvcProv('no.primary')} + async with self.getTestCell(s_cell.Cell, conf=conf_no_primary) as cell_no_primary: + svcinfo = dict(base_svcinfo) + svcinfo['urlinfo']['hostname'] = 'nonexistent.host' + await aha.addAhaSvc('no.primary', svcinfo) + + async with aha.waiter(3, 'aha:svcadd', timeout=10): + + conf = {'aha:provision': await aha.addAhaSvcProv('00.cell')} + cell00 = await aha.enter_context(self.getTestCell(conf=conf)) + + conf = {'aha:provision': await aha.addAhaSvcProv('01.cell', {'mirror': 'cell'})} + cell01 = await aha.enter_context(self.getTestCell(conf=conf)) + + await cell01.sync() + + ahaurl = aha.getLocalUrl() + + argv = ['--url', ahaurl] + retn, outp = await self.execToolMain(s_a_mirror.main, argv) + self.eq(retn, 0) + outp.expect('Service Mirror Groups:') + outp.expect('00.cell.synapse') + outp.expect('01.cell.synapse') + outp.expect('Group Status: In Sync') + + argv = ['--url', ahaurl, '--timeout', '30'] + retn, outp = await self.execToolMain(s_a_mirror.main, argv) + self.eq(retn, 0) + + with mock.patch('synapse.telepath.Proxy._hasTeleFeat', + return_value=False): + argv = ['--url', ahaurl] + retn, outp = await self.execToolMain(s_a_mirror.main, argv) + self.eq(retn, 1) + outp.expect(f'Service at {ahaurl} does not support the required callpeers feature.') + + with mock.patch('synapse.telepath.Proxy._hasTeleFeat', + side_effect=s_exc.NoSuchMeth(name='_hasTeleFeat')): + argv = ['--url', ahaurl] + retn, outp = await self.execToolMain(s_a_mirror.main, argv) + self.eq(retn, 1) + outp.expect(f'Service at {ahaurl} does not support the required callpeers feature.') + + argv = ['--url', 'tcp://newp:1234/'] + retn, outp = await self.execToolMain(s_a_mirror.main, argv) + self.eq(retn, 1) + outp.expect('ERROR:') + + async def mockCellInfo(): + return { + 'cell': {'ready': True, 'nexsindx': 10, 'uplink': None}, + 'synapse': {'verstring': s_version.verstring}, + } + + async def mockOutOfSyncCellInfo(): + return { + 'cell': {'ready': True, 'nexsindx': 5, 'uplink': cell00.iden}, + 'synapse': {'verstring': s_version.verstring}, + } + + with mock.patch.object(cell00, 'getCellInfo', mockCellInfo): + with mock.patch.object(cell01, 'getCellInfo', mockOutOfSyncCellInfo): + async def mock_call_aha(*args, **kwargs): + todo = args[1] + if todo[0] == 'waitNexsOffs': + yield ('00.cell.synapse', (True, True)) + yield ('01.cell.synapse', (True, True)) + elif todo[0] == 'getCellInfo': + if not hasattr(mock_call_aha, 'called'): + mock_call_aha.called = True + yield ('00.cell.synapse', (True, await mockCellInfo())) + yield ('01.cell.synapse', (True, await mockOutOfSyncCellInfo())) + else: + yield ('00.cell.synapse', (True, await mockCellInfo())) + yield ('01.cell.synapse', (True, await mockCellInfo())) + + with mock.patch.object(aha, 'callAhaPeerApi', mock_call_aha): + argv = ['--url', ahaurl, '--wait'] + retn, outp = await self.execToolMain(s_a_mirror.main, argv) + self.eq(retn, 0) + outp.expect('Group Status: Out of Sync') + outp.expect('Updated status:') + outp.expect('Group Status: In Sync') + + with mock.patch.object(cell00, 'getCellInfo', mockCellInfo): + with mock.patch.object(cell01, 'getCellInfo', mockOutOfSyncCellInfo): + argv = ['--url', ahaurl, '--timeout', '1'] + retn, outp = await self.execToolMain(s_a_mirror.main, argv) + self.eq(retn, 0) + outp.expect('Group Status: Out of Sync') + + async with self.getTestCore() as core: + curl = core.getLocalUrl() + argv = ['--url', curl] + with mock.patch('synapse.telepath.Proxy._hasTeleFeat', + return_value=True): + retn, outp = await self.execToolMain(s_a_mirror.main, argv) + self.eq(1, retn) + outp.expect(f'Service at {curl} is not an Aha server') + + async with aha.waiter(1, 'aha:svcadd', timeout=10): + + conf = {'aha:provision': await aha.addAhaSvcProv('02.cell', {'mirror': 'cell'})} + cell02 = await aha.enter_context(self.getTestCell(conf=conf)) + await cell02.sync() + + async def mock_failed_api(*args, **kwargs): + yield ('00.cell.synapse', (True, {'cell': {'ready': True, 'nexsindx': 10}})) + yield ('01.cell.synapse', (False, 'error')) + yield ('02.cell.synapse', (True, {'cell': {'ready': True, 'nexsindx': 12}})) + + with mock.patch.object(aha, 'callAhaPeerApi', mock_failed_api): + argv = ['--url', ahaurl, '--timeout', '1'] + retn, outp = await self.execToolMain(s_a_mirror.main, argv) + outp.expect('00.cell.synapse leader True True 127.0.0.1', whitespace=False) + outp.expect('nexsindx 10', whitespace=False) + outp.expect('02.cell.synapse leader True True 127.0.0.1', whitespace=False) + outp.expect('nexsindx 12', whitespace=False) + outp.expect('01.cell.synapse True True', whitespace=False) + outp.expect(' ', whitespace=False) + + self.eq(s_a_mirror.timeout_type('30'), 30) + self.eq(s_a_mirror.timeout_type('0'), 0) + + with self.raises(s_exc.BadArg) as cm: + s_a_mirror.timeout_type('-1') + self.isin('is not a valid non-negative integer', cm.exception.get('mesg')) + + with self.raises(s_exc.BadArg) as cm: + s_a_mirror.timeout_type('foo') + self.isin('is not a valid non-negative integer', cm.exception.get('mesg')) + + synerr = s_exc.SynErr(mesg='Oof') + with mock.patch('synapse.telepath.openurl', side_effect=synerr): + argv = ['--url', 'tcp://test:1234/'] + retn, outp = await self.execToolMain(s_a_mirror.main, argv) + self.eq(retn, 1) + outp.expect('ERROR: Oof') diff --git a/synapse/tools/aha/mirror.py b/synapse/tools/aha/mirror.py new file mode 100644 index 00000000000..77f5fa3c5b1 --- /dev/null +++ b/synapse/tools/aha/mirror.py @@ -0,0 +1,193 @@ +import sys +import asyncio +import argparse + +import synapse.exc as s_exc +import synapse.common as s_common +import synapse.telepath as s_telepath + +import synapse.lib.output as s_output +import synapse.lib.version as s_version + +descr = ''' +Query the Aha server for the service cluster status of mirrors. + +Examples: + + python -m synapse.tools.aha.mirror --timeout 30 + +''' + +async def get_cell_infos(prox, iden, members, timeout): + cell_infos = {} + if iden is not None: + todo = s_common.todo('getCellInfo') + async for svcname, (ok, info) in prox.callAhaPeerApi(iden, todo, timeout=timeout): + if not ok: + continue + cell_infos[svcname] = info + return cell_infos + +def build_status_list(members, cell_infos): + group_status = [] + for svc in members: + svcname = svc.get('name') + svcinfo = svc.get('svcinfo', {}) + status = { + 'name': svcname, + 'role': '', + 'online': str('online' in svcinfo), + 'ready': 'True', + 'host': svcinfo.get('urlinfo', {}).get('host', ''), + 'port': str(svcinfo.get('urlinfo', {}).get('port', '')), + 'version': '', + 'nexs_indx': 0 + } + if svcname in cell_infos: + info = cell_infos[svcname] + cell_info = info.get('cell', {}) + status.update({ + 'nexs_indx': cell_info.get('nexsindx', 0), + 'role': 'follower' if cell_info.get('uplink') else 'leader', + 'version': str(info.get('synapse', {}).get('verstring', '')), + 'online': 'True', + 'ready': str(cell_info.get('ready', False)) + }) + group_status.append(status) + return group_status + +def output_status(outp, vname, group_status): + header = ' {:<40} {:<10} {:<8} {:<7} {:<16} {:<9} {:<12} {:<10}'.format( + 'name', 'role', 'online', 'ready', 'host', 'port', 'version', 'nexus idx') + outp.printf(header) + outp.printf('#' * 120) + outp.printf(vname) + for status in group_status: + if status['nexs_indx'] == 0: + status['nexs_indx'] = '' + line = ' {name:<40} {role:<10} {online:<8} {ready:<7} {host:<16} {port:<9} {version:<12} {nexs_indx:<10}'.format(**status) + outp.printf(line) + +def check_sync_status(group_status): + indices = {status['nexs_indx'] for status in group_status} + known_count = sum(1 for status in group_status) + return len(indices) == 1 and known_count == len(group_status) + +def timeout_type(valu): + try: + ivalu = int(valu) + if ivalu < 0: + raise ValueError + except ValueError: + raise s_exc.BadArg(mesg=f"{valu} is not a valid non-negative integer") + return ivalu + +async def main(argv, outp=s_output.stdout): + + pars = argparse.ArgumentParser(prog='synapse.tools.aha.mirror', description=descr, + formatter_class=argparse.RawDescriptionHelpFormatter) + + pars.add_argument('--url', default='cell:///vertex/storage', help='The telepath URL to connect to the AHA service.') + pars.add_argument('--timeout', type=timeout_type, default=10, help='The timeout in seconds for individual service API calls') + pars.add_argument('--wait', action='store_true', help='Whether to wait for the mirrors to sync.') + opts = pars.parse_args(argv) + + async with s_telepath.withTeleEnv(): + try: + async with await s_telepath.openurl(opts.url) as prox: + try: + if not prox._hasTeleFeat('callpeers', vers=1): + outp.printf(f'Service at {opts.url} does not support the required callpeers feature.') + return 1 + except s_exc.NoSuchMeth: + outp.printf(f'Service at {opts.url} does not support the required callpeers feature.') + return 1 + classes = prox._getClasses() + if 'synapse.lib.aha.AhaApi' not in classes: + outp.printf(f'Service at {opts.url} is not an Aha server') + return 1 + + virtual_services, member_servers = {}, {} + async for svc in prox.getAhaSvcs(): + name = svc.get('name', '') + svcinfo = svc.get('svcinfo', {}) + urlinfo = svcinfo.get('urlinfo', {}) + hostname = urlinfo.get('hostname', '') + + if name != hostname: + virtual_services[name] = svc + else: + member_servers[name] = svc + + mirror_groups = {} + for vname, vsvc in virtual_services.items(): + vsvc_info = vsvc.get('svcinfo', {}) + vsvc_iden = vsvc_info.get('iden') + vsvc_leader = vsvc_info.get('leader') + vsvc_hostname = vsvc_info.get('urlinfo', {}).get('hostname', '') + + if not vsvc_iden or not vsvc_hostname or not vsvc_leader: + continue + + primary_member = member_servers.get(vsvc_hostname) + if not primary_member: + continue + + members = [primary_member] + [ + msvc for mname, msvc in member_servers.items() + if mname != vsvc_hostname and + msvc.get('svcinfo', {}).get('iden') == vsvc_iden and + msvc.get('svcinfo', {}).get('leader') == vsvc_leader + ] + + if len(members) > 1: + mirror_groups[vname] = members + + outp.printf('Service Mirror Groups:') + for vname, members in mirror_groups.items(): + iden = members[0].get('svcinfo', {}).get('iden') + + cell_infos = await get_cell_infos(prox, iden, members, opts.timeout) + group_status = build_status_list(members, cell_infos) + output_status(outp, vname, group_status) + + if check_sync_status(group_status): + outp.printf('Group Status: In Sync') + else: + outp.printf(f'Group Status: Out of Sync') + if opts.wait: + leader_nexs = None + for status in group_status: + if status['role'] == 'leader' and isinstance(status['nexs_indx'], int): + leader_nexs = status['nexs_indx'] + + if leader_nexs is not None: + while True: + responses = [] + todo = s_common.todo('waitNexsOffs', leader_nexs - 1, timeout=opts.timeout) + async for svcname, (ok, info) in prox.callAhaPeerApi(iden, todo, timeout=opts.timeout): + if ok and info: + responses.append((svcname, info)) + + if len(responses) == len(members): + cell_infos = await get_cell_infos(prox, iden, members, opts.timeout) + group_status = build_status_list(members, cell_infos) + + outp.printf('\nUpdated status:') + output_status(outp, vname, group_status) + + if check_sync_status(group_status): + outp.printf('Group Status: In Sync') + break + + return 0 + + except Exception as e: + mesg = repr(e) + if isinstance(e, s_exc.SynErr): + mesg = e.errinfo.get('mesg', repr(e)) + outp.printf(f'ERROR: {mesg}') + return 1 + +if __name__ == '__main__': # pragma: no cover + sys.exit(asyncio.run(main(sys.argv[1:])))