Python Rest Client to interact against schema-registry confluent server to manage Avro and JSON schemas resources.
python 3.8+
pip install python-schema-registry-client
If you want the Faust
functionality:
pip install python-schema-registry-client[faust]
Note that this will automatically add a dependency on the faust-streaming fork of faust. If you want to use the
old faust version, simply install it manually and then install python-schema-registry-client
without the faust
extra enabled, the functionality will
be the same.
Documentation: https://marcosschroh.github.io/python-schema-registry-client.io
from schema_registry.client import SchemaRegistryClient, schema
client = SchemaRegistryClient(url="http://127.0.0.1:8081")
deployment_schema = {
"type": "record",
"namespace": "com.kubertenes",
"name": "AvroDeployment",
"fields": [
{"name": "image", "type": "string"},
{"name": "replicas", "type": "int"},
{"name": "port", "type": "int"},
],
}
avro_schema = schema.AvroSchema(deployment_schema)
schema_id = client.register("test-deployment", avro_schema)
or async
from schema_registry.client import AsyncSchemaRegistryClient, schema
async_client = AsyncSchemaRegistryClient(url="http://127.0.0.1:8081")
deployment_schema = {
"type": "record",
"namespace": "com.kubertenes",
"name": "AvroDeployment",
"fields": [
{"name": "image", "type": "string"},
{"name": "replicas", "type": "int"},
{"name": "port", "type": "int"},
],
}
avro_schema = schema.AvroSchema(deployment_schema)
schema_id = await async_client.register("test-deployment", avro_schema)
from schema_registry.client import SchemaRegistryClient, schema
client = SchemaRegistryClient(url="http://127.0.0.1:8081")
deployment_schema = {
"definitions" : {
"JsonDeployment" : {
"type" : "object",
"required" : ["image", "replicas", "port"],
"properties" : {
"image" : {"type" : "string"},
"replicas" : {"type" : "integer"},
"port" : {"type" : "integer"}
}
}
},
"$ref" : "#/definitions/JsonDeployment"
}
json_schema = schema.JsonSchema(deployment_schema)
schema_id = client.register("test-deployment", json_schema)
or async
from schema_registry.client import AsyncSchemaRegistryClient, schema
async_client = AsyncSchemaRegistryClient(url="http://127.0.0.1:8081")
deployment_schema = {
"definitions" : {
"JsonDeployment" : {
"type" : "object",
"required" : ["image", "replicas", "port"],
"properties" : {
"image" : {"type" : "string"},
"replicas" : {"type" : "integer"},
"port" : {"type" : "integer"}
}
}
},
"$ref" : "#/definitions/JsonDeployment"
}
json_schema = schema.JsonSchema(deployment_schema)
schema_id = await async_client.register("test-deployment", json_schema)
You can generate the avro schema
directely from a python class using dataclasses-avroschema
and use it in the API for register schemas
, check versions
and test compatibility
:
import dataclasses
from dataclasses_avroschema import AvroModel, types
from schema_registry.client import SchemaRegistryClient
client = SchemaRegistryClient(url="http://127.0.0.1:8081")
@dataclasses.dataclass
class UserAdvance(AvroModel):
name: str
age: int
pets: typing.List[str] = dataclasses.field(default_factory=lambda: ["dog", "cat"])
accounts: typing.Dict[str, int] = dataclasses.field(default_factory=lambda: {"key": 1})
has_car: bool = False
favorite_colors: types.Enum = types.Enum(["BLUE", "YELLOW", "GREEN"], default="BLUE")
country: str = "Argentina"
address: str = None
# register the schema
schema_id = client.register(subject, UserAdvance.avro_schema())
print(schema_id)
# >>> 12
result = client.check_version(subject, UserAdvance.avro_schema())
print(result)
# >>> SchemaVersion(subject='dataclasses-avroschema-subject-2', schema_id=12, schema=1, version={"type":"record" ...')
compatibility = client.test_compatibility(subject, UserAdvance.avro_schema())
print(compatibility)
# >>> True
You can generate the json schema directely from a python class using pydantic and use it in the API for register schemas, check versions and test compatibility:
import typing
from enum import Enum
from pydantic import BaseModel
from schema_registry.client import SchemaRegistryClient
client = SchemaRegistryClient(url="http://127.0.0.1:8081")
class ColorEnum(str, Enum):
BLUE = "BLUE"
YELLOW = "YELLOW"
GREEN = "GREEN"
class UserAdvance(BaseModel):
name: str
age: int
pets: typing.List[str] = ["dog", "cat"]
accounts: typing.Dict[str, int] = {"key": 1}
has_car: bool = False
favorite_colors: ColorEnum = ColorEnum.BLUE
country: str = "Argentina"
address: str = None
# register the schema
schema_id = client.register(subject, UserAdvance.model_json_schema(), schema_type="JSON")
print(schema_id)
# >>> 12
result = client.check_version(subject, UserAdvance.model_json_schema(), schema_type="JSON")
print(result)
# >>> SchemaVersion(subject='pydantic-jsonschema-subject', schema_id=12, schema=1, version=<schema_registry.client.schema.JsonSchema object at 0x7f40354550a0>)
compatibility = client.test_compatibility(subject, UserAdvance.model_json_schema(), schema_type="JSON")
print(compatibility)
# >>> True
You can use AvroMessageSerializer
to encode/decode messages in avro
from schema_registry.client import SchemaRegistryClient, schema
from schema_registry.serializers import AvroMessageSerializer
client = SchemaRegistryClient("http://127.0.0.1:8081")
avro_message_serializer = AvroMessageSerializer(client)
avro_user_schema = schema.AvroSchema({
"type": "record",
"namespace": "com.example",
"name": "AvroUsers",
"fields": [
{"name": "first_name", "type": "string"},
{"name": "last_name", "type": "string"},
{"name": "age", "type": "int"},
],
})
# We want to encode the user_record with avro_user_schema
user_record = {
"first_name": "my_first_name",
"last_name": "my_last_name",
"age": 20,
}
# Encode the record
message_encoded = avro_message_serializer.encode_record_with_schema(
"user", avro_user_schema, user_record)
print(message_encoded)
# >>> b'\x00\x00\x00\x00\x01\x1amy_first_name\x18my_last_name('
or with json schemas
from schema_registry.client import SchemaRegistryClient, schema
from schema_registry.serializers import JsonMessageSerializer
client = SchemaRegistryClient("http://127.0.0.1:8081")
json_message_serializer = JsonMessageSerializer(client)
json_schema = schema.JsonSchema({
"definitions" : {
"record:python.test.basic.basic" : {
"description" : "basic schema for tests",
"type" : "object",
"required" : [ "number", "name" ],
"properties" : {
"number" : {
"oneOf" : [ {
"type" : "integer"
}, {
"type" : "null"
} ]
},
"name" : {
"oneOf" : [ {
"type" : "string"
} ]
}
}
}
},
"$ref" : "#/definitions/record:python.test.basic.basic"
})
# Encode the record
basic_record = {
"number": 10,
"name": "a_name",
}
message_encoded = json_message_serializer.encode_record_with_schema(
"basic", json_schema, basic_record)
print(message_encoded)
# >>> b'\x00\x00\x00\x00\x02{"number": 10, "name": "a_name"}'
Usually, we have a situation like this:
So, our producers/consumers have to serialize/deserialize messages every time that they send/receive from Kafka topics. In this picture, we can imagine a Faust
application receiving messages (encoded with an Avro schema) and we want to deserialize them, so we can ask the schema server
to do that for us. In this scenario, the MessageSerializer
is perfect.
Also, could be a use case that we would like to have an Application only to administrate Avro Schemas
(register, update compatibilities, delete old schemas, etc.), so the SchemaRegistryClient
is perfect.
Poetry is needed to install the dependencies and develope locally
- Install dependencies:
poetry install --all-extras
- Code linting:
./scripts/format
- Run tests:
./scripts/test
For commit messages we use commitizen in order to standardize a way of committing rules
Note: The tests are run against the Schema Server
using docker compose
, so you will need
Docker
and Docker Compose
installed.
In a terminal run docker-compose up
. Then in a different terminal run the tests:
./scripts/test
All additional args will be passed to pytest, for example:
./scripts/test ./tests/client/
To perform tests using the python shell you can run the project using docker-compose
.
- Execute
docker-compose up
. Then, theschema registry server
will run onhttp://127.0.0.1:8081
, then you can interact against it using theSchemaRegistryClient
: - Use the python interpreter (get a python shell typing
python
in your command line) - Play with the
schema server
from schema_registry.client import SchemaRegistryClient, schema
client = SchemaRegistryClient(url="http://127.0.0.1:8081")
# do some operations with the client...
deployment_schema = {
"type": "record",
"namespace": "com.kubertenes",
"name": "AvroDeployment",
"fields": [
{"name": "image", "type": "string"},
{"name": "replicas", "type": "int"},
{"name": "port", "type": "int"},
],
}
avro_schema = schema.AvroSchema(deployment_schema)
client.register("test-deployment", avro_schema)
# >>>> Out[5]: 1
Then, you can check the schema using your browser going to the url http://127.0.0.1:8081/schemas/ids/1