diff --git a/discretesampling/base/algorithms/smc_components/resampling.py b/discretesampling/base/algorithms/smc_components/resampling.py index 837ca4d..5427a52 100755 --- a/discretesampling/base/algorithms/smc_components/resampling.py +++ b/discretesampling/base/algorithms/smc_components/resampling.py @@ -26,7 +26,7 @@ def check_stability(ncopies, exec=Executor()): def get_number_of_copies(logw, rng=RNG(), exec=Executor()): N = len(logw) * exec.P - cdf = exec.cumsum(np.exp(logw)*N) + cdf = calculate_cdf(logw, exec) cdf_of_i_minus_one = cdf - np.reshape(np.exp(logw) * N, newshape=cdf.shape) u = np.array(rng.uniform(0.0, 1.0), dtype=logw.dtype) @@ -37,6 +37,11 @@ def get_number_of_copies(logw, rng=RNG(), exec=Executor()): return ncopies # .astype(int) +def calculate_cdf(logw, exec=Executor()): + N = len(logw) * exec.P + return np.exp(exec.logcumsumexp(logw)) * N + + def systematic_resampling(particles, logw, rng, exec=Executor()): loc_n = len(logw) N = loc_n * exec.P diff --git a/discretesampling/base/executor/MPI/distributed_fixed_size_redistribution b/discretesampling/base/executor/MPI/distributed_fixed_size_redistribution index bca0d69..dda2e96 160000 --- a/discretesampling/base/executor/MPI/distributed_fixed_size_redistribution +++ b/discretesampling/base/executor/MPI/distributed_fixed_size_redistribution @@ -1 +1 @@ -Subproject commit bca0d698e7f7687633af71dd5a5bb552971d8f32 +Subproject commit dda2e96375ab36a5c0e8603ed7f9c1b6be3c2d29 diff --git a/discretesampling/base/executor/executor.py b/discretesampling/base/executor/executor.py index 085359b..56caa35 100644 --- a/discretesampling/base/executor/executor.py +++ b/discretesampling/base/executor/executor.py @@ -26,6 +26,9 @@ def logsumexp(self, x): def cumsum(self, x): return np.cumsum(x) + def logcumsumexp(self, x): + return np.logaddexp.accumulate(x) + def redistribute(self, particles, ncopies): particles = list(itertools.chain.from_iterable( [[particles[i]]*ncopies[i] for i in range(len(particles))] diff --git a/discretesampling/base/executor/executor_MPI.py b/discretesampling/base/executor/executor_MPI.py index 935a64f..d661123 100644 --- a/discretesampling/base/executor/executor_MPI.py +++ b/discretesampling/base/executor/executor_MPI.py @@ -3,7 +3,8 @@ from scipy.special import logsumexp from discretesampling.base.executor.executor import Executor from discretesampling.base.executor.MPI.distributed_fixed_size_redistribution.prefix_sum import ( - inclusive_prefix_sum + inclusive_prefix_sum, + inclusive_prefix_logsumexp ) from discretesampling.base.executor.MPI.variable_size_redistribution import ( variable_size_redistribution @@ -59,5 +60,8 @@ def logsumexp(self, x): def cumsum(self, x): return inclusive_prefix_sum(x) + def logcumsumexp(self, x): + return inclusive_prefix_logsumexp(x) + def redistribute(self, particles, ncopies): return variable_size_redistribution(particles, ncopies, self) diff --git a/tests/test_executor.py b/tests/test_executor.py index cc41836..8a8e1fa 100644 --- a/tests/test_executor.py +++ b/tests/test_executor.py @@ -68,6 +68,17 @@ def test_cumsum(x, expected): assert all(calc == expected) +@pytest.mark.parametrize( + "x", + [(np.array([1., 2., 3., 4., 5., 6., 7., 8.])), + (np.array([-1., -2., -3., -4., -5, -6., -7., -8.]))] +) +def test_logcumsumexp(x): + calc = Executor().logcumsumexp(x) + expected = np.logaddexp.accumulate(x) + np.testing.assert_array_equal(calc, expected) + + @pytest.mark.parametrize( "particles,ncopies,expected", [([ExampleParticleClass(x) for x in ["a", "b", "c", "d", "e"]], np.array( diff --git a/tests/test_executor_MPI.py b/tests/test_executor_MPI.py index d26b325..f0f264d 100644 --- a/tests/test_executor_MPI.py +++ b/tests/test_executor_MPI.py @@ -77,7 +77,7 @@ def test_logsumexp(x, expected): first, last = np.array(split_across_cores(len(x), exec.P, exec.rank)) local_x = x[first:(last+1)] calc = exec.logsumexp(local_x) - np.testing.assert_almost_equal(calc, expected) # use almost_equal for numerical inaccuracy + np.testing.assert_almost_equal(calc, expected, 12) # use almost_equal for numerical inaccuracy @pytest.mark.mpi @@ -95,6 +95,24 @@ def test_cumsum(x, expected): assert all(calc == local_expected) +@pytest.mark.mpi +@pytest.mark.parametrize( + "x", + [(np.array([1., 2., 3., 4., 5., 6., 7., 8.])), + (np.array([-1., -2., -3., -4., -5, -6., -7., -8.])), + (np.array(range(1, 1025)) * 1.0)] +) +def test_logcumsumexp(x): + exec = Executor_MPI() + expected = np.logaddexp.accumulate(x) + first, last = np.array(split_across_cores(len(x), exec.P, exec.rank)) + local_x = x[first:(last+1)] + + calc = exec.logcumsumexp(local_x) + local_expected = expected[first:(last+1)] + np.testing.assert_array_almost_equal(calc, local_expected, 12) + + @pytest.mark.mpi @pytest.mark.parametrize( "particles,ncopies,expected", # Only functions for equal numbers per core diff --git a/tests/test_smc_components.py b/tests/test_smc_components.py index 6b0efdb..d3a37fc 100644 --- a/tests/test_smc_components.py +++ b/tests/test_smc_components.py @@ -6,7 +6,7 @@ from discretesampling.base.types import DiscreteVariable from discretesampling.base.algorithms.smc_components.effective_sample_size import ess from discretesampling.base.algorithms.smc_components.normalisation import normalise -from discretesampling.base.algorithms.smc_components.resampling import systematic_resampling, check_stability +from discretesampling.base.algorithms.smc_components.resampling import systematic_resampling, check_stability, calculate_cdf class ExampleParticleClass(DiscreteVariable): @@ -127,5 +127,32 @@ def test_check_stability_MPI(ncopies, expected): local_expected = expected[first:(last+1)] assert all(local_check_ncopies == local_expected) + +@pytest.mark.parametrize( + "logw", + [(np.array([-np.log(8)]*8))] +) +def test_calculate_cdf(logw): + exec = Executor() + expected = np.exp(np.logaddexp.accumulate(logw))*len(logw) # numerically stable calculation + cdf = calculate_cdf(logw, exec) + np.testing.assert_array_equal(cdf, expected) + + +@pytest.mark.mpi +@pytest.mark.parametrize( + "logw", + [(np.array([-np.log(8)]*8))] +) +def test_calculate_cdf_MPI(logw): + exec = Executor_MPI() + expected = np.exp(np.logaddexp.accumulate(logw))*len(logw) # numerically stable calculation + first, last = np.array(split_across_cores(len(logw), exec.P, exec.rank)) + local_logw = logw[first:(last+1)] + local_check_cdf = calculate_cdf(local_logw, exec) + local_expected = expected[first:(last+1)] + np.testing.assert_array_equal(local_check_cdf, local_expected) + + # TODO: # get_number_of_copies