drama
is an asynchronous workflow executor engine supported by a command line tool and RESTful API.
Proudly based on Dramatiq. This project requires the following dependencies to work:
- Apache Kafka
- RabbitMQ
- Mongodb
- MinIO / HDFS
You can setup a minimal development environment using Docker Compose.
docker-compose [-f docker-compose-kafka.yml] up -d
Via source code using Poetry:
git clone https://github.com/KhaosResearch/drama.git
cd drama
poetry install
Before running drama
, save a copy of .env.template
as .env
and insert your own values.
drama
will then look for a valid .env
file in the current working directory. In its absence, it will attempt to determine the values from environmental variables (with prefix DRAMA_
).
After that you can run:
poetry run drama -h
Spawns multiple concurrent worker processes to execute tasks.
poetry run drama worker --processes 4
For a full list or valid command line arguments that can be passed to
drama worker
, checkoutdramatiq -h
(optional)
Server can be deployed with uvicorn, a lightning-fast ASGI server, using the command-line client.
poetry run drama server
Alternatively, use the provided Dockerfile
:
sudo docker build . -t drama-server
sudo docker run -p 8004:8004 drama-server
Online documentation is available at /api/docs
.
Take a look at the catalog for some examples on how to develop your own components.
To run a workflow from Python, provide a list of TaskRequest
s and wrap them up in a WorkflowRequest
.
from drama.models.task import Task
from drama.models.workflow import Workflow
from drama.worker.scheduler import Scheduler
task_import_file = Task(
name="LoadIrisDataset",
module="drama.core.catalog.load.ImportFile",
params={
"url": "https://gist.githubusercontent.com/netj/8836201/raw/6f9306ad21398ea43cba4f7d537619d0e07d5ae3/iris.csv"
}
)
workflow_request = Workflow(tasks=[task_import_file])
with Scheduler() as scheduler:
scheduler.run(workflow_request)
(This script is complete, it should run "as is")
Alternatively, a workflow can also be run using the built-in server.
import requests
r = requests.post(
url='http://localhost:8001/api/v2/workflow/run',
json={
"tasks": [
{
"name": "LoadIrisDataset",
"module": "drama.core.catalog.load.ImportFile",
"params": {
"url": "https://raw.githubusercontent.com/cs109/2014_data/master/countries.csv"
}
}
]
})
print(r.text)
(This script is complete, it should run "as is")