Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MPI mode #1

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 65 additions & 1 deletion xpysom/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

import pickle
import os
import mockmpi
import sys

class TestCupySom(unittest.TestCase):
def setUp(self, xp=cp):
Expand Down Expand Up @@ -303,4 +305,66 @@ def test_bubble(self):

class TestNumpySomHex(TestCupySomHex):
def setUp(self):
TestCupySom.setUp(self, xp=np)
TestCupySom.setUp(self, xp=np)



def core_mpi_init_weight(comm, xp):
xp = {'np': np, 'cp': cp}[xp]
som = XPySom(5, 5, 2, sigma=1.0, learning_rate=0.5, random_seed=1, xp=xp)
wcheck = som._weights.copy()
comm.Bcast(wcheck)

np.testing.assert_array_almost_equal(wcheck, som._weights)

def core_mpi_train(comm, xp):
xp = {'np': np, 'cp': cp}[xp]
# train two equivalent SOMs, one in parallel.
sys.modules['mpi4py'] = mockmpi
sys.modules['mpi4py.MPI'] = mockmpi.comm
mockmpi.MPI = mockmpi.comm

nfeat = 5
ndata = 100

# data at root
rng = np.random.default_rng(12345)
# should ensure all rng values the same
data = rng.uniform(size=(ndata, nfeat))

# split data among processors
my_data = np.array_split(data, comm.size)[comm.rank]

som1 = XPySom(5, 5, nfeat, sigma=1.0, learning_rate=0.5, random_seed=7, xp=xp)

som1.train(my_data, 10, comm=comm)
comm.Barrier()

# results should be the same as a serial test using all the data
if comm.rank == 0:
som2 = XPySom(5, 5, nfeat, sigma=1.0, learning_rate=0.5, random_seed=7, xp=xp)
som2.train(data, 10, comm=None)
np.testing.assert_array_almost_equal(som1._weights, som2._weights)




class TestMPINumpy(unittest.TestCase):
def setUp(self):
self.xp = 'np'
def test_pca_weights_init(self):
mockmpi.mock_mpiexec(2, core_mpi_init_weight, self.xp)
mockmpi.mock_mpiexec(5, core_mpi_init_weight, self.xp)


def test_mpi_train(self):
mockmpi.mock_mpiexec(2, core_mpi_train, self.xp)
mockmpi.mock_mpiexec(5, core_mpi_train, self.xp)

class TestMPICupy(TestMPINumpy):
def setUp(self):
self.xp = 'cp'


if __name__ == "__main__":
unittest.main()
74 changes: 55 additions & 19 deletions xpysom/xpysom.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,21 @@
beginning = None
sec_left = None


def mpi_reduce(comm, data, xp):
import mpi4py.MPI
# TODO: bleeding edge mpi4py versions
# can directly access cupy arrays. Once
# that is mainstream then use it here

if xp.__name__ == 'cupy':
tmp = xp.asnumpy(data)
comm.Allreduce(mpi4py.MPI.IN_PLACE, tmp)
data[:] = xp.asarray(tmp)
else:
comm.Allreduce(mpi4py.MPI.IN_PLACE, data)


def print_progress(t, T):
digits = len(str(T))

Expand Down Expand Up @@ -415,19 +430,25 @@ def _update(self, x_gpu, wins, eta, sig):
self._denominator_gpu += sum_g_gpu[:,:,self.xp.newaxis]


def _merge_updates(self):
def _merge_updates(self, comm=None):
"""
Divides the numerator accumulator by the denominator accumulator
to compute the new weights.
"""

if comm is not None:
comm.Barrier()
mpi_reduce(comm, self._denominator_gpu, self.xp)
mpi_reduce(comm, self._numerator_gpu, self.xp)

self._weights_gpu = self.xp.where(
self._denominator_gpu != 0,
self._numerator_gpu / self._denominator_gpu,
self._weights_gpu
)


def train(self, data, num_epochs, iter_beg=0, iter_end=None, verbose=False):
def train(self, data, num_epochs, iter_beg=0, iter_end=None, verbose=False, comm=None):
"""Trains the SOM.

Parameters
Expand Down Expand Up @@ -508,7 +529,7 @@ def train(self, data, num_epochs, iter_beg=0, iter_end=None, verbose=False):
num_epochs*len(data)
)

self._merge_updates()
self._merge_updates(comm=comm)

# Copy back arrays to host
if self.xp.__name__ == 'cupy':
Expand Down Expand Up @@ -654,20 +675,29 @@ def topographic_error(self, data):
return (distance > 1.5).mean().item()


def random_weights_init(self, data):
def random_weights_init(self, data, comm=None):
"""Initializes the weights of the SOM
picking random samples from data.
TODO: unoptimized
"""
self._check_input_len(data)
it = np.nditer(self._weights[:,:,0], flags=['multi_index'])
while not it.finished:
rand_i = self._random_generator.randint(len(data))
self._weights[it.multi_index] = data[rand_i]
it.iternext()

# Only the root process needs to do this
if (comm is None) or (comm.rank == 0):
it = np.nditer(self._weights[:,:,0], flags=['multi_index'])
while not it.finished:
rand_i = self._random_generator.randint(len(data))
self._weights[it.multi_index] = data[rand_i]
it.iternext()

# Give the weights from the root process to everyone else.
# These are host arrays so we do not need to move from device
if comm is not None:
import mpi4py.MPI
comm.Bcast(self._weights)

def pca_weights_init(self, data):

def pca_weights_init(self, data, comm=None):
"""Initializes the weights to span the first two principal components.

This initialization doesn't depend on random processes and
Expand All @@ -682,15 +712,21 @@ def pca_weights_init(self, data):
msg = 'The data needs at least 2 features for pca initialization'
raise ValueError(msg)
self._check_input_len(data)
if len(self._neigx) == 1 or len(self._neigy) == 1:
msg = 'PCA initialization inappropriate:' + \
'One of the dimensions of the map is 1.'
warn(msg)
pc_length, pc = np.linalg.eig(np.cov(np.transpose(data)))
pc_order = np.argsort(-pc_length)
for i, c1 in enumerate(np.linspace(-1, 1, len(self._neigx))):
for j, c2 in enumerate(np.linspace(-1, 1, len(self._neigy))):
self._weights[i, j] = c1*pc[pc_order[0]] + c2*pc[pc_order[1]]

if (comm is None) or (comm.rank == 0):
if len(self._neigx) == 1 or len(self._neigy) == 1:
msg = 'PCA initialization inappropriate:' + \
'One of the dimensions of the map is 1.'
warn(msg)
pc_length, pc = np.linalg.eig(np.cov(np.transpose(data)))
pc_order = np.argsort(-pc_length)
for i, c1 in enumerate(np.linspace(-1, 1, len(self._neigx))):
for j, c2 in enumerate(np.linspace(-1, 1, len(self._neigy))):
self._weights[i, j] = c1*pc[pc_order[0]] + c2*pc[pc_order[1]]

if comm is not None:
import mpi4py.MPI
comm.Bcast(self._weights)


def distance_map(self):
Expand Down