From a546cf27e14c5ecfe31f54ce3eb113e42b1467c3 Mon Sep 17 00:00:00 2001 From: Luis Villanueva Date: Tue, 20 Sep 2022 12:15:09 -0500 Subject: [PATCH] add extra_kwargs property --- mockintosh/builders.py | 3 ++- mockintosh/config.py | 4 +++- mockintosh/definition.py | 3 ++- mockintosh/schema.json | 4 ++++ mockintosh/services/asynchronous/kafka.py | 7 +++++-- 5 files changed, 16 insertions(+), 5 deletions(-) diff --git a/mockintosh/builders.py b/mockintosh/builders.py index c8e06d204..359dd4cd6 100644 --- a/mockintosh/builders.py +++ b/mockintosh/builders.py @@ -144,7 +144,8 @@ def build_config_async_service(self, data: dict, internal_service_id: Union[int, actors=[], name=data.get('name', None), ssl=data.get('ssl', False), - internal_service_id=internal_service_id + internal_service_id=internal_service_id, + extra_kwargs=data.get('extra_kwargs', None) ) actors = [] diff --git a/mockintosh/config.py b/mockintosh/config.py index c726c75ae..03337546b 100644 --- a/mockintosh/config.py +++ b/mockintosh/config.py @@ -229,7 +229,8 @@ def __init__( actors: List[ConfigActor] = [], name: Union[str, None] = None, ssl: bool = False, - internal_service_id: Union[int, None] = None + internal_service_id: Union[int, None] = None, + extra_kwargs: Union[dict, None] = None ): super().__init__(_type, name, internal_service_id) ConfigAsyncService.services.append(self) @@ -237,6 +238,7 @@ def __init__( self.address = address self.actors = actors self.ssl = ssl + self.extra_kwargs = extra_kwargs def get_hint(self): return '%s://%s' % (self.type, self.address) if self.name is None else self.name diff --git a/mockintosh/definition.py b/mockintosh/definition.py index 3df52821a..8689d6a2b 100644 --- a/mockintosh/definition.py +++ b/mockintosh/definition.py @@ -399,7 +399,8 @@ def analyze_async_service( name=service.name, definition=self, _id=service.internal_service_id, - ssl=service.ssl + ssl=service.ssl, + extra_kwargs=service.extra_kwargs ) service._impl = async_service diff --git a/mockintosh/schema.json b/mockintosh/schema.json index 7cdbf3893..be61b1fe8 100644 --- a/mockintosh/schema.json +++ b/mockintosh/schema.json @@ -26,6 +26,10 @@ "mqtt" ] }, + "extra_kwargs": { + "type": "object", + "default": {} + }, "actors": { "type": "array", "items": { diff --git a/mockintosh/services/asynchronous/kafka.py b/mockintosh/services/asynchronous/kafka.py index fa06ab3fb..1f4799527 100644 --- a/mockintosh/services/asynchronous/kafka.py +++ b/mockintosh/services/asynchronous/kafka.py @@ -103,6 +103,7 @@ def consume(self) -> None: 'bootstrap.servers': first_actor.service.address, 'group.id': '0', 'auto.offset.reset': 'earliest' + **first_actor.service.extra_kwargs, } if first_actor.service.ssl: config['security.protocol'] = 'SSL' @@ -159,7 +160,7 @@ class KafkaProducerPayloadList(AsyncProducerPayloadList): class KafkaProducer(AsyncProducer): def _produce(self, key: str, value: str, headers: dict, payload: AsyncProducerPayload) -> None: - config = {'bootstrap.servers': self.actor.service.address} + config = {'bootstrap.servers': self.actor.service.address, **self.actor.service.extra_kwargs} if self.actor.service.ssl: config['security.protocol'] = 'SSL' producer = Producer(config) @@ -190,7 +191,8 @@ def __init__( name: Union[str, None] = None, definition=None, _id: Union[int, None] = None, - ssl: bool = False + ssl: bool = False, + extra_kwargs: Union[dict, None] = None ): super().__init__( address, @@ -200,6 +202,7 @@ def __init__( ssl=ssl ) self.type = 'kafka' + self.extra_kwargs = extra_kwargs or {} def build_single_payload_producer(