Skip to content

Commit

Permalink
Add initial version (#1)
Browse files Browse the repository at this point in the history
- Add generic bus app instance
- Add support for devs to define their own dataclass events
- Allow defining any function as a consumer of an event class(called agents)
- Add CLI interface for starting workers with agents.
- Add unit tests
- Add basic linting, type checks
  • Loading branch information
sidmitra authored Jun 14, 2021
1 parent 8b3050b commit 1b49e61
Show file tree
Hide file tree
Showing 20 changed files with 2,285 additions and 2 deletions.
3 changes: 3 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[flake8]
format = pylint
max-line-length = 88
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,6 @@ dmypy.json

# Pyre type checker
.pyre/

# Misc
.DS_Store
2 changes: 2 additions & 0 deletions .isort.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[settings]
known_third_party =click,confluent_kafka,cotyledon,pytest,pytest_mock
70 changes: 70 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks

repos:

- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.0.1
hooks:
- id: end-of-file-fixer
- id: trailing-whitespace
- id: check-yaml

- repo: https://github.com/asottile/pyupgrade
rev: v2.19.4
hooks:
- id: pyupgrade
args: ["--py38-plus", "--keep-runtime-typing"]

- repo: https://github.com/asottile/seed-isort-config
rev: v2.2.0
hooks:
- id: seed-isort-config

- repo: https://github.com/pre-commit/mirrors-isort
rev: v5.8.0
hooks:
- id: isort
additional_dependencies:
- toml

- repo: https://github.com/ambv/black
rev: 21.6b0
hooks:
- id: black
args: [--line-length=88, --safe]

- repo: https://github.com/myint/autoflake
rev: v1.4
hooks:
- id: autoflake
args: [--in-place, --remove-all-unused-import]

- repo: https://github.com/pycqa/flake8
rev: 3.9.2
hooks:
- id: flake8

- repo: https://github.com/pycqa/pylint
rev: v2.8.3
hooks:
- id: pylint
args: [--extension-pkg-whitelist=confluent_kafka]
additional_dependencies: ["click", "confluent_kafka", "cotyledon", "pytest", "pytest_mock"]

# - repo: https://github.com/pre-commit/mirrors-mypy
# rev: v0.902
# hooks:
# - id: mypy
# args: ["--config-file=pyproject.toml"]
# files: ^eventbusk/
# additional_dependencies: ["types-click"]

- repo: local
hooks:
- id: pytest-check
name: pytest-check
entry: pytest
language: system
pass_filenames: false
always_run: true
50 changes: 48 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,48 @@
# eventbusk
Event bus with Kafka
# eventbusk - Event Bus Framework

- Version:
- [Download](https://github.com/Airbase/eventbusk/)
- [Source](https://github.com/Airbase/eventbusk/)
- Keywords: event-bus, distributed, stream, processing, data, queue, kafka, python

## Install

```bash
pip install git+https://github.com/Airbase/eventbusk.git@SM-event-bus
```

## Quick Start

```python
from eventbus import Event, EventBus

# create an app instance of the bus
bus = EventBus(broker="kafka://localhost:9092")

# define an event as a dataclass
@dataclass
class Foo(Event):
foo: int

# register the event to a single topic
bus.register_event("topic_foo", Foo)

# Define an method that receives that event
@bus.receive(event_type=Foo)
def process_a(event):
logger.info(f"Foo: {event}")


# Publish an event to the bus
foo = Foo(foo=1)
bus.send(f)
```

## Examples

See `examples/eventbus.py` for a concrete example.
You can run workers for all the receivers

```bash
eventbusk worker -A eventbus:bus
```
8 changes: 8 additions & 0 deletions eventbusk/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
"""
Event Bus Framework
"""
from __future__ import annotations

from .bus import Event, EventBus

__all__ = ["EventBus", "Event"]
45 changes: 45 additions & 0 deletions eventbusk/brokers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""
Generic interface for brokers
"""
from __future__ import annotations

import logging

from .base import BaseConsumer, BaseProducer, DeliveryCallBackT
from .dummy import Consumer as DummyConsumer
from .dummy import Producer as DummyProducer
from .kafka import Consumer as KafkaConsumer
from .kafka import Producer as KafkaProducer

logger = logging.getLogger(__name__)


__all__ = ["Consumer", "Producer", "DeliveryCallBackT"]


def consumer_factory(broker: str, topic: str, group: str) -> BaseConsumer:
"""
Return a consumer instance for the specied broker url
"""
if broker.startswith("kafka"):
return KafkaConsumer(broker=broker, topic=topic, group=group)
if broker.startswith("dummy"):
return DummyConsumer(broker=broker, topic=topic, group=group)
raise ValueError("Unsupported broker.")


Consumer = consumer_factory


def producer_factory(broker: str) -> BaseProducer:
"""
Return a producer instance for the specied broker url
"""
if broker.startswith("kafka"):
return KafkaProducer(broker)
if broker.startswith("dummy"):
return DummyProducer(broker)
raise ValueError("Unsupported broker.")


Producer = producer_factory
123 changes: 123 additions & 0 deletions eventbusk/brokers/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
"""
Base interface for event consumer and producers.
"""
from __future__ import annotations

import logging
from abc import ABC, abstractmethod
from contextlib import ContextDecorator
from types import TracebackType
from typing import Callable, Optional, Type, Union

from confluent_kafka import cimpl # type: ignore

logger = logging.getLogger(__name__)


__all__ = [
"BaseBrokerURI",
"BaseConsumer",
"BaseProducer",
]

# Type hints
# callback method `on_delivery` on the producer
DeliveryCallBackT = Callable[..., None]
MessageT = Union[str, bytes, cimpl.Message]


class BaseBrokerURI(ABC):
"""
Base class that defines the interface for all broker URIs
"""

@classmethod
@abstractmethod
def from_uri(cls, uri: str) -> BaseBrokerURI:
"""
Return a instance created from a URI
"""


class BaseConsumer(ContextDecorator, ABC):
"""
Base class for consumers
All event consumers are exposed as a ContextDecorator, so it can be used via a
`with` statement and any connections are automatically closed on exit.
"""

broker: BaseBrokerURI
topic: str
group: str

def __repr__(self) -> str:
return (
f"<{self.__class__.__name__}("
f"broker=*, "
f"topic={self.topic}, "
f"group='{self.group}')>"
)

def __enter__(self) -> BaseConsumer:
return self

def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
exc_traceback: Optional[TracebackType],
) -> None:
pass

@abstractmethod
def poll(self, timeout: int) -> Optional[MessageT]: # type: ignore
"""
Poll for a specified time in seconds for new messages
"""

@abstractmethod
def ack(self, message: str) -> None:
"""
Acknowledge successful consumption of a message.
"""


class BaseProducer(ABC):
"""
Base class for producers
"""

def __repr__(self) -> str:
return f"<{self.__class__.__name__}(" f"broker=*>"

@abstractmethod
def __init__(self, broker: str):
super().__init__()

@abstractmethod
def produce( # type: ignore # pylint: disable=too-many-arguments
self,
topic: str,
value: MessageT,
flush: bool = True,
on_delivery: DeliveryCallBackT = None,
fail_silently: bool = False,
) -> None:
"""
Send a message on the specific topic.
Arguments
----------
topic:
The name of the topic
value:
Serialized message to send.
on_delivery:
Callback function on delivery of a message.
flush:
Flush any pending messages after every send.
Useful for brokers like Kafka which do batches.
fail_silently:
If True, ignore all delivery errors.
"""
Loading

0 comments on commit 1b49e61

Please sign in to comment.