Source code for arkouda.array_api.statistical_functions

from __future__ import annotations

from typing import TYPE_CHECKING, Optional, Tuple, Union

from ._dtypes import (  # _complex_floating_dtypes,; complex128,
from .array_object import Array, implements_numpy
from .manipulation_functions import squeeze

    from ._typing import Dtype

import numpy as np

from arkouda.client import generic_msg
from arkouda.numpy.dtypes import dtype as akdtype
from arkouda.numeric import cast as akcast
from arkouda.pdarrayclass import create_pdarray, parse_single_value
from arkouda.pdarraycreation import scalar_array

[docs] def max( x: Array, /, *, axis: Optional[Union[int, Tuple[int, ...]]] = None, keepdims: bool = False, ) -> Array: """ Compute the maximum values of an array along a given axis or axes. Parameters ---------- x : Array The array to compute the maximum of axis : int or Tuple[int, ...], optional The axis or axes along which to compute the maximum values. If None, the maximum value of the entire array is computed (returning a scalar-array). keepdims : bool, optional Whether to keep the singleton dimension(s) along `axis` in the result. """ if x.dtype not in _real_numeric_dtypes: raise TypeError("Only real numeric dtypes are allowed in max") axis_list = [] if axis is not None: axis_list = list(axis) if isinstance(axis, tuple) else [axis] resp = generic_msg( cmd=f"reduce{x.ndim}D", args={ "x": x._array, "op": "max", "nAxes": len(axis_list), "axis": axis_list, "skipNan": True, }, ) if axis is None or x.ndim == 1: return Array._new(scalar_array(parse_single_value(resp))) else: arr = Array._new(create_pdarray(resp)) if keepdims: return arr else: return squeeze(arr, axis)
# this is a temporary fix to get mean working with XArray # (until a counterpart to np.nanmean is added to the array API # see:
[docs] @implements_numpy(np.nanmean) @implements_numpy(np.mean) def mean_shim(x: Array, axis=None, dtype=None, out=None, keepdims=False): return mean(x, axis=axis, keepdims=keepdims)
[docs] def mean( x: Array, /, *, axis: Optional[Union[int, Tuple[int, ...]]] = None, keepdims: bool = False, ) -> Array: """ Compute the minimum values of an array along a given axis or axes. Parameters ---------- x : Array The array to compute the minimum of axis : int or Tuple[int, ...], optional The axis or axes along which to compute the mean. If None, the mean of the entire array is computed (returning a scalar-array). keepdims : bool, optional Whether to keep the singleton dimension(s) along `axis` in the result. """ if x.dtype not in _real_floating_dtypes: raise TypeError("Only real floating-point dtypes are allowed in mean") if axis is not None: axis_list = list(axis) if isinstance(axis, tuple) else [axis] else: axis_list = list(range(x.ndim)) arr = Array._new( create_pdarray( generic_msg( cmd=f"meanReduce<{x.dtype},{x.ndim}>", args={ "x": x._array, "axes": axis_list, "skipNan": True, # TODO: handle all-nan slices }, ) ) ) if keepdims or axis is None: return arr else: return squeeze(arr, axis)
[docs] def min( x: Array, /, *, axis: Optional[Union[int, Tuple[int, ...]]] = None, keepdims: bool = False, ) -> Array: """ Compute the mean of an array along a given axis or axes. Parameters ---------- x : Array The array to compute the mean of axis : int or Tuple[int, ...], optional The axis or axes along which to compute the minimum values. If None, the minimum of the entire array is computed (returning a scalar-array). keepdims : bool, optional Whether to keep the singleton dimension(s) along `axis` in the result. """ if x.dtype not in _real_numeric_dtypes: raise TypeError("Only real numeric dtypes are allowed in min") axis_list = [] if axis is not None: axis_list = list(axis) if isinstance(axis, tuple) else [axis] resp = generic_msg( cmd=f"reduce{x.ndim}D", args={ "x": x._array, "op": "min", "nAxes": len(axis_list), "axis": axis_list, "skipNan": True, }, ) if axis is None or x.ndim == 1: return Array._new(scalar_array(parse_single_value(resp))) else: arr = Array._new(create_pdarray(resp)) if keepdims: return arr else: return squeeze(arr, axis)
[docs] def prod( x: Array, /, *, axis: Optional[Union[int, Tuple[int, ...]]] = None, dtype: Optional[Dtype] = None, keepdims: bool = False, ) -> Array: """ Compute the product of an array along a given axis or axes. Parameters ---------- x : Array The array to compute the product of axis : int or Tuple[int, ...], optional The axis or axes along which to compute the product. If None, the product of the entire array is computed (returning a scalar-array). dtype : Dtype, optional The dtype of the returned array. If None, the dtype of the input array is used. keepdims : bool, optional Whether to keep the singleton dimension(s) along `axis` in the result. """ if x.dtype not in _numeric_dtypes: raise TypeError("Only numeric dtypes are allowed in prod") axis_list = [] if axis is not None: axis_list = list(axis) if isinstance(axis, tuple) else [axis] # cast to the appropriate dtype if necessary cast_to = _prod_sum_dtype(x.dtype) if dtype is None else dtype if cast_to != x.dtype: x_op = akcast(x._array, cast_to) else: x_op = x._array resp = generic_msg( cmd=f"reduce{x.ndim}D", args={ "x": x_op, "op": "prod", "nAxes": len(axis_list), "axis": axis_list, "skipNan": True, }, ) if axis is None or x.ndim == 1: return Array._new(scalar_array(parse_single_value(resp))) else: arr = Array._new(create_pdarray(resp)) if keepdims: return arr else: return squeeze(arr, axis)
# Not working with XArray yet, pending a fix for: #
[docs] def std( x: Array, /, *, axis: Optional[Union[int, Tuple[int, ...]]] = None, correction: Union[int, float] = 0.0, keepdims: bool = False, ) -> Array: """ Compute the standard deviation of an array along a given axis or axes. Parameters ---------- x : Array The array to compute the standard deviation of axis : int or Tuple[int, ...], optional The axis or axes along which to compute the standard deviation. If None, the standard deviation of the entire array is computed (returning a scalar-array). correction : int or float, optional The degrees of freedom correction to apply. The default is 0. keepdims : bool, optional Whether to keep the singleton dimension(s) along `axis` in the result. """ if x.dtype not in _real_floating_dtypes: raise TypeError("Only real floating-point dtypes are allowed in std") if correction < 0: raise ValueError("Correction must be non-negative in std") if axis is not None: axis_list = list(axis) if isinstance(axis, tuple) else [axis] else: axis_list = list(range(x.ndim)) arr = Array._new( create_pdarray( generic_msg( cmd=f"stdReduce<{x.dtype},{x.ndim}>", args={ "x": x._array, "ddof": correction, "axes": axis_list, "skipNan": True, }, ) ) ) if keepdims or axis is None: return arr else: return squeeze(arr, axis)
[docs] def sum( x: Array, /, *, axis: Optional[Union[int, Tuple[int, ...]]] = None, dtype: Optional[Dtype] = None, keepdims: bool = False, ) -> Array: """ Compute the sum of an array along a given axis or axes. Parameters ---------- x : Array The array to compute the sum of axis : int or Tuple[int, ...], optional The axis or axes along which to compute the sum. If None, the sum of the entire array is computed (returning a scalar-array). dtype : Dtype, optional The dtype of the returned array. If None, the dtype of the input array is used. keepdims : bool, optional Whether to keep the singleton dimension(s) along `axis` in the result. """ if x.dtype not in _numeric_dtypes: raise TypeError("Only numeric dtypes are allowed in sum") axis_list = [] if axis is not None: axis_list = list(axis) if isinstance(axis, tuple) else [axis] # cast to the appropriate dtype if necessary cast_to = _prod_sum_dtype(x.dtype) if dtype is None else dtype if cast_to != x.dtype: x_op = akcast(x._array, cast_to) else: x_op = x._array resp = generic_msg( cmd=f"reduce{x.ndim}D", args={ "x": x_op, "op": "sum", "nAxes": len(axis_list), "axis": axis_list, "skipNan": True, }, ) if axis is None or x.ndim == 1: return Array._new(scalar_array(parse_single_value(resp))) else: arr = Array._new(create_pdarray(resp)) if keepdims: return arr else: return squeeze(arr, axis)
# Not working with XArray yet, pending a fix for: #
[docs] def var( x: Array, /, *, axis: Optional[Union[int, Tuple[int, ...]]] = None, correction: Union[int, float] = 0.0, keepdims: bool = False, ) -> Array: """ Compute the variance of an array along a given axis or axes. Parameters ---------- x : Array The array to compute the variance of axis : int or Tuple[int, ...], optional The axis or axes along which to compute the variance. If None, the variance of the entire array is computed (returning a scalar-array). correction : int or float, optional The degrees of freedom correction to apply. The default is 0. keepdims : bool, optional Whether to keep the singleton dimension(s) along `axis` in the result. """ # Note: the keyword argument correction is different here if x.dtype not in _real_floating_dtypes: raise TypeError("Only real floating-point dtypes are allowed in var") if correction < 0: raise ValueError("Correction must be non-negative in std") if axis is not None: axis_list = list(axis) if isinstance(axis, tuple) else [axis] else: axis_list = list(range(x.ndim)) arr = Array._new( create_pdarray( generic_msg( cmd=f"varReduce<{x.dtype},{x.ndim}>", args={ "x": x._array, "ddof": correction, "axes": axis_list, "skipNan": True, }, ) ) ) if keepdims or axis is None: return arr else: return squeeze(arr, axis)
def _prod_sum_dtype(dtype: Dtype) -> Dtype: if dtype == uint64: return akdtype(dtype) elif dtype in _real_floating_dtypes: return akdtype(float64) # elif dtype in _complex_floating_dtypes: # return complex128 elif dtype in _signed_integer_dtypes: return akdtype(int64) else: return akdtype(uint64)
[docs] def cumulative_sum( x: Array, /, *, axis: Optional[int] = None, dtype: Optional[Dtype] = None, include_initial: bool = False, ) -> Array: """ Compute the cumulative sum of the elements of an array along a given axis. Parameters ---------- x : Array The array to compute the cumulative sum of axis : int, optional The axis along which to compute the cumulative sum. If x is 1D, this argument is optional, otherwise it is required. dtype : Dtype, optional The dtype of the returned array. If None, the dtype of the input array is used. include_initial : bool, optional Whether to include the initial value as the first element of the output. """ if dtype is None: x_ = x else: x_ = akcast(x, dtype) resp = generic_msg( cmd=f"cumSum<{x_.dtype},{x.ndim}>", args={ "x": x_._array, "axis": axis if axis is not None else 0, "includeInitial": include_initial, }, ) return Array._new(create_pdarray(resp))