Skip to content

Commit

Permalink
pydantic fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
gitcarbs committed Sep 17, 2024
1 parent f594d69 commit 77b3ee3
Show file tree
Hide file tree
Showing 11 changed files with 29 additions and 32 deletions.
2 changes: 1 addition & 1 deletion examples/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

class UserLog(BaseModel):
_is_log_model = True
user: Optional[str]
user: Optional[str] = None


async def test_log() -> None:
Expand Down
8 changes: 4 additions & 4 deletions kafkaesk/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ class Application(Router):
Application configuration
"""

_producer: Optional[aiokafka.AIOKafkaProducer]
_producer: Optional[aiokafka.AIOKafkaProducer] = None

def __init__(
self,
Expand Down Expand Up @@ -535,9 +535,9 @@ async def __aenter__(self) -> "Application":

async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
traceback: Optional[TracebackType],
exc_type: Optional[Type[BaseException]] = None,
exc: Optional[BaseException] = None,
traceback: Optional[TracebackType] = None,
) -> None:
logger.info("Stopping application...", exc_info=exc)
await self.finalize()
Expand Down
2 changes: 1 addition & 1 deletion kafkaesk/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ async def inner(record: aiokafka.ConsumerRecord, span: opentracing.Span) -> None

class BatchConsumer(aiokafka.ConsumerRebalanceListener):
_subscription: Subscription
_close: typing.Optional[asyncio.Future]
_close: typing.Optional[asyncio.Future] = None
_consumer: aiokafka.AIOKafkaConsumer
_offsets: typing.Dict[aiokafka.TopicPartition, int]
_message_handler: typing.Callable
Expand Down
8 changes: 4 additions & 4 deletions kafkaesk/ext/logging/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ def __init__(
exc_info: Union[
Tuple[type, BaseException, Optional[TracebackType]], Tuple[None, None, None], None
],
func: Optional[str],
sinfo: Optional[str],
func: Optional[str] = None,
sinfo: Optional[str] = None,
pydantic_data: Optional[List[pydantic.BaseModel]] = None,
):
super().__init__(name, level, fn, lno, msg, args, exc_info, func, sinfo)
Expand All @@ -41,8 +41,8 @@ def factory(
exc_info: Union[
Tuple[type, BaseException, Optional[TracebackType]], Tuple[None, None, None], None
],
func: Optional[str],
sinfo: Optional[str],
func: Optional[str] = None,
sinfo: Optional[str] = None,
) -> PydanticLogRecord:
pydantic_data: List[pydantic.BaseModel] = []

Expand Down
6 changes: 3 additions & 3 deletions kafkaesk/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@


class KafkaTopicManager:
_admin_client: Optional[kafka.admin.client.KafkaAdminClient]
_client: Optional[kafka.KafkaClient]
_kafka_api_version: Optional[Tuple[int, ...]]
_admin_client: Optional[kafka.admin.client.KafkaAdminClient] = None
_client: Optional[kafka.KafkaClient] = None
_kafka_api_version: Optional[Tuple[int, ...]] = None

def __init__(
self,
Expand Down
6 changes: 3 additions & 3 deletions kafkaesk/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ def __enter__(self) -> None:

def __exit__(
self,
exc_type: Optional[Type[Exception]],
exc_value: Optional[Exception],
exc_traceback: Optional[traceback.StackSummary],
exc_type: Optional[Type[Exception]] = None,
exc_value: Optional[Exception] = None,
exc_traceback: Optional[traceback.StackSummary] = None,
) -> None:
error = NOERROR
if self.histogram is not None:
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
[tool.poetry]
name = "kafkaesk"
version = "0.8.4"
version = "0.8.5"
description = "Easy publish and subscribe to events with python and Kafka."
authors = ["vangheem <[email protected]>", "pfreixes <[email protected]>"]
classifiers = [
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Framework :: AsyncIO",
"License :: OSI Approved :: BSD License",
"Topic :: System :: Distributed Computing"
Expand Down
9 changes: 6 additions & 3 deletions stubs/aiokafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ class AIOKafkaConsumer:
_client: AIOKafkaClient
_coordinator: GroupCoordinator
_subscription: Subscription
_group_id: Optional[str]
_group_id: Optional[str] = None

def __init__(
self,
bootstrap_servers: List[str],
loop: AbstractEventLoop,
group_id: Optional[str],
group_id: Optional[str] = None,
api_version: str = "auto",
**kwargs: Any,
):
Expand All @@ -95,7 +95,10 @@ async def getone(self, *partitions: Optional[List[TopicPartition]]) -> ConsumerR
...

async def subscribe(
self, topics: Optional[List[str]] = None, pattern: Optional[str] = None, listener: Optional["ConsumerRebalanceListener"] = None
self,
topics: Optional[List[str]] = None,
pattern: Optional[str] = None,
listener: Optional["ConsumerRebalanceListener"] = None,
) -> None:
...

Expand Down
2 changes: 1 addition & 1 deletion stubs/kafka/structs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class ConsumerRecord:
topic: str
value: bytes
key: bytes
headers: Optional[List[Tuple[str, bytes]]]
headers: Optional[List[Tuple[str, bytes]]] = None


class TopicPartition:
Expand Down
11 changes: 2 additions & 9 deletions tests/acceptance/ext/logging/test_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ def logger():

@pytest_asyncio.fixture(scope="function")
def stream_handler(logger):

stream = io.StringIO()
handler = PydanticStreamHandler(stream=stream)
logger.addHandler(handler)
Expand Down Expand Up @@ -76,7 +75,6 @@ async def consume(data: PydanticLogModel):

class TestPydanticStreamHandler:
async def test_stream_handler(self, stream_handler, logger):

logger.info("Test Message %s", "extra")

message = stream_handler.getvalue()
Expand All @@ -86,7 +84,7 @@ async def test_stream_handler(self, stream_handler, logger):
async def test_stream_handler_with_log_model(self, stream_handler, logger):
class LogModel(pydantic.BaseModel):
_is_log_model = True
foo: Optional[str]
foo: Optional[str] = None

logger.info("Test Message %s", "extra", LogModel(foo="bar"))

Expand All @@ -112,7 +110,6 @@ class LogModel(pydantic.BaseModel):

class TestPydanticKafkaeskHandler:
async def test_kafka_handler(self, app, kafakesk_handler, logger, log_consumer):

async with app:
logger.info("Test Message %s", "extra")
await app.flush()
Expand All @@ -124,7 +121,7 @@ async def test_kafka_handler(self, app, kafakesk_handler, logger, log_consumer):
async def test_kafka_handler_with_log_model(self, app, kafakesk_handler, logger, log_consumer):
class LogModel(pydantic.BaseModel):
_is_log_model = True
foo: Optional[str]
foo: Optional[str] = None

async with app:
logger.info("Test Message %s", "extra", LogModel(foo="bar"))
Expand Down Expand Up @@ -184,7 +181,6 @@ def test_emit_drops_message_on_runtime_error_start(self):
class TestKafkaeskQueue:
@pytest_asyncio.fixture(scope="function")
async def queue(self, request, app):

max_queue = 10000
for marker in request.node.iter_markers("with_max_queue"):
max_queue = marker.args[0]
Expand Down Expand Up @@ -214,7 +210,6 @@ async def consume(data: PydanticLogModel):
assert len(consumed) == 1

async def test_queue_flush(self, app, queue, log_consumer):

async with app:
queue.start()
for i in range(10):
Expand All @@ -228,7 +223,6 @@ async def test_queue_flush(self, app, queue, log_consumer):
assert len(log_consumer) == 10

async def test_queue_flush_on_close(self, app, queue, log_consumer):

async with app:
queue.start()
await asyncio.sleep(0.1)
Expand All @@ -245,7 +239,6 @@ async def test_queue_flush_on_close(self, app, queue, log_consumer):

@pytest.mark.with_max_queue(1)
async def test_queue_max_size(self, app, queue):

queue.start()
queue.put_nowait("log.test", PydanticLogModel())

Expand Down
4 changes: 2 additions & 2 deletions tests/acceptance/ext/logging/test_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async def test_factory_return_type() -> None:
async def test_factory_adds_pydantic_models() -> None:
class LogModel(pydantic.BaseModel):
_is_log_model = True
foo: Optional[str]
foo: Optional[str] = None

record = factory(
name="logger.test",
Expand Down Expand Up @@ -66,7 +66,7 @@ async def test_factory_formats_msg() -> None:
async def test_factory_formats_msg_and_adds_pydantic_model() -> None:
class LogModel(pydantic.BaseModel):
_is_log_model = True
foo: Optional[str]
foo: Optional[str] = None

record = factory(
name="logger.test",
Expand Down

0 comments on commit 77b3ee3

Please sign in to comment.