-
Notifications
You must be signed in to change notification settings - Fork 271
Add partial support for from_protobuf #14062
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 3 commits
2ab5557
084e9c2
7606925
c6cde2d
6d4eb16
044ea96
6c1369e
a77d90e
2a66f9a
f175207
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -857,6 +857,116 @@ def gen_bytes(): | |||||
| return bytes([ rand.randint(0, 255) for _ in range(length) ]) | ||||||
| self._start(rand, gen_bytes) | ||||||
|
|
||||||
|
|
||||||
| # ----------------------------------------------------------------------------- | ||||||
| # Protobuf (simple types) generators/utilities (for from_protobuf/to_protobuf tests) | ||||||
| # ----------------------------------------------------------------------------- | ||||||
|
|
||||||
| _PROTOBUF_WIRE_VARINT = 0 | ||||||
| _PROTOBUF_WIRE_64BIT = 1 | ||||||
| _PROTOBUF_WIRE_LEN_DELIM = 2 | ||||||
| _PROTOBUF_WIRE_32BIT = 5 | ||||||
|
|
||||||
| def _encode_protobuf_uvarint(value): | ||||||
| """Encode a non-negative integer as protobuf varint.""" | ||||||
| if value is None: | ||||||
| raise ValueError("value must not be None") | ||||||
| if value < 0: | ||||||
| raise ValueError("uvarint only supports non-negative integers") | ||||||
| out = bytearray() | ||||||
| v = int(value) | ||||||
| while True: | ||||||
| b = v & 0x7F | ||||||
| v >>= 7 | ||||||
| if v: | ||||||
| out.append(b | 0x80) | ||||||
| else: | ||||||
| out.append(b) | ||||||
| break | ||||||
| return bytes(out) | ||||||
|
|
||||||
| def _encode_protobuf_key(field_number, wire_type): | ||||||
| return _encode_protobuf_uvarint((int(field_number) << 3) | int(wire_type)) | ||||||
|
|
||||||
| def _encode_protobuf_field(field_number, spark_type, value): | ||||||
| """ | ||||||
| Encode a single protobuf field for a subset of scalar types. | ||||||
| Notes on signed ints: | ||||||
| - Protobuf `int32`/`int64` use *varint* encoding of the two's-complement integer. | ||||||
| - Negative `int32` values are encoded as a 10-byte varint (because they are sign-extended to 64 bits). | ||||||
| """ | ||||||
| if value is None: | ||||||
| return b"" | ||||||
|
|
||||||
| if isinstance(spark_type, BooleanType): | ||||||
| return _encode_protobuf_key(field_number, _PROTOBUF_WIRE_VARINT) + _encode_protobuf_uvarint(1 if value else 0) | ||||||
| elif isinstance(spark_type, IntegerType): | ||||||
| # Match protobuf-java behavior for writeInt32NoTag: negative values are sign-extended and written as uint64. | ||||||
| u64 = int(value) & 0xFFFFFFFFFFFFFFFF | ||||||
| return _encode_protobuf_key(field_number, _PROTOBUF_WIRE_VARINT) + _encode_protobuf_uvarint(u64) | ||||||
| elif isinstance(spark_type, LongType): | ||||||
| u64 = int(value) & 0xFFFFFFFFFFFFFFFF | ||||||
| return _encode_protobuf_key(field_number, _PROTOBUF_WIRE_VARINT) + _encode_protobuf_uvarint(u64) | ||||||
| elif isinstance(spark_type, FloatType): | ||||||
| return _encode_protobuf_key(field_number, _PROTOBUF_WIRE_32BIT) + struct.pack("<f", float(value)) | ||||||
| elif isinstance(spark_type, DoubleType): | ||||||
| return _encode_protobuf_key(field_number, _PROTOBUF_WIRE_64BIT) + struct.pack("<d", float(value)) | ||||||
| elif isinstance(spark_type, StringType): | ||||||
| b = value.encode("utf-8") | ||||||
| return (_encode_protobuf_key(field_number, _PROTOBUF_WIRE_LEN_DELIM) + | ||||||
| _encode_protobuf_uvarint(len(b)) + b) | ||||||
| else: | ||||||
| raise ValueError("Unsupported type for protobuf simple generator: {}".format(spark_type)) | ||||||
|
|
||||||
|
|
||||||
| class ProtobufSimpleMessageRowGen(DataGen): | ||||||
|
||||||
| """ | ||||||
| Generates rows that include: | ||||||
| - one column per message field (Spark scalar types) | ||||||
| - a binary column containing a serialized protobuf message containing those fields | ||||||
|
|
||||||
| This is intentionally limited to the simple scalar types supported in Patch 1: | ||||||
|
||||||
| This is intentionally limited to the simple scalar types supported in Patch 1: | |
| This is intentionally limited to the simple scalar types currently supported by this implementation: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the trailing blank line at the end of the file. This follows standard code style guidelines and maintains consistency across the codebase.