Skip to content

Commit

Permalink
Extend RandomWalk to handle multivariate cases
Browse files Browse the repository at this point in the history
Also create abstract class for predefined RandomWalks
  • Loading branch information
ricardoV94 committed Oct 9, 2022
1 parent 0e159a6 commit 247b8de
Show file tree
Hide file tree
Showing 2 changed files with 404 additions and 177 deletions.
179 changes: 121 additions & 58 deletions pymc/distributions/timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import warnings

from abc import ABCMeta
from typing import Optional

import aesara
Expand All @@ -37,6 +39,7 @@
from pymc.distributions.shape_utils import (
_change_dist_size,
change_dist_size,
get_support_shape,
get_support_shape_1d,
to_tuple,
)
Expand Down Expand Up @@ -69,94 +72,156 @@ class RandomWalk(Distribution):

rv_type = RandomWalkRV

def __new__(cls, *args, steps=None, **kwargs):
steps = get_support_shape_1d(
support_shape=steps,
def __new__(cls, *args, innovation_dist, steps=None, **kwargs):
steps = cls.get_steps(
innovation_dist=innovation_dist,
steps=steps,
shape=None, # Shape will be checked in `cls.dist`
dims=kwargs.get("dims", None),
observed=kwargs.get("observed", None),
support_shape_offset=1,
dims=kwargs.get("dims"),
observed=kwargs.get("observed"),
)
return super().__new__(cls, *args, steps=steps, **kwargs)

return super().__new__(cls, *args, innovation_dist=innovation_dist, steps=steps, **kwargs)

@classmethod
def dist(cls, init_dist, innovation_dist, steps=None, **kwargs) -> at.TensorVariable:
steps = get_support_shape_1d(
support_shape=steps,
shape=kwargs.get("shape"),
support_shape_offset=1,
)
if steps is None:
raise ValueError("Must specify steps or shape parameter")
steps = at.as_tensor_variable(intX(steps))

if not (
isinstance(init_dist, at.TensorVariable)
and init_dist.owner is not None
and isinstance(init_dist.owner.op, (RandomVariable, SymbolicRandomVariable))
# TODO: Lift univariate constraint on init_dist
and init_dist.owner.op.ndim_supp == 0
):
raise TypeError("init_dist must be a univariate distribution variable")
raise TypeError("init_dist must be a distribution variable")
check_dist_not_registered(init_dist)

if not (
isinstance(innovation_dist, at.TensorVariable)
and innovation_dist.owner is not None
and isinstance(innovation_dist.owner.op, (RandomVariable, SymbolicRandomVariable))
and innovation_dist.owner.op.ndim_supp == 0
):
raise TypeError("innovation_dist must be a univariate distribution variable")
raise TypeError("innovation_dist must be a distribution variable")
check_dist_not_registered(innovation_dist)

if init_dist.owner.op.ndim_supp != innovation_dist.owner.op.ndim_supp:
raise TypeError(
"init_dist and innovation_dist must have the same support dimensionality"
)

steps = cls.get_steps(
innovation_dist=innovation_dist,
steps=steps,
shape=kwargs.get("shape"),
dims=None,
observed=None,
)
if steps is None:
raise ValueError("Must specify steps or shape parameter")
steps = at.as_tensor_variable(intX(steps))

return super().dist([init_dist, innovation_dist, steps], **kwargs)

@classmethod
def get_steps(cls, innovation_dist, steps, shape, dims, observed):
# We need to know the ndim_supp of the innovation_dist
if not (
isinstance(innovation_dist, at.TensorVariable)
and innovation_dist.owner is not None
and isinstance(innovation_dist.owner.op, (RandomVariable, SymbolicRandomVariable))
):
raise TypeError("innovation_dist must be a distribution variable")

dist_ndim_supp = innovation_dist.owner.op.ndim_supp
dist_shape = tuple(innovation_dist.shape)
support_shape = None
if steps is not None:
support_shape = (steps,) + (dist_shape[len(dist_shape) - dist_ndim_supp :])
support_shape = get_support_shape(
support_shape=support_shape,
shape=shape,
dims=dims,
observed=observed,
support_shape_offset=1,
ndim_supp=dist_ndim_supp + 1,
)
if support_shape is not None:
steps = support_shape[-dist_ndim_supp - 1]
return steps

@classmethod
def rv_op(cls, init_dist, innovation_dist, steps, size=None):
if not steps.ndim == 0 or not steps.dtype.startswith("int"):
raise ValueError("steps must be an integer scalar (ndim=0).")

dist_ndim_supp = init_dist.owner.op.ndim_supp
init_dist_shape = tuple(init_dist.shape)
init_dist_batch_shape = init_dist_shape[: len(init_dist_shape) - dist_ndim_supp]
innovation_dist_shape = tuple(innovation_dist.shape)
innovation_batch_shape = innovation_dist_shape[
: len(innovation_dist_shape) - dist_ndim_supp
]

ndim_supp = dist_ndim_supp + 1

# If not explicit, size is determined by the shapes of the input distributions
if size is None:
size = at.broadcast_shape(init_dist, at.atleast_1d(innovation_dist)[..., 0])
innovation_size = tuple(size) + (steps,)
size = at.broadcast_shape(
init_dist_batch_shape, innovation_batch_shape, arrays_are_shapes=True
)

# Resize input distributions
init_dist = change_dist_size(init_dist, size)
innovation_dist = change_dist_size(innovation_dist, innovation_size)
# Resize input distributions. We will size them to (T, B, S) in order
# to safely take random draws. We later swap the steps dimension so
# that the final distribution will follow (B, T, S)
# init_dist must have shape (1, B, S)
init_dist = change_dist_size(init_dist, (1, *size))
# innovation_dist must have shape (T-1, B, S)
innovation_dist = change_dist_size(innovation_dist, (steps, *size))

# Create SymbolicRV
init_dist_, innovation_dist_, steps_ = (
init_dist.type(),
innovation_dist.type(),
steps.type(),
)
grw_ = at.concatenate([init_dist_[..., None], innovation_dist_], axis=-1)
grw_ = at.cumsum(grw_, axis=-1)
# Aeppl can only infer the logp of a dimshuffled variables, if the dimshuffle is
# done directly on top of a RandomVariable. Because of this we dimshuffle the
# distributions and only then concatenate them, instead of the other way around.
# shape = (B, 1, S)
init_dist_dimswapped_ = at.moveaxis(init_dist_, 0, -ndim_supp)
# shape = (B, T-1, S)
innovation_dist_dimswapped_ = at.moveaxis(innovation_dist_, 0, -ndim_supp)
# shape = (B, T, S)
grw_ = at.concatenate([init_dist_dimswapped_, innovation_dist_dimswapped_], axis=-ndim_supp)
grw_ = at.cumsum(grw_, axis=-ndim_supp)
return RandomWalkRV(
[init_dist_, innovation_dist_, steps_],
# We pass steps_ through just so we can keep a reference to it, even though
# it's no longer needed at this point
[grw_, steps_],
ndim_supp=1,
ndim_supp=ndim_supp,
)(init_dist, innovation_dist, steps)


@_change_dist_size.register(RandomWalkRV)
def change_random_walk_size(op, dist, new_size, expand):
init_dist, innovation_dist, steps = dist.owner.inputs
if expand:
old_size = init_dist.shape
old_shape = tuple(dist.shape)
old_size = old_shape[: len(old_shape) - op.ndim_supp]
new_size = tuple(new_size) + tuple(old_size)
return RandomWalk.rv_op(init_dist, innovation_dist, steps, size=new_size)


@_moment.register(RandomWalkRV)
def random_walk_moment(op, rv, init_dist, innovation_dist, steps):
grw_moment = at.zeros_like(rv)
grw_moment = at.set_subtensor(grw_moment[..., 0], moment(init_dist))
grw_moment = at.set_subtensor(grw_moment[..., 1:], moment(innovation_dist))
return at.cumsum(grw_moment, axis=-1)
# shape = (1, B, S)
init_moment = moment(init_dist)
# shape = (T-1, B, S)
innovation_moment = moment(innovation_dist)
# shape = (T, B, S)
grw_moment = at.concatenate([init_moment, innovation_moment], axis=0)
grw_moment = at.cumsum(grw_moment, axis=0)
# shape = (B, T, S)
grw_moment = at.moveaxis(grw_moment, 0, -op.ndim_supp)
return grw_moment


@_logprob.register(RandomWalkRV)
Expand All @@ -173,7 +238,25 @@ def random_walk_logp(op, values, *inputs, **kwargs):
return logp(rv, value).sum(axis=-1)


class GaussianRandomWalk:
class PredefinedRandomWalk(ABCMeta):
"""Base class for predefined RandomWalk distributions"""

def __new__(cls, name, *args, **kwargs):
init_dist, innovation_dist, kwargs = cls.get_dists(*args, **kwargs)
return RandomWalk(name, init_dist=init_dist, innovation_dist=innovation_dist, **kwargs)

@classmethod
def dist(cls, *args, **kwargs) -> at.TensorVariable:
init_dist, innovation_dist, kwargs = cls.get_dists(*args, **kwargs)
return RandomWalk.dist(init_dist=init_dist, innovation_dist=innovation_dist, **kwargs)

@classmethod
@abc.abstractmethod
def get_dists(cls, *args, **kwargs):
pass


class GaussianRandomWalk(PredefinedRandomWalk):
r"""Random Walk with Normal innovations.
Parameters
Expand All @@ -186,40 +269,22 @@ class GaussianRandomWalk:
Unnamed univariate distribution of the initial value. Unnamed refers to distributions
created with the ``.dist()`` API.
.. warning:: init will be cloned, rendering them independent of the ones passed as input.
.. warning:: init_dist will be cloned, rendering them independent of the ones passed as input.
steps : int, optional
Number of steps in Gaussian Random Walk (steps > 0). Only needed if shape is not
provided.
"""

def __new__(cls, name, mu=0.0, sigma=1.0, *, init_dist=None, steps=None, **kwargs):
init_dist, innovation_dist, kwargs = cls.get_dists(
mu=mu, sigma=sigma, init_dist=init_dist, **kwargs
)
return RandomWalk(
name, init_dist=init_dist, innovation_dist=innovation_dist, steps=steps, **kwargs
)

@classmethod
def dist(cls, mu=0.0, sigma=1.0, *, init_dist=None, steps=None, **kwargs) -> at.TensorVariable:
init_dist, innovation_dist, kwargs = cls.get_dists(
mu=mu, sigma=sigma, init_dist=init_dist, **kwargs
)
return RandomWalk.dist(
init_dist=init_dist, innovation_dist=innovation_dist, steps=steps, **kwargs
)

@classmethod
def get_dists(cls, *, mu, sigma, init_dist, **kwargs):
def get_dists(cls, mu=0.0, sigma=1.0, *, init_dist=None, **kwargs):
if "init" in kwargs:
warnings.warn(
"init parameter is now called init_dist. Using init will raise an error in a future release.",
FutureWarning,
)
init_dist = kwargs.pop("init")

# If no scalar distribution is passed then initialize with a Normal of same mu and sigma
if init_dist is None:
warnings.warn(
"Initial distribution not specified, defaulting to `Normal.dist(0, 100)`."
Expand All @@ -228,11 +293,9 @@ def get_dists(cls, *, mu, sigma, init_dist, **kwargs):
)
init_dist = Normal.dist(0, 100)

# Add one dimension to the right, so that mu and sigma broadcast safely along
# the steps dimension
mu = at.as_tensor_variable(mu)
sigma = at.as_tensor_variable(sigma)
innovation_dist = Normal.dist(mu=mu[..., None], sigma=sigma[..., None])
innovation_dist = Normal.dist(mu=mu, sigma=sigma)

return init_dist, innovation_dist, kwargs

Expand Down
Loading

0 comments on commit 247b8de

Please sign in to comment.