Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions openfl/pipelines/no_compression_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@

"""NoCompressionPipeline module."""

from openfl.pipelines.pipeline import Float32NumpyArrayToBytes, TransformationPipeline
from openfl.pipelines.pipeline import NumpyArrayToBytes, TransformationPipeline


class NoCompressionPipeline(TransformationPipeline):
"""The data pipeline without any compression."""

def __init__(self, **kwargs):
"""Initialize."""
super().__init__(transformers=[Float32NumpyArrayToBytes()], **kwargs)
super().__init__(transformers=[NumpyArrayToBytes()], **kwargs)
38 changes: 38 additions & 0 deletions openfl/pipelines/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,44 @@ def backward(self, data, metadata, **kwargs):
return np.reshape(flat_array, newshape=array_shape, order="C")


class NumpyArrayToBytes(Transformer):
"""Transformer for converting generic Numpy arrays to bytes."""

def __init__(self):
self.lossy = False

def forward(self, data: np.ndarray, **kwargs):
"""Convert a Numpy array to bytes.

Args:
data: The Numpy array to be converted.
**kwargs: Additional keyword arguments for the conversion.

Returns:
data_bytes: The data converted to bytes.
metadata: The metadata for the conversion.
"""
array_shape = data.shape
metadata = {"int_list": list(array_shape), "dtype": str(data.dtype)}
data_bytes = data.tobytes(order="C")
return data_bytes, metadata

def backward(self, data, metadata, **kwargs):
"""Convert bytes back to a Numpy array.

Args:
data: The data in bytes.
metadata: The metadata for the conversion.

Returns:
The data converted back to a Numpy array.
"""
array_shape = tuple(metadata["int_list"])
dtype = np.dtype(metadata["dtype"])
flat_array = np.frombuffer(data, dtype=dtype)
return np.reshape(flat_array, newshape=array_shape, order="C")


class TransformationPipeline:
"""Data Transformer Pipeline Class.

Expand Down
16 changes: 6 additions & 10 deletions openfl/pipelines/tensor_codec.py
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for reviewer(s): Changes in this file are unrelated to the PR. Variables are renamed for readability and python GC action

Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ def compress(self, tensor_key, data, require_lossless=False, **kwargs):
metadata: metadata associated with compressed tensor.
"""
if require_lossless:
compressed_nparray, metadata = self.lossless_pipeline.forward(data, **kwargs)
data, metadata = self.lossless_pipeline.forward(data, **kwargs)
else:
compressed_nparray, metadata = self.compression_pipeline.forward(data, **kwargs)
data, metadata = self.compression_pipeline.forward(data, **kwargs)
# Define the compressed tensorkey that should be
# returned ('trained.delta'->'trained.delta.lossy_compressed')
tensor_name, origin, round_number, report, tags = tensor_key
Expand All @@ -80,7 +80,7 @@ def compress(self, tensor_key, data, require_lossless=False, **kwargs):
else:
new_tags = change_tags(tags, add_field="lossy_compressed")
compressed_tensor_key = TensorKey(tensor_name, origin, round_number, report, new_tags)
return compressed_tensor_key, compressed_nparray, metadata
return compressed_tensor_key, data, metadata

def decompress(
self,
Expand Down Expand Up @@ -121,13 +121,9 @@ def decompress(
assert "compressed" in tags, "Cannot losslessly decompress lossy tensor"

if require_lossless or "compressed" in tags:
decompressed_nparray = self.lossless_pipeline.backward(
data, transformer_metadata, **kwargs
)
data = self.lossless_pipeline.backward(data, transformer_metadata, **kwargs)
else:
decompressed_nparray = self.compression_pipeline.backward(
data, transformer_metadata, **kwargs
)
data = self.compression_pipeline.backward(data, transformer_metadata, **kwargs)
# Define the decompressed tensorkey that should be returned
if "lossy_compressed" in tags:
new_tags = change_tags(
Expand All @@ -144,7 +140,7 @@ def decompress(
else:
raise NotImplementedError("Decompression is only supported on compressed data")

return decompressed_tensor_key, decompressed_nparray
return decompressed_tensor_key, data

@staticmethod
def generate_delta(tensor_key, nparray, base_model_nparray):
Expand Down
1 change: 1 addition & 0 deletions openfl/protocols/base.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ message MetadataProto {
map<int32, float> int_to_float = 1;
repeated int32 int_list = 2;
repeated bool bool_list = 3;
repeated string dtype = 4;
}

// handles large size data
Expand Down
Loading