From 94f834eb6177a36f64b7608ddb2ffc196e70aa63 Mon Sep 17 00:00:00 2001 From: Alex Phillips Date: Thu, 24 Aug 2023 10:25:53 +0100 Subject: [PATCH 1/6] refactor: separate cdf calculation in resampling separate calculation of cdf in get_num_copies to test separately add test (currently fails due to numerical inaccuracy) --- .../algorithms/smc_components/resampling.py | 7 ++++- tests/test_smc_components.py | 29 ++++++++++++++++++- 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/discretesampling/base/algorithms/smc_components/resampling.py b/discretesampling/base/algorithms/smc_components/resampling.py index 837ca4d..0e3648c 100755 --- a/discretesampling/base/algorithms/smc_components/resampling.py +++ b/discretesampling/base/algorithms/smc_components/resampling.py @@ -26,7 +26,7 @@ def check_stability(ncopies, exec=Executor()): def get_number_of_copies(logw, rng=RNG(), exec=Executor()): N = len(logw) * exec.P - cdf = exec.cumsum(np.exp(logw)*N) + cdf = calculate_cdf(logw, exec) cdf_of_i_minus_one = cdf - np.reshape(np.exp(logw) * N, newshape=cdf.shape) u = np.array(rng.uniform(0.0, 1.0), dtype=logw.dtype) @@ -37,6 +37,11 @@ def get_number_of_copies(logw, rng=RNG(), exec=Executor()): return ncopies # .astype(int) +def calculate_cdf(logw, exec=Executor()): + N = len(logw) * exec.P + return exec.cumsum(np.exp(logw)*N) + + def systematic_resampling(particles, logw, rng, exec=Executor()): loc_n = len(logw) N = loc_n * exec.P diff --git a/tests/test_smc_components.py b/tests/test_smc_components.py index 6b0efdb..d3a37fc 100644 --- a/tests/test_smc_components.py +++ b/tests/test_smc_components.py @@ -6,7 +6,7 @@ from discretesampling.base.types import DiscreteVariable from discretesampling.base.algorithms.smc_components.effective_sample_size import ess from discretesampling.base.algorithms.smc_components.normalisation import normalise -from discretesampling.base.algorithms.smc_components.resampling import systematic_resampling, check_stability +from discretesampling.base.algorithms.smc_components.resampling import systematic_resampling, check_stability, calculate_cdf class ExampleParticleClass(DiscreteVariable): @@ -127,5 +127,32 @@ def test_check_stability_MPI(ncopies, expected): local_expected = expected[first:(last+1)] assert all(local_check_ncopies == local_expected) + +@pytest.mark.parametrize( + "logw", + [(np.array([-np.log(8)]*8))] +) +def test_calculate_cdf(logw): + exec = Executor() + expected = np.exp(np.logaddexp.accumulate(logw))*len(logw) # numerically stable calculation + cdf = calculate_cdf(logw, exec) + np.testing.assert_array_equal(cdf, expected) + + +@pytest.mark.mpi +@pytest.mark.parametrize( + "logw", + [(np.array([-np.log(8)]*8))] +) +def test_calculate_cdf_MPI(logw): + exec = Executor_MPI() + expected = np.exp(np.logaddexp.accumulate(logw))*len(logw) # numerically stable calculation + first, last = np.array(split_across_cores(len(logw), exec.P, exec.rank)) + local_logw = logw[first:(last+1)] + local_check_cdf = calculate_cdf(local_logw, exec) + local_expected = expected[first:(last+1)] + np.testing.assert_array_equal(local_check_cdf, local_expected) + + # TODO: # get_number_of_copies From 91d38c5bc9dcb2eb079ef905eddbede540c50af5 Mon Sep 17 00:00:00 2001 From: Alex Phillips Date: Thu, 24 Aug 2023 10:35:59 +0100 Subject: [PATCH 2/6] feat: add logcumsumexp to executors Add logcumsumexp to Executor and Executor_MPI with accompanying tests Executor_MPI.logsumexp(x) currently implemented naively as log(exec.cumsum(log(x))) and thus fails tests due to numerical inaccuracy --- discretesampling/base/executor/executor.py | 3 +++ discretesampling/base/executor/executor_MPI.py | 3 +++ tests/test_executor.py | 11 +++++++++++ tests/test_executor_MPI.py | 16 ++++++++++++++++ 4 files changed, 33 insertions(+) diff --git a/discretesampling/base/executor/executor.py b/discretesampling/base/executor/executor.py index 085359b..56caa35 100644 --- a/discretesampling/base/executor/executor.py +++ b/discretesampling/base/executor/executor.py @@ -26,6 +26,9 @@ def logsumexp(self, x): def cumsum(self, x): return np.cumsum(x) + def logcumsumexp(self, x): + return np.logaddexp.accumulate(x) + def redistribute(self, particles, ncopies): particles = list(itertools.chain.from_iterable( [[particles[i]]*ncopies[i] for i in range(len(particles))] diff --git a/discretesampling/base/executor/executor_MPI.py b/discretesampling/base/executor/executor_MPI.py index 935a64f..f0909e6 100644 --- a/discretesampling/base/executor/executor_MPI.py +++ b/discretesampling/base/executor/executor_MPI.py @@ -59,5 +59,8 @@ def logsumexp(self, x): def cumsum(self, x): return inclusive_prefix_sum(x) + def logcumsumexp(self, x): + return np.log(self.cumsum(np.exp(x))) + def redistribute(self, particles, ncopies): return variable_size_redistribution(particles, ncopies, self) diff --git a/tests/test_executor.py b/tests/test_executor.py index cc41836..8a8e1fa 100644 --- a/tests/test_executor.py +++ b/tests/test_executor.py @@ -68,6 +68,17 @@ def test_cumsum(x, expected): assert all(calc == expected) +@pytest.mark.parametrize( + "x", + [(np.array([1., 2., 3., 4., 5., 6., 7., 8.])), + (np.array([-1., -2., -3., -4., -5, -6., -7., -8.]))] +) +def test_logcumsumexp(x): + calc = Executor().logcumsumexp(x) + expected = np.logaddexp.accumulate(x) + np.testing.assert_array_equal(calc, expected) + + @pytest.mark.parametrize( "particles,ncopies,expected", [([ExampleParticleClass(x) for x in ["a", "b", "c", "d", "e"]], np.array( diff --git a/tests/test_executor_MPI.py b/tests/test_executor_MPI.py index d26b325..19b2978 100644 --- a/tests/test_executor_MPI.py +++ b/tests/test_executor_MPI.py @@ -95,6 +95,22 @@ def test_cumsum(x, expected): assert all(calc == local_expected) +@pytest.mark.parametrize( + "x", + [(np.array([1., 2., 3., 4., 5., 6., 7., 8.])), + (np.array([-1., -2., -3., -4., -5, -6., -7., -8.]))] +) +def test_logcumsumexp(x): + exec = Executor_MPI() + expected = np.logaddexp.accumulate(x) + first, last = np.array(split_across_cores(len(x), exec.P, exec.rank)) + local_x = x[first:(last+1)] + + calc = exec.logcumsumexp(local_x) + local_expected = expected[first:(last+1)] + np.testing.assert_array_equal(calc, local_expected) + + @pytest.mark.mpi @pytest.mark.parametrize( "particles,ncopies,expected", # Only functions for equal numbers per core From 40046d3b1b2bae588fc2b4172f505d72bd94fb9a Mon Sep 17 00:00:00 2001 From: Alex Phillips Date: Thu, 24 Aug 2023 10:41:22 +0100 Subject: [PATCH 3/6] fix: use logcumsumexp in calculate cdf Use exec.logcumsumexp in calculate cdf for numerical stability (currently still unstable with MPI) --- discretesampling/base/algorithms/smc_components/resampling.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/discretesampling/base/algorithms/smc_components/resampling.py b/discretesampling/base/algorithms/smc_components/resampling.py index 0e3648c..5427a52 100755 --- a/discretesampling/base/algorithms/smc_components/resampling.py +++ b/discretesampling/base/algorithms/smc_components/resampling.py @@ -39,7 +39,7 @@ def get_number_of_copies(logw, rng=RNG(), exec=Executor()): def calculate_cdf(logw, exec=Executor()): N = len(logw) * exec.P - return exec.cumsum(np.exp(logw)*N) + return np.exp(exec.logcumsumexp(logw)) * N def systematic_resampling(particles, logw, rng, exec=Executor()): From 2c8cf732612a09bd1ffc93c053e83eb046f6b648 Mon Sep 17 00:00:00 2001 From: Alex Phillips Date: Thu, 24 Aug 2023 10:54:44 +0100 Subject: [PATCH 4/6] test: mark test_logcumsumexp as mpi Mark test_logcumsumexp in test_executor_MPI --- tests/test_executor_MPI.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_executor_MPI.py b/tests/test_executor_MPI.py index 19b2978..554abfd 100644 --- a/tests/test_executor_MPI.py +++ b/tests/test_executor_MPI.py @@ -95,6 +95,7 @@ def test_cumsum(x, expected): assert all(calc == local_expected) +@pytest.mark.mpi @pytest.mark.parametrize( "x", [(np.array([1., 2., 3., 4., 5., 6., 7., 8.])), From e6038fac74382c89f2a0a8197af655a84975501f Mon Sep 17 00:00:00 2001 From: Alex Phillips Date: Wed, 30 Aug 2023 14:02:59 +0100 Subject: [PATCH 5/6] fix: use inclusive_prefix_logsumexp For MPI implementation of logcumsumexp, use inclusive_prefix_logsumexp from redistribution submodule. Tests in tests/test_executor_MPI.py all pass including for large array of numbers for logcumsumexp, however running working example (e.g. examples/mpi/decision_tree_example.py) with MPI results in error when performing reduction operation: ``` Fatal Python error: exception in user-defined reduction operation Traceback (most recent call last): File "mpi4py\MPI\opimpl.pxi", line 99, in mpi4py.MPI.op_user_mpi File "mpi4py\MPI\opimpl.pxi", line 90, in mpi4py.MPI.op_user_py File "...\discretesampling\base\executor\MPI\distributed_fixed_size_redistribution\prefix_sum.py", line 7, in LSE~ x = np.frombuffer(xmem, dtype=np.float64) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ValueError: buffer size must be a multiple of element size ``` --- .../executor/MPI/distributed_fixed_size_redistribution | 2 +- discretesampling/base/executor/executor_MPI.py | 5 +++-- tests/test_executor_MPI.py | 7 ++++--- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/discretesampling/base/executor/MPI/distributed_fixed_size_redistribution b/discretesampling/base/executor/MPI/distributed_fixed_size_redistribution index bca0d69..d48c92f 160000 --- a/discretesampling/base/executor/MPI/distributed_fixed_size_redistribution +++ b/discretesampling/base/executor/MPI/distributed_fixed_size_redistribution @@ -1 +1 @@ -Subproject commit bca0d698e7f7687633af71dd5a5bb552971d8f32 +Subproject commit d48c92f7bc515d71dd849df24e6f88040e6d7e87 diff --git a/discretesampling/base/executor/executor_MPI.py b/discretesampling/base/executor/executor_MPI.py index f0909e6..d661123 100644 --- a/discretesampling/base/executor/executor_MPI.py +++ b/discretesampling/base/executor/executor_MPI.py @@ -3,7 +3,8 @@ from scipy.special import logsumexp from discretesampling.base.executor.executor import Executor from discretesampling.base.executor.MPI.distributed_fixed_size_redistribution.prefix_sum import ( - inclusive_prefix_sum + inclusive_prefix_sum, + inclusive_prefix_logsumexp ) from discretesampling.base.executor.MPI.variable_size_redistribution import ( variable_size_redistribution @@ -60,7 +61,7 @@ def cumsum(self, x): return inclusive_prefix_sum(x) def logcumsumexp(self, x): - return np.log(self.cumsum(np.exp(x))) + return inclusive_prefix_logsumexp(x) def redistribute(self, particles, ncopies): return variable_size_redistribution(particles, ncopies, self) diff --git a/tests/test_executor_MPI.py b/tests/test_executor_MPI.py index 554abfd..f0f264d 100644 --- a/tests/test_executor_MPI.py +++ b/tests/test_executor_MPI.py @@ -77,7 +77,7 @@ def test_logsumexp(x, expected): first, last = np.array(split_across_cores(len(x), exec.P, exec.rank)) local_x = x[first:(last+1)] calc = exec.logsumexp(local_x) - np.testing.assert_almost_equal(calc, expected) # use almost_equal for numerical inaccuracy + np.testing.assert_almost_equal(calc, expected, 12) # use almost_equal for numerical inaccuracy @pytest.mark.mpi @@ -99,7 +99,8 @@ def test_cumsum(x, expected): @pytest.mark.parametrize( "x", [(np.array([1., 2., 3., 4., 5., 6., 7., 8.])), - (np.array([-1., -2., -3., -4., -5, -6., -7., -8.]))] + (np.array([-1., -2., -3., -4., -5, -6., -7., -8.])), + (np.array(range(1, 1025)) * 1.0)] ) def test_logcumsumexp(x): exec = Executor_MPI() @@ -109,7 +110,7 @@ def test_logcumsumexp(x): calc = exec.logcumsumexp(local_x) local_expected = expected[first:(last+1)] - np.testing.assert_array_equal(calc, local_expected) + np.testing.assert_array_almost_equal(calc, local_expected, 12) @pytest.mark.mpi From c45e8e39a39806c160499c10fd2ddc608f56790b Mon Sep 17 00:00:00 2001 From: Alex Phillips Date: Tue, 5 Sep 2023 14:32:01 +0100 Subject: [PATCH 6/6] chore: update distributed_fixed_size_redistribution submodule --- .../base/executor/MPI/distributed_fixed_size_redistribution | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/discretesampling/base/executor/MPI/distributed_fixed_size_redistribution b/discretesampling/base/executor/MPI/distributed_fixed_size_redistribution index d48c92f..dda2e96 160000 --- a/discretesampling/base/executor/MPI/distributed_fixed_size_redistribution +++ b/discretesampling/base/executor/MPI/distributed_fixed_size_redistribution @@ -1 +1 @@ -Subproject commit d48c92f7bc515d71dd849df24e6f88040e6d7e87 +Subproject commit dda2e96375ab36a5c0e8603ed7f9c1b6be3c2d29