diff --git a/Dockerfile b/Dockerfile index 804a4a57d..a35c0e9aa 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM python:3.7-slim-bullseye +FROM python:3.11-slim-bullseye # Install libvirt which requires system dependencies. RUN apt update && \ @@ -20,7 +20,7 @@ RUN ln -s /influxdb-1.8.4-1/influxd /usr/local/bin/influxd && \ RUN pip install --no-cache-dir --upgrade pip && \ pip install --no-cache-dir --upgrade setuptools && \ - pip install libvirt-python==8.8.0 uwsgi==2.0.20 && \ + pip install libvirt-python==8.8.0 uwsgi==2.0.21 && \ pip install --no-cache-dir ipython ipdb flake8 pytest pytest-cov # Remove `-frozen` to build without strictly pinned dependencies. @@ -63,4 +63,4 @@ ENV JS_BUILD=1 \ RUN echo "{\"sha\":\"$VERSION_SHA\",\"name\":\"$VERSION_NAME\",\"repo\":\"$VERSION_REPO\",\"modified\":false}" \ - > /mist-version.json + > /mist-version.json diff --git a/lc b/lc index d94bec4f5..5be25aaac 160000 --- a/lc +++ b/lc @@ -1 +1 @@ -Subproject commit d94bec4f5ba9ab603051eccbaee5d8fa4deeb058 +Subproject commit 5be25aaac8cf3091849947661266a87eed2d8c4d diff --git a/migrations/0019-cleanup-keys-libvirt-locations.py b/migrations/0019-cleanup-keys-libvirt-locations.py index e3e2e9a07..9903a4bd2 100644 --- a/migrations/0019-cleanup-keys-libvirt-locations.py +++ b/migrations/0019-cleanup-keys-libvirt-locations.py @@ -21,7 +21,7 @@ def cleanup_libvirt_cloud_locations(): """ from mist.api.models import Machine from mist.api.clouds.models import CloudLocation, LibvirtCloud - libvirt_cloud_ids = [l.id for l in LibvirtCloud.objects( + libvirt_cloud_ids = [loc.id for loc in LibvirtCloud.objects( deleted=None).only('id')] for loc in CloudLocation.objects(cloud__in=libvirt_cloud_ids): diff --git a/migrations/0041-remove-control-plane-cost.py b/migrations/0041-remove-control-plane-cost.py index a75cfbad2..24e0e6c50 100644 --- a/migrations/0041-remove-control-plane-cost.py +++ b/migrations/0041-remove-control-plane-cost.py @@ -23,7 +23,7 @@ def remove_control_plane_costs(): '$unset': {"cost.control_plane_monthly": 1} }) print(f'{res.modified_count} machines were modified.') - print(f'Removing control_plane_monthly from clusters ...') + print('Removing control_plane_monthly from clusters ...') res = db_clusters.update_many({}, { '$unset': {"cost.control_plane_monthly": 1} }) diff --git a/requirements-frozen.txt b/requirements-frozen.txt index e862f5800..47efdb8ec 100644 --- a/requirements-frozen.txt +++ b/requirements-frozen.txt @@ -3,8 +3,9 @@ ## direct dependecies and their dependencies and so on. This ensures that ## builds wont start failing just because of a new release of some dependency. -amqp==2.6.1 # 5.1.1 +amqp==5.1.1 apscheduler==3.9.1 +asgiref==3.5.2 asn1crypto==1.5.1 atomicwrites==1.4.1 attrs==22.1.0 @@ -22,8 +23,7 @@ cryptography==38.0.1 dateparser==1.1.1 decorator==4.4.2 dnspython==1.16.0 -elasticsearch==6.8.0 -elasticsearch-tornado==2.0.9 +elasticsearch[async]==7.10.1 flake8==5.0.4 future==0.18.2 funcsigs==1.0.2 @@ -32,7 +32,7 @@ greenlet==1.1.3.post0 idna==2.10 ipaddress==1.0.23 ipdb==0.13.9 -ipython==7.34.0 +ipython==8.7.0 ipython-genutils==0.2.0 iso8601==0.1.16 jedi==0.18.1 @@ -40,7 +40,7 @@ Jinja2==2.11.3 jsonpatch==1.32 jsonpickle==2.2.0 jsonpointer==2.3 -kombu==4.6.11 +kombu==5.2.4 Logbook==1.5.3 lxml==4.9.1 Mako==1.2.3 @@ -63,7 +63,7 @@ pathlib2==2.3.7.post1 pbr==5.10.0 pexpect==4.8.0 pickleshare==0.7.5 -pika==0.12.0 # 1.3.0 +pika==1.3.1 pingparsing==1.4.0 pluggy==0.13.1 pretty==0.1 @@ -99,11 +99,10 @@ sentry-sdk==1.9.10 simplegeneric==0.8.1 singledispatch==3.7.0 six==1.16.0 -sockjs-tornado==1.0.6 +sockjs-tornado==1.0.7 subprocrunner==1.6.0 -tornado==5.1.1 +tornado==6.2 troposphere==3.2.2 -#tornado-profile==1.2.0 traitlets==5.4.0 translationstring==1.4 typepy==1.3.0 @@ -111,8 +110,8 @@ typing_extensions==4.4.0 urllib3==1.26.12 uwsgidecorators==1.1.0 venusian==1.2.0 -vine==1.3.0 +vine==5.0.0 wcwidth==0.2.5 WebOb==1.8.7 websocket-client==1.4.1 -yappi==1.3.6 +yappi==1.4.0 diff --git a/requirements.txt b/requirements.txt index 60e24d6fa..b73a80671 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,15 +6,15 @@ ## ensures that the build won't break because of a new release of some ## dependency. -amqp<3.0 +amqp apscheduler +asgiref beautifulsoup4 boto3 dnspython dateparser dramatiq elasticsearch -elasticsearch_tornado flake8 future gevent @@ -47,6 +47,7 @@ pytest python3-openid pyvmomi requests +rstream s3cmd scp sendgrid-python @@ -57,4 +58,4 @@ tornado troposphere #tornado_profile uwsgidecorators -websocket-client +websocket-client \ No newline at end of file diff --git a/src/mist/api/amqp_tornado.py b/src/mist/api/amqp_tornado.py index 16cd7d809..143dd94bb 100644 --- a/src/mist/api/amqp_tornado.py +++ b/src/mist/api/amqp_tornado.py @@ -9,8 +9,7 @@ import logging import pika -from pika import adapters - +import pika.adapters.tornado_connection log = logging.getLogger(__name__) @@ -61,8 +60,8 @@ def connect(self): """ log.info('Connecting to %s', self.amqp_url) - return adapters.TornadoConnection(pika.URLParameters(self.amqp_url), - self.on_connection_open) + return pika.adapters.tornado_connection.TornadoConnection( + pika.URLParameters(self.amqp_url), self.on_connection_open) def close_connection(self): """This method closes the connection to RabbitMQ.""" @@ -77,7 +76,7 @@ def add_on_connection_close_callback(self): log.debug('Adding connection close callback') self._connection.add_on_close_callback(self.on_connection_closed) - def on_connection_closed(self, connection, reply_code, reply_text): + def on_connection_closed(self, connection, reply_code, reply_text=''): """This method is invoked by pika when the connection to RabbitMQ is closed unexpectedly. Since it is unexpected, we will reconnect to RabbitMQ if it disconnects. @@ -125,7 +124,7 @@ def add_on_channel_close_callback(self): log.debug('Adding channel close callback') self._channel.add_on_close_callback(self.on_channel_closed) - def on_channel_closed(self, channel, reply_code, reply_text): + def on_channel_closed(self, channel, reply_code, reply_text=''): """Invoked by pika when RabbitMQ unexpectedly closes the channel. Channels are usually closed if you attempt to do something that violates the protocol, such as re-declare an exchange or queue with @@ -164,10 +163,10 @@ def setup_exchange(self, exchange_name): """ log.debug('Declaring exchange %s', exchange_name) - self._channel.exchange_declare(self.on_exchange_declareok, - exchange_name, + self._channel.exchange_declare(exchange_name, self.exchange_type, - **self.exchange_kwargs) + callback=self.on_exchange_declareok, + arguments=self.exchange_kwargs) def on_exchange_declareok(self, unused_frame): """Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC @@ -189,8 +188,9 @@ def setup_queue(self, queue_name): """ log.debug('Declaring queue %s', queue_name) - self._channel.queue_declare(self.on_queue_declareok, queue_name, - **self.queue_kwargs) + self._channel.queue_declare(queue_name, + callback=self.on_queue_declareok, + arguments=self.queue_kwargs) def on_queue_declareok(self, method_frame): """Method invoked by pika when the Queue.Declare RPC call made in @@ -204,8 +204,8 @@ def on_queue_declareok(self, method_frame): """ log.debug('Binding %s to %s with %s', self.exchange, self.queue, self.routing_key) - self._channel.queue_bind(self.on_bindok, self.queue, - self.exchange, self.routing_key) + self._channel.queue_bind(self.queue, self.exchange, self.routing_key, + callback=self.on_bindok) def add_on_cancel_callback(self): """Add a callback that will be invoked if RabbitMQ cancels the consumer @@ -276,7 +276,8 @@ def stop_consuming(self): """ if self._channel: log.info('Sending a Basic.Cancel RPC command to RabbitMQ') - self._channel.basic_cancel(self.on_cancelok, self._consumer_tag) + self._channel.basic_cancel( + consumer_tag=self._consumer_tag, callback=self.on_cancelok) def start_consuming(self): """This method sets up the consumer by first calling @@ -290,9 +291,9 @@ def start_consuming(self): """ log.debug('Issuing consumer related RPC commands') self.add_on_cancel_callback() - self._consumer_tag = self._channel.basic_consume(self.on_message, - self.queue, - no_ack=not self.ack) + self._consumer_tag = self._channel.basic_consume(self.queue, + self.on_message, + auto_ack=not self.ack) def on_bindok(self, unused_frame): """Invoked by pika when the Queue.Bind method has completed. At this diff --git a/src/mist/api/clouds/controllers/compute/base.py b/src/mist/api/clouds/controllers/compute/base.py index 44b2b62fe..6983b7141 100644 --- a/src/mist/api/clouds/controllers/compute/base.py +++ b/src/mist/api/clouds/controllers/compute/base.py @@ -1466,18 +1466,24 @@ def list_locations(self, persist=True): task_key = 'cloud:list_locations:%s' % self.cloud.id task = PeriodicTaskInfo.get_or_add(task_key) with task.task_runner(persist=persist): - cached_locations = {'%s' % l.id: l.as_dict() - for l in self.list_cached_locations()} + cached_locations = {'%s' % loc.id: loc.as_dict() + for loc in self.list_cached_locations()} locations = self._list_locations() new_location_objects = [location for location in locations if location.id not in cached_locations.keys()] - if amqp_owner_listening(self.cloud.owner.id): - locations_dict = [l.as_dict() for l in locations] + try: + owner_listening = amqp_owner_listening(self.cloud.owner.id) + except Exception as e: + log.error('Exception raised during amqp owner lookup', repr(e)) + owner_listening = False + if owner_listening: + locations_dict = [loc.as_dict() for loc in locations] if cached_locations and locations_dict: - new_locations = {'%s' % l['id']: l for l in locations_dict} + new_locations = { + '%s' % loc['id']: loc for loc in locations_dict} # Pop extra to prevent weird patches for loc in cached_locations: cached_locations[loc].pop('extra') @@ -1539,90 +1545,120 @@ def _list_locations(self): len(fetched_locations), self.cloud) locations = [] + try: + loop = asyncio.get_event_loop() + if loop.is_closed(): + raise RuntimeError('loop is closed') + except RuntimeError: + asyncio.set_event_loop(asyncio.new_event_loop()) + loop = asyncio.get_event_loop() + locations = loop.run_until_complete( + self._list_locations_populate_all_locations( + fetched_locations, loop + ) + ) - for loc in fetched_locations: - try: - _location = CloudLocation.objects.get(cloud=self.cloud, - external_id=loc.id) - except CloudLocation.DoesNotExist: - _location = CloudLocation(cloud=self.cloud, - owner=self.cloud.owner, - external_id=loc.id) - try: - _location.country = loc.country - except AttributeError: - _location.country = None - _location.name = loc.name - _location.extra = copy.deepcopy(loc.extra) - _location.missing_since = None - _location.parent = self._list_locations__get_parent(_location, loc) - _location.location_type = self._list_locations__get_type( - _location, loc) - _location.images_location = self._list_locations__get_images_location(loc) # noqa: E501 - try: - created = self._list_locations__location_creation_date(loc) - if created: - created = get_datetime(created) - if _location.created != created: - _location.created = created - except Exception as exc: - log.exception("Error finding creation date for %s in %s.\n%r", - self.cloud, _location, exc) - try: - capabilities = self._list_locations__get_capabilities(loc) - except Exception as exc: - log.error( - "Failed to get location capabilities for cloud: %s", - self.cloud.id) - else: - _location.capabilities = capabilities - - try: - available_sizes = self._list_locations__get_available_sizes(loc) # noqa - except Exception as exc: - log.error('Error adding location-size constraint: %s' - % repr(exc)) - else: - if available_sizes: - _location.available_sizes = available_sizes - - try: - available_images = self._list_locations__get_available_images(loc) # noqa - except Exception as exc: - log.error('Error adding location-image constraint: %s' - % repr(exc)) - else: - if available_images: - _location.available_images = available_images - - try: - _location.save() - except me.ValidationError as exc: - log.error("Error adding %s: %s", loc.name, exc.to_dict()) - raise BadRequestError({"msg": str(exc), - "errors": exc.to_dict()}) - locations.append(_location) now = datetime.datetime.utcnow() # update missing_since for locations not returned by libcloud CloudLocation.objects(cloud=self.cloud, missing_since=None, - external_id__nin=[l.external_id - for l in locations]).update( + external_id__nin=[loc.external_id + for loc in locations]).update( missing_since=now) # update locations for locations seen for the first time CloudLocation.objects(cloud=self.cloud, first_seen=None, - external_id__in=[l.external_id - for l in locations]).update( + external_id__in=[loc.external_id + for loc in locations]).update( first_seen=now) # update last_seen, unset missing_since for locations we just saw CloudLocation.objects(cloud=self.cloud, - external_id__in=[l.external_id - for l in locations]).update( + external_id__in=[loc.external_id + for loc in locations]).update( last_seen=now, missing_since=None) return locations + async def _list_locations_populate_all_locations(self, locations, loop): + result = [ + loop.run_in_executor( + None, + self._list_locations__populate_location, libcloud_location + ) for libcloud_location in locations + ] + return await asyncio.gather(*result) + + def _list_locations__populate_location(self, libcloud_location): + from mist.api.clouds.models import CloudLocation + try: + _location = CloudLocation.objects.get( + cloud=self.cloud, external_id=libcloud_location.id) + except CloudLocation.DoesNotExist: + _location = CloudLocation( + cloud=self.cloud, owner=self.cloud.owner, + external_id=libcloud_location.id) + try: + _location.country = libcloud_location.country + except AttributeError: + _location.country = None + _location.name = libcloud_location.name + _location.extra = copy.deepcopy(libcloud_location.extra) + _location.missing_since = None + _location.parent = self._list_locations__get_parent( + _location, libcloud_location) + _location.location_type = self._list_locations__get_type( + _location, libcloud_location) + _location.images_location = self._list_locations__get_images_location( + libcloud_location) + try: + created = self._list_locations__location_creation_date( + libcloud_location) + if created: + created = get_datetime(created) + if _location.created != created: + _location.created = created + except Exception as exc: + log.exception("Error finding creation date for %s in %s.\n%r", + self.cloud, _location, exc) + try: + capabilities = self._list_locations__get_capabilities( + libcloud_location) + except Exception as exc: + log.error( + "Failed to get location capabilities for cloud: %s", + self.cloud.id) + else: + _location.capabilities = capabilities + + try: + available_sizes = self._list_locations__get_available_sizes( + libcloud_location) + except Exception as exc: + log.error('Error adding location-size constraint: %s' + % repr(exc)) + else: + if available_sizes: + _location.available_sizes = available_sizes + + try: + available_images = self._list_locations__get_available_images( + libcloud_location) + except Exception as exc: + log.error('Error adding location-image constraint: %s' + % repr(exc)) + else: + if available_images: + _location.available_images = available_images + + try: + _location.save() + except me.ValidationError as exc: + log.error( + "Error adding %s: %s", libcloud_location.name, exc.to_dict()) + raise BadRequestError({"msg": str(exc), + "errors": exc.to_dict()}) + return _location + def _list_locations__fetch_locations(self): """Fetch location listing in a libcloud compatible format @@ -3303,36 +3339,6 @@ def _generate_plan__parse_schedules(self, auth_context, """Parse & validate machine's schedules list from the create machine request. - Schedule attributes: - `schedule_type`: 'one_off', 'interval', 'crontab' - `action`: 'start' 'stop', 'reboot', 'destroy' - `script`: dictionary containing: - `script`: id or name of the script to run - `params`: optional script parameters - - one_off schedule_type parameters: - `datetime`: when schedule should run, - e.g '2020-12-15T22:00:00Z' - crontab schedule_type parameters: - `minute`: e.g '*/10', - `hour` - `day_of_month` - `month_of_year` - `day_of_week` - - interval schedule_type parameters: - `every`: int , - `period`: minutes,hours,days - - `expires`: date when schedule should expire, - e.g '2020-12-15T22:00:00Z' - - `start_after`: date when schedule should start running, - e.g '2020-12-15T22:00:00Z' - - `max_run_count`: max number of times to run - description: - Parameters: auth_context(AuthContext): The AuthContext object of the user making the request. @@ -3345,48 +3351,57 @@ def _generate_plan__parse_schedules(self, auth_context, return None ret_schedules = [] for schedule in schedules: - schedule_type = schedule.get('schedule_type') + when = schedule.get('when') + schedule_type = when.get('schedule_type') if schedule_type not in ['crontab', 'interval', 'one_off']: raise BadRequestError('schedule type must be one of ' 'these (crontab, interval, one_off)]') ret_schedule = { - 'schedule_type': schedule_type, 'description': schedule.get('description', ''), - 'task_enabled': True, + 'when': when, + 'task_enabled': schedule.get('task_enabled', True), + 'actions': [] } - action = schedule.get('action') - script = schedule.get('script') - if action is None and script is None: - raise BadRequestError('Schedule action or script not defined') - if action and script: - raise BadRequestError( - 'One of action or script should be defined') - if action: - if action not in ['reboot', 'destroy', 'start', 'stop']: + actions = schedule.get('actions', []) + for action in actions: + action_type = action.get('action_type') + if action_type is None: + raise BadRequestError('Schedule action not defined') + if action_type not in [ + 'reboot', 'destroy', 'start', 'stop', 'delete', 'webhook', + 'notify', 'undefine', 'resize', 'run_script']: raise BadRequestError('Action is not correct') - ret_schedule['action'] = action - else: - from mist.api.methods import list_resources - script_search = script.get('script') - if not script_search: - raise BadRequestError('script parameter is required') - try: - [script_obj], _ = list_resources(auth_context, 'script', - search=script_search, - limit=1) - except ValueError: - raise NotFoundError('Schedule script does not exist') - auth_context.check_perm('script', 'run', script_obj.id) - ret_schedule['script_id'] = script_obj.id - ret_schedule['script_name'] = script_obj.name - ret_schedule['params'] = script.get('params') + ret_action = { + 'action_type': action_type + } + if action_type == 'run_script': + script_type = action.get('script_type') + if script_type == 'existing': + from mist.api.methods import list_resources + script_search = action.get('script') + if not script_search: + raise BadRequestError( + 'script parameter is required') + try: + [script_obj], _ = list_resources( + auth_context, 'script', search=script_search, + limit=1) + except ValueError: + raise NotFoundError( + 'Schedule script does not exist') + auth_context.check_perm('script', 'run', script_obj.id) + ret_action['script'] = script_obj.id + ret_action['script_name'] = script_obj.name + ret_action['params'] = action.get('params') + ret_schedule['actions'].append(ret_action) + if schedule_type == 'one_off': # convert schedule_entry from ISO format # to '%Y-%m-%d %H:%M:%S' try: - ret_schedule['schedule_entry'] = datetime.datetime.strptime( # noqa - schedule['datetime'], '%Y-%m-%dT%H:%M:%SZ' + ret_schedule['when']['datetime'] = datetime.datetime.strptime( # noqa + when['datetime'], '%Y-%m-%dT%H:%M:%SZ' ).strftime("%Y-%m-%d %H:%M:%S") except KeyError: raise BadRequestError( @@ -3397,21 +3412,24 @@ def _generate_plan__parse_schedules(self, auth_context, ' format %Y-%m-%dT%H:%M:%SZ') elif schedule_type == 'interval': try: - ret_schedule['schedule_entry'] = { - 'every': schedule['every'], - 'period': schedule['period'] + ret_schedule['when'] = { + 'schedule_type': 'interval', + 'every': when['every'], + 'period': when['period'], + 'max_run_count': when.get('max_run_count') } except KeyError: raise BadRequestError( 'interval schedule parameter missing') elif schedule_type == 'crontab': try: - ret_schedule['schedule_entry'] = { - 'minute': schedule['minute'], - 'hour': schedule['hour'], - 'day_of_month': schedule['day_of_month'], - 'month_of_year': schedule['month_of_year'], - 'day_of_week': schedule['day_of_week'] + ret_schedule['when'] = { + 'schedule_type': 'crontab', + 'minute': when['minute'], + 'hour': when['hour'], + 'day_of_month': when['day_of_month'], + 'month_of_year': when['month_of_year'], + 'day_of_week': when['day_of_week'] } except KeyError: raise BadRequestError( @@ -3430,6 +3448,7 @@ def _generate_plan__parse_schedules(self, auth_context, 'not match format ' '%Y-%m-%dT%H:%M:%SZ' ) + if schedule.get('expires'): # convert `expires` from ISO format # to '%Y-%m-%d %H:%M:%S' diff --git a/src/mist/api/clouds/controllers/compute/controllers.py b/src/mist/api/clouds/controllers/compute/controllers.py index c385ce4cf..2c989107d 100644 --- a/src/mist/api/clouds/controllers/compute/controllers.py +++ b/src/mist/api/clouds/controllers/compute/controllers.py @@ -298,7 +298,7 @@ def _list_images__fetch_images(self, search=None): if 'UnauthorizedOperation' in str(e.message): images = [] else: - raise() + raise for image in images: if image.id in default_images: image.name = default_images[image.id] @@ -308,7 +308,7 @@ def _list_images__fetch_images(self, search=None): if 'UnauthorizedOperation' in str(e.message): pass else: - raise() + raise else: # search on EC2. search = search.lstrip() @@ -335,7 +335,7 @@ def _list_images__fetch_images(self, search=None): if 'UnauthorizedOperation' in str(e.message): break else: - raise() + raise else: if images: break @@ -5565,14 +5565,14 @@ def _list_locations__fetch_locations(self): if locations: hypervisors = self.connection.connection.request( "/settings/hypervisor_zones.json") - for l in locations: + for loc in locations: for hypervisor in hypervisors.object: h = hypervisor.get("hypervisor_group") - if str(h.get("location_group_id")) == l.id: + if str(h.get("location_group_id")) == loc.id: # get max_memory/max_cpu - l.extra["max_memory"] = h.get("max_host_free_memory") - l.extra["max_cpu"] = h.get("max_host_cpu") - l.extra["hypervisor_group_id"] = h.get("id") + loc.extra["max_memory"] = h.get("max_host_free_memory") + loc.extra["max_cpu"] = h.get("max_host_cpu") + loc.extra["hypervisor_group_id"] = h.get("id") break try: @@ -5583,13 +5583,13 @@ def _list_locations__fetch_locations(self): except: pass - for l in locations: + for loc in locations: # get data store zones, and match with locations # through location_group_id # then calculate max_disk_size per data store, # by matching data store zones and data stores try: - store_zones = [dsg for dsg in data_store_zones if l.id is + store_zones = [dsg for dsg in data_store_zones if loc.id is str(dsg['data_store_group'] ['location_group_id'])] for store_zone in store_zones: @@ -5597,7 +5597,7 @@ def _list_locations__fetch_locations(self): store['data_store']['data_store_group_id'] is store_zone['data_store_group']['id']] for store in stores: - l.extra['max_disk_size'] = store['data_store'] + loc.extra['max_disk_size'] = store['data_store'] ['data_store_size'] - store['data_store']['usage'] except: pass @@ -5608,16 +5608,16 @@ def _list_locations__fetch_locations(self): except: pass - for l in locations: + for loc in locations: # match locations with network ids (through location_group_id) - l.extra['networks'] = [] + loc.extra['networks'] = [] try: for network in networks: net = network["network_group"] - if str(net["location_group_id"]) == l.id: - l.extra['networks'].append({'name': net['label'], - 'id': net['id']}) + if str(net["location_group_id"]) == loc.id: + loc.extra['networks'].append({'name': net['label'], + 'id': net['id']}) except: pass diff --git a/src/mist/api/clouds/controllers/container/base.py b/src/mist/api/clouds/controllers/container/base.py index d5265caf1..be0ea13ca 100644 --- a/src/mist/api/clouds/controllers/container/base.py +++ b/src/mist/api/clouds/controllers/container/base.py @@ -489,10 +489,10 @@ def _update_from_libcloud_cluster(self, self._list_clusters__cost_nodes(cluster, libcloud_cluster) cph = nodes_cph + control_plane_cph cpm = nodes_cpm + control_plane_cpm - if(cluster.total_cost.hourly != round(cph, 2) or - cluster.total_cost.monthly != round(cpm, 2) or - cluster.cost.hourly != round(control_plane_cph, 2) or - cluster.cost.monthly != round(control_plane_cpm, 2)): + if cluster.total_cost.hourly != round(cph, 2) or \ + cluster.total_cost.monthly != round(cpm, 2) or \ + cluster.cost.hourly != round(control_plane_cph, 2) or \ + cluster.cost.monthly != round(control_plane_cpm, 2): cluster.total_cost.hourly = round(cph, 2) cluster.total_cost.monthly = round(cpm, 2) cluster.cost.hourly = round(control_plane_cph, 2) diff --git a/src/mist/api/clouds/controllers/main/base.py b/src/mist/api/clouds/controllers/main/base.py index e6d9fe039..724c5d9cb 100644 --- a/src/mist/api/clouds/controllers/main/base.py +++ b/src/mist/api/clouds/controllers/main/base.py @@ -309,7 +309,8 @@ def update(self, user=None, fail_on_error=True, if secret: # value will be obtained from vault data = secret.data if _key not in data.keys(): - raise BadRequestError('The key specified (%s) does not exist in \ + raise BadRequestError( + 'The key specified (%s) does not exist in \ secret `%s`' % (_key, secret.name)) if key in self.cloud._private_fields: diff --git a/src/mist/api/concurrency/models.py b/src/mist/api/concurrency/models.py index 1d44b704a..844712d5f 100644 --- a/src/mist/api/concurrency/models.py +++ b/src/mist/api/concurrency/models.py @@ -6,6 +6,7 @@ import mongoengine as me +from mist.api.exceptions import MistError log = logging.getLogger(__name__) @@ -18,8 +19,9 @@ class PeriodicTaskTooRecentLastRun(Exception): pass -class PeriodicTaskLockTakenError(Exception): - pass +class PeriodicTaskLockTakenError(MistError): + msg = "Periodic task lock taken" + http_code = 423 class PeriodicTaskInfo(me.Document): diff --git a/src/mist/api/config.py b/src/mist/api/config.py index b0a6db473..824499c6a 100755 --- a/src/mist/api/config.py +++ b/src/mist/api/config.py @@ -1287,8 +1287,8 @@ def dirname(path, num=1): GRAPHITE_URI = "http://graphite" VICTORIAMETRICS_URI = "http://vmselect:8481/select//prometheus" -VICTORIAMETRICS_WRITE_URI = (f"http://vminsert:8480/insert//" - f"prometheus") +VICTORIAMETRICS_WRITE_URI = ("http://vminsert:8480/insert//" + "prometheus") GRAPHITE_TO_VICTORIAMETRICS_METRICS_MAP = {} diff --git a/src/mist/api/helpers.py b/src/mist/api/helpers.py index d8da4b97e..6379d6373 100644 --- a/src/mist/api/helpers.py +++ b/src/mist/api/helpers.py @@ -11,6 +11,10 @@ """ +import asyncio +from asgiref.sync import async_to_sync +from rstream import Consumer, amqp_decoder, AMQPMessage +from rstream.exceptions import StreamDoesNotExist from functools import reduce from mist.api import config from mist.api.exceptions import WorkflowExecutionError, BadRequestError @@ -23,7 +27,6 @@ from libcloud.container.providers import get_driver as get_container_driver from libcloud.container.drivers.docker import DockerException from libcloud.container.base import ContainerImage -from elasticsearch_tornado import EsClient from elasticsearch import Elasticsearch from distutils.version import LooseVersion from amqp.exceptions import NotFound as AmqpNotFound @@ -69,8 +72,6 @@ import codecs import secrets import operator -import websocket -import _thread # Python 2 and 3 support from future.utils import string_types @@ -643,7 +644,7 @@ def random_string(length=5, punc=False): When punc=True, the string will also contain punctuation apart from letters and digits """ - _chars = string.letters + string.digits + _chars = string.ascii_letters + string.digits _chars += string.punctuation if punc else '' return ''.join(random.choice(_chars) for _ in range(length)) @@ -1203,24 +1204,6 @@ def view_config(*args, **kwargs): **kwargs) -class AsyncElasticsearch(EsClient): - """Tornado-compatible Elasticsearch client.""" - - def mk_req(self, url, **kwargs): - """Update kwargs with authentication credentials.""" - kwargs.update({ - 'auth_username': config.ELASTICSEARCH['elastic_username'], - 'auth_password': config.ELASTICSEARCH['elastic_password'], - 'validate_cert': config.ELASTICSEARCH['elastic_verify_certs'], - 'ca_certs': None, - - }) - for param in ('connect_timeout', 'request_timeout'): - if param not in kwargs: - kwargs[param] = 30.0 # Increase default timeout by 10 sec. - return super(AsyncElasticsearch, self).mk_req(url, **kwargs) - - def es_client(asynchronous=False): """Returns an initialized Elasticsearch client.""" if not asynchronous: @@ -1234,6 +1217,7 @@ def es_client(asynchronous=False): ) else: method = 'https' if config.ELASTICSEARCH['elastic_use_ssl'] else 'http' + from elasticsearch import AsyncElasticsearch return AsyncElasticsearch( config.ELASTICSEARCH['elastic_host'], port=config.ELASTICSEARCH['elastic_port'], method=method, @@ -2012,44 +1996,6 @@ def create_helm_command(repo_url, release_name, chart_name, host, port, token, return helm_install_command -class websocket_for_scripts(object): - - def __init__(self, uri): - self.uri = uri - ws = websocket.WebSocketApp(self.uri, - on_message=self.on_message, - on_error=self.on_error, - on_close=self.on_close) - self.ws = ws - self.ws.on_open = self.on_open - self.buffer = "" - - def on_message(self, message): - message = message.decode('utf-8') - if message.startswith('retval:'): - self.retval = message.replace('retval:', '', 1) - else: - self.buffer = self.buffer + message - - def on_close(self): - self.ws.close() - - def on_error(self, error): - self.ws.close() - log.error("Got Websocket error: %s" % error) - - def on_open(self): - def run(*args): - self.ws.wait_command_to_finish() - _thread.start_new_thread(run, ()) - - def wait_command_to_finish(self): - self.ws.run_forever(ping_interval=9, ping_timeout=8) - self.retval = 0 - output = self.buffer.split("\n")[0:-1] - return self.retval, "\n".join(output) - - def extract_selector_type(**kwargs): error_count = 0 for selector in kwargs.get('selectors', []): @@ -2066,3 +2012,47 @@ def extract_selector_type(**kwargs): if error_count == len(kwargs.get('selectors', [])): raise BadRequestError('selector_type') return selector_type + + +class RabbitMQStreamConsumer: + def __init__(self, job_id): + self.stream_name = job_id + self.buffer = "" + self.exit_code = 1 + + def on_message(self, msg: AMQPMessage): + message = next(msg.data).decode('utf-8') + if message.startswith('retval:'): + self.exit_code = int(message.replace('retval:', '', 1)) + import asyncio + asyncio.create_task(self.consumer.close()) + else: + self.buffer = self.buffer + message + + @async_to_sync + async def consume(self): + self.consumer = Consumer( + host=os.getenv("RABBITMQ_HOST", 'rabbitmq'), + port=5552, + vhost='/', + username=os.getenv("RABBITMQ_USERNAME", 'guest'), + password=os.getenv("RABBITMQ_PASSWORD", 'guest'), + ) + + loop = asyncio.get_event_loop() + await self.consumer.start() + sleep_time = 0 + SLEEP_TIMEOUT = 30 + SLEEP_INTERVAL = 3 + while sleep_time < SLEEP_TIMEOUT: + + try: + await self.consumer.subscribe( + self.stream_name, self.on_message, decoder=amqp_decoder) + break + except StreamDoesNotExist: + sleep(SLEEP_INTERVAL) + sleep_time += SLEEP_INTERVAL + + await self.consumer.run() + return self.exit_code, self.buffer diff --git a/src/mist/api/hub/tornado_client.py b/src/mist/api/hub/tornado_client.py index eacfbb4c4..56110fb79 100644 --- a/src/mist/api/hub/tornado_client.py +++ b/src/mist/api/hub/tornado_client.py @@ -98,10 +98,10 @@ def on_message(self, unused_channel, basic_deliver, properties, body): log.debug("%s: Will start listening for routing_key 'from_%s.#'.", self.lbl, self.worker_id) self._channel.queue_bind( - self.ready_callback, self.queue, self.exchange, 'from_%s.#' % self.worker_id, + callback=self.ready_callback, ) return diff --git a/src/mist/api/keys/base.py b/src/mist/api/keys/base.py index a4b181e57..f9b9b26e2 100644 --- a/src/mist/api/keys/base.py +++ b/src/mist/api/keys/base.py @@ -65,7 +65,8 @@ def add(self, user=None, fail_on_invalid_params=True, **kwargs): if secret: data = secret.data if _key not in data.keys(): - raise BadRequestError('The key specified (%s) does not exist in \ + raise BadRequestError( + 'The key specified (%s) does not exist in \ secret `%s`' % (_key, secret.name)) secret_value = SecretValue(secret=secret, @@ -82,10 +83,11 @@ def add(self, user=None, fail_on_invalid_params=True, **kwargs): if user: secret.assign_to(user) except me.NotUniqueError: - raise KeyExistsError("The path `%s%s` exists on Vault. \ - Try changing the name of the key" % - (config.VAULT_KEYS_PATH, - self.key.name)) + raise KeyExistsError( + "The path `%s%s` exists on Vault. \ + Try changing the name of the key" % ( + config.VAULT_KEYS_PATH, + self.key.name)) try: secret.create_or_update({key: value}) except Exception as exc: diff --git a/src/mist/api/logs/helpers.py b/src/mist/api/logs/helpers.py index d12601c98..798fd45a7 100644 --- a/src/mist/api/logs/helpers.py +++ b/src/mist/api/logs/helpers.py @@ -3,11 +3,6 @@ from mist.api.helpers import es_client as es -from mist.api.exceptions import NotFoundError -from mist.api.exceptions import RateLimitError -from mist.api.exceptions import BadRequestError -from mist.api.exceptions import ServiceUnavailableError - from mist.api.logs.constants import TYPES @@ -16,7 +11,7 @@ def _filtered_query(owner_id, close=None, error=None, range=None, type=None, - callback=None, tornado_async=False, **kwargs): + callback=None, es_async=False, **kwargs): """Filter Elasticsearch documents. Executes a filtering aggregation on Elasticsearch documents in order to @@ -94,37 +89,11 @@ def _filtered_query(owner_id, close=None, error=None, range=None, type=None, {"term": {key: value}} ) # Perform Elasticsearch request. - if not tornado_async: + if not es_async: result = es().search(index=index, doc_type=TYPES.get(type), body=query) if callback: return callback(result) return result else: - es(tornado_async).search(index=index, doc_type=TYPES.get(type), - body=json.dumps(query), callback=callback) - - -def _on_response_callback(response, tornado_async=False): - """HTTP Response-handling callback. - - This method is meant to return HTTP Response objects generated either in a - Tornado or synchronous execution context. - - Arguments: - - response: HTTP Response object. - - tornado_async: Denotes if a Tornado-safe HTTP request was issued. - - """ - if tornado_async: - if response.code != 200: - log.error('Error on Elasticsearch query in tornado_async mode. ' - 'Got %d status code: %s', response.code, response.body) - if response.code == 400: - raise BadRequestError() - if response.code == 404: - raise NotFoundError() - if response.code == 429: - raise RateLimitError() - raise ServiceUnavailableError() - response = json.loads(response.body) - return response + es(es_async).search(index=index, doc_type=TYPES.get(type), + body=json.dumps(query), callback=callback) diff --git a/src/mist/api/logs/methods.py b/src/mist/api/logs/methods.py index 2be8be918..60f21ca30 100644 --- a/src/mist/api/logs/methods.py +++ b/src/mist/api/logs/methods.py @@ -16,7 +16,6 @@ from mist.api.users.models import User from mist.api.logs.helpers import _filtered_query -from mist.api.logs.helpers import _on_response_callback from mist.api.logs.constants import FIELDS, JOBS from mist.api.logs.constants import EXCLUDED_BUCKETS, TYPES @@ -398,7 +397,7 @@ def get_events(auth_context, owner_id='', user_id='', event_type='', action='', def get_stories(story_type='', owner_id='', user_id='', sort_order=-1, limit=0, error=None, range=None, pending=None, expand=False, - tornado_callback=None, tornado_async=False, **kwargs): + callback=None, es_async=False, **kwargs): """Fetch stories. Query Elasticsearch for story documents based on the provided arguments. @@ -431,7 +430,7 @@ def get_stories(story_type='', owner_id='', user_id='', sort_order=-1, limit=0, includes += list(FIELDS) + ["action", "extra"] else: includes = [] - assert not tornado_async + assert not es_async if story_type: assert story_type in TYPES @@ -513,28 +512,28 @@ def get_stories(story_type='', owner_id='', user_id='', sort_order=-1, limit=0, # Process returned stories. def _on_stories_callback(response): - result = _on_response_callback(response, tornado_async) return process_stories( - buckets=result["aggregations"]["stories"]["buckets"], - callback=tornado_callback, type=story_type + buckets=response["aggregations"]["stories"]["buckets"], + callback=callback, type=story_type ) # Fetch stories. Invoke callback to process and return results. def _on_request_callback(query): - if not tornado_async: + if es_async is False: result = es().search(index=index, doc_type=TYPES.get(story_type), body=query) return _on_stories_callback(result) else: - es(tornado_async).search(index=index, - body=json.dumps(query), - doc_type=TYPES.get(story_type), - callback=_on_stories_callback) + async def search_async(query): + result = await es_async.search(index=index, + body=json.dumps(query), + doc_type=TYPES.get(story_type)) + return _on_stories_callback(result) + return search_async(query) # Process aggregation results in order to be applied as filters. def _on_filters_callback(response): - results = _on_response_callback(response, tornado_async) - filters = results["aggregations"]["main_bucket"]["buckets"] + filters = response["aggregations"]["main_bucket"]["buckets"] process_filters(query, filters, pending, error) return _on_request_callback(query) @@ -546,7 +545,7 @@ def _on_filters_callback(response): return _filtered_query(owner_id, close=pending, error=error, range=range, type=story_type, callback=_on_filters_callback, - tornado_async=tornado_async, **kwargs) + es_async=es_async, **kwargs) else: return _on_request_callback(query) diff --git a/src/mist/api/machines/methods.py b/src/mist/api/machines/methods.py index 4e1bed2c3..9bd82fa68 100644 --- a/src/mist/api/machines/methods.py +++ b/src/mist/api/machines/methods.py @@ -596,7 +596,7 @@ def create_machine(auth_context, cloud_id, key_id, machine_name, location_id, cloud.ctl.compute.list_machines() except Exception as e: if i > 8: - raise(e) + raise (e) else: continue @@ -2615,7 +2615,54 @@ def find_best_ssh_params(machine, auth_context=None): raise MachineUnauthorizedError +def prepare_ssh_dict(auth_context, machine, + command): + key_association_id, hostname, user, port = find_best_ssh_params( + machine, auth_context=auth_context) + association = KeyMachineAssociation.objects.get(id=key_association_id) + key = association.key + key_path = key.private.secret.name + expiry = int(datetime.now().timestamp()) + 100 + org = machine.owner + vault_token = org.vault_token if org.vault_token is not None else \ + config.VAULT_TOKEN + vault_secret_engine_path = machine.owner.vault_secret_engine_path + vault_addr = org.vault_address if org.vault_address is not None else \ + config.VAULT_ADDR + msg_to_encrypt = '%s,%s,%s,%s' % ( + vault_token, + vault_addr, + vault_secret_engine_path, + key_path) + from mist.api.helpers import encrypt + # ENCRYPTION KEY AND HMAC KEY SHOULD BE DIFFERENT! + encrypted_msg = encrypt(msg_to_encrypt, segment_size=128) + command_encoded = base64.urlsafe_b64encode( + command.encode()).decode() + msg = '%s,%s,%s,%s,%s,%s' % ( + user, + hostname, + port, + expiry, + command_encoded, + encrypted_msg) + mac = hmac.new( + config.SIGN_KEY.encode(), + msg=msg.encode(), + digestmod=hashlib.sha256).hexdigest() + ssh_dict = { + "user": user, + "hostname": hostname, + "port": str(port), + "expiry": str(expiry), + "command_encoded": command_encoded, + "encrypted_msg": encrypted_msg, + "mac": mac, + } + return ssh_dict, key.name # SEC + + def prepare_ssh_uri(auth_context, machine, command=config.DEFAULT_EXEC_TERMINAL, job_id=None): diff --git a/src/mist/api/machines/views.py b/src/mist/api/machines/views.py index fbaf36ef8..3aaf627a1 100644 --- a/src/mist/api/machines/views.py +++ b/src/mist/api/machines/views.py @@ -417,7 +417,7 @@ def create_machine(request): if not isinstance(mtags, dict): if not isinstance(mtags, list): raise ValueError() - if not all((isinstance(t, dict) and len(t) is 1 for t in mtags)): + if not all((isinstance(t, dict) and len(t) == 1 for t in mtags)): raise ValueError() mtags = {key: val for item in mtags for key, val in list(item.items())} diff --git a/src/mist/api/metering/methods.py b/src/mist/api/metering/methods.py index 04420e67e..e4f71da9c 100644 --- a/src/mist/api/metering/methods.py +++ b/src/mist/api/metering/methods.py @@ -109,7 +109,7 @@ def _parse_checks_or_datapoints_series(results, field, owner_id=''): for start_iso, result in results: for series in result: values = series.get('values', []) - assert len(values) is 1, 'Expected a single value. Got %s' % values + assert len(values) == 1, 'Expected a single value. Got %s' % values value = values[0][-1] value = int(round(value)) if value else None owner = series.get('tags', {}).get('owner', owner_id) diff --git a/src/mist/api/methods.py b/src/mist/api/methods.py index 3bc582293..2ae521faf 100644 --- a/src/mist/api/methods.py +++ b/src/mist/api/methods.py @@ -372,8 +372,10 @@ def find_public_ips(ips): def notify_admin(title, message="", team="all"): """ This will only work on a multi-user setup configured to send emails """ from mist.api.helpers import send_email - send_email(title, message, - config.NOTIFICATION_EMAIL.get(team, config.NOTIFICATION_EMAIL)) + email = config.NOTIFICATION_EMAIL.get(team, config.NOTIFICATION_EMAIL) + if email: + send_email(title, message, + email) def notify_user(owner, title, message="", email_notify=True, **kwargs): diff --git a/src/mist/api/monitoring/influxdb/handlers.py b/src/mist/api/monitoring/influxdb/handlers.py index c34db036b..29b416449 100644 --- a/src/mist/api/monitoring/influxdb/handlers.py +++ b/src/mist/api/monitoring/influxdb/handlers.py @@ -213,7 +213,7 @@ def parse_path(self, metric): if len(fields) > 1: for tag in fields[:-1]: tag = tag.split('=') - if len(tag) is not 2: + if len(tag) != 2: log.error('%s got unexpected tag: %s', self.__class__.__name__, tag) continue diff --git a/src/mist/api/portal/methods.py b/src/mist/api/portal/methods.py index 81c1c27d2..0e23499de 100644 --- a/src/mist/api/portal/methods.py +++ b/src/mist/api/portal/methods.py @@ -43,14 +43,15 @@ def should_task_exist_for_cloud(task, cloud): """ Return whether a given cloud should have the specified scheduled task. """ - if ((task == "list_zones" and cloud.dns_enabled is False) or - (task == "list_buckets" and cloud.object_storage_enabled is False) or - (task == "list_clusters" and cloud.container_enabled is False) or # noqa: E501 - (task == "list_networks" and getattr(cloud.ctl, "network", None) is None) or # noqa: E501 - (task == "list_volumes" and getattr(cloud.ctl, "storage", None) is None) or # noqa: E501 - (task == "list_sizes" and cloud._cls == "Cloud.LibvirtCloud") or - (task != "list machines" and cloud._cls == "Cloud.OtherCloud") - ): + if (task == "list_zones" and cloud.dns_enabled is False) or \ + (task == "list_buckets" and cloud.object_storage_enabled is False) or \ + (task == "list_clusters" and cloud.container_enabled is False) or \ + (task == "list_networks" and getattr( + cloud.ctl, "network", None) is None) or \ + (task == "list_volumes" and getattr( + cloud.ctl, "storage", None) is None) or \ + (task == "list_sizes" and cloud._cls == "Cloud.LibvirtCloud") or \ + (task != "list machines" and cloud._cls == "Cloud.OtherCloud"): return False return True diff --git a/src/mist/api/portal/tasks.py b/src/mist/api/portal/tasks.py index 0907aa8d7..6ba92eb63 100644 --- a/src/mist/api/portal/tasks.py +++ b/src/mist/api/portal/tasks.py @@ -367,8 +367,9 @@ def restore_backup(backup, portal=None, until=False, databases=[ ) result = subprocess.check_output(cmd, shell=True) available_backups = [ - int(l.strip().split('/victoria/')[1].rstrip('/')) - for l in result.decode().split('\n') if '/victoria/' in l] + int(line.strip().split('/victoria/')[1].rstrip('/')) + for line in result.decode().split('\n') + if '/victoria/' in line] available_backups.sort(reverse=True) for b in available_backups: if b < int(backup) and b >= int(until or 0): diff --git a/src/mist/api/rules/base.py b/src/mist/api/rules/base.py index be2a4b271..34baa33b9 100644 --- a/src/mist/api/rules/base.py +++ b/src/mist/api/rules/base.py @@ -354,7 +354,7 @@ def includes_only(self, resource): return False # The rule contains multiple selectors. - if len(self.rule.selectors) is not 1: + if len(self.rule.selectors) != 1: return False # The rule does not refer to resources by their UUID. @@ -362,7 +362,7 @@ def includes_only(self, resource): return False # The rule refers to multiple resources. - if len(self.rule.selectors[0].ids) is not 1: + if len(self.rule.selectors[0].ids) != 1: return False # The rule's single resource does not match `resource`. diff --git a/src/mist/api/rules/models/conditions.py b/src/mist/api/rules/models/conditions.py index 7f7194bc1..2b417e82c 100644 --- a/src/mist/api/rules/models/conditions.py +++ b/src/mist/api/rules/models/conditions.py @@ -186,8 +186,8 @@ def as_dict(self): return {'offset': self.offset, 'period': self.period} def __str__(self): - if self.offset is 0: + if self.offset == 0: return 'Trigger offset is 0' - if self.offset is 1: + if self.offset == 1: return 'Trigger offset of 1 %s' % self.period_singular return 'Trigger offset %s %s' % (self.offset, self.period) diff --git a/src/mist/api/rules/models/main.py b/src/mist/api/rules/models/main.py index 4e2f35ef0..1c2757f74 100644 --- a/src/mist/api/rules/models/main.py +++ b/src/mist/api/rules/models/main.py @@ -351,22 +351,22 @@ def as_dict(self): @property def metric(self): - assert len(self.queries) is 1 + assert len(self.queries) == 1 return self.queries[0].target @property def operator(self): - assert len(self.queries) is 1 + assert len(self.queries) == 1 return self.queries[0].operator @property def value(self): - assert len(self.queries) is 1 + assert len(self.queries) == 1 return self.queries[0].threshold @property def aggregate(self): - assert len(self.queries) is 1 + assert len(self.queries) == 1 return self.queries[0].aggregation @property diff --git a/src/mist/api/scheduler.py b/src/mist/api/scheduler.py index a505a1fa3..733dabd18 100644 --- a/src/mist/api/scheduler.py +++ b/src/mist/api/scheduler.py @@ -27,6 +27,7 @@ def schedule_to_actor(schedule): + task_path = None if isinstance(schedule, PollingSchedule) or isinstance(schedule, Rule): task_path = schedule.task.split('.') else: @@ -75,7 +76,7 @@ def add_job(scheduler, schedule, actor, first_run=False): if schedule_action._cls == 'ScriptAction': job['args'] = ( None, - schedule_action.script, + schedule_action.script.id, schedule.name, [r.id for r in schedule.get_resources()], schedule_action.params, @@ -157,7 +158,7 @@ def update_job(scheduler, schedule, actor, existing): if schedule_action._cls == 'ScriptAction': new_args = ( None, - schedule_action.script, + schedule_action.script.id, schedule.name, [r.id for r in schedule.get_resources()], schedule_action.params, diff --git a/src/mist/api/schedules/base.py b/src/mist/api/schedules/base.py index ecf2987b1..2edc9c7d2 100644 --- a/src/mist/api/schedules/base.py +++ b/src/mist/api/schedules/base.py @@ -6,7 +6,6 @@ """ import logging import datetime -import ast import mongoengine as me from mist.api.scripts.models import Script @@ -142,7 +141,6 @@ def update(self, **kwargs): script_type = kwargs.get('actions')[0].get('script_type') if script_type == 'existing': script = kwargs.get('actions')[0].get('script') - script = ast.literal_eval(script) script_id = script['script'] if script_id: try: @@ -238,9 +236,8 @@ def update(self, **kwargs): # TODO Make possible to have notification actions on schedules raise NotImplementedError() elif action == 'run_script': - script = ast.literal_eval(actions[0]['script']) - script_id = script['script'] - params = script['params'] + script_id = actions[0]['script'] + params = actions[0]['params'] if script_id: self.schedule.actions[0] = acts.ScriptAction( script=script_id, params=params) diff --git a/src/mist/api/schedules/models.py b/src/mist/api/schedules/models.py index f6fa69e91..bb36d2721 100644 --- a/src/mist/api/schedules/models.py +++ b/src/mist/api/schedules/models.py @@ -336,10 +336,7 @@ def as_dict(self): selectors = [selector.as_dict() for selector in self.selectors] - if self.actions[0].__class__.__name__ == 'ScriptAction': - action = 'run script' - else: - action = self.actions[0].action + action = self.actions[0] sdict = { 'id': self.id, @@ -348,12 +345,10 @@ def as_dict(self): 'schedule': str(self.when), 'schedule_type': self.when.type, 'schedule_entry': self.when.as_dict(), - 'task_type': action, 'expires': str(self.expires or ''), 'start_after': str(self.start_after or ''), 'task_enabled': self.task_enabled, 'active': self.enabled, - 'run_immediately': self.run_immediately or '', 'last_run_at': last_run, 'total_run_count': self.total_run_count, 'max_run_count': self.max_run_count, @@ -361,6 +356,14 @@ def as_dict(self): 'owned_by': self.owned_by.id if self.owned_by else '', 'created_by': self.created_by.id if self.created_by else '', } + task_type = {} + if action.__class__.__name__ == 'ScriptAction': + task_type['action'] = 'run script' + task_type['script_id'] = action.script + task_type['params'] = action.params + else: + task_type['action'] = self.actions[0].atype + sdict['task_type'] = task_type return sdict diff --git a/src/mist/api/scripts/base.py b/src/mist/api/scripts/base.py index 5ec0337a8..1d558fd21 100644 --- a/src/mist/api/scripts/base.py +++ b/src/mist/api/scripts/base.py @@ -8,14 +8,16 @@ import mongoengine as me +from time import time from mist.api.exceptions import BadRequestError from mist.api.helpers import trigger_session_update, mac_sign -from mist.api.helpers import websocket_for_scripts +from mist.api.helpers import RabbitMQStreamConsumer from mist.api.exceptions import ScriptNameExistsError from mist.api import config + log = logging.getLogger(__name__) @@ -230,9 +232,9 @@ def generate_signed_url_v2(self): def run(self, auth_context, machine, host=None, port=None, username=None, password=None, su=False, key_id=None, params=None, job_id=None, - env='', owner=None): + env='', owner=None, ret=None, action_prefix=None): from mist.api.users.models import Organization - from mist.api.machines.methods import prepare_ssh_uri + from mist.api.machines.methods import prepare_ssh_dict import re if auth_context: owner = auth_context.owner @@ -268,19 +270,40 @@ def run(self, auth_context, machine, host=None, port=None, username=None, f' return "$retval";' '} && fetchrun' ) - ssh_user, key_name, ws_uri = prepare_ssh_uri( - auth_context=auth_context, machine=machine, job_id=job_id, + log.info('Preparing ssh dict') + ssh_dict, key_name = prepare_ssh_dict( + auth_context=auth_context, machine=machine, command=command) - exit_code, stdout = websocket_for_scripts( - ws_uri).wait_command_to_finish() - + sendScriptURI = '%s/ssh/jobs/%s' % ( + config.PORTAL_URI, + job_id + ) + log.info('Sending request to sheller:: %s' % sendScriptURI) + log.info(ssh_dict) + start = time() + resp = requests.post(sendScriptURI, json=ssh_dict) + log.info('Sheller returned %s in %d' % ( + resp.status_code, time() - start)) + exit_code, stdout = 1, "" + if resp.status_code == 200: + from mist.api.logs.methods import log_event + log_event( + event_type='job', + action=action_prefix + 'script_started', + **ret + ) + log.info('Script started: %s' % ret) + # start reading from rabbitmq-stream + c = RabbitMQStreamConsumer(job_id) + log.info("reading logs from rabbitmq-stream of job_id:%s" % job_id) + exit_code, stdout = c.consume() return { 'command': command, 'exit_code': exit_code, 'stdout': re.sub(r"(\n)\1+", r"\1", stdout.replace( '\r\n', '\n').replace('\r', '\n')), 'key_name': key_name, - 'ssh_user': ssh_user + 'ssh_user': ssh_dict["user"], } def _preparse_file(self): diff --git a/src/mist/api/scripts/views.py b/src/mist/api/scripts/views.py index 1610b4ae1..9a5a8bd7e 100644 --- a/src/mist/api/scripts/views.py +++ b/src/mist/api/scripts/views.py @@ -402,6 +402,7 @@ def run_script(request): su = params.get('su', False) env = params.get('env') job_id = params.get('job', params.get('job_id', None)) + run_async = params.get('async', True) if not job_id: job = 'run_script' job_id = uuid.uuid4().hex @@ -453,17 +454,24 @@ def run_script(request): except me.DoesNotExist: raise NotFoundError('Script id not found') job_id = job_id or uuid.uuid4().hex - tasks.run_script.send_with_options( - args=(auth_context.serialize(), script.id, machine.id), - kwargs={ - "params": script_params, - "env": env, - "su": su, - "job_id": job_id, - "job": job - }, - delay=1_000 - ) + if run_async: + tasks.run_script.send_with_options( + args=(auth_context.serialize(), script.id, machine.id), + kwargs={ + "params": script_params, + "env": env, + "su": su, + "job_id": job_id, + "job": job + }, + delay=1_000 + ) + else: + return tasks.run_script( + auth_context.serialize(), + script.id, machine.id, + params=script_params, env=env, + su=su, job=job, job_id=job_id) return {'job_id': job_id, 'job': job} diff --git a/src/mist/api/shell.py b/src/mist/api/shell.py index 236e6da5b..2b0d45ead 100644 --- a/src/mist/api/shell.py +++ b/src/mist/api/shell.py @@ -398,7 +398,7 @@ def disconnect(self, **kwargs): pass def _wrap_command(self, cmd): - if cmd[-1] is not "\n": + if cmd[-1] != "\n": cmd = cmd + "\n" return cmd @@ -611,7 +611,7 @@ def set_ws_data(self, uuid, secret, control): self._control = control def _wrap_command(self, cmd): - if cmd[-1] is not "\r": + if cmd[-1] != "\r": cmd = cmd + "\r" return cmd @@ -741,7 +741,7 @@ def disconnect(self, **kwargs): pass def _wrap_command(self, cmd): - if cmd[-1] is not "\n": + if cmd[-1] != "\n": cmd = cmd + "\n" return cmd diff --git a/src/mist/api/sock.py b/src/mist/api/sock.py index e0de04785..56c0629ba 100644 --- a/src/mist/api/sock.py +++ b/src/mist/api/sock.py @@ -20,6 +20,7 @@ import tornado.httpclient from sockjs.tornado import SockJSConnection, SockJSRouter +from mist.api.helpers import es_client from mist.api.sockjs_mux import MultiplexConnection from mist.api.logs.methods import log_event @@ -128,6 +129,7 @@ def get_dict(self): 'session_id': self.session_id, } + @tornado.gen.coroutine def internal_request(self, path, params=None, callback=None): if path.startswith('/'): path = path[1:] @@ -150,14 +152,14 @@ def response_callback(resp): headers = {'Authorization': 'internal %s %s' % ( Portal.get_singleton().internal_api_key, self.cookie_session_id)} - - tornado.httpclient.AsyncHTTPClient( - force_instance=True, max_clients=100).fetch( + client = tornado.httpclient.AsyncHTTPClient( + force_instance=True, max_clients=100) + response = yield client.fetch( '%s/%s' % (config.INTERNAL_API_URL, path), headers=headers, - callback=response_callback, connect_timeout=600, request_timeout=600, ) + response_callback(response) def __repr__(self): conn_dict = self.get_dict() @@ -670,6 +672,7 @@ def on_open(self, conn_info): self.enabled = True self.consumer = None self.enforce_logs_for = self.auth_context.org.id + self.es_client = es_client(asynchronous=True) def on_ready(self): """Initiate the RabbitMQ Consumer.""" @@ -699,6 +702,7 @@ def emit_event(self, event): self.send('event', self.parse_log(event)) self.patch_stories(event) + @tornado.gen.coroutine def send_stories(self, stype): """Send stories of the specified type.""" @@ -723,10 +727,11 @@ def callback(stories): } if self.enforce_logs_for is not None: kwargs['owner_id'] = self.enforce_logs_for - get_stories(tornado_async=True, - tornado_callback=callback, - limit=100, - **kwargs) + + yield get_stories(es_async=self.es_client, + callback=callback, + limit=100, + **kwargs) def patch_stories(self, event): """Send a stories patch. @@ -771,8 +776,10 @@ def filter_log(self, event): return filter_log_event(self.auth_context, event) return event + @tornado.gen.coroutine def on_close(self, stale=False): """Stop the Consumer and close the WebSocket.""" + yield self.es_client.close() if self.consumer is not None: try: self.consumer.stop() diff --git a/src/mist/api/tasks.py b/src/mist/api/tasks.py index 203344331..f3f047b94 100644 --- a/src/mist/api/tasks.py +++ b/src/mist/api/tasks.py @@ -1148,6 +1148,7 @@ def group_run_script(auth_context_serialized, script_id, name, machine_ids, @dramatiq.actor(queue_name='dramatiq_schedules', time_limit=3_600_000, store_results=True, + max_retries=0, throws=(me.DoesNotExist,)) def run_script(auth_context_serialized, script_id, machine_id, params='', host='', key_id='', username='', password='', port=22, @@ -1184,12 +1185,9 @@ def run_script(auth_context_serialized, script_id, machine_id, params='', 'port': port, 'command': '', 'stdout': '', - 'exit_code': '', - 'wrapper_stdout': '', - 'extra_output': '', + 'exit_code': -255, 'error': False, } - started_at = time() machine_name = '' cloud_id = '' @@ -1212,18 +1210,19 @@ def run_script(auth_context_serialized, script_id, machine_id, params='', if not host: raise MistError("No host provided and none could be discovered.") - + started_at = time() result = script.ctl.run( auth_context, machine, host=host, port=port, username=username, password=password, su=su, key_id=key_id, params=params, - job_id=job_id, env=env, owner=owner + job_id=job_id, env=env, owner=owner, ret=ret, + action_prefix=action_prefix ) ret.update(result) - except Exception as exc: + log.info("Script result: %s" % result) + except TypeError as exc: ret['error'] = str(exc) - log_event(event_type='job', action=action_prefix + 'script_started', **ret) - log.info('Script started: %s', ret) + log.error("Script error: %s" % exc) if not ret['error']: if ret['exit_code'] > 0: ret['error'] = 'Script exited with code %s' % ret['exit_code'] diff --git a/src/mist/api/users/models.py b/src/mist/api/users/models.py index ba0443225..acb65838c 100644 --- a/src/mist/api/users/models.py +++ b/src/mist/api/users/models.py @@ -443,7 +443,7 @@ def as_dict_v2(self, deref='auto', only=''): } ret = prepare_dereferenced_dict(standard_fields, deref_map, self, deref, only) - if(ret.get('policy')): + if ret.get('policy'): ret['policy'] = ret['policy'].__str__() ret['members_count'] = len(ret.get('members', [])) return ret @@ -714,7 +714,7 @@ def clean(self): elif team.name == 'Owners': raise me.ValidationError( 'RBAC Mappings are not intended for Team Owners') - elif len(mappings) is not 2: + elif len(mappings) != 2: raise me.ValidationError( 'RBAC Mappings have not been properly initialized for ' 'Team %s' % team) diff --git a/src/mist/api/when/models.py b/src/mist/api/when/models.py index 427b1772b..70e9d85f9 100644 --- a/src/mist/api/when/models.py +++ b/src/mist/api/when/models.py @@ -172,8 +172,8 @@ def as_dict(self): return {'offset': self.offset, 'period': self.period} def __str__(self): - if self.offset is 0: + if self.offset == 0: return 'Trigger offset is 0' - if self.offset is 1: + if self.offset == 1: return 'Trigger offset of 1 %s' % self.period_singular return 'Trigger offset %s %s' % (self.offset, self.period) diff --git a/v2 b/v2 index da3954cef..6673b8a73 160000 --- a/v2 +++ b/v2 @@ -1 +1 @@ -Subproject commit da3954cef9a80de617ccfad4234f34d0656be38c +Subproject commit 6673b8a7346f48ba64daa149a2e9c9182e991a0d