From 247b8de9b7971c959d532303c7a92ccb322e45eb Mon Sep 17 00:00:00 2001 From: Ricardo Vieira Date: Sat, 8 Oct 2022 14:05:24 +0200 Subject: [PATCH] Extend RandomWalk to handle multivariate cases Also create abstract class for predefined RandomWalks --- pymc/distributions/timeseries.py | 179 ++++++--- pymc/tests/distributions/test_timeseries.py | 402 ++++++++++++++------ 2 files changed, 404 insertions(+), 177 deletions(-) diff --git a/pymc/distributions/timeseries.py b/pymc/distributions/timeseries.py index b294d07613c..8d0d6847804 100644 --- a/pymc/distributions/timeseries.py +++ b/pymc/distributions/timeseries.py @@ -11,8 +11,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import abc import warnings +from abc import ABCMeta from typing import Optional import aesara @@ -37,6 +39,7 @@ from pymc.distributions.shape_utils import ( _change_dist_size, change_dist_size, + get_support_shape, get_support_shape_1d, to_tuple, ) @@ -69,61 +72,108 @@ class RandomWalk(Distribution): rv_type = RandomWalkRV - def __new__(cls, *args, steps=None, **kwargs): - steps = get_support_shape_1d( - support_shape=steps, + def __new__(cls, *args, innovation_dist, steps=None, **kwargs): + steps = cls.get_steps( + innovation_dist=innovation_dist, + steps=steps, shape=None, # Shape will be checked in `cls.dist` - dims=kwargs.get("dims", None), - observed=kwargs.get("observed", None), - support_shape_offset=1, + dims=kwargs.get("dims"), + observed=kwargs.get("observed"), ) - return super().__new__(cls, *args, steps=steps, **kwargs) + + return super().__new__(cls, *args, innovation_dist=innovation_dist, steps=steps, **kwargs) @classmethod def dist(cls, init_dist, innovation_dist, steps=None, **kwargs) -> at.TensorVariable: - steps = get_support_shape_1d( - support_shape=steps, - shape=kwargs.get("shape"), - support_shape_offset=1, - ) - if steps is None: - raise ValueError("Must specify steps or shape parameter") - steps = at.as_tensor_variable(intX(steps)) - if not ( isinstance(init_dist, at.TensorVariable) and init_dist.owner is not None and isinstance(init_dist.owner.op, (RandomVariable, SymbolicRandomVariable)) - # TODO: Lift univariate constraint on init_dist - and init_dist.owner.op.ndim_supp == 0 ): - raise TypeError("init_dist must be a univariate distribution variable") + raise TypeError("init_dist must be a distribution variable") check_dist_not_registered(init_dist) if not ( isinstance(innovation_dist, at.TensorVariable) and innovation_dist.owner is not None and isinstance(innovation_dist.owner.op, (RandomVariable, SymbolicRandomVariable)) - and innovation_dist.owner.op.ndim_supp == 0 ): - raise TypeError("innovation_dist must be a univariate distribution variable") + raise TypeError("innovation_dist must be a distribution variable") check_dist_not_registered(innovation_dist) + if init_dist.owner.op.ndim_supp != innovation_dist.owner.op.ndim_supp: + raise TypeError( + "init_dist and innovation_dist must have the same support dimensionality" + ) + + steps = cls.get_steps( + innovation_dist=innovation_dist, + steps=steps, + shape=kwargs.get("shape"), + dims=None, + observed=None, + ) + if steps is None: + raise ValueError("Must specify steps or shape parameter") + steps = at.as_tensor_variable(intX(steps)) + return super().dist([init_dist, innovation_dist, steps], **kwargs) + @classmethod + def get_steps(cls, innovation_dist, steps, shape, dims, observed): + # We need to know the ndim_supp of the innovation_dist + if not ( + isinstance(innovation_dist, at.TensorVariable) + and innovation_dist.owner is not None + and isinstance(innovation_dist.owner.op, (RandomVariable, SymbolicRandomVariable)) + ): + raise TypeError("innovation_dist must be a distribution variable") + + dist_ndim_supp = innovation_dist.owner.op.ndim_supp + dist_shape = tuple(innovation_dist.shape) + support_shape = None + if steps is not None: + support_shape = (steps,) + (dist_shape[len(dist_shape) - dist_ndim_supp :]) + support_shape = get_support_shape( + support_shape=support_shape, + shape=shape, + dims=dims, + observed=observed, + support_shape_offset=1, + ndim_supp=dist_ndim_supp + 1, + ) + if support_shape is not None: + steps = support_shape[-dist_ndim_supp - 1] + return steps + @classmethod def rv_op(cls, init_dist, innovation_dist, steps, size=None): if not steps.ndim == 0 or not steps.dtype.startswith("int"): raise ValueError("steps must be an integer scalar (ndim=0).") + dist_ndim_supp = init_dist.owner.op.ndim_supp + init_dist_shape = tuple(init_dist.shape) + init_dist_batch_shape = init_dist_shape[: len(init_dist_shape) - dist_ndim_supp] + innovation_dist_shape = tuple(innovation_dist.shape) + innovation_batch_shape = innovation_dist_shape[ + : len(innovation_dist_shape) - dist_ndim_supp + ] + + ndim_supp = dist_ndim_supp + 1 + # If not explicit, size is determined by the shapes of the input distributions if size is None: - size = at.broadcast_shape(init_dist, at.atleast_1d(innovation_dist)[..., 0]) - innovation_size = tuple(size) + (steps,) + size = at.broadcast_shape( + init_dist_batch_shape, innovation_batch_shape, arrays_are_shapes=True + ) - # Resize input distributions - init_dist = change_dist_size(init_dist, size) - innovation_dist = change_dist_size(innovation_dist, innovation_size) + # Resize input distributions. We will size them to (T, B, S) in order + # to safely take random draws. We later swap the steps dimension so + # that the final distribution will follow (B, T, S) + # init_dist must have shape (1, B, S) + init_dist = change_dist_size(init_dist, (1, *size)) + # innovation_dist must have shape (T-1, B, S) + innovation_dist = change_dist_size(innovation_dist, (steps, *size)) # Create SymbolicRV init_dist_, innovation_dist_, steps_ = ( @@ -131,14 +181,22 @@ def rv_op(cls, init_dist, innovation_dist, steps, size=None): innovation_dist.type(), steps.type(), ) - grw_ = at.concatenate([init_dist_[..., None], innovation_dist_], axis=-1) - grw_ = at.cumsum(grw_, axis=-1) + # Aeppl can only infer the logp of a dimshuffled variables, if the dimshuffle is + # done directly on top of a RandomVariable. Because of this we dimshuffle the + # distributions and only then concatenate them, instead of the other way around. + # shape = (B, 1, S) + init_dist_dimswapped_ = at.moveaxis(init_dist_, 0, -ndim_supp) + # shape = (B, T-1, S) + innovation_dist_dimswapped_ = at.moveaxis(innovation_dist_, 0, -ndim_supp) + # shape = (B, T, S) + grw_ = at.concatenate([init_dist_dimswapped_, innovation_dist_dimswapped_], axis=-ndim_supp) + grw_ = at.cumsum(grw_, axis=-ndim_supp) return RandomWalkRV( [init_dist_, innovation_dist_, steps_], # We pass steps_ through just so we can keep a reference to it, even though # it's no longer needed at this point [grw_, steps_], - ndim_supp=1, + ndim_supp=ndim_supp, )(init_dist, innovation_dist, steps) @@ -146,17 +204,24 @@ def rv_op(cls, init_dist, innovation_dist, steps, size=None): def change_random_walk_size(op, dist, new_size, expand): init_dist, innovation_dist, steps = dist.owner.inputs if expand: - old_size = init_dist.shape + old_shape = tuple(dist.shape) + old_size = old_shape[: len(old_shape) - op.ndim_supp] new_size = tuple(new_size) + tuple(old_size) return RandomWalk.rv_op(init_dist, innovation_dist, steps, size=new_size) @_moment.register(RandomWalkRV) def random_walk_moment(op, rv, init_dist, innovation_dist, steps): - grw_moment = at.zeros_like(rv) - grw_moment = at.set_subtensor(grw_moment[..., 0], moment(init_dist)) - grw_moment = at.set_subtensor(grw_moment[..., 1:], moment(innovation_dist)) - return at.cumsum(grw_moment, axis=-1) + # shape = (1, B, S) + init_moment = moment(init_dist) + # shape = (T-1, B, S) + innovation_moment = moment(innovation_dist) + # shape = (T, B, S) + grw_moment = at.concatenate([init_moment, innovation_moment], axis=0) + grw_moment = at.cumsum(grw_moment, axis=0) + # shape = (B, T, S) + grw_moment = at.moveaxis(grw_moment, 0, -op.ndim_supp) + return grw_moment @_logprob.register(RandomWalkRV) @@ -173,7 +238,25 @@ def random_walk_logp(op, values, *inputs, **kwargs): return logp(rv, value).sum(axis=-1) -class GaussianRandomWalk: +class PredefinedRandomWalk(ABCMeta): + """Base class for predefined RandomWalk distributions""" + + def __new__(cls, name, *args, **kwargs): + init_dist, innovation_dist, kwargs = cls.get_dists(*args, **kwargs) + return RandomWalk(name, init_dist=init_dist, innovation_dist=innovation_dist, **kwargs) + + @classmethod + def dist(cls, *args, **kwargs) -> at.TensorVariable: + init_dist, innovation_dist, kwargs = cls.get_dists(*args, **kwargs) + return RandomWalk.dist(init_dist=init_dist, innovation_dist=innovation_dist, **kwargs) + + @classmethod + @abc.abstractmethod + def get_dists(cls, *args, **kwargs): + pass + + +class GaussianRandomWalk(PredefinedRandomWalk): r"""Random Walk with Normal innovations. Parameters @@ -186,32 +269,15 @@ class GaussianRandomWalk: Unnamed univariate distribution of the initial value. Unnamed refers to distributions created with the ``.dist()`` API. - .. warning:: init will be cloned, rendering them independent of the ones passed as input. + .. warning:: init_dist will be cloned, rendering them independent of the ones passed as input. steps : int, optional Number of steps in Gaussian Random Walk (steps > 0). Only needed if shape is not provided. """ - def __new__(cls, name, mu=0.0, sigma=1.0, *, init_dist=None, steps=None, **kwargs): - init_dist, innovation_dist, kwargs = cls.get_dists( - mu=mu, sigma=sigma, init_dist=init_dist, **kwargs - ) - return RandomWalk( - name, init_dist=init_dist, innovation_dist=innovation_dist, steps=steps, **kwargs - ) - - @classmethod - def dist(cls, mu=0.0, sigma=1.0, *, init_dist=None, steps=None, **kwargs) -> at.TensorVariable: - init_dist, innovation_dist, kwargs = cls.get_dists( - mu=mu, sigma=sigma, init_dist=init_dist, **kwargs - ) - return RandomWalk.dist( - init_dist=init_dist, innovation_dist=innovation_dist, steps=steps, **kwargs - ) - @classmethod - def get_dists(cls, *, mu, sigma, init_dist, **kwargs): + def get_dists(cls, mu=0.0, sigma=1.0, *, init_dist=None, **kwargs): if "init" in kwargs: warnings.warn( "init parameter is now called init_dist. Using init will raise an error in a future release.", @@ -219,7 +285,6 @@ def get_dists(cls, *, mu, sigma, init_dist, **kwargs): ) init_dist = kwargs.pop("init") - # If no scalar distribution is passed then initialize with a Normal of same mu and sigma if init_dist is None: warnings.warn( "Initial distribution not specified, defaulting to `Normal.dist(0, 100)`." @@ -228,11 +293,9 @@ def get_dists(cls, *, mu, sigma, init_dist, **kwargs): ) init_dist = Normal.dist(0, 100) - # Add one dimension to the right, so that mu and sigma broadcast safely along - # the steps dimension mu = at.as_tensor_variable(mu) sigma = at.as_tensor_variable(sigma) - innovation_dist = Normal.dist(mu=mu[..., None], sigma=sigma[..., None]) + innovation_dist = Normal.dist(mu=mu, sigma=sigma) return init_dist, innovation_dist, kwargs diff --git a/pymc/tests/distributions/test_timeseries.py b/pymc/tests/distributions/test_timeseries.py index aad537e7f5f..729991f1b61 100644 --- a/pymc/tests/distributions/test_timeseries.py +++ b/pymc/tests/distributions/test_timeseries.py @@ -12,20 +12,22 @@ # See the License for the specific language governing permissions and # limitations under the License. import itertools as it -import warnings import aesara import numpy as np import pytest import scipy.stats as st +from aesara.tensor.random.op import RandomVariable + import pymc as pm +from pymc import MutableData from pymc.aesaraf import floatX -from pymc.distributions.continuous import Flat, HalfNormal, Normal +from pymc.distributions.continuous import Flat, HalfNormal, Normal, Uniform from pymc.distributions.discrete import DiracDelta from pymc.distributions.logprob import logp -from pymc.distributions.multivariate import Dirichlet +from pymc.distributions.multivariate import Dirichlet, MvNormal from pymc.distributions.shape_utils import change_dist_size, to_tuple from pymc.distributions.timeseries import ( AR, @@ -36,13 +38,7 @@ ) from pymc.model import Model from pymc.sampling import draw, sample, sample_posterior_predictive -from pymc.tests.distributions.util import ( - R, - Rplus, - Vector, - assert_moment_is_expected, - check_logp, -) +from pymc.tests.distributions.util import assert_moment_is_expected from pymc.tests.helpers import SeededTest, select_by_precision @@ -53,16 +49,24 @@ def test_dists_types(self): with pytest.raises( TypeError, - match="init_dist must be a univariate distribution variable", + match="init_dist must be a distribution variable", ): RandomWalk.dist(init_dist=5, innovation_dist=innovation_dist, steps=5) with pytest.raises( TypeError, - match="innovation_dist must be a univariate distribution variable", + match="innovation_dist must be a distribution variable", ): RandomWalk.dist(init_dist=init_dist, innovation_dist=5, steps=5) + init_dist = MvNormal.dist([0], [[1]]) + innovation_dist = Normal.dist(size=(1,)) + with pytest.raises( + TypeError, + match="init_dist and innovation_dist must have the same support dimensionality", + ): + RandomWalk.dist(init_dist=init_dist, innovation_dist=innovation_dist, steps=5) + def test_dists_not_registered_check(self): with Model(): init = Normal("init") @@ -82,7 +86,98 @@ def test_dists_not_registered_check(self): ): RandomWalk("rw", init_dist=init_dist, innovation_dist=innovation, steps=5) - def test_change_size(self): + @pytest.mark.parametrize( + "init_dist, innovation_dist, steps, size, shape, " + "init_dist_size, innovation_dist_size, rw_shape", + [ + (Normal.dist(), Normal.dist(), 1, None, None, 1, 1, (2,)), + (Normal.dist(), Normal.dist(), 1, None, (2,), 1, 1, (2,)), + (Normal.dist(), Normal.dist(), 3, (5,), None, 5, 5 * 3, (5, 4)), + (Normal.dist(), Normal.dist(), 3, None, (5, 4), 5, 5 * 3, (5, 4)), + (Normal.dist(), Normal.dist(mu=np.zeros(3)), 1, None, None, 3, 3, (3, 2)), + ( + Normal.dist(), + Normal.dist(mu=np.zeros((3, 1)), sigma=np.ones(2)), + 4, + None, + None, + 3 * 2, + 3 * 2 * 4, + (3, 2, 5), + ), + (Normal.dist(size=(2, 3)), Normal.dist(), 4, None, None, 2 * 3, 2 * 3 * 4, (2, 3, 5)), + (Dirichlet.dist(np.ones(3)), Dirichlet.dist(np.ones(3)), 1, None, None, 3, 3, (2, 3)), + (Dirichlet.dist(np.ones(3)), Dirichlet.dist(np.ones(3)), 1, None, (2, 3), 3, 3, (2, 3)), + ( + Dirichlet.dist(np.ones(3)), + Dirichlet.dist(np.ones(3)), + 4, + (6,), + None, + 6 * 3, + 6 * 4 * 3, + (6, 5, 3), + ), + ( + Dirichlet.dist(np.ones(3)), + Dirichlet.dist(np.ones(3)), + 4, + None, + (6, 5, 3), + 6 * 3, + 6 * 4 * 3, + (6, 5, 3), + ), + ( + Dirichlet.dist(np.ones(3)), + Dirichlet.dist(np.ones((4, 3))), + 1, + None, + None, + 4 * 3, + 4 * 3, + (4, 2, 3), + ), + ( + Dirichlet.dist(np.ones((2, 3))), + Dirichlet.dist(np.ones(3)), + 4, + None, + None, + 2 * 3, + 2 * 4 * 3, + (2, 5, 3), + ), + ], + ) + def test_dist_sizes( + self, + init_dist, + innovation_dist, + steps, + size, + shape, + init_dist_size, + innovation_dist_size, + rw_shape, + ): + """Test that init and innovation dists are properly resized""" + rw = RandomWalk.dist( + steps=steps, + init_dist=init_dist, + innovation_dist=innovation_dist, + size=size, + shape=shape, + ) + init_dist, innovation_dist = rw.owner.inputs[:2] + # Check the inputs are pure RandomVariable's (and not e.g., Broadcasts) + assert isinstance(init_dist.owner.op, RandomVariable) + assert isinstance(innovation_dist.owner.op, RandomVariable) + assert init_dist.size.eval() == init_dist_size + assert innovation_dist.size.eval() == innovation_dist_size + assert tuple(rw.shape.eval()) == rw_shape + + def test_change_size_univariate(self): init_dist = Normal.dist() innovation_dist = Normal.dist() @@ -95,143 +190,212 @@ def test_change_size(self): new_rw = change_dist_size(rw, new_size=(4, 3), expand=True) assert tuple(new_rw.shape.eval()) == (4, 3, 5, 100) + def test_change_size_multivariate(self): + init_dist = Dirichlet.dist([1, 1, 1]) + innovation_dist = Dirichlet.dist([1, 1, 1]) -class TestGaussianRandomWalk: - def test_logp(self): - def ref_logp(value, mu, sigma): - return ( - st.norm.logpdf(value[0], 0, 100) # default init_dist has a scale 100 - + st.norm.logpdf(np.diff(value), mu, sigma).sum() - ) - - check_logp( - pm.GaussianRandomWalk, - Vector(R, 4), - {"mu": R, "sigma": Rplus}, - ref_logp, - decimal=select_by_precision(float64=6, float32=1), - extra_args={"steps": 3, "init_dist": Normal.dist(0, 100)}, + # size = 5 + rw = RandomWalk.dist( + init_dist=init_dist, innovation_dist=innovation_dist, shape=(5, 100, 3) ) - def test_gaussianrandomwalk_inference(self): - mu, sigma, steps = 2, 1, 1000 - obs = np.concatenate([[0], np.random.normal(mu, sigma, size=steps)]).cumsum() - - with pm.Model(): - _mu = pm.Uniform("mu", -10, 10) - _sigma = pm.Uniform("sigma", 0, 10) - - obs_data = pm.MutableData("obs_data", obs) - grw = GaussianRandomWalk( - "grw", _mu, _sigma, steps=steps, observed=obs_data, init_dist=Normal.dist(0, 100) - ) - - trace = pm.sample(chains=1) - - recovered_mu = trace.posterior["mu"].mean() - recovered_sigma = trace.posterior["sigma"].mean() - np.testing.assert_allclose([mu, sigma], [recovered_mu, recovered_sigma], atol=0.2) - - @pytest.mark.parametrize("init", [None, pm.Normal.dist()]) - def test_gaussian_random_walk_init_dist_shape(self, init): - """Test that init_dist is properly resized""" - with warnings.catch_warnings(): - warnings.filterwarnings("ignore", "Initial distribution not specified.*", UserWarning) - grw = pm.GaussianRandomWalk.dist(mu=0, sigma=1, steps=1, init_dist=init) - assert tuple(grw.owner.inputs[0].shape.eval()) == () - - grw = pm.GaussianRandomWalk.dist(mu=0, sigma=1, steps=1, init_dist=init, size=(5,)) - assert tuple(grw.owner.inputs[0].shape.eval()) == (5,) - - grw = pm.GaussianRandomWalk.dist(mu=0, sigma=1, steps=1, init_dist=init, shape=2) - assert tuple(grw.owner.inputs[0].shape.eval()) == () - - grw = pm.GaussianRandomWalk.dist(mu=0, sigma=1, steps=1, init_dist=init, shape=(5, 2)) - assert tuple(grw.owner.inputs[0].shape.eval()) == (5,) + new_rw = change_dist_size(rw, new_size=(7,)) + assert tuple(new_rw.shape.eval()) == (7, 100, 3) - grw = pm.GaussianRandomWalk.dist(mu=[0, 0], sigma=1, steps=1, init_dist=init) - assert tuple(grw.owner.inputs[0].shape.eval()) == (2,) + new_rw = change_dist_size(rw, new_size=(4, 3), expand=True) + assert tuple(new_rw.shape.eval()) == (4, 3, 5, 100, 3) - grw = pm.GaussianRandomWalk.dist(mu=0, sigma=[1, 1], steps=1, init_dist=init) - assert tuple(grw.owner.inputs[0].shape.eval()) == (2,) + @pytest.mark.parametrize( + "init_dist, innovation_dist, shape, steps", + [ + (Normal.dist(), Normal.dist(), (5, 3), 2), + (Dirichlet.dist([1, 1, 1]), Dirichlet.dist([1, 1, 1]), (5, 3), 4), + ], + ) + @pytest.mark.parametrize("steps_source", ("shape", "dims", "observed")) + def test_infer_steps(self, init_dist, innovation_dist, shape, steps, steps_source): + shape_source_kwargs = dict(shape=None, dims=None, observed=None) + if steps_source == "shape": + shape_source_kwargs["shape"] = shape + elif steps_source == "dims": + shape_source_kwargs["dims"] = [f"dim{i}" for i in range(len(shape))] + elif steps_source == "observed": + shape_source_kwargs["observed"] = np.zeros(shape) + else: + raise ValueError - grw = pm.GaussianRandomWalk.dist( - mu=np.zeros((3, 1)), sigma=[1, 1], steps=1, init_dist=init + coords = {f"dim{i}": range(s) for i, s in enumerate(shape)} + with Model(coords=coords): + x = RandomWalk( + "x", init_dist=init_dist, innovation_dist=innovation_dist, **shape_source_kwargs ) - assert tuple(grw.owner.inputs[0].shape.eval()) == (3, 2) - - def test_gaussianrandomwalk_broadcasted_by_init_dist(self): - grw = pm.GaussianRandomWalk.dist( - mu=0, sigma=1, steps=4, init_dist=pm.Normal.dist(size=(2, 3)) - ) - assert tuple(grw.shape.eval()) == (2, 3, 5) - assert grw.eval().shape == (2, 3, 5) - - @pytest.mark.parametrize("shape", ((6,), (3, 6))) - def test_inferred_steps_from_shape(self, shape): - x = GaussianRandomWalk.dist(shape=shape, init_dist=Normal.dist(0, 100)) - steps = x.owner.inputs[-1] - assert steps.eval() == 5 + inferred_steps = x.owner.inputs[-1] + assert inferred_steps.eval().item() == steps - def test_missing_steps(self): + def test_infer_steps_error(self): with pytest.raises(ValueError, match="Must specify steps or shape parameter"): - GaussianRandomWalk.dist(shape=None, init_dist=Normal.dist(0, 100)) + RandomWalk.dist(init_dist=Normal.dist(), innovation_dist=Normal.dist()) - def test_inconsistent_steps_and_shape(self): + @pytest.mark.parametrize( + "init_dist, innovation_dist, steps, shape", + [ + (Normal.dist(), Normal.dist(), 12, (13, 42)), + (Dirichlet.dist([1, 1, 1]), Dirichlet.dist([1, 1, 1]), 12, (13, 42, 3)), + ], + ) + def test_inconsistent_steps_and_shape(self, init_dist, innovation_dist, steps, shape): with pytest.raises( AssertionError, match="support_shape does not match respective shape dimension" ): - x = GaussianRandomWalk.dist(steps=12, shape=45, init_dist=Normal.dist(0, 100)) - - def test_inferred_steps_from_dims(self): - with pm.Model(coords={"batch": range(5), "steps": range(20)}): - x = GaussianRandomWalk("x", dims=("batch", "steps"), init_dist=Normal.dist(0, 100)) - steps = x.owner.inputs[-1] - assert steps.eval() == 19 - - def test_inferred_steps_from_observed(self): - with pm.Model(): - x = GaussianRandomWalk("x", observed=np.zeros(10), init_dist=Normal.dist(0, 100)) - steps = x.owner.inputs[-1] - assert steps.eval() == 9 + RandomWalk.dist( + init_dist=init_dist, + innovation_dist=innovation_dist, + steps=steps, + shape=shape, + ).eval() @pytest.mark.parametrize( - "init", + "init_dist", [ pm.HalfNormal.dist(sigma=2), pm.StudentT.dist(nu=4, mu=1, sigma=0.5), ], ) - def test_gaussian_random_walk_init_dist_logp(self, init): - grw = pm.GaussianRandomWalk.dist(init_dist=init, steps=1) + def test_init_logp_univariate(self, init_dist): + rw = RandomWalk.dist(init_dist=init_dist, innovation_dist=Normal.dist(), steps=1) + assert np.isclose( + pm.logp(rw, [0, 0]).eval(), + pm.logp(init_dist, 0).eval() + st.norm.logpdf(0), + ) + + @pytest.mark.parametrize( + "init_dist", + [ + MvNormal.dist(mu=[1, 2, 3], cov=np.eye(3) * 1.5), + MvStudentT.dist(nu=4, mu=[1, 2, 3], scale=np.eye(3) * 0.5), + ], + ) + def test_init_logp_multivariate(self, init_dist): + rw = RandomWalk.dist( + init_dist=init_dist, + innovation_dist=MvNormal.dist(np.zeros(3), np.eye(3)), + steps=1, + ) + assert np.isclose( + pm.logp(rw, np.zeros((2, 3))).eval(), + pm.logp(init_dist, np.zeros(3)).eval() + + st.multivariate_normal.logpdf(np.zeros(3)).sum(), + ) + + def test_innovation_logp_univariate(self): + steps = 5 + dist = RandomWalk.dist( + init_dist=Normal.dist(0, 1), + innovation_dist=Normal.dist(1, 1), + shape=(steps,), + ) + assert np.isclose( + logp(dist, np.arange(5)).eval(), + logp(Normal.dist(0, 1), 0).eval() * steps, + ) + + def test_innovation_logp_multivariate(self): + steps = 5 + dist = RandomWalk.dist( + init_dist=MvNormal.dist(np.zeros(3), cov=np.eye(3)), + innovation_dist=MvNormal.dist(mu=np.ones(3), cov=np.eye(3)), + shape=(steps, 3), + ) assert np.isclose( - pm.logp(grw, [0, 0]).eval(), - pm.logp(init, 0).eval() + st.norm.logpdf(0), + logp(dist, np.full((3, 5), np.arange(5)).T).eval(), + logp(MvNormal.dist(np.zeros(3), cov=np.eye(3)), np.zeros(3)).eval() * steps, ) @pytest.mark.parametrize( - "mu, sigma, init_dist, steps, size, expected", + "init_dist, innovation_dist, steps, size, expected, check_finite_logp", [ - (0, 1, Normal.dist(1), 10, None, np.ones((11,))), - (1, 1, Normal.dist(0), 10, (2,), np.full((2, 11), np.arange(11))), - (1, 1, Normal.dist([0, 1]), 10, None, np.vstack((np.arange(11), np.arange(11) + 1))), - (0, [1, 1], Normal.dist(0), 10, None, np.zeros((2, 11))), + (Normal.dist(-10), Normal.dist(1), 9, None, np.arange(-10, 0), True), ( - [1, -1], - 1, - Normal.dist(0), - 10, - (4, 2), - np.full((4, 2, 11), np.vstack((np.arange(11), -np.arange(11)))), + Normal.dist(-10), + Normal.dist(1), + 9, + (5, 3), + np.full((5, 3, 10), np.arange(-10, 0)), + True, + ), + ( + Normal.dist([-10, 0]), + Normal.dist(1), + 9, + None, + np.concatenate( + [[np.arange(-10, 0)], [np.arange(0, 10)]], + axis=0, + ), + True, + ), + ( + MvNormal.dist([-10, 0, 10], np.eye(3)), + MvNormal.dist([1, 2, 3], np.eye(3)), + 9, + None, + np.concatenate( + [[np.arange(-10, 0, 1)], [np.arange(0, 20, 2)], [np.arange(10, 40, 3)]], + axis=0, + ).T, + True, + ), + ( + MvNormal.dist([-10, 0, 10], np.eye(3)), + MvNormal.dist([1, 2, 3], np.eye(3)), + 9, + (5, 4), + np.full( + (5, 4, 10, 3), + np.concatenate( + [[np.arange(-10, 0, 1)], [np.arange(0, 20, 2)], [np.arange(10, 40, 3)]], + axis=0, + ).T, + ), + False, # MvNormal logp only supports 2D values ), ], ) - def test_moment(self, mu, sigma, init_dist, steps, size, expected): + def test_moment(self, init_dist, innovation_dist, steps, size, expected, check_finite_logp): with Model() as model: - GaussianRandomWalk("x", mu=mu, sigma=sigma, init_dist=init_dist, steps=steps, size=size) - assert_moment_is_expected(model, expected) + RandomWalk( + "x", init_dist=init_dist, innovation_dist=innovation_dist, steps=steps, size=size + ) + assert_moment_is_expected(model, expected, check_finite_logp=check_finite_logp) - def test_init_deprecated_arg(self): + +class TestPredefinedRandomWalk: + def test_gaussian(self): + x = GaussianRandomWalk.dist(mu=0, sigma=1, init_dist=Flat.dist(), steps=5) + init_dist, innovation_dist = x.owner.inputs[:2] + assert isinstance(init_dist.owner.op, Flat) + assert isinstance(innovation_dist.owner.op, Normal) + + def test_gaussian_inference(self): + mu, sigma, steps = 2, 1, 1000 + obs = np.concatenate([[0], np.random.normal(mu, sigma, size=steps)]).cumsum() + + with Model(): + _mu = Uniform("mu", -10, 10) + _sigma = Uniform("sigma", 0, 10) + + obs_data = MutableData("obs_data", obs) + grw = GaussianRandomWalk( + "grw", _mu, _sigma, steps=steps, observed=obs_data, init_dist=Normal.dist(0, 100) + ) + + trace = sample(chains=1) + + recovered_mu = trace.posterior["mu"].mean() + recovered_sigma = trace.posterior["sigma"].mean() + np.testing.assert_allclose([mu, sigma], [recovered_mu, recovered_sigma], atol=0.2) + + def test_gaussian_init_deprecated_arg(self): with pytest.warns(FutureWarning, match="init parameter is now called init_dist"): pm.GaussianRandomWalk.dist(init=Normal.dist(), shape=(10,))