From 7c01c529040a276e8be5f76b2931669a4da88f72 Mon Sep 17 00:00:00 2001 From: Joe Zuntz Date: Mon, 26 Jul 2021 13:41:25 +0100 Subject: [PATCH 1/3] Add MPI to training --- xpysom/tests.py | 50 ++++++++++++++++++++++++++++++++- xpysom/xpysom.py | 73 +++++++++++++++++++++++++++++++++++------------- 2 files changed, 103 insertions(+), 20 deletions(-) diff --git a/xpysom/tests.py b/xpysom/tests.py index e4d2d68..43612d3 100644 --- a/xpysom/tests.py +++ b/xpysom/tests.py @@ -12,6 +12,7 @@ import pickle import os +import mockmpi class TestCupySom(unittest.TestCase): def setUp(self, xp=cp): @@ -303,4 +304,51 @@ def test_bubble(self): class TestNumpySomHex(TestCupySomHex): def setUp(self): - TestCupySom.setUp(self, xp=np) \ No newline at end of file + TestCupySom.setUp(self, xp=np) + + + +def core_mpi_init_weight(comm): + som = XPySom(5, 5, 2, sigma=1.0, learning_rate=0.5, random_seed=1) + + wcheck = som._weights.copy() + + wcheck = comm.Bcast(wcheck) + np.testing.assert_array_almost_equal(wcheck, som._weights) + +def core_mpi_train(comm): + # train two equivalent SOMs, one in parallel. + nfeat = 5 + ndata = 200 + + # data at root + data = np.random.uniform((nfeat, ndata)) + data = comm.Bcast(data) + my_data = np.array_split(data, comm.size)[comm.rank] + + som1 = XPySom(5, 5, 2, sigma=1.0, learning_rate=0.5, random_seed=1) + som2 = XPySom(5, 5, 2, sigma=1.0, learning_rate=0.5, random_seed=1) + + som1.train(my_data, 10, comm=comm) + + # results should be the same as a serial test + if comm.rank == 0: + som2.train(data) + np.testing.assert_array_almost_equal(som1._weights, som2._weights) + + + + +class TestMPI(unittest.TestCase): + + def test_mpi_init_weight(self): + # train two equivalent SOMs, one in parallel. + + def test_pca_weights_init(self): + mockmpi.mock_mpiexec(core_mpi_init_weight, 2) + mockmpi.mock_mpiexec(core_mpi_init_weight, 5) + + + def test_mpi_train(self): + mockmpi.mock_mpiexec(core_mpi_train, 2) + mockmpi.mock_mpiexec(core_mpi_train, 5) diff --git a/xpysom/xpysom.py b/xpysom/xpysom.py index c91c46a..2548a4e 100644 --- a/xpysom/xpysom.py +++ b/xpysom/xpysom.py @@ -31,6 +31,21 @@ beginning = None sec_left = None + +def mpi_reduce(comm, data, xp): + import mpi4py.MPI + # TODO: bleeding edge mpi4py versions + # can directly access cupy arrays. Once + # that is mainstream then use it here + + if xp.__name__ == 'cupy': + tmp = self.xp.asnumpy(data) + comm.Allreduce(mpi4py.MPI.IN_PLACE, tmp) + data[:] = xp.asarray(tmp) + else: + comm.Allreduce(mpi4py.MPI.IN_PLACE, data) + + def print_progress(t, T): digits = len(str(T)) @@ -415,11 +430,16 @@ def _update(self, x_gpu, wins, eta, sig): self._denominator_gpu += sum_g_gpu[:,:,self.xp.newaxis] - def _merge_updates(self): + def _merge_updates(self, comm=None): """ Divides the numerator accumulator by the denominator accumulator to compute the new weights. """ + + if comm is not None: + mpi_reduce(comm, self._denominator_gpu, self.xp) + mpi_reduce(comm, self._numerator_gpu, self.xp) + self._weights_gpu = self.xp.where( self._denominator_gpu != 0, self._numerator_gpu / self._denominator_gpu, @@ -427,7 +447,7 @@ def _merge_updates(self): ) - def train(self, data, num_epochs, iter_beg=0, iter_end=None, verbose=False): + def train(self, data, num_epochs, iter_beg=0, iter_end=None, verbose=False, comm=None): """Trains the SOM. Parameters @@ -508,7 +528,7 @@ def train(self, data, num_epochs, iter_beg=0, iter_end=None, verbose=False): num_epochs*len(data) ) - self._merge_updates() + self._merge_updates(comm=comm) # Copy back arrays to host if self.xp.__name__ == 'cupy': @@ -654,20 +674,29 @@ def topographic_error(self, data): return (distance > 1.5).mean().item() - def random_weights_init(self, data): + def random_weights_init(self, data, comm=None): """Initializes the weights of the SOM picking random samples from data. TODO: unoptimized """ self._check_input_len(data) - it = np.nditer(self._weights[:,:,0], flags=['multi_index']) - while not it.finished: - rand_i = self._random_generator.randint(len(data)) - self._weights[it.multi_index] = data[rand_i] - it.iternext() + # Only the root process needs to do this + if (comm is None) or (comm.rank == 0): + it = np.nditer(self._weights[:,:,0], flags=['multi_index']) + while not it.finished: + rand_i = self._random_generator.randint(len(data)) + self._weights[it.multi_index] = data[rand_i] + it.iternext() + + # Give the weights from the root process to everyone else. + # These are host arrays so we do not need to move from device + if comm is not None: + import mpi4py.MPI + self._weights = comm.Bcast(self._weights) - def pca_weights_init(self, data): + + def pca_weights_init(self, data, comm=None): """Initializes the weights to span the first two principal components. This initialization doesn't depend on random processes and @@ -682,15 +711,21 @@ def pca_weights_init(self, data): msg = 'The data needs at least 2 features for pca initialization' raise ValueError(msg) self._check_input_len(data) - if len(self._neigx) == 1 or len(self._neigy) == 1: - msg = 'PCA initialization inappropriate:' + \ - 'One of the dimensions of the map is 1.' - warn(msg) - pc_length, pc = np.linalg.eig(np.cov(np.transpose(data))) - pc_order = np.argsort(-pc_length) - for i, c1 in enumerate(np.linspace(-1, 1, len(self._neigx))): - for j, c2 in enumerate(np.linspace(-1, 1, len(self._neigy))): - self._weights[i, j] = c1*pc[pc_order[0]] + c2*pc[pc_order[1]] + + if (comm is None) or (comm.rank == 0): + if len(self._neigx) == 1 or len(self._neigy) == 1: + msg = 'PCA initialization inappropriate:' + \ + 'One of the dimensions of the map is 1.' + warn(msg) + pc_length, pc = np.linalg.eig(np.cov(np.transpose(data))) + pc_order = np.argsort(-pc_length) + for i, c1 in enumerate(np.linspace(-1, 1, len(self._neigx))): + for j, c2 in enumerate(np.linspace(-1, 1, len(self._neigy))): + self._weights[i, j] = c1*pc[pc_order[0]] + c2*pc[pc_order[1]] + + if comm is not None: + import mpi4py.MPI + self._weights = comm.Bcast(self._weights) def distance_map(self): From 9da453c4f07793b2648884b62ccc5c2f296a1d89 Mon Sep 17 00:00:00 2001 From: joezuntz Date: Mon, 26 Jul 2021 06:30:04 -0700 Subject: [PATCH 2/3] fix tests and stuff --- xpysom/tests.py | 46 ++++++++++++++++++++++++++++------------------ xpysom/xpysom.py | 5 +++-- 2 files changed, 31 insertions(+), 20 deletions(-) diff --git a/xpysom/tests.py b/xpysom/tests.py index 43612d3..a9ffba3 100644 --- a/xpysom/tests.py +++ b/xpysom/tests.py @@ -13,6 +13,7 @@ import pickle import os import mockmpi +import sys class TestCupySom(unittest.TestCase): def setUp(self, xp=cp): @@ -309,31 +310,39 @@ def setUp(self): def core_mpi_init_weight(comm): - som = XPySom(5, 5, 2, sigma=1.0, learning_rate=0.5, random_seed=1) - + som = XPySom(5, 5, 2, sigma=1.0, learning_rate=0.5, random_seed=1, xp=np) wcheck = som._weights.copy() + comm.Bcast(wcheck) - wcheck = comm.Bcast(wcheck) np.testing.assert_array_almost_equal(wcheck, som._weights) def core_mpi_train(comm): # train two equivalent SOMs, one in parallel. + sys.modules['mpi4py'] = mockmpi + sys.modules['mpi4py.MPI'] = mockmpi.comm + mockmpi.MPI = mockmpi.comm + nfeat = 5 - ndata = 200 + ndata = 100 # data at root - data = np.random.uniform((nfeat, ndata)) - data = comm.Bcast(data) + rng = np.random.default_rng(12345) + # should ensure all rng values the same + data = rng.uniform(size=(ndata, nfeat)) + + # split data among processors my_data = np.array_split(data, comm.size)[comm.rank] + print(comm.rank, my_data.shape) - som1 = XPySom(5, 5, 2, sigma=1.0, learning_rate=0.5, random_seed=1) - som2 = XPySom(5, 5, 2, sigma=1.0, learning_rate=0.5, random_seed=1) + som1 = XPySom(5, 5, nfeat, sigma=1.0, learning_rate=0.5, random_seed=7, xp=np) som1.train(my_data, 10, comm=comm) - - # results should be the same as a serial test + comm.Barrier() + + # results should be the same as a serial test using all the data if comm.rank == 0: - som2.train(data) + som2 = XPySom(5, 5, nfeat, sigma=1.0, learning_rate=0.5, random_seed=7, xp=np) + som2.train(data, 10, comm=None) np.testing.assert_array_almost_equal(som1._weights, som2._weights) @@ -341,14 +350,15 @@ def core_mpi_train(comm): class TestMPI(unittest.TestCase): - def test_mpi_init_weight(self): - # train two equivalent SOMs, one in parallel. - def test_pca_weights_init(self): - mockmpi.mock_mpiexec(core_mpi_init_weight, 2) - mockmpi.mock_mpiexec(core_mpi_init_weight, 5) + mockmpi.mock_mpiexec(2, core_mpi_init_weight) + mockmpi.mock_mpiexec(5, core_mpi_init_weight) def test_mpi_train(self): - mockmpi.mock_mpiexec(core_mpi_train, 2) - mockmpi.mock_mpiexec(core_mpi_train, 5) + mockmpi.mock_mpiexec(2, core_mpi_train) + mockmpi.mock_mpiexec(5, core_mpi_train) + + +if __name__ == "__main__": + unittest.main() diff --git a/xpysom/xpysom.py b/xpysom/xpysom.py index 2548a4e..2ade547 100644 --- a/xpysom/xpysom.py +++ b/xpysom/xpysom.py @@ -437,6 +437,7 @@ def _merge_updates(self, comm=None): """ if comm is not None: + comm.Barrier() mpi_reduce(comm, self._denominator_gpu, self.xp) mpi_reduce(comm, self._numerator_gpu, self.xp) @@ -693,7 +694,7 @@ def random_weights_init(self, data, comm=None): # These are host arrays so we do not need to move from device if comm is not None: import mpi4py.MPI - self._weights = comm.Bcast(self._weights) + comm.Bcast(self._weights) def pca_weights_init(self, data, comm=None): @@ -725,7 +726,7 @@ def pca_weights_init(self, data, comm=None): if comm is not None: import mpi4py.MPI - self._weights = comm.Bcast(self._weights) + comm.Bcast(self._weights) def distance_map(self): From 9232eb239499fbbe038c310441977c03c12cab87 Mon Sep 17 00:00:00 2001 From: joezuntz Date: Mon, 26 Jul 2021 06:56:08 -0700 Subject: [PATCH 3/3] cupy tests --- xpysom/tests.py | 30 ++++++++++++++++++------------ xpysom/xpysom.py | 2 +- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/xpysom/tests.py b/xpysom/tests.py index a9ffba3..caf3405 100644 --- a/xpysom/tests.py +++ b/xpysom/tests.py @@ -309,14 +309,16 @@ def setUp(self): -def core_mpi_init_weight(comm): - som = XPySom(5, 5, 2, sigma=1.0, learning_rate=0.5, random_seed=1, xp=np) +def core_mpi_init_weight(comm, xp): + xp = {'np': np, 'cp': cp}[xp] + som = XPySom(5, 5, 2, sigma=1.0, learning_rate=0.5, random_seed=1, xp=xp) wcheck = som._weights.copy() comm.Bcast(wcheck) np.testing.assert_array_almost_equal(wcheck, som._weights) -def core_mpi_train(comm): +def core_mpi_train(comm, xp): + xp = {'np': np, 'cp': cp}[xp] # train two equivalent SOMs, one in parallel. sys.modules['mpi4py'] = mockmpi sys.modules['mpi4py.MPI'] = mockmpi.comm @@ -332,32 +334,36 @@ def core_mpi_train(comm): # split data among processors my_data = np.array_split(data, comm.size)[comm.rank] - print(comm.rank, my_data.shape) - som1 = XPySom(5, 5, nfeat, sigma=1.0, learning_rate=0.5, random_seed=7, xp=np) + som1 = XPySom(5, 5, nfeat, sigma=1.0, learning_rate=0.5, random_seed=7, xp=xp) som1.train(my_data, 10, comm=comm) comm.Barrier() # results should be the same as a serial test using all the data if comm.rank == 0: - som2 = XPySom(5, 5, nfeat, sigma=1.0, learning_rate=0.5, random_seed=7, xp=np) + som2 = XPySom(5, 5, nfeat, sigma=1.0, learning_rate=0.5, random_seed=7, xp=xp) som2.train(data, 10, comm=None) np.testing.assert_array_almost_equal(som1._weights, som2._weights) -class TestMPI(unittest.TestCase): - +class TestMPINumpy(unittest.TestCase): + def setUp(self): + self.xp = 'np' def test_pca_weights_init(self): - mockmpi.mock_mpiexec(2, core_mpi_init_weight) - mockmpi.mock_mpiexec(5, core_mpi_init_weight) + mockmpi.mock_mpiexec(2, core_mpi_init_weight, self.xp) + mockmpi.mock_mpiexec(5, core_mpi_init_weight, self.xp) def test_mpi_train(self): - mockmpi.mock_mpiexec(2, core_mpi_train) - mockmpi.mock_mpiexec(5, core_mpi_train) + mockmpi.mock_mpiexec(2, core_mpi_train, self.xp) + mockmpi.mock_mpiexec(5, core_mpi_train, self.xp) + +class TestMPICupy(TestMPINumpy): + def setUp(self): + self.xp = 'cp' if __name__ == "__main__": diff --git a/xpysom/xpysom.py b/xpysom/xpysom.py index 2ade547..51e1f5b 100644 --- a/xpysom/xpysom.py +++ b/xpysom/xpysom.py @@ -39,7 +39,7 @@ def mpi_reduce(comm, data, xp): # that is mainstream then use it here if xp.__name__ == 'cupy': - tmp = self.xp.asnumpy(data) + tmp = xp.asnumpy(data) comm.Allreduce(mpi4py.MPI.IN_PLACE, tmp) data[:] = xp.asarray(tmp) else: