"""Compute metrics related to the length, straightness and complexity of paths.
By 'path' we refer to the spatial trajectory of an individual over the
time span of the data. While these metrics can be computed based on any
set of keypoints, they are most meaningful when applied to a single
keypoint representing the individual's overall position (e.g., centroid).
"""
import warnings
from typing import Literal
import numpy as np
import xarray as xr
from movement.kinematics.kinematics import compute_backward_displacement
from movement.utils.logging import logger
from movement.utils.reports import report_nan_values
from movement.utils.vector import compute_norm, compute_signed_angle_2d
from movement.validators.arrays import validate_dims_coords
[docs]
def compute_path_length(
data: xr.DataArray,
nan_policy: Literal["ffill", "scale"] = "ffill",
nan_warn_threshold: float = 0.2,
) -> xr.DataArray:
r"""Compute the length of a path travelled.
The path length is defined as the sum of the norms (magnitudes) of the
displacement vectors between consecutive time points in the data.
Parameters
----------
data
The input data containing position information, with ``time``
and ``space`` (in Cartesian coordinates) as required dimensions.
nan_policy
Policy to handle NaN (missing) values. Can be one of the ``"ffill"``
or ``"scale"``. Defaults to ``"ffill"`` (forward fill).
See Notes for more details on the two policies.
nan_warn_threshold
If any point track in the data has at least (:math:`\ge`)
this proportion of values missing, a warning will be emitted.
Defaults to 0.2 (20%).
Returns
-------
xarray.DataArray
An xarray DataArray containing the computed path length,
with dimensions matching those of the input data,
except ``time`` and ``space`` are removed.
Notes
-----
Choosing ``nan_policy="ffill"`` will use :meth:`xarray.DataArray.ffill`
to forward-fill missing segments (NaN values) across time.
This equates to assuming that a track remains stationary for
the duration of the missing segment and then instantaneously moves to
the next valid position, following a straight line. This approach tends
to underestimate the path length, and the error increases with the number
of missing values.
Choosing ``nan_policy="scale"`` will adjust the path length based on the
the proportion of valid segments per point track. For example, if only
80% of segments are present, the path length will be computed based on
these and the result will be divided by 0.8. This approach assumes
that motion dynamics are similar across observed and missing time
segments, which may not accurately reflect actual conditions.
**Sampling rate sensitivity ('coastline paradox'):**
The measured path length is sensitive to the temporal sampling rate
(i.e., frames per second) of the tracking data. Higher sampling rates
capture finer micro-movements and tracking jitter, which inherently
increases the total measured path length. Exercise caution when comparing
path lengths across datasets with different temporal resolutions.
See Also
--------
compute_path_straightness : A related metric that quantifies
the straightness of a path.
Examples
--------
>>> from movement.kinematics import compute_path_length
Compute the path length from the centroid trajectory of a poses
dataset ``ds``:
>>> centroid = ds.position.mean(dim="keypoint")
>>> length = compute_path_length(centroid)
Compute path length over a specific time window:
>>> length = compute_path_length(centroid.sel(time=slice(0, 100)))
Use the scale policy to handle missing values:
>>> length = compute_path_length(centroid, nan_policy="scale")
"""
data = _validate_time_points(data, "path length")
return _path_length(data, nan_policy, nan_warn_threshold)
[docs]
def compute_path_straightness(
data: xr.DataArray,
nan_policy: Literal["ffill", "scale"] = "ffill",
nan_warn_threshold: float = 0.2,
) -> xr.DataArray:
r"""Compute the straightness index of a path :math:`(D/L)`.
The straightness index is the ratio of the Euclidean distance :math:`D`
between the first and last valid positions of a trajectory to the
total path length :math:`L`. Values range from 0 to 1, where 1
indicates a perfectly straight path and 0 indicates the animal
returned to its starting point. Returns NaN if the path length is zero.
Parameters
----------
data
The input data containing position information, with ``time``
and ``space`` (in Cartesian coordinates) as required dimensions.
nan_policy
Policy to handle NaN (missing) values for the path length computation.
Can be one of ``"ffill"`` or ``"scale"``. Defaults to ``"ffill"``
(forward fill). See :func:`compute_path_length` for more details on
the two policies.
nan_warn_threshold
If any point track in the data has at least (:math:`\ge`)
this proportion of values missing, a warning will be emitted.
Defaults to 0.2 (20%). Directly passed to :func:`compute_path_length`.
Returns
-------
xarray.DataArray
An xarray DataArray containing the computed straightness index,
with dimensions matching those of the input data,
except ``time`` and ``space`` are removed.
Notes
-----
The Euclidean distance :math:`D`, also known as the "straight-line" or
"beeline" distance, is calculated using the first and last valid (non-NaN)
spatial coordinates in the data. This ensures that missing data at the
first or last time points do not nullify the result, provided there are
valid observed positions in between.
Note that the total path length (L), and therefore the straightness index,
is sensitive to the temporal sampling rate (i.e. frames per second),
as described in the Notes of :func:`compute_path_length`.
See Also
--------
compute_path_length : The underlying function used to
compute the path length :math:`L`.
Examples
--------
>>> from movement.kinematics import compute_path_straightness
Compute the straightness index from the centroid trajectory of a
poses dataset ``ds``:
>>> centroid = ds.position.mean(dim="keypoint")
>>> si = compute_path_straightness(centroid)
Compute straightness over a specific time window:
>>> si = compute_path_straightness(centroid.sel(time=slice(0, 100)))
"""
data = _validate_time_points(data, "path straightness")
path_length = _path_length(data, nan_policy, nan_warn_threshold)
# Compute D/L ratio, avoiding division by zero
result = _path_distance(data) / path_length.where(path_length > 0)
result.name = "straightness_index"
result.attrs["long_name"] = "Path Straightness Index"
return result
[docs]
def compute_turning_angle(
data: xr.DataArray,
in_degrees: bool = False,
min_step_length: float = 0.0,
) -> xr.DataArray:
r"""Compute the turning angles between consecutive steps in a path.
The turning angle at time ``t`` is the :func:`signed angle\
<movement.utils.vector.compute_signed_angle_2d>` between two
consecutive :func:`backward displacement\
<movement.kinematics.compute_backward_displacement>` vectors
at times ``t-1`` and ``t``.
The returned angles are in radians, spanning the range :math:`(-\pi, \pi]`,
unless ``in_degrees`` is set to ``True``.
Parameters
----------
data
The input position data. Must contain ``time`` and ``space``
dimensions. The ``space`` dimension must contain exactly the
coordinates ``["x", "y"]`` (2D spatial data only).
in_degrees
If ``True``, return turning angles in degrees. Default is
``False`` (radians).
min_step_length
The minimum step length to consider for computing the turning
angle. Any turning angle involving an incoming or outgoing step
shorter than or equal to this value is set to ``NaN``. The
default ``0.0`` only masks steps with exactly zero length,
which means steps with near-zero lengths may still produce
spurious angles. See Note 2 below.
Returns
-------
xarray.DataArray
Turning angles with the same shape as the input ``data``, but
with the ``space`` dimension dropped.
Notes
-----
1. **Time dimension length:** This function uses a ``shift``
operation to preserve the original ``time`` dimension length.
The first two time steps are always ``NaN``: the first because
no previous step exists, and the second because a turning angle
requires two steps (three positions). In other words, the turning angle
at time step ``t`` is computed as the angle between the steps
from ``t-2`` to ``t-1`` and from ``t-1`` to ``t``.
2. **Positional jitter and small steps:** Tracking data
often contains positional jitter, meaning a stationary animal
may appear to make microscopic movements. With default parameters
(``min_step_length=0.0``), these tiny, noisy movements will
produce spurious, meaningless turning angles. It is highly
recommended to set ``min_step_length`` to an appropriate threshold
based on the tracking resolution and the animal's size in the scene.
The value should be in the same units as the input position data
(e.g. pixels, mm, etc.). Pre-smoothing the trajectory
can also help reduce positional jitter.
3. **NaN propagation:** ``NaN`` positions in the input propagate
to ``NaN`` turning angles. A single missing position affects
up to two turning angles (the incoming and outgoing steps).
Use :func:`movement.filtering.interpolate_over_time` to fill
positional gaps before computing turning angles if continuity
is important.
See Also
--------
movement.kinematics.compute_backward_displacement :
The underlying function used to compute the displacement vectors.
movement.utils.vector.compute_signed_angle_2d :
The underlying function used to compute the signed angle
between two consecutive displacement vectors.
Examples
--------
>>> from movement.kinematics import compute_turning_angle
Compute turning angles from the centroid trajectory of a poses
dataset ``ds``:
>>> centroid = ds.position.mean(dim="keypoint")
>>> angles = compute_turning_angle(centroid)
Compute in degrees, with a minimum step length of 3 pixels to filter out
pose estimation jitter:
>>> angles = compute_turning_angle(
... centroid, in_degrees=True, min_step_length=3
... )
"""
validate_dims_coords(
data, {"time": [], "space": ["x", "y"]}, exact_coords=True
)
# Displacement arriving at each time step t.
disp = compute_backward_displacement(data)
# Turning angle at t = rotation needed to align step[t-1] onto step[t].
turning = compute_signed_angle_2d(disp.shift(time=1), disp)
# Mask turning angles involving steps smaller than min_step_length
step_lengths = compute_norm(disp)
invalid_steps = (step_lengths <= min_step_length) | (
step_lengths.shift(time=1) <= min_step_length
)
turning = xr.where(invalid_steps, np.nan, turning)
turning.attrs["units"] = "radians"
if in_degrees:
turning = np.rad2deg(turning)
turning.attrs["units"] = "degrees"
turning.name = "turning_angle"
return turning
[docs]
def compute_directional_change(
data: xr.DataArray,
in_degrees: bool = False,
min_step_length: float = 0.0,
) -> xr.DataArray:
r"""Compute the directional change (DC) per time step.
The directional change at step :math:`i` is the absolute turning
angle divided by the temporal interval spanning the two steps that
define it [1]_:
.. math::
\mathrm{DC}_i = \frac{|\theta_i|}{\Delta t_i}
where :math:`\theta_i` is the signed turning angle at step :math:`i`
and :math:`\Delta t_i = t_i - t_{i-2}`.
Parameters
----------
data
The input data containing position information, with ``time``
and ``space`` (in Cartesian coordinates) as required dimensions.
in_degrees
If ``True``, the turning angles (and hence the directional
change) are expressed in degrees rather than radians. Defaults
to ``False``.
min_step_length
The minimum step length used when computing turning angles.
Steps shorter than or equal to this value produce ``NaN``
turning angles, which propagate to ``NaN`` directional change
values. The default ``0.0`` only masks steps with exactly
zero length; near-zero steps from positional jitter may still
produce spurious turning angles. See
:func:`compute_turning_angle` for details.
Returns
-------
xarray.DataArray
Directional change values with the same dimensions as the
input, except ``space`` is removed. Values are in radians per
``time`` unit (e.g. radians/second if ``time`` is in seconds),
or degrees per ``time`` unit if ``in_degrees`` is ``True``.
Notes
-----
**Boundary behaviour:** The first two time steps of the output are
always ``NaN``, because a turning angle requires three consecutive
positions (see :func:`compute_turning_angle`).
References
----------
.. [1] Kitamura, T. & Imafuku, M. (2015). Behavioural mimicry in
flight path of Batesian intraspecific polymorphic butterfly
*Papilio polytes*. *Proc. R. Soc. B* 282(1809).
https://doi.org/10.1098/rspb.2015.0483
See Also
--------
compute_turning_angle :
The underlying function used to compute turning angles.
Examples
--------
>>> from movement.kinematics import compute_directional_change
Compute directional change from the centroid trajectory of a
poses dataset ``ds``:
>>> centroid = ds.position.mean(dim="keypoint")
>>> dc = compute_directional_change(centroid)
Compute over a specific time window:
>>> dc = compute_directional_change(centroid.sel(time=slice(0, 100)))
Filter out pose-estimation jitter by setting ``min_step_length``:
>>> dc = compute_directional_change(centroid, min_step_length=3)
"""
data = _validate_time_points(data, "directional change")
theta = compute_turning_angle(
data, in_degrees=in_degrees, min_step_length=min_step_length
)
time_coord = data["time"]
# Span the two steps that define theta_i (positions i-2..i), matching
# compute_turning_angle's support so DC is correct for non-uniform time.
dt = time_coord - time_coord.shift(time=2)
dc = abs(theta) / dt
dc.name = "directional_change"
dc.attrs["long_name"] = "Directional Change"
return dc
[docs]
def compute_path_deviation(
data: xr.DataArray,
) -> xr.DataArray:
r"""Compute deviation from the straight-line path at each time point.
For each time point :math:`t`, the path deviation is the perpendicular
(unsigned) distance between the position :math:`P(t)` and the infinite
straight line passing through the first and last valid positions in
the data, denoted :math:`A` and :math:`B` respectively. Zero means the
position lies exactly on the straight line; larger values indicate greater
lateral excursion.
Formally, let :math:`\mathbf{u} = B - A` be the chord vector and
:math:`\hat{\mathbf{u}} = \mathbf{u} / \|\mathbf{u}\|` its unit vector.
The deviation at time :math:`t` is:
.. math::
d(t) = \left\|
(P(t) - A) -
\left[(P(t) - A) \cdot \hat{\mathbf{u}}\right] \hat{\mathbf{u}}
\right\|
This is the norm of the component of :math:`P(t) - A` that is
perpendicular to the chord, and is equivalent to the distance from
:math:`P(t)` to the infinite line through :math:`A` and :math:`B`.
The formulation is dimension-agnostic (works for 2D and 3D data).
Parameters
----------
data
The input data containing position information, with ``time``
and ``space`` (in Cartesian coordinates) as required dimensions.
Returns
-------
xarray.DataArray
An xarray DataArray containing the perpendicular deviation from
the chord at each time point, with dimensions matching those of
the input data, except ``space`` is removed. Values are in the
same spatial units as the input.
Raises
------
ValueError
If fewer than 2 time points exist in the data, or if
the chord length is zero (i.e. ``A == B``) for *all* tracks.
If the chord length is zero for *some* (but not all) tracks, a
warning is issued and those tracks will have ``NaN`` deviation.
See Also
--------
compute_path_length : Total distance travelled along the path.
compute_path_straightness : Ratio of chord length to path length.
Examples
--------
>>> from movement.kinematics import compute_path_deviation
Compute per-frame path deviation from the centroid trajectory of a
poses dataset ``ds``:
>>> centroid = ds.position.mean(dim="keypoint")
>>> deviation = compute_path_deviation(centroid)
Compute the maximum lateral excursion over the trajectory:
>>> max_deviation = deviation.max(dim="time")
Compute the mean deviation over a specific time window:
>>> mean_deviation = compute_path_deviation(
... centroid.sel(time=slice(10, 50))
... ).mean(dim="time")
"""
data = _validate_time_points(data, "path deviation")
anchored = data.ffill(dim="time").bfill(dim="time")
A = anchored.isel(time=0)
B = anchored.isel(time=-1)
chord = B - A
chord_length = compute_norm(chord)
degenerate = chord_length == 0
if degenerate.all():
raise logger.error(
ValueError(
"Path deviation is undefined because the start and end "
"positions are identical for all tracks."
)
)
if degenerate.any():
stacked = degenerate.stack(tracks=list(degenerate.dims))
bad_tracks = stacked.sel(tracks=stacked).coords["tracks"].values
warnings.warn(
"Path deviation is undefined for tracks where the start and end "
"positions are identical. The following tracks will return NaN: "
f"{bad_tracks}",
UserWarning,
stacklevel=2,
)
chord_unit = chord / chord_length
p_minus_a = data - A
scalar_proj = (p_minus_a * chord_unit).sum(dim="space")
vector_proj = scalar_proj * chord_unit
rejection = p_minus_a - vector_proj
deviation = compute_norm(rejection)
deviation.name = "path_deviation"
deviation.attrs["long_name"] = "Path Deviation"
return deviation
def _validate_time_points(
data: xr.DataArray,
metric_name: str,
) -> xr.DataArray:
"""Validate dims/coords and require at least 2 time points.
Parameters
----------
data : xarray.DataArray
Position data with ``time`` and ``space`` dimensions.
metric_name : str
Used in the error message when there are fewer than 2 time points.
Returns
-------
xarray.DataArray
The validated data.
"""
validate_dims_coords(data, {"time": [], "space": []})
n_time = data.sizes["time"]
if n_time < 2:
raise logger.error(
ValueError(
"At least 2 time points are required to compute "
f"{metric_name}, but {n_time} were found."
)
)
return data
def _segment_lengths(data: xr.DataArray) -> xr.DataArray:
"""Compute Euclidean distances between consecutive time points.
The first entry of backward displacement is always zero (no previous
point), so it is dropped before computing the norm.
"""
segments = compute_backward_displacement(data).isel(time=slice(1, None))
return compute_norm(segments)
def _path_distance(data: xr.DataArray) -> xr.DataArray:
"""Compute Euclidean distance between the first and last valid positions.
Also known as the "straight-line" or "beeline" distance.
Uses forward and backward filling along the time dimension to ensure
the distance is calculated between the first and last observed locations,
preventing NaNs at the first or last time points from nullifying the
entire calculation.
"""
anchored_data = data.ffill(dim="time").bfill(dim="time")
distance = compute_norm(
anchored_data.isel(time=-1) - anchored_data.isel(time=0)
)
return distance
def _path_length(
data: xr.DataArray,
nan_policy: Literal["ffill", "scale"],
nan_warn_threshold: float,
) -> xr.DataArray:
"""Compute path length on already-validated data.
See :func:`compute_path_length` for parameter details.
"""
_warn_about_nan_proportion(data, nan_warn_threshold)
if nan_policy == "ffill":
result = _segment_lengths(data.ffill(dim="time")).sum(
dim="time", min_count=1
)
elif nan_policy == "scale":
lengths = _segment_lengths(data)
valid_segments = (~lengths.isnull()).sum(dim="time")
valid_proportion = valid_segments / (data.sizes["time"] - 1)
result = lengths.sum(dim="time") / valid_proportion
else:
raise logger.error(
ValueError(
f"Invalid value for nan_policy: {nan_policy}. "
"Must be one of 'ffill' or 'scale'."
)
)
result.name = "path_length"
result.attrs["long_name"] = "Path Length"
return result
def _warn_about_nan_proportion(
data: xr.DataArray, nan_warn_threshold: float
) -> None:
"""Issue warning if the proportion of NaN values exceeds a threshold.
The NaN proportion is evaluated per point track, and a given point is
considered NaN if any of its ``space`` coordinates are NaN. The warning
specifically lists the point tracks with at least (>=)
``nan_warn_threshold`` proportion of NaN values.
Parameters
----------
data : xarray.DataArray
The input data array.
nan_warn_threshold : float
The threshold for the proportion of NaN values. Must be a number
between 0 and 1.
"""
nan_warn_threshold = float(nan_warn_threshold)
if not 0 <= nan_warn_threshold <= 1:
raise logger.error(
ValueError("nan_warn_threshold must be between 0 and 1.")
)
n_nans = data.isnull().any(dim="space").sum(dim="time")
exceeds_threshold = n_nans >= data.sizes["time"] * nan_warn_threshold
if not exceeds_threshold.any():
return
track_dims = [d for d in data.dims if d not in ("time", "space")]
stacked = data.stack(tracks=track_dims)
mask = exceeds_threshold.stack(tracks=track_dims)
data_to_warn_about = stacked.sel(tracks=mask).unstack("tracks")
warnings.warn(
"The result may be unreliable for point tracks with many "
"missing values. The following tracks have at least "
f"{nan_warn_threshold * 100:.3} % NaN values:\n"
f"{report_nan_values(data_to_warn_about)}",
UserWarning,
stacklevel=2,
)