From b6ca74d0d286a816ab36bfa0931463515b8a89eb Mon Sep 17 00:00:00 2001 From: Pior Bastida <pior@pbastida.net> Date: Fri, 1 Jan 2016 18:01:00 -0500 Subject: [PATCH 1/2] Rename module caravan.models.decision_task --- caravan/__init__.py | 2 +- caravan/models/{decision.py => decision_task.py} | 0 caravan/tests/models/test_decision.py | 8 ++++---- caravan/workers/decider.py | 8 +++++--- 4 files changed, 10 insertions(+), 8 deletions(-) rename caravan/models/{decision.py => decision_task.py} (100%) diff --git a/caravan/__init__.py b/caravan/__init__.py index dd48923..c50ab70 100644 --- a/caravan/__init__.py +++ b/caravan/__init__.py @@ -1,4 +1,4 @@ from caravan.models.workflow import Workflow -from caravan.models.decision import DecisionDone, WorkflowFailure +from caravan.models.decision_task import DecisionDone, WorkflowFailure __all__ = ['Workflow', 'DecisionDone', 'WorkflowFailure'] diff --git a/caravan/models/decision.py b/caravan/models/decision_task.py similarity index 100% rename from caravan/models/decision.py rename to caravan/models/decision_task.py diff --git a/caravan/tests/models/test_decision.py b/caravan/tests/models/test_decision.py index 70f8450..8bfb1bc 100644 --- a/caravan/tests/models/test_decision.py +++ b/caravan/tests/models/test_decision.py @@ -4,13 +4,13 @@ class TestException(unittest.TestCase): def test_decision_done(self): - from caravan.models.decision import DecisionDone + from caravan.models.decision_task import DecisionDone exc = DecisionDone('STUFF') self.assertEqual(str(exc), 'STUFF') def test_workflow_failure(self): - from caravan.models.decision import WorkflowFailure + from caravan.models.decision_task import WorkflowFailure exc = WorkflowFailure('REASON', 'DETAILS') @@ -71,7 +71,7 @@ def decision_data(self): return decision_data def test_nominal(self): - from caravan.models.decision import DecisionTask + from caravan.models.decision_task import DecisionTask task = DecisionTask(self.decision_data) @@ -89,7 +89,7 @@ def test_nominal(self): self.assertEqual(task.decisions, []) def test_repr(self): - from caravan.models.decision import DecisionTask + from caravan.models.decision_task import DecisionTask task = DecisionTask(self.decision_data) self.assertEqual(repr(task), diff --git a/caravan/workers/decider.py b/caravan/workers/decider.py index f1487f8..4430b4c 100644 --- a/caravan/workers/decider.py +++ b/caravan/workers/decider.py @@ -3,7 +3,9 @@ from botocore.exceptions import ClientError from caravan.workers import get_default_identity -from caravan.models.decision import DecisionTask, DecisionDone, WorkflowFailure +from caravan.models.decision_task import (DecisionTask, + DecisionDone, + WorkflowFailure) log = logging.getLogger(__name__) @@ -14,7 +16,7 @@ def __init__(self, connection, domain, task_list, workflows): self.conn = connection self.domain = domain self.task_list = task_list - self.Workflows = {(w.name, w.version): w for w in workflows} + self.workflows = dict(((w.name, w.version), w) for w in workflows) self.identity = get_default_identity() def run(self): @@ -47,7 +49,7 @@ def poll(self): def run_task(self, task): log.info('Got a %r', task) workflow_key = (task.workflow_type, task.workflow_version) - workflow_class = self.Workflows.get(workflow_key) + workflow_class = self.workflows.get(workflow_key) if not workflow_class: log.warning('Unknown workflow %s', task) From c7a82c39810d6740d964c74486d6fc162de2e187 Mon Sep 17 00:00:00 2001 From: Pior Bastida <pior@pbastida.net> Date: Fri, 1 Jan 2016 19:05:02 -0500 Subject: [PATCH 2/2] Implement an activity model + worker --- README.rst | 4 +- caravan/__init__.py | 3 +- caravan/codecs/__init__.py | 0 caravan/codecs/gzipjson.py | 35 +++++++ caravan/commands/activity.py | 54 +++++++++++ caravan/commands/decider.py | 2 +- caravan/examples/demo.py | 41 ++++++++- caravan/models/activity.py | 82 +++++++++++++++++ caravan/models/activity_task.py | 34 +++++++ caravan/swf.py | 77 ++++++++++------ caravan/tests/codecs/__init__.py | 0 caravan/tests/codecs/test_gzipjson.py | 33 +++++++ caravan/tests/commands/test_activity.py | 61 ++++++++++++ caravan/tests/commands/test_decider.py | 2 +- caravan/tests/examples/test_demo.py | 4 +- caravan/tests/fixtures.py | 39 +++++++- caravan/tests/models/test_activity_task.py | 48 ++++++++++ caravan/tests/test_swf.py | 8 +- caravan/tests/util.py | 11 +++ caravan/tests/workers/__init__.py | 0 caravan/tests/workers/test_activity.py | 102 +++++++++++++++++++++ caravan/workers/__init__.py | 19 ++++ caravan/workers/activity.py | 69 ++++++++++++++ caravan/workers/decider.py | 26 ++---- setup.py | 2 + 25 files changed, 694 insertions(+), 62 deletions(-) create mode 100644 caravan/codecs/__init__.py create mode 100644 caravan/codecs/gzipjson.py create mode 100644 caravan/commands/activity.py create mode 100644 caravan/models/activity.py create mode 100644 caravan/models/activity_task.py create mode 100644 caravan/tests/codecs/__init__.py create mode 100644 caravan/tests/codecs/test_gzipjson.py create mode 100644 caravan/tests/commands/test_activity.py create mode 100644 caravan/tests/models/test_activity_task.py create mode 100644 caravan/tests/workers/__init__.py create mode 100644 caravan/tests/workers/test_activity.py create mode 100644 caravan/workers/activity.py diff --git a/README.rst b/README.rst index 7e76726..4fdbe28 100644 --- a/README.rst +++ b/README.rst @@ -15,7 +15,7 @@ Feedbacks, ideas and contributions are highly welcomed. (Just open a `Github issue <https://github.com/pior/caravan/issues>`_). - `Code on Github <https://github.com/pior/caravan>`_ -- `PyPi <https://pypi.python.org/pypi/caravan>`_ +- `PyPI <https://pypi.python.org/pypi/caravan>`_ - `Tests <https://travis-ci.org/pior/caravan>`_ |travis| |coveralls| - Doc: ``TODO`` @@ -42,7 +42,7 @@ Features ======== - Decider worker -- Activity task worker ``TODO`` +- Activity task worker - Commands to start/signal/terminate an arbitrary workflow execution - Command to list open workflow execution - Command to register a domain / list domains diff --git a/caravan/__init__.py b/caravan/__init__.py index c50ab70..020b19a 100644 --- a/caravan/__init__.py +++ b/caravan/__init__.py @@ -1,4 +1,5 @@ from caravan.models.workflow import Workflow +from caravan.models.activity import Activity from caravan.models.decision_task import DecisionDone, WorkflowFailure -__all__ = ['Workflow', 'DecisionDone', 'WorkflowFailure'] +__all__ = ['Workflow', 'Activity', 'DecisionDone', 'WorkflowFailure'] diff --git a/caravan/codecs/__init__.py b/caravan/codecs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/caravan/codecs/gzipjson.py b/caravan/codecs/gzipjson.py new file mode 100644 index 0000000..a80d73e --- /dev/null +++ b/caravan/codecs/gzipjson.py @@ -0,0 +1,35 @@ +import json +import zlib +import base64 +import logging + +log = logging.getLogger(__name__) + +# http://docs.aws.amazon.com/amazonswf/latest/developerguide/swf-dg-limits.html +MAX_SWF_MESSAGE_SIZE = 32000 + + +def dumps(data): + """Encode data in JSON. Compress in zlib/base64 if too large for SWF. + + Boto3 expect any payload to be unicode and thus will crash on unicode errors + if we don't base64 encode all zlib compressed payloads. + """ + encoded_data = json.dumps(data) + + if len(encoded_data) > MAX_SWF_MESSAGE_SIZE: + encoded_data = encoded_data.encode('utf-8') + encoded_data = base64.b64encode(zlib.compress(encoded_data)) + + return encoded_data + + +def loads(data): + """Decode data in JSON, possibly zlib/base64 compressed.""" + try: + decompressed_data = zlib.decompress(base64.b64decode(data)) + data = decompressed_data.decode('utf-8') + except: + pass + + return json.loads(data) diff --git a/caravan/commands/activity.py b/caravan/commands/activity.py new file mode 100644 index 0000000..589be21 --- /dev/null +++ b/caravan/commands/activity.py @@ -0,0 +1,54 @@ +import logging + +from caravan import Activity +from caravan.commands.base import BaseCommand +from caravan.commands import ClassesLoaderFromModule +from caravan.swf import register_activity, get_connection +from caravan.workers.activity import Worker + +log = logging.getLogger(__name__) + + +class Command(BaseCommand): + + description = 'Activity worker' + + def setup_arguments(self, parser): + parser.add_argument('-m', '--modules', + type=ClassesLoaderFromModule(Activity), + nargs='+', + required=True) + parser.add_argument('-d', '--domain', + required=True) + parser.add_argument('-t', '--task-list', + required=True) + parser.add_argument('--register-activities', action='store_true') + + def run(self): + connection = get_connection() + activities = [a for module in self.args.modules for a in module] + + if self.args.register_activities: + log.info("Registering activity types") + for activity in activities: + created = register_activity(connection=connection, + domain=self.args.domain, + activity=activity) + if created: + log.info("Activity type %s(%s): registered.", + activity.name, activity.version) + else: + log.info("Activity type %s(%s): already registered.", + activity.name, activity.version) + + log.info("Start activity worker...") + worker = Worker(connection=connection, + domain=self.args.domain, + task_list=self.args.task_list, + entities=activities) + + while True: + try: + worker.run() + except Exception: # Doesn't catch KeyboardInterrupt + log.exception("Decider crashed!") diff --git a/caravan/commands/decider.py b/caravan/commands/decider.py index 19a0be1..a38a155 100644 --- a/caravan/commands/decider.py +++ b/caravan/commands/decider.py @@ -46,7 +46,7 @@ def run(self): worker = Worker(connection=connection, domain=self.args.domain, task_list=self.args.task_list, - workflows=workflows) + entities=workflows) while True: try: diff --git a/caravan/examples/demo.py b/caravan/examples/demo.py index e5f7ec0..1e9181d 100644 --- a/caravan/examples/demo.py +++ b/caravan/examples/demo.py @@ -1,9 +1,10 @@ from __future__ import print_function +import time -from caravan import Workflow +from caravan import Workflow, Activity -class Demo(Workflow): +class DemoWorkflow(Workflow): """Demo workflow using the bare caravan API.""" @@ -58,9 +59,43 @@ def run(self): self.task.add_decision('CompleteWorkflowExecution', result=workflow_result) + elif signal_name == 'ACTIVITY': + # Schedule a demo activity + activity_type = { + 'name': DemoActivity.name, + 'version': DemoActivity.version, + } + activity_id = str(time.time()) + self.task.add_decision('ScheduleActivityTask', + activityType=activity_type, + activityId=activity_id, + input='"DemoInput"') + else: print("Unknown signal '%s'" % signal_name) self.task.decision_done(msg='JustIgnoringThisUnknownSignal') else: - self.task.fail('unknown_event_type') + print('Unknown last event type') + + +class DemoActivity(Activity): + + name = 'DemoActivity' + version = '0.1.2' + + default_task_list = 'default' + + default_task_start_to_close_timeout = 60 + default_task_schedule_to_start_timeout = 10 + default_task_schedule_to_close_timeout = 70 + default_task_heartbeat_timeout = 'NONE' + + def run(self, input): + print('Received input: %s' % input) + self.do_stuff() + return {'input': input, 'output': 'YO'} + + def do_stuff(self): + print('Doing stuff (waiting 3 secs)') + time.sleep(2) diff --git a/caravan/models/activity.py b/caravan/models/activity.py new file mode 100644 index 0000000..76b7d04 --- /dev/null +++ b/caravan/models/activity.py @@ -0,0 +1,82 @@ +from caravan.codecs import gzipjson + + +class Activity(object): + + """Base implementation of an activity. + + To implement a activity, subclass this class and override the attributes + and the run method. + + Class attributes to override: + + name (str): Name of the workflow type + + version (str): Version of the workflow type + + description (str|optional): description of the workflow type + + default_task_start_to_close_timeout (optional): + maximum duration of decision tasks + + default_task_heartbeat_timeout (optional): + maximum time before which a worker processing a task of this type + must report progress by calling RecordActivityTaskHeartbeat + + default_task_list (optional): + default task list to use for scheduling decision tasks + + default_task_priority (optional): + task priority to assign to the activity type. (default to 0) + Valid values: -2147483648 to 2147483647. Higher numbers indicate + higher priority. + + default_task_schedule_to_start_timeout (optional): + maximum duration that a task of this activity type can wait before + being assigned to a worker. The value "NONE" can be used to specify + unlimited duration. + + default_task_schedule_to_close_timeout (optional): + maximum duration for a task of this activity type. The value "NONE" + can be used to specify unlimited duration. + + More information on the Boto3 documentation: + http://boto3.readthedocs.org/en/latest/reference/services/swf.html#SWF.Client.register_activity_type + + Method to override: + + run() + + Example:: + + from caravan import Activity + + class DoStuff(Activity): + + name = 'DoStuff' + version = '1.0' + + def run(self, input): + result = do_stuff(input) + return result + """ + + name = None + version = None + description = None + + codec = gzipjson + + def __init__(self, task): + self.task = task + + def __repr__(self): + return '<Activity %s(%s)>' % (self.name, self.version) + + def _run(self): + task_input = self.codec.loads(self.task.task_input) + task_result = self.run(task_input) + self.task.set_result(self.codec.dumps(task_result)) + + def run(self): + raise NotImplementedError() diff --git a/caravan/models/activity_task.py b/caravan/models/activity_task.py new file mode 100644 index 0000000..9e038dc --- /dev/null +++ b/caravan/models/activity_task.py @@ -0,0 +1,34 @@ +class ActivityTaskFailure(Exception): + + def __init__(self, reason, details): + self.reason = reason + self.details = details + super(ActivityTaskFailure, self).__init__('%s: %s' % (reason, details)) + + +class ActivityTask(object): + + """An activity task.""" + + def __init__(self, data): + self.task_token = data['taskToken'] + self.activity_id = data['activityId'] + self.workflow_id = data['workflowExecution']['workflowId'] + self.workflow_run_id = data['workflowExecution']['runId'] + self.activity_name = data['activityType']['name'] + self.activity_version = data['activityType']['version'] + self.task_input = data.get('input') + self.result = None + + def __repr__(self): + return '<ActivityTask %s(%s) id=%s WorkflowId=%s>' % ( + self.activity_name, + self.activity_version, + self.activity_id, + self.workflow_id) + + def fail(self, reason, details): + raise ActivityTaskFailure(reason, details) + + def set_result(self, result): + self.result = result diff --git a/caravan/swf.py b/caravan/swf.py index cc777a3..ad856f3 100644 --- a/caravan/swf.py +++ b/caravan/swf.py @@ -9,32 +9,14 @@ class InvalidWorkflowError(Exception): pass -REGISTER_WORKFLOW_PARAMETERS = [ - 'name', - 'version', - 'description', - 'defaultTaskStartToCloseTimeout', - 'defaultExecutionStartToCloseTimeout', - 'defaultTaskList', - 'defaultTaskPriority', - 'defaultChildPolicy', - 'defaultLambdaRole', - ] +def get_swf_parameters(workflow, parameters): + param_dict = {} -REGISTER_WORKFLOW_REQUIRED_PARAMETERS = [ - 'name', - 'version', - ] - - -def get_workflow_registration_parameter(workflow): - args = {} - - for parameter in REGISTER_WORKFLOW_PARAMETERS: + for parameter in parameters: attr_name = inflection.underscore(parameter) attr_value = getattr(workflow, attr_name, None) - required = attr_name in REGISTER_WORKFLOW_REQUIRED_PARAMETERS + required = attr_name in ['name', 'version'] if attr_value is None: if required: @@ -54,9 +36,22 @@ def get_workflow_registration_parameter(workflow): elif not is_string: raise InvalidWorkflowError('invalid attribute %s' % attr_name) - args[parameter] = attr_value + param_dict[parameter] = attr_value - return args + return param_dict + + +WORKFLOW_PARAMETERS = [ + 'name', + 'version', + 'description', + 'defaultTaskStartToCloseTimeout', + 'defaultExecutionStartToCloseTimeout', + 'defaultTaskList', + 'defaultTaskPriority', + 'defaultChildPolicy', + 'defaultLambdaRole', + ] def register_workflow(connection, domain, workflow): @@ -64,7 +59,7 @@ def register_workflow(connection, domain, workflow): Return False if this workflow already registered (and True otherwise). """ - args = get_workflow_registration_parameter(workflow) + args = get_swf_parameters(workflow, WORKFLOW_PARAMETERS) try: connection.register_workflow_type(domain=domain, **args) @@ -76,8 +71,38 @@ def register_workflow(connection, domain, workflow): return True +ACTIVITY_PARAMETERS = [ + 'name', + 'version', + 'description', + 'defaultTaskStartToCloseTimeout', + 'defaultTaskHeartbeatTimeout', + 'defaultTaskList', + 'defaultTaskPriority', + 'defaultTaskScheduleToStartTimeout', + 'defaultTaskScheduleToCloseTimeout', + ] + + +def register_activity(connection, domain, activity): + """Register an activity type. + + Return False if this activity already registered (and True otherwise). + """ + args = get_swf_parameters(activity, ACTIVITY_PARAMETERS) + + try: + connection.register_activity_type(domain=domain, **args) + except ClientError as err: + if err.response['Error']['Code'] == 'TypeAlreadyExistsFault': + return False # Ignore this error + raise + + return True + + def get_connection(): - # Must increase the http timeout since SWF has a timeout of 60 sec + """Create and return a Boto3 connection for SWF (with read_timeout=70).""" config = Config(connect_timeout=50, read_timeout=70) connection = boto3.client("swf", config=config) return connection diff --git a/caravan/tests/codecs/__init__.py b/caravan/tests/codecs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/caravan/tests/codecs/test_gzipjson.py b/caravan/tests/codecs/test_gzipjson.py new file mode 100644 index 0000000..cac472b --- /dev/null +++ b/caravan/tests/codecs/test_gzipjson.py @@ -0,0 +1,33 @@ +# encoding: utf-8 +import unittest + +from caravan.codecs import gzipjson + +from nose_parameterized import parameterized + + +class Test(unittest.TestCase): + + @parameterized.expand([ + ('None', None, 'null'), + ('String', 'Yo', '"Yo"'), + ('Dict', {'k': 'v'}, '{"k": "v"}'), + ('VeryLong', 'x' * 32001, + b'eJztwSEBAAAAAqAvrnS+yRdACgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAHA' + b'ZPkWcIw=='), + ('Unicode', u'πΏπ±β', '"\\ud83d\\ude3f\\ud83d\\ude31\\u2211"'), + ('VeryLongUnicode', u'πΏ' * 8001, + b'eJztxjERACAIAMAuVGAxDCPawP5SwAa//H3U7ZU97jzu7u7u7u7u7u7u7u7u7u7u7u7' + b'u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u' + b'7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7' + b'u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u7u' + b'7u7u7u7u7u7u7u7u7u7u7u6/xwMOT8Lp'), + ]) + def test_dumps(self, name, data, encoded): + self.assertEqual(gzipjson.dumps(data), encoded) + + self.assertEqual(gzipjson.loads(encoded), data, + msg='Failed to decode data') + + self.assertEqual(gzipjson.loads(gzipjson.dumps(data)), data, + msg='Failed to encode/decode data') diff --git a/caravan/tests/commands/test_activity.py b/caravan/tests/commands/test_activity.py new file mode 100644 index 0000000..da9b15a --- /dev/null +++ b/caravan/tests/commands/test_activity.py @@ -0,0 +1,61 @@ +import unittest + +import mock + +from caravan.tests import fixtures +from caravan.tests.util import TestUtilMixin +from caravan.commands.activity import Command + + +@mock.patch('caravan.commands.activity.Worker') +class Test(TestUtilMixin, unittest.TestCase): + + def test_nominal(self, m_worker_cls): + m_worker = m_worker_cls.return_value + m_worker.run.side_effect = [None, None, KeyboardInterrupt('KILLTEST')] + + args = [ + '-d', 'DOMAIN', '-m', 'caravan.tests.fixtures', '-t', 'TASKLIST', + ] + + with self.mock_args(args): + with self.assertRaises(SystemExit) as exc: + Command.main() + + self.assertEqual(exc.exception.code, 1) + self.assertEqual(m_worker.run.call_count, 3) + + args, kwargs = m_worker_cls.call_args + self.assertEqual(kwargs['domain'], 'DOMAIN') + self.assertEqual(kwargs['task_list'], 'TASKLIST') + self.assertEqual(kwargs['entities'], + [fixtures.TestActivity1, fixtures.TestActivity2]) + + self.assertIsSwfConnection(kwargs['connection']) + + @mock.patch('caravan.commands.activity.register_activity', autospec=True) + @mock.patch('caravan.commands.activity.get_connection', autospec=True) + def test_register_activities(self, m_get_conn, m_register, m_worker_cls): + m_worker = m_worker_cls.return_value + m_worker.run.side_effect = KeyboardInterrupt('KILLTEST') + + m_conn = m_get_conn.return_value + + args = [ + '-d', 'DOMAIN', '-m', 'caravan.tests.fixtures', '-t', 'TASKLIST', + '--register-activities', + ] + + with self.mock_args(args): + with self.assertRaises(SystemExit): + Command.main() + + expected = [ + mock.call(connection=m_conn, + domain='DOMAIN', + activity=fixtures.TestActivity1), + mock.call(connection=m_conn, + domain='DOMAIN', + activity=fixtures.TestActivity2), + ] + self.assertEqual(m_register.call_args_list, expected) diff --git a/caravan/tests/commands/test_decider.py b/caravan/tests/commands/test_decider.py index 411cda4..177c1d3 100644 --- a/caravan/tests/commands/test_decider.py +++ b/caravan/tests/commands/test_decider.py @@ -30,7 +30,7 @@ def test_nominal(self, m_worker_cls): args, kwargs = m_worker_cls.call_args self.assertEqual(kwargs['domain'], 'DOMAIN') self.assertEqual(kwargs['task_list'], 'TASKLIST') - self.assertEqual(kwargs['workflows'], + self.assertEqual(kwargs['entities'], [fixtures.TestWorkflow1, fixtures.TestWorkflow2]) # Boto3 client object are dynamically forged... diff --git a/caravan/tests/examples/test_demo.py b/caravan/tests/examples/test_demo.py index ec927a1..bd60751 100644 --- a/caravan/tests/examples/test_demo.py +++ b/caravan/tests/examples/test_demo.py @@ -1,10 +1,10 @@ import unittest -from caravan.examples.demo import Demo +from caravan.examples.demo import DemoWorkflow from caravan.testing import valid_workflow_registration class Test(unittest.TestCase): def test_registration(self): - valid_workflow_registration(Demo) + valid_workflow_registration(DemoWorkflow) diff --git a/caravan/tests/fixtures.py b/caravan/tests/fixtures.py index e6d394e..d131f7f 100644 --- a/caravan/tests/fixtures.py +++ b/caravan/tests/fixtures.py @@ -1,4 +1,4 @@ -from caravan import Workflow +from caravan import Workflow, Activity class TestWorkflow1(Workflow): @@ -9,3 +9,40 @@ class TestWorkflow1(Workflow): class TestWorkflow2(Workflow): name = 'Workflow2' + + +class TestActivity1(Activity): + + name = 'Activity1' + version = '1' + + def run(self, input): + return 'DONE' + + +class TestActivity2(Activity): + + name = 'Activity2' + version = '2' + + def run(self, input): + self.task.fail('REASON', 'DETAILS') + + +def make_activity_task_data(name='NAME', version='VERSION', input='INPUT'): + data = { + 'taskToken': 'TASK_TOKEN', + 'activityId': 'ACTIVITY_ID', + 'startedEventId': 123, + 'workflowExecution': { + 'workflowId': 'WORKFLOW_ID', + 'runId': 'RUN_ID' + }, + 'activityType': { + 'name': name, + 'version': version, + }, + } + if input is not None: + data['input'] = input + return data diff --git a/caravan/tests/models/test_activity_task.py b/caravan/tests/models/test_activity_task.py new file mode 100644 index 0000000..234637b --- /dev/null +++ b/caravan/tests/models/test_activity_task.py @@ -0,0 +1,48 @@ +import unittest + +from caravan.models.activity_task import ActivityTask, ActivityTaskFailure + +from .. import fixtures + + +class Test(unittest.TestCase): + + def test_nominal(self): + data = fixtures.make_activity_task_data() + task = ActivityTask(data) + self.assertEqual(task.task_token, 'TASK_TOKEN') + self.assertEqual(task.activity_id, 'ACTIVITY_ID') + self.assertEqual(task.workflow_id, 'WORKFLOW_ID') + self.assertEqual(task.workflow_run_id, 'RUN_ID') + self.assertEqual(task.activity_name, 'NAME') + self.assertEqual(task.activity_version, 'VERSION') + self.assertEqual(task.task_input, 'INPUT') + + def test_no_input(self): + data = fixtures.make_activity_task_data(input=None) + task = ActivityTask(data) + self.assertEqual(task.task_input, None) + + def test_result(self): + data = fixtures.make_activity_task_data(input=None) + task = ActivityTask(data) + self.assertEqual(task.result, None) + + task.set_result('RESULT') + self.assertEqual(task.result, 'RESULT') + + def test_fail(self): + data = fixtures.make_activity_task_data() + task = ActivityTask(data) + + with self.assertRaises(ActivityTaskFailure) as assert_exc: + task.fail('reason', 'details') + + self.assertEqual(assert_exc.exception.reason, 'reason') + self.assertEqual(assert_exc.exception.details, 'details') + self.assertEqual(str(assert_exc.exception), 'reason: details') + + def test_repr(self): + data = fixtures.make_activity_task_data() + task = ActivityTask(data) + repr(task) diff --git a/caravan/tests/test_swf.py b/caravan/tests/test_swf.py index 7f92258..4c524d0 100644 --- a/caravan/tests/test_swf.py +++ b/caravan/tests/test_swf.py @@ -3,9 +3,7 @@ import mock from moto import mock_swf -from caravan.swf import (get_workflow_registration_parameter, - register_workflow, - get_connection) +from caravan.swf import register_workflow, get_connection class Test(unittest.TestCase): @@ -49,9 +47,9 @@ def _list_workflows(self): return response['typeInfos'] def test_example_demo(self): - from caravan.examples.demo import Demo + from caravan.examples.demo import DemoWorkflow - result = register_workflow(self.connection, 'TEST', Demo) + result = register_workflow(self.connection, 'TEST', DemoWorkflow) self.assertTrue(result) workflow = self._list_workflows()[0] diff --git a/caravan/tests/util.py b/caravan/tests/util.py index a35f0cf..f0dcaa5 100644 --- a/caravan/tests/util.py +++ b/caravan/tests/util.py @@ -18,3 +18,14 @@ def __exit__(self, *args): def mock_args(args): return mock.patch('sys.argv', ['PROG'] + args) + + +class TestUtilMixin(object): + + def assertIsSwfConnection(self, obj): + # Boto3 client object are dynamically built type... + self.assertEqual(obj.__class__.__name__, 'SWF') + self.assertEqual(obj.__class__.__module__, 'botocore.client') + + def mock_args(self, args): + return mock.patch('sys.argv', ['PROG'] + args) diff --git a/caravan/tests/workers/__init__.py b/caravan/tests/workers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/caravan/tests/workers/test_activity.py b/caravan/tests/workers/test_activity.py new file mode 100644 index 0000000..8e5ab19 --- /dev/null +++ b/caravan/tests/workers/test_activity.py @@ -0,0 +1,102 @@ +import unittest + +import mock + +from caravan.models.activity_task import ActivityTask +from caravan.workers.activity import Worker +from .. import fixtures + + +class Test(unittest.TestCase): + + def test_run(self): + data = fixtures.make_activity_task_data(name='Activity1', + version='1', + input='"IN"') + m_conn = mock.Mock() + m_conn.poll_for_activity_task.return_value = data + + activities = [fixtures.TestActivity1, fixtures.TestActivity2] + + worker = Worker(connection=m_conn, domain='DOMAIN', + task_list='default', entities=activities) + + worker.run() + + self.assertTrue(m_conn.respond_activity_task_completed.called) + self.assertFalse(m_conn.respond_activity_task_failed.called) + + def test_poll(self): + data = fixtures.make_activity_task_data() + m_conn = mock.Mock() + m_conn.poll_for_activity_task.return_value = data + + worker = Worker(connection=m_conn, domain='DOMAIN', + task_list='default', entities=[]) + + task = worker.poll() + + self.assertIsInstance(task, ActivityTask) + self.assertEqual(task.activity_name, 'NAME') + + def test_poll_timeout(self): + m_conn = mock.Mock() + m_conn.poll_for_activity_task.return_value = {} + + worker = Worker(connection=m_conn, domain='DOMAIN', + task_list='default', entities=[]) + + result = worker.poll() + self.assertIsNone(result) + + def test_run_task(self): + m_conn = mock.Mock() + + activities = [fixtures.TestActivity1, fixtures.TestActivity2] + + worker = Worker(connection=m_conn, domain='DOMAIN', + task_list='default', entities=activities) + + data = fixtures.make_activity_task_data(name='Activity1', + version='1', + input='"IN"') + task = ActivityTask(data) + worker.run_task(task) + + m_conn.respond_activity_task_completed.assert_called_once_with( + result='"DONE"', taskToken='TASK_TOKEN') + self.assertFalse(m_conn.respond_activity_task_failed.called) + + def test_run_task_failed(self): + m_conn = mock.Mock() + + activities = [fixtures.TestActivity1, fixtures.TestActivity2] + + worker = Worker(connection=m_conn, domain='DOMAIN', + task_list='default', entities=activities) + + data = fixtures.make_activity_task_data(name='Activity2', + version='2', + input='"IN"') + task = ActivityTask(data) + worker.run_task(task) + + m_conn.respond_activity_task_failed.assert_called_once_with( + details='DETAILS', reason='REASON', taskToken='TASK_TOKEN') + self.assertFalse(m_conn.respond_activity_task_completed.called) + + def test_run_task_unknown(self): + m_conn = mock.Mock() + + activities = [fixtures.TestActivity1, fixtures.TestActivity2] + + worker = Worker(connection=m_conn, domain='DOMAIN', + task_list='default', entities=activities) + + data = fixtures.make_activity_task_data(name='Activity3') + + task = ActivityTask(data) + worker.run_task(task) + + self.assertFalse(m_conn.respond_activity_task_completed.called) + self.assertFalse(m_conn.respond_activity_task_failed.called) diff --git a/caravan/workers/__init__.py b/caravan/workers/__init__.py index bc5a13f..311953b 100644 --- a/caravan/workers/__init__.py +++ b/caravan/workers/__init__.py @@ -1,6 +1,8 @@ import os import socket +import logging +log = logging.getLogger(__name__) IDENTITY_SIZE = 256 @@ -8,3 +10,20 @@ def get_default_identity(): identity = "%s-%s" % (socket.getfqdn(), os.getpid()) return identity[-IDENTITY_SIZE:] # keep the most important part + + +class BaseWorker(object): + + def __init__(self, connection, domain, task_list, entities): + self.conn = connection + self.domain = domain + self.task_list = task_list + self.entities = dict(((e.name, e.version), e) for e in entities) + self.identity = get_default_identity() + + def run(self): + log.info('Waiting for a task...') + task = self.poll() + if task: + log.info('Got a %r', task) + self.run_task(task) diff --git a/caravan/workers/activity.py b/caravan/workers/activity.py new file mode 100644 index 0000000..692949c --- /dev/null +++ b/caravan/workers/activity.py @@ -0,0 +1,69 @@ +import logging + +from botocore.exceptions import ClientError + +from caravan.workers import BaseWorker +from caravan.models.activity_task import ActivityTask, ActivityTaskFailure + +log = logging.getLogger(__name__) + + +class Worker(BaseWorker): + + def poll(self): + task_list = {'name': self.task_list} + resp = self.conn.poll_for_activity_task(domain=self.domain, + taskList=task_list, + identity=self.identity) + if 'taskToken' not in resp: + return # Polling timed out. + + return ActivityTask(resp) + + def run_task(self, task): + activity_key = (task.activity_name, task.activity_version) + activity_class = self.entities.get(activity_key) + + if not activity_class: + log.warning('Unknown activity %s', task) + return + + log.info('Running %r...', activity_class) + try: + activity_class(task)._run() + except ActivityTaskFailure as exc: + self.respond_failed(task, reason=exc.reason, details=exc.details) + except Exception as exc: + details = 'Exception %s: %s' % (type(exc), exc) + self.respond_failed(task, reason='unknown_error', details=details) + else: + self.respond_completed(task) + + def respond_failed(self, task, reason, details): + log.info('Respond task failed: %s (%s)', reason, details) + try: + self.conn.respond_activity_task_failed(taskToken=task.task_token, + reason=reason, + details=details) + except ClientError as exc: + log.error('respond_activity_task_failed failed for %s: %s\n%s', + task, exc.message, exc.response) + except Exception as exc: + log.error('respond_activity_task_failed failed for %s: %s', + task, exc) + + def respond_completed(self, task): + log.info('Respond task complete: %s', task.result) + args = {} + if task.result is not None: + args['result'] = task.result + + try: + self.conn.respond_activity_task_completed(taskToken=task.task_token, + **args) + except ClientError as exc: + log.error('respond_activity_task_completed failed for %s: %s\n%s', + task, exc.message, exc.response) + except Exception as exc: + log.error('respond_activity_task_completed failed for %s: %s', + task, exc) diff --git a/caravan/workers/decider.py b/caravan/workers/decider.py index 4430b4c..2513ba9 100644 --- a/caravan/workers/decider.py +++ b/caravan/workers/decider.py @@ -2,7 +2,7 @@ from botocore.exceptions import ClientError -from caravan.workers import get_default_identity +from caravan.workers import BaseWorker from caravan.models.decision_task import (DecisionTask, DecisionDone, WorkflowFailure) @@ -10,24 +10,10 @@ log = logging.getLogger(__name__) -class Worker(object): - - def __init__(self, connection, domain, task_list, workflows): - self.conn = connection - self.domain = domain - self.task_list = task_list - self.workflows = dict(((w.name, w.version), w) for w in workflows) - self.identity = get_default_identity() - - def run(self): - log.info('Waiting for a decision task...') - task = self.poll() - if task: - self.run_task(task) +class Worker(BaseWorker): def poll(self): - task_list = dict(name=self.task_list) - + task_list = {'name': self.task_list} resp = self.conn.poll_for_decision_task(domain=self.domain, taskList=task_list, identity=self.identity) @@ -35,21 +21,21 @@ def poll(self): return # Polling timed out. next_page_token = resp.get('nextPageToken') + while next_page_token: page = self.conn.poll_for_decision_task( domain=self.domain, taskList=task_list, identity=self.identity, nextPageToken=next_page_token) - next_page_token = page.get('nextPageToken') resp['events'].extend(page['events']) + next_page_token = page.get('nextPageToken') return DecisionTask(resp) def run_task(self, task): - log.info('Got a %r', task) workflow_key = (task.workflow_type, task.workflow_version) - workflow_class = self.workflows.get(workflow_key) + workflow_class = self.entities.get(workflow_key) if not workflow_class: log.warning('Unknown workflow %s', task) diff --git a/setup.py b/setup.py index c699a70..71acc7f 100644 --- a/setup.py +++ b/setup.py @@ -31,6 +31,7 @@ 'abduct', 'httpretty == 0.8.10', 'moto', + 'nose-parameterized', 'zest.releaser[recommended]', 'pylama', @@ -39,6 +40,7 @@ entry_points={ 'console_scripts': [ 'caravan-decider = caravan.commands.decider:Command.main', + 'caravan-activity = caravan.commands.activity:Command.main', 'caravan-start = caravan.commands.start:Command.main', 'caravan-signal = caravan.commands.signal:Command.main', 'caravan-terminate = caravan.commands.terminate:Command.main',