Skip to content

Commit

Permalink
Add schema resolution support
Browse files Browse the repository at this point in the history
Using AvroC's resolving writer/reader allows the implementation
of schema resolution. In the case where a new "reader schema"
is used while reading avro records, this schema will be applied
rather than the originating schema (confusingly referred to as
the "writer schema"). In cases when writing avro records the
so-called "reader schema" is used to resolve to the new schema,
allowing such things like promotion of types (where valid).
This ends up fixing #30 where the need is to skip fields that
are being resolved for performance reasons.
  • Loading branch information
ChrisRx committed May 3, 2018
1 parent 83cc46c commit 4b3421a
Show file tree
Hide file tree
Showing 6 changed files with 253 additions and 45 deletions.
2 changes: 1 addition & 1 deletion quickavro/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"""

__title__ = 'quickavro'
__version__ = '0.1.22'
__version__ = '0.2.0'
__authors__ = ['Chris Marshall']
__license__ = 'Apache 2.0'
__all__ = ['BinaryEncoder', 'Enum', 'FileReader', 'FileWriter']
Expand Down
23 changes: 20 additions & 3 deletions quickavro/encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def write_header(schema, sync_marker, codec="null"):
:param sync_marker: str used to verify blocks.
:param codec: (optional) Compression codec.
"""
with BinaryEncoder(HEADER_SCHEMA, codec) as encoder:
with BinaryEncoder(HEADER_SCHEMA, codec=codec) as encoder:
header = {
"magic": MAGIC,
"meta": {
Expand All @@ -43,6 +43,7 @@ def write_header(schema, sync_marker, codec="null"):
}
return encoder.write(header)

# class ResolvedWriter(BinaryEncoder):

class BinaryEncoder(Encoder):
"""
Expand Down Expand Up @@ -74,14 +75,17 @@ class BinaryEncoder(Encoder):
f.write(block)
"""

def __init__(self, schema=None, codec="null"):
def __init__(self, schema=None, reader_schema=None, codec="null"):
super(BinaryEncoder, self).__init__()
self._codec = None
self._schema = None
self._reader_schema = None
self.sync_marker = os.urandom(SYNC_SIZE)
self.codec = codec
if schema:
self.schema = schema
if reader_schema is not None:
self.reader_schema = reader_schema
self.block = []
self.block_count = 0
self.block_size = 0
Expand Down Expand Up @@ -130,6 +134,16 @@ def read_header(self, data):
data = data[offset:]
return header, data

@property
def reader_schema(self):
return self._reader_schema

@reader_schema.setter
def reader_schema(self, schema):
self._reader_schema = schema
if self._schema:
self.set_schema(json.dumps(self._schema), reader_schema=json.dumps(self._reader_schema))

@property
def schema(self):
if not self._schema:
Expand All @@ -139,7 +153,10 @@ def schema(self):
@schema.setter
def schema(self, schema):
self._schema = schema
self.set_schema(json.dumps(schema))
if self._reader_schema:
self.set_schema(json.dumps(self._schema), reader_schema=json.dumps(self._reader_schema))
return
self.set_schema(json.dumps(self._schema))

def write_block(self):
data = b"".join(self.block)
Expand Down
9 changes: 7 additions & 2 deletions quickavro/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from . import _quickavro


class FileReader(BinaryEncoder):
"""
The :class:`FileReader` object implements :class:`quickavro.BinaryEncoder`
Expand All @@ -29,15 +30,19 @@ class FileReader(BinaryEncoder):
print(record)
"""

def __init__(self, f, header_size=INITIAL_HEADER_SIZE):
def __init__(self, f, header_size=INITIAL_HEADER_SIZE, reader_schema=None):
super(FileReader, self).__init__()
if isinstance(f, basestring):
self.f = open(f, 'rb')
else:
self.f = f
header = self.read_header(header_size)
metadata = header.get('meta')
self.schema = json.loads(ensure_str(metadata.get('avro.schema')))
if reader_schema is not None:
self._schema = json.loads(ensure_str(metadata.get('avro.schema')))
self.reader_schema = reader_schema
else:
self.schema = json.loads(ensure_str(metadata.get('avro.schema')))
self.codec = ensure_str(metadata.get('avro.codec', 'null'))
self.sync_marker = header.get('sync')

Expand Down
205 changes: 168 additions & 37 deletions src/encoderobject.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,20 @@ PyObject *SchemaError;
PyObject *WriteError;

static void Encoder_dealloc(Encoder* self) {
if (self->schema != NULL) {
avro_schema_decref(self->schema);
if (self->reader_iface != NULL) {
avro_value_iface_decref(self->reader_iface);
}
if (self->iface != NULL) {
avro_value_iface_decref(self->iface);
if (self->reader_schema != NULL) {
avro_schema_decref(self->reader_schema);
}
if (self->writer_schema != NULL) {
avro_schema_decref(self->writer_schema);
}
if (self->writer_iface != NULL) {
avro_value_iface_decref(self->writer_iface);
}
if (self->resolver != NULL) {
avro_value_iface_decref(self->resolver);
}
if (self->reader != NULL) {
avro_reader_free(self->reader);
Expand All @@ -51,10 +60,14 @@ static PyObject* Encoder_new(PyTypeObject* type, PyObject* args, PyObject* kwds)

self = (Encoder*)type->tp_alloc(type, 0);
self->buffer = NULL;
self->resolver = NULL;
self->reader = NULL;
self->writer = NULL;
self->iface = NULL;
self->schema = NULL;
self->reader_iface = NULL;
self->reader_schema = NULL;
self->writer_iface = NULL;
self->writer_schema = NULL;
self->reader_schema = NULL;
return (PyObject*)self;
}

Expand All @@ -68,22 +81,42 @@ static int Encoder_init(Encoder* self, PyObject* args, PyObject* kwds) {

static PyObject* Encoder_read(Encoder* self, PyObject* args) {
Py_buffer buffer;
int rval;

if (!PyArg_ParseTuple(args, "s*", &buffer)) {
Py_RETURN_NONE;
}
avro_value_t value;
avro_reader_memory_set_source(self->reader, buffer.buf, buffer.len);
avro_generic_value_new(self->iface, &value);
PyObject* values = PyList_New(0);
while ((rval = avro_value_read(self->reader, &value)) == 0) {
PyObject* item = avro_to_python(&value);
avro_value_t writer_value;
avro_value_t reader_value;
int rval;
avro_reader_memory_set_source(self->reader, buffer.buf, buffer.len);
if ((rval = avro_generic_value_new(self->reader_iface, &reader_value)) != 0) {
PyErr_Format(ReadError, "%s", avro_strerror());
return NULL;
};
if (self->writer_iface != NULL) {
if ((rval = avro_resolved_writer_new_value(self->writer_iface, &writer_value)) != 0) {
PyErr_Format(ReadError, "%s", avro_strerror());
return NULL;
}
avro_resolved_writer_set_dest(&writer_value, &reader_value);
while ((rval = avro_value_read(self->reader, &writer_value)) == 0) {
PyObject* item = avro_to_python(&reader_value);
PyList_Append(values, item);
avro_value_reset(&reader_value);
Py_DECREF(item);
}
avro_value_decref(&reader_value);
avro_value_decref(&writer_value);
PyBuffer_Release(&buffer);
return values;
}
while ((rval = avro_value_read(self->reader, &reader_value)) == 0) {
PyObject* item = avro_to_python(&reader_value);
PyList_Append(values, item);
avro_value_reset(&value);
avro_value_reset(&reader_value);
Py_DECREF(item);
}
avro_value_decref(&value);
avro_value_decref(&reader_value);
PyBuffer_Release(&buffer);
return values;
}
Expand Down Expand Up @@ -122,19 +155,48 @@ static PyObject* Encoder_read_record(Encoder* self, PyObject* args) {
if (!PyArg_ParseTuple(args, "s*", &buffer)) {
Py_RETURN_NONE;
}
avro_value_t value;
avro_value_t reader_value;
avro_value_t writer_value;
avro_reader_memory_set_source(self->reader, buffer.buf, buffer.len);
avro_generic_value_new(self->iface, &value);
if ((rval = avro_value_read(self->reader, &value)) != 0) {
if ((rval = avro_generic_value_new(self->reader_iface, &reader_value)) != 0) {
PyErr_Format(ReadError, "%s", avro_strerror());
return NULL;
};
if (self->writer_iface != NULL) {
if ((rval = avro_resolved_writer_new_value(self->writer_iface, &writer_value)) != 0) {
PyErr_Format(ReadError, "%s", avro_strerror());
return NULL;
}
avro_resolved_writer_set_dest(&writer_value, &reader_value);
if ((rval = avro_value_read(self->reader, &writer_value)) != 0) {
PyErr_Format(ReadError, "%s", avro_strerror());
return NULL;
}
obj = avro_to_python(&reader_value);
if ((rval = avro_value_sizeof(&reader_value, &record_size)) != 0) {
PyErr_Format(ReadError, "%s", avro_strerror());
return NULL;
}
avro_value_decref(&reader_value);
avro_value_decref(&writer_value);
PyBuffer_Release(&buffer);
// TODO: refcount wrong, there are others
// TODO: also check all the PyLong_ calls to make sure they
// have the correct size types
PyObject *ret = Py_BuildValue("(OO)", obj, PyLong_FromLong(record_size));
Py_XDECREF(obj);
return ret;
}
obj = avro_to_python(&value);
if ((rval = avro_value_sizeof(&value, &record_size)) != 0) {
if ((rval = avro_value_read(self->reader, &reader_value)) != 0) {
PyErr_Format(ReadError, "%s", avro_strerror());
return NULL;
}
avro_value_decref(&value);
obj = avro_to_python(&reader_value);
if ((rval = avro_value_sizeof(&reader_value, &record_size)) != 0) {
PyErr_Format(ReadError, "%s", avro_strerror());
return NULL;
}
avro_value_decref(&reader_value);
PyBuffer_Release(&buffer);
// TODO: refcount wrong, there are others
// TODO: also check all the PyLong_ calls to make sure they
Expand All @@ -144,21 +206,52 @@ static PyObject* Encoder_read_record(Encoder* self, PyObject* args) {
return ret;
}

static PyObject* Encoder_set_schema(Encoder* self, PyObject* args) {
char* json_str;
static PyObject* Encoder_set_schema(Encoder* self, PyObject* args, PyObject *kwargs) {
char* json_str = NULL;
char* reader_schema = NULL;
avro_schema_error_t error;

if (!PyArg_ParseTuple(args, "s", &json_str)) {
PyErr_SetString(SchemaError, "Not provided valid arguments");
static char *kwlist[] = {"json_str", "reader_schema", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|s", kwlist, &json_str, &reader_schema)) {
return NULL;
}
int r = avro_schema_from_json(json_str, 0, &self->schema, &error);
if (r != 0 || self->schema == NULL) {
self->schema = NULL;
int r = avro_schema_from_json(json_str, 0, &self->writer_schema, &error);
if (r != 0 || self->writer_schema == NULL) {
self->writer_schema = NULL;
PyErr_Format(SchemaError, "%s", avro_strerror());
return NULL;
}
self->iface = avro_generic_class_from_schema(self->schema);
if (reader_schema != NULL) {
int r = avro_schema_from_json(reader_schema, 0, &self->reader_schema, &error);
if (r != 0 || self->reader_schema == NULL) {
self->reader_schema = NULL;
PyErr_Format(SchemaError, "%s", avro_strerror());
return NULL;
}
self->writer_iface = avro_resolved_writer_new(self->writer_schema, self->reader_schema);
if (self->writer_iface == NULL) {
PyErr_Format(SchemaError, "%s", avro_strerror());
return NULL;
}
self->resolver = avro_resolved_reader_new(self->writer_schema, self->reader_schema);
if (self->resolver == NULL) {
PyErr_Format(SchemaError, "%s", avro_strerror());
return NULL;
}
self->reader_iface = avro_generic_class_from_schema(self->reader_schema);
} else {
self->reader_iface = avro_generic_class_from_schema(self->writer_schema);
// ensure writer_iface is cleaned up if set_schema is set without
// reader_schema
if (self->writer_iface != NULL) {
avro_value_iface_decref(self->writer_iface);
self->writer_iface = NULL;
}
if (self->resolver != NULL) {
avro_value_iface_decref(self->resolver);
self->resolver = NULL;
}
}
return Py_BuildValue("i", 0);
}

Expand All @@ -169,11 +262,49 @@ static PyObject* Encoder_write(Encoder* self, PyObject* args) {
if (!PyArg_ParseTuple(args, "O", &obj)) {
Py_RETURN_NONE;
}
avro_value_t value;
avro_generic_value_new(self->iface, &value);
rval = python_to_avro(obj, &value);
avro_value_t reader_value;
if ((rval = avro_generic_value_new(self->reader_iface, &reader_value)) != 0) {
PyErr_Format(ReadError, "%s", avro_strerror());
return NULL;
};
if (self->resolver != NULL) {
avro_value_t resolved;
if ((rval = python_to_avro(obj, &reader_value)) == 0) {
rval = avro_value_write(self->writer, &reader_value);
}
if ((rval = avro_resolved_reader_new_value(self->resolver, &resolved)) != 0) {
PyErr_Format(WriteError, "%s", avro_strerror());
return NULL;
}
avro_resolved_reader_set_source(&resolved, &reader_value);
size_t new_size;

while (rval == ENOSPC) {
new_size = self->buffer_length * 2;
self->buffer = (char*)avro_realloc(self->buffer, self->buffer_length, new_size);
if (!self->buffer) {
PyErr_NoMemory();
return NULL;
}
self->buffer_length = new_size;
avro_writer_memory_set_dest(self->writer, self->buffer, self->buffer_length);
rval = avro_value_write(self->writer, &resolved);
}

if (rval) {
avro_value_decref(&resolved);
avro_value_decref(&reader_value);
return NULL;
}
PyObject* s = PyBytes_FromStringAndSize(self->buffer, avro_writer_tell(self->writer));
avro_writer_reset(self->writer);
avro_value_decref(&resolved);
avro_value_decref(&reader_value);
return s;
}
rval = python_to_avro(obj, &reader_value);
if (rval == 0) {
rval = avro_value_write(self->writer, &value);
rval = avro_value_write(self->writer, &reader_value);
}
size_t new_size;

Expand All @@ -186,16 +317,16 @@ static PyObject* Encoder_write(Encoder* self, PyObject* args) {
}
self->buffer_length = new_size;
avro_writer_memory_set_dest(self->writer, self->buffer, self->buffer_length);
rval = avro_value_write(self->writer, &value);
rval = avro_value_write(self->writer, &reader_value);
}

if (rval) {
avro_value_decref(&value);
avro_value_decref(&reader_value);
return NULL;
}
PyObject* s = PyBytes_FromStringAndSize(self->buffer, avro_writer_tell(self->writer));
avro_writer_reset(self->writer);
avro_value_decref(&value);
avro_value_decref(&reader_value);
return s;
}

Expand Down Expand Up @@ -223,7 +354,7 @@ static PyMethodDef Encoder_methods[] = {
{"read", (PyCFunction)Encoder_read, METH_VARARGS, ""},
{"read_long", (PyCFunction)Encoder_read_long, METH_VARARGS, ""},
{"read_record", (PyCFunction)Encoder_read_record, METH_VARARGS, ""},
{"set_schema", (PyCFunction)Encoder_set_schema, METH_VARARGS, ""},
{"set_schema", (PyCFunction)Encoder_set_schema, METH_VARARGS|METH_KEYWORDS, ""},
{"write", (PyCFunction)Encoder_write, METH_VARARGS, ""},
{"write_long", (PyCFunction)Encoder_write_long, METH_VARARGS, ""},
{NULL} /* Sentinel */
Expand Down
Loading

0 comments on commit 4b3421a

Please sign in to comment.