diff --git a/mockintosh/builders.py b/mockintosh/builders.py index c8e06d20..359dd4cd 100644 --- a/mockintosh/builders.py +++ b/mockintosh/builders.py @@ -144,7 +144,8 @@ def build_config_async_service(self, data: dict, internal_service_id: Union[int, actors=[], name=data.get('name', None), ssl=data.get('ssl', False), - internal_service_id=internal_service_id + internal_service_id=internal_service_id, + extra_kwargs=data.get('extra_kwargs', None) ) actors = [] diff --git a/mockintosh/config.py b/mockintosh/config.py index c726c75a..03337546 100644 --- a/mockintosh/config.py +++ b/mockintosh/config.py @@ -229,7 +229,8 @@ def __init__( actors: List[ConfigActor] = [], name: Union[str, None] = None, ssl: bool = False, - internal_service_id: Union[int, None] = None + internal_service_id: Union[int, None] = None, + extra_kwargs: Union[dict, None] = None ): super().__init__(_type, name, internal_service_id) ConfigAsyncService.services.append(self) @@ -237,6 +238,7 @@ def __init__( self.address = address self.actors = actors self.ssl = ssl + self.extra_kwargs = extra_kwargs def get_hint(self): return '%s://%s' % (self.type, self.address) if self.name is None else self.name diff --git a/mockintosh/definition.py b/mockintosh/definition.py index 3df52821..8689d6a2 100644 --- a/mockintosh/definition.py +++ b/mockintosh/definition.py @@ -399,7 +399,8 @@ def analyze_async_service( name=service.name, definition=self, _id=service.internal_service_id, - ssl=service.ssl + ssl=service.ssl, + extra_kwargs=service.extra_kwargs ) service._impl = async_service diff --git a/mockintosh/schema.json b/mockintosh/schema.json index 7cdbf389..be61b1fe 100644 --- a/mockintosh/schema.json +++ b/mockintosh/schema.json @@ -26,6 +26,10 @@ "mqtt" ] }, + "extra_kwargs": { + "type": "object", + "default": {} + }, "actors": { "type": "array", "items": { diff --git a/mockintosh/services/asynchronous/kafka.py b/mockintosh/services/asynchronous/kafka.py index fa06ab3f..1f479952 100644 --- a/mockintosh/services/asynchronous/kafka.py +++ b/mockintosh/services/asynchronous/kafka.py @@ -103,6 +103,7 @@ def consume(self) -> None: 'bootstrap.servers': first_actor.service.address, 'group.id': '0', 'auto.offset.reset': 'earliest' + **first_actor.service.extra_kwargs, } if first_actor.service.ssl: config['security.protocol'] = 'SSL' @@ -159,7 +160,7 @@ class KafkaProducerPayloadList(AsyncProducerPayloadList): class KafkaProducer(AsyncProducer): def _produce(self, key: str, value: str, headers: dict, payload: AsyncProducerPayload) -> None: - config = {'bootstrap.servers': self.actor.service.address} + config = {'bootstrap.servers': self.actor.service.address, **self.actor.service.extra_kwargs} if self.actor.service.ssl: config['security.protocol'] = 'SSL' producer = Producer(config) @@ -190,7 +191,8 @@ def __init__( name: Union[str, None] = None, definition=None, _id: Union[int, None] = None, - ssl: bool = False + ssl: bool = False, + extra_kwargs: Union[dict, None] = None ): super().__init__( address, @@ -200,6 +202,7 @@ def __init__( ssl=ssl ) self.type = 'kafka' + self.extra_kwargs = extra_kwargs or {} def build_single_payload_producer(