Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Activity worker #12

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Feedbacks, ideas and contributions are highly welcomed. (Just open a
`Github issue <https://github.com/pior/caravan/issues>`_).

- `Code on Github <https://github.com/pior/caravan>`_
- `PyPi <https://pypi.python.org/pypi/caravan>`_
- `PyPI <https://pypi.python.org/pypi/caravan>`_
- `Tests <https://travis-ci.org/pior/caravan>`_ |travis| |coveralls|
- Doc: ``TODO``

Expand All @@ -42,7 +42,7 @@ Features
========

- Decider worker
- Activity task worker ``TODO``
- Activity task worker
- Commands to start/signal/terminate an arbitrary workflow execution
- Command to list open workflow execution
- Command to register a domain / list domains
Expand Down
5 changes: 3 additions & 2 deletions caravan/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from caravan.models.workflow import Workflow
from caravan.models.decision import DecisionDone, WorkflowFailure
from caravan.models.activity import Activity
from caravan.models.decision_task import DecisionDone, WorkflowFailure

__all__ = ['Workflow', 'DecisionDone', 'WorkflowFailure']
__all__ = ['Workflow', 'Activity', 'DecisionDone', 'WorkflowFailure']
Empty file added caravan/codecs/__init__.py
Empty file.
35 changes: 35 additions & 0 deletions caravan/codecs/gzipjson.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import json
import zlib
import base64
import logging

log = logging.getLogger(__name__)

# http://docs.aws.amazon.com/amazonswf/latest/developerguide/swf-dg-limits.html
MAX_SWF_MESSAGE_SIZE = 32000


def dumps(data):
"""Encode data in JSON. Compress in zlib/base64 if too large for SWF.

Boto3 expect any payload to be unicode and thus will crash on unicode errors
if we don't base64 encode all zlib compressed payloads.
"""
encoded_data = json.dumps(data)

if len(encoded_data) > MAX_SWF_MESSAGE_SIZE:
encoded_data = encoded_data.encode('utf-8')
encoded_data = base64.b64encode(zlib.compress(encoded_data))

return encoded_data


def loads(data):
"""Decode data in JSON, possibly zlib/base64 compressed."""
try:
decompressed_data = zlib.decompress(base64.b64decode(data))
data = decompressed_data.decode('utf-8')
except:
pass

return json.loads(data)
54 changes: 54 additions & 0 deletions caravan/commands/activity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import logging

from caravan import Activity
from caravan.commands.base import BaseCommand
from caravan.commands import ClassesLoaderFromModule
from caravan.swf import register_activity, get_connection
from caravan.workers.activity import Worker

log = logging.getLogger(__name__)


class Command(BaseCommand):

description = 'Activity worker'

def setup_arguments(self, parser):
parser.add_argument('-m', '--modules',
type=ClassesLoaderFromModule(Activity),
nargs='+',
required=True)
parser.add_argument('-d', '--domain',
required=True)
parser.add_argument('-t', '--task-list',
required=True)
parser.add_argument('--register-activities', action='store_true')

def run(self):
connection = get_connection()
activities = [a for module in self.args.modules for a in module]

if self.args.register_activities:
log.info("Registering activity types")
for activity in activities:
created = register_activity(connection=connection,
domain=self.args.domain,
activity=activity)
if created:
log.info("Activity type %s(%s): registered.",
activity.name, activity.version)
else:
log.info("Activity type %s(%s): already registered.",
activity.name, activity.version)

log.info("Start activity worker...")
worker = Worker(connection=connection,
domain=self.args.domain,
task_list=self.args.task_list,
entities=activities)

while True:
try:
worker.run()
except Exception: # Doesn't catch KeyboardInterrupt
log.exception("Decider crashed!")
2 changes: 1 addition & 1 deletion caravan/commands/decider.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def run(self):
worker = Worker(connection=connection,
domain=self.args.domain,
task_list=self.args.task_list,
workflows=workflows)
entities=workflows)

while True:
try:
Expand Down
41 changes: 38 additions & 3 deletions caravan/examples/demo.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from __future__ import print_function
import time

from caravan import Workflow
from caravan import Workflow, Activity


class Demo(Workflow):
class DemoWorkflow(Workflow):

"""Demo workflow using the bare caravan API."""

Expand Down Expand Up @@ -58,9 +59,43 @@ def run(self):
self.task.add_decision('CompleteWorkflowExecution',
result=workflow_result)

elif signal_name == 'ACTIVITY':
# Schedule a demo activity
activity_type = {
'name': DemoActivity.name,
'version': DemoActivity.version,
}
activity_id = str(time.time())
self.task.add_decision('ScheduleActivityTask',
activityType=activity_type,
activityId=activity_id,
input='"DemoInput"')

else:
print("Unknown signal '%s'" % signal_name)
self.task.decision_done(msg='JustIgnoringThisUnknownSignal')

else:
self.task.fail('unknown_event_type')
print('Unknown last event type')


class DemoActivity(Activity):

name = 'DemoActivity'
version = '0.1.2'

default_task_list = 'default'

default_task_start_to_close_timeout = 60
default_task_schedule_to_start_timeout = 10
default_task_schedule_to_close_timeout = 70
default_task_heartbeat_timeout = 'NONE'

def run(self, input):
print('Received input: %s' % input)
self.do_stuff()
return {'input': input, 'output': 'YO'}

def do_stuff(self):
print('Doing stuff (waiting 3 secs)')
time.sleep(2)
82 changes: 82 additions & 0 deletions caravan/models/activity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from caravan.codecs import gzipjson


class Activity(object):

"""Base implementation of an activity.

To implement a activity, subclass this class and override the attributes
and the run method.

Class attributes to override:

name (str): Name of the workflow type

version (str): Version of the workflow type

description (str|optional): description of the workflow type

default_task_start_to_close_timeout (optional):
maximum duration of decision tasks

default_task_heartbeat_timeout (optional):
maximum time before which a worker processing a task of this type
must report progress by calling RecordActivityTaskHeartbeat

default_task_list (optional):
default task list to use for scheduling decision tasks

default_task_priority (optional):
task priority to assign to the activity type. (default to 0)
Valid values: -2147483648 to 2147483647. Higher numbers indicate
higher priority.

default_task_schedule_to_start_timeout (optional):
maximum duration that a task of this activity type can wait before
being assigned to a worker. The value "NONE" can be used to specify
unlimited duration.

default_task_schedule_to_close_timeout (optional):
maximum duration for a task of this activity type. The value "NONE"
can be used to specify unlimited duration.

More information on the Boto3 documentation:
http://boto3.readthedocs.org/en/latest/reference/services/swf.html#SWF.Client.register_activity_type

Method to override:

run()

Example::

from caravan import Activity

class DoStuff(Activity):

name = 'DoStuff'
version = '1.0'

def run(self, input):
result = do_stuff(input)
return result
"""

name = None
version = None
description = None

codec = gzipjson

def __init__(self, task):
self.task = task

def __repr__(self):
return '<Activity %s(%s)>' % (self.name, self.version)

def _run(self):
task_input = self.codec.loads(self.task.task_input)
task_result = self.run(task_input)
self.task.set_result(self.codec.dumps(task_result))

def run(self):
raise NotImplementedError()
34 changes: 34 additions & 0 deletions caravan/models/activity_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
class ActivityTaskFailure(Exception):

def __init__(self, reason, details):
self.reason = reason
self.details = details
super(ActivityTaskFailure, self).__init__('%s: %s' % (reason, details))


class ActivityTask(object):

"""An activity task."""

def __init__(self, data):
self.task_token = data['taskToken']
self.activity_id = data['activityId']
self.workflow_id = data['workflowExecution']['workflowId']
self.workflow_run_id = data['workflowExecution']['runId']
self.activity_name = data['activityType']['name']
self.activity_version = data['activityType']['version']
self.task_input = data.get('input')
self.result = None

def __repr__(self):
return '<ActivityTask %s(%s) id=%s WorkflowId=%s>' % (
self.activity_name,
self.activity_version,
self.activity_id,
self.workflow_id)

def fail(self, reason, details):
raise ActivityTaskFailure(reason, details)

def set_result(self, result):
self.result = result
File renamed without changes.
Loading