From 26a1f8c8bca6f31efc49100528f3dd25e075f583 Mon Sep 17 00:00:00 2001 From: EricDinging Date: Sun, 17 Dec 2023 11:19:37 -0500 Subject: [PATCH 1/3] Fix processing client weights --- fedscale/cloud/aggregation/optimizers.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/fedscale/cloud/aggregation/optimizers.py b/fedscale/cloud/aggregation/optimizers.py index 201ffccc..2e927b27 100644 --- a/fedscale/cloud/aggregation/optimizers.py +++ b/fedscale/cloud/aggregation/optimizers.py @@ -75,11 +75,9 @@ def update_round_gradient( update_weights = result["update_weight"] if type(update_weights) is dict: update_weights = [x for x in update_weights.values()] + weights = [ - torch.from_numpy(np.asarray(x, dtype=np.float32)).to( - device=self.device - ) - for x in update_weights + torch.tensor(x).to(device=self.device) for x in update_weights ] grads = [ (u - v) * 1.0 / learning_rate for u, v in zip(last_model, weights) From c3a940aff65b96349220523b368f4e8073da69c2 Mon Sep 17 00:00:00 2001 From: EricDinging Date: Sun, 17 Dec 2023 11:34:10 -0500 Subject: [PATCH 2/3] Fix processing client weights --- fedscale/cloud/aggregation/optimizers.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/fedscale/cloud/aggregation/optimizers.py b/fedscale/cloud/aggregation/optimizers.py index 2e927b27..9e14dbe3 100644 --- a/fedscale/cloud/aggregation/optimizers.py +++ b/fedscale/cloud/aggregation/optimizers.py @@ -98,8 +98,10 @@ def update_round_gradient( ) + (1.0 / learning_rate) * np.float_power(loss + 1e-10, qfedq) # update global model - for idx, param in enumerate(target_model.parameters()): - param.data = last_model[idx] - Deltas[idx] / (hs + 1e-10) + new_state_dict = { + name: last_model[idx] - Deltas[idx] / (hs + 1e-10) for idx, name in enumerate(target_model.state_dict().keys()) + } + target_model.load_state_dict(new_state_dict) else: # The default optimizer, FedAvg, has been applied in aggregator.py on the fly From 292f3061a401e56f4c4a1b60e3ed4ed5ade23168 Mon Sep 17 00:00:00 2001 From: EricDinging Date: Sun, 17 Dec 2023 13:48:14 -0500 Subject: [PATCH 3/3] Pass unit test --- fedscale/cloud/internal/model_adapter_base.py | 7 ++++++- .../internal/tensorflow_model_adapter.py | 19 +++++++++++++++++-- .../cloud/aggregation/test_aggregator.py | 14 ++++++++++---- 3 files changed, 33 insertions(+), 7 deletions(-) diff --git a/fedscale/cloud/internal/model_adapter_base.py b/fedscale/cloud/internal/model_adapter_base.py index 067ef91c..18beab77 100644 --- a/fedscale/cloud/internal/model_adapter_base.py +++ b/fedscale/cloud/internal/model_adapter_base.py @@ -7,11 +7,16 @@ class ModelAdapterBase(abc.ABC): """ Represents an adapter that operates on a framework-specific model. """ + @abc.abstractmethod - def set_weights(self, weights: np.ndarray): + def set_weights( + self, weights: np.ndarray, is_aggregator=True, client_training_results=None + ): """ Set the model's weights to the numpy weights array. :param weights: numpy weights array + :param is_aggregator: boolean indicating whether the caller is the aggregator + :param client_training_results: list of gradients from every clients, for q-fedavg """ pass diff --git a/fedscale/cloud/internal/tensorflow_model_adapter.py b/fedscale/cloud/internal/tensorflow_model_adapter.py index 5c5785a5..360fc031 100644 --- a/fedscale/cloud/internal/tensorflow_model_adapter.py +++ b/fedscale/cloud/internal/tensorflow_model_adapter.py @@ -10,13 +10,28 @@ class TensorflowModelAdapter(ModelAdapterBase): def __init__(self, model: tf.keras.Model): self.model = model - def set_weights(self, weights: List[np.ndarray]): + def set_weights( + self, + weights: List[np.ndarray], + is_aggregator=True, + client_training_results=None, + ): + """ + Set the model's weights to the numpy weights array. + :param weights: numpy weights array + :param is_aggregator: boolean indicating whether the caller is the aggregator + :param client_training_results: list of gradients from every clients, for q-fedavg + """ for i, layer in enumerate(self.model.layers): if layer.trainable: layer.set_weights(weights[i]) def get_weights(self) -> List[np.ndarray]: - return [np.asarray(layer.get_weights()) for layer in self.model.layers if layer.trainable] + return [ + np.asarray(layer.get_weights()) + for layer in self.model.layers + if layer.trainable + ] def get_model(self): return self.model diff --git a/fedscale/tests/cloud/aggregation/test_aggregator.py b/fedscale/tests/cloud/aggregation/test_aggregator.py index 4bd81d55..58c739f1 100644 --- a/fedscale/tests/cloud/aggregation/test_aggregator.py +++ b/fedscale/tests/cloud/aggregation/test_aggregator.py @@ -14,6 +14,7 @@ def __init__(self, model_wrapper): self.model_in_update = 1 self.tasks_round = 3 self.model_wrapper = model_wrapper + self.client_training_results = None def multiply_weights(weights, factor): @@ -23,8 +24,9 @@ def multiply_weights(weights, factor): class TestAggregator: def test_update_weight_aggregation_for_keras_model(self): x = tf.keras.Input(shape=(2,)) - y = tf.keras.layers.Dense(2, activation='softmax')( - tf.keras.layers.Dense(4, activation='softmax')(x)) + y = tf.keras.layers.Dense(2, activation="softmax")( + tf.keras.layers.Dense(4, activation="softmax")(x) + ) model = tf.keras.Model(x, y) model_adapter = TensorflowModelAdapter(model) aggregator = MockAggregator(model_adapter) @@ -34,7 +36,9 @@ def test_update_weight_aggregation_for_keras_model(self): aggregator.update_weight_aggregation(multiply_weights(weights, 2)) aggregator.model_in_update += 1 aggregator.update_weight_aggregation(multiply_weights(weights, 5)) - np.array_equal(aggregator.model_wrapper.get_weights(), multiply_weights(weights, 3)) + np.array_equal( + aggregator.model_wrapper.get_weights(), multiply_weights(weights, 3) + ) def test_update_weight_aggregation_for_torch_model(self): model = torch.nn.Linear(3, 2) @@ -46,4 +50,6 @@ def test_update_weight_aggregation_for_torch_model(self): aggregator.update_weight_aggregation(multiply_weights(weights, 2)) aggregator.model_in_update += 1 aggregator.update_weight_aggregation(multiply_weights(weights, 5)) - np.array_equal(aggregator.model_wrapper.get_weights(), multiply_weights(weights, 3)) + np.array_equal( + aggregator.model_wrapper.get_weights(), multiply_weights(weights, 3) + )