Source code for arkouda.numpy.util

from __future__ import annotations

import builtins
import json
import sys

from math import prod as maprod
from typing import TYPE_CHECKING, List, Literal, Sequence, Tuple, TypeVar, Union, cast

from typeguard import typechecked

from arkouda.client_dtypes import BitVector, IPv4, bit_vectorizer
from arkouda.core.infoclass import list_registry
from arkouda.numpy.dtypes import (
    _is_dtype_in_union,
    dtype,
    float_scalars,
    int_scalars,
    numeric_scalars,
    resolve_scalar_dtype,
)
from arkouda.numpy.pdarrayclass import create_pdarray, pdarray
from arkouda.numpy.pdarraysetops import unique
from arkouda.numpy.sorting import coargsort
from arkouda.numpy.timeclass import Datetime, Timedelta
from arkouda.pandas.groupbyclass import GroupBy


__all__ = [
    "attach",
    "attach_all",
    "_axis_validation",
    "broadcast_dims",
    "broadcast_shapes",
    "broadcast_arrays",
    "convert_bytes",
    "convert_if_categorical",
    "copy",
    "generic_concat",
    "get_callback",
    "identity",
    "_integer_axis_validation",
    "invert_permutation",
    "is_float",
    "is_int",
    "is_numeric",
    "is_registered",
    "map",
    "may_share_memory",
    "register",
    "register_all",
    "report_mem",
    "shares_memory",
    "sparse_sum_help",
    "unregister",
    "unregister_all",
]


if TYPE_CHECKING:
    from arkouda.core.client import get_config, get_mem_used
    from arkouda.numpy.pdarraycreation import arange
    from arkouda.numpy.segarray import SegArray
    from arkouda.numpy.strings import Strings
    from arkouda.pandas.categorical import Categorical
    from arkouda.pandas.index import Index
    from arkouda.pandas.series import Series
else:
    Categorical = TypeVar("Categorical")
    SegArray = TypeVar("SegArray")
    Strings = TypeVar("Strings")
    Index = TypeVar("Index")
    Series = TypeVar("Series")


[docs] def identity(x): return x
[docs] def get_callback(x): if type(x) in {Datetime, Timedelta, IPv4}: return type(x) elif hasattr(x, "_cast"): return x._cast elif isinstance(x, BitVector): return bit_vectorizer(width=x.width, reverse=x.reverse) else: return identity
[docs] def generic_concat(items, ordered=True): # this version can be called with Dataframe and Series (which have Class.concat methods) from arkouda.numpy.pdarraysetops import concatenate as pdarrayconcatenate types = {type(x) for x in items} if len(types) != 1: raise TypeError(f"Items must all have same type: {types}") t = types.pop() if t is list: return [x for lst in items for x in lst] return ( t.concat(items, ordered=ordered) if hasattr(t, "concat") else pdarrayconcatenate(items, ordered=ordered) )
[docs] def report_mem(pre=""): cfg = get_config() used = get_mem_used() / (cfg["numLocales"] * cfg["physicalMemory"]) sys.stdout.write(f"{pre} mem use: {get_mem_used() / (1024**4): .2f} TB ({used:.1%})")
[docs] @typechecked def invert_permutation(perm: pdarray) -> pdarray: """ Compute the inverse of a permutation array. The inverse permutation undoes the effect of the original permutation. For a valid permutation array `perm`, this function returns an array `inv` such that `inv[perm[i]] == i` for all `i`. Parameters ---------- perm : pdarray A permutation of the integers `[0, N-1]`, where `N` is the length of the array. Returns ------- pdarray The inverse of the input permutation. Raises ------ ValueError If `perm` is not a valid permutation of the range `[0, N-1]`, such as containing duplicates or missing values. Examples -------- >>> import arkouda as ak >>> from arkouda import array, invert_permutation >>> perm = array([2, 0, 3, 1]) >>> inv = invert_permutation(perm) >>> print(inv) [1 3 0 2] """ unique_vals = unique(perm) if (not isinstance(unique_vals, pdarray)) or unique_vals.size != perm.size: raise ValueError("The array is not a permutation.") return coargsort([perm, arange(0, perm.size)])
[docs] def convert_if_categorical(values): """ Convert a ``Categorical`` array to a ``Strings`` array for display purposes. If the input is a ``Categorical``, it is converted to its string labels based on its codes. Otherwise, the input is returned unchanged. Parameters ---------- values : Categorical or any The input array, which may be a ``Categorical``. Returns ------- Strings or any The string labels if ``values`` is a ``Categorical``; otherwise the original input. Examples -------- Convert a ``Categorical`` to its string labels: >>> import arkouda as ak >>> categories = ak.array(["apple", "banana", "cherry"]) >>> cat = ak.Categorical(categories) >>> result = convert_if_categorical(cat) >>> print(result) ['apple', 'banana', 'cherry'] Non-``Categorical`` inputs are returned unchanged: >>> values = ak.array([1, 2, 3]) >>> result = convert_if_categorical(values) >>> print(result) [1 2 3] """ from arkouda.pandas.categorical import Categorical if isinstance(values, Categorical): values = values.categories[values.codes] return values
[docs] def register(obj, name): """ Register an Arkouda object with a user-specified name. This function registers the provided Arkouda object (``obj``) under a given name (``name``). It is maintained for backwards compatibility with earlier versions of Arkouda. Parameters ---------- obj : Arkouda object The Arkouda object to register. name : str The name to associate with the object. Returns ------- object The input object, now registered with the specified name. Raises ------ AttributeError Raised if ``obj`` does not have a ``register`` method. Examples -------- >>> import arkouda as ak >>> from arkouda.numpy.util import register >>> obj = ak.array([1, 2, 3]) >>> registered_obj = register(obj, "my_array") >>> print(registered_obj) [1 2 3] >>> registered_obj.unregister() Register a different Arkouda object: >>> categories = ak.array(["apple", "banana", "cherry"]) >>> cat = ak.Categorical(categories) >>> registered_cat = register(cat, "my_cat") >>> print(registered_cat) ['apple', 'banana', 'cherry'] """ return obj.register(name)
[docs] @typechecked def attach(name: str): """ Attach a previously created Arkouda object by its registered name. This function retrieves an Arkouda object (e.g., ``pdarray``, ``DataFrame``, ``Series``) associated with a given ``name``. The returned object type depends on the object stored under that name. Parameters ---------- name : str The name of the object to attach. Returns ------- object The Arkouda object associated with the given ``name``. The returned object could be any supported type, such as ``pdarray``, ``DataFrame``, or ``Series``. Raises ------ ValueError Raised if the object type in the response message does not match any known Arkouda types. Examples -------- Attach an existing ``pdarray``: >>> import arkouda as ak >>> obj = ak.array([1, 2, 3]) >>> registered_obj = obj.register("my_array") >>> arr = ak.attach("my_array") >>> print(arr) [1 2 3] >>> registered_obj.unregister() """ from arkouda.core.client import generic_msg from arkouda.numpy.pdarrayclass import pdarray from arkouda.numpy.segarray import SegArray from arkouda.numpy.strings import Strings from arkouda.pandas.categorical import Categorical from arkouda.pandas.dataframe import DataFrame from arkouda.pandas.index import Index, MultiIndex from arkouda.pandas.series import Series attachable = Union[ pdarray, Strings, Datetime, Timedelta, IPv4, SegArray, DataFrame, GroupBy, Categorical, ] rep_msg = json.loads(cast(str, generic_msg(cmd="attach", args={"name": name}))) rtn_obj: attachable | None = None if rep_msg["objType"].lower() == pdarray.objType.lower(): rtn_obj = create_pdarray(rep_msg["create"]) elif rep_msg["objType"].lower() == Strings.objType.lower(): rtn_obj = Strings.from_return_msg(rep_msg["create"]) elif rep_msg["objType"].lower() == Datetime.special_objType.lower(): rtn_obj = Datetime(create_pdarray(rep_msg["create"])) elif rep_msg["objType"].lower() == Timedelta.special_objType.lower(): rtn_obj = Timedelta(create_pdarray(rep_msg["create"])) elif rep_msg["objType"].lower() == IPv4.special_objType.lower(): rtn_obj = IPv4(create_pdarray(rep_msg["create"])) elif rep_msg["objType"].lower() == SegArray.objType.lower(): rtn_obj = SegArray.from_return_msg(rep_msg["create"]) elif rep_msg["objType"].lower() == DataFrame.objType.lower(): rtn_obj = DataFrame.from_return_msg(rep_msg["create"]) elif rep_msg["objType"].lower() == GroupBy.objType.lower(): rtn_obj = GroupBy.from_return_msg(rep_msg["create"]) elif rep_msg["objType"].lower() == Categorical.objType.lower(): rtn_obj = Categorical.from_return_msg(rep_msg["create"]) elif ( rep_msg["objType"].lower() == Index.objType.lower() or rep_msg["objType"].lower() == MultiIndex.objType.lower() ): rtn_obj = Index.from_return_msg(rep_msg["create"]) elif rep_msg["objType"].lower() == Series.objType.lower(): rtn_obj = Series.from_return_msg(rep_msg["create"]) elif rep_msg["objType"].lower() == BitVector.special_objType.lower(): rtn_obj = BitVector.from_return_msg(rep_msg["create"]) if rtn_obj is not None: rtn_obj.registered_name = name return rtn_obj
[docs] @typechecked def unregister(name: str) -> str: """ Unregister an Arkouda object by its name. This function sends a request to unregister the Arkouda object associated with the specified `name`. It returns a response message indicating the success or failure of the operation. Parameters ---------- name : str The name of the object to unregister. Returns ------- str A message indicating the result of the unregister operation. Raises ------ RuntimeError If the object associated with the given `name` does not exist or cannot be unregistered. Examples -------- >>> import arkouda as ak Unregister an existing object >>> obj = ak.array([1, 2, 3]) >>> registered_obj = obj.register("my_array") >>> response = ak.unregister("my_array") >>> print(response) Unregistered PDARRAY my_array """ from arkouda.core.client import generic_msg rep_msg = cast(str, generic_msg(cmd="unregister", args={"name": name})) return rep_msg
[docs] @typechecked def is_registered(name: str, as_component: bool = False) -> bool: """ Determine whether the provided name is associated with a registered Arkouda object. This function checks whether ``name`` is found in the registry of objects and optionally checks whether it is registered as a component of a registered object. Parameters ---------- name : str The name to check in the registry. as_component : bool, default=False When ``True``, the function checks whether the name is registered as a component of a registered object rather than as a standalone object. Returns ------- bool ``True`` if the name is found in the registry, ``False`` otherwise. Raises ------ KeyError Raised if the registry query encounters an issue (e.g., invalid registry data or access problems). Examples -------- Check whether a name is registered as an object: >>> import arkouda as ak >>> obj = ak.array([1, 2, 3]) >>> registered_obj = obj.register("my_array") >>> result = ak.is_registered("my_array") >>> print(result) True >>> registered_obj.unregister() Check whether a name is registered as a component: >>> result = ak.is_registered("my_component", as_component=True) >>> print(result) False """ return name in list_registry()["Components" if as_component else "Objects"]
[docs] def register_all(data: dict): """ Register all objects in the provided dictionary. This function iterates through the dictionary ``data``, registering each object with its corresponding name. It is useful for batch-registering multiple objects in Arkouda. Parameters ---------- data : dict A dictionary mapping the name used to register the object to the object itself. For example, ``{"MyArray": ak.array([0, 1, 2])}``. Examples -------- >>> import arkouda as ak >>> data = {"array1": ak.array([0, 1, 2]), "array2": ak.array([3, 4, 5])} >>> ak.register_all(data) After calling this function, ``"array1"`` and ``"array2"`` are registered in Arkouda and can be accessed by their names. >>> ak.unregister_all(["array1", "array2"]) """ for reg_name, obj in data.items(): register(obj, reg_name)
[docs] def unregister_all(names: List[str]): """ Unregister all Arkouda objects associated with the provided names. This function iterates through the list of ``names``, unregistering each corresponding object from the Arkouda server. Parameters ---------- names : list of str A list of registered names corresponding to Arkouda objects that should be unregistered. Examples -------- >>> import arkouda as ak >>> data = {"array1": ak.array([0, 1, 2]), "array2": ak.array([3, 4, 5])} >>> ak.register_all(data) After calling this function, ``"array1"`` and ``"array2"`` are registered in Arkouda and can be accessed by their names. >>> ak.unregister_all(["array1", "array2"]) The objects are now unregistered. """ for n in names: unregister(n)
[docs] def attach_all(names: list): """ Attach to all objects registered with the provided names. This function returns a dictionary mapping each name in the input list to the corresponding Arkouda object retrieved using ``attach``. Parameters ---------- names : list of str A list of names corresponding to registered Arkouda objects. Returns ------- dict A dictionary mapping each name to the attached Arkouda object. Examples -------- >>> import arkouda as ak >>> data = {"arr1": ak.array([0, 1, 2]), "arr2": ak.array([3, 4, 5])} >>> ak.register_all(data) Assuming ``"arr1"`` and ``"arr2"`` were previously registered: >>> attached_objs = ak.attach_all(["arr1", "arr2"]) >>> print(attached_objs["arr1"]) [0 1 2] >>> print(type(attached_objs["arr2"])) <class 'arkouda.numpy.pdarrayclass.pdarray'> >>> ak.unregister_all(["arr1", "arr2"]) """ return {n: attach(n) for n in names}
[docs] def sparse_sum_help( idx1: pdarray, idx2: pdarray, val1: pdarray, val2: pdarray, merge: bool = True, percent_transfer_limit: int = 100, ) -> Tuple[pdarray, pdarray]: """ Sum two sparse matrices together. This function returns the result of summing two sparse matrices by combining their indices and values. Internally, it performs the equivalent of: ak.GroupBy(ak.concatenate([idx1, idx2])).sum(ak.concatenate((val1, val2))) Parameters ---------- idx1 : pdarray Indices for the first sparse matrix. idx2 : pdarray Indices for the second sparse matrix. val1 : pdarray Values for the first sparse matrix. val2 : pdarray Values for the second sparse matrix. merge : bool, default=True If True, the indices are combined using a merge-based workflow. If False, a sort-based workflow is used. percent_transfer_limit : int, default=100 Only used when `merge` is True. This defines the maximum percentage of data allowed to move between locales during the merge. If this threshold is exceeded, a sort-based workflow is used instead. Returns ------- Tuple[pdarray, pdarray] A tuple containing: - The indices of the resulting sparse matrix. - The summed values associated with those indices. Examples -------- >>> import arkouda as ak >>> idx1 = ak.array([0, 1, 3, 4, 7, 9]) >>> idx2 = ak.array([0, 1, 3, 6, 9]) >>> vals1 = idx1 >>> vals2 = ak.array([10, 11, 13, 16, 19]) >>> ak.util.sparse_sum_help(idx1, idx2, vals1, vals2) (array([0 1 3 4 6 7 9]), array([10 12 16 4 16 7 28])) >>> ak.GroupBy(ak.concatenate([idx1, idx2])).sum(ak.concatenate((vals1, vals2))) (array([0 1 3 4 6 7 9]), array([10 12 16 4 16 7 28])) """ from arkouda.core.client import generic_msg rep_msg = generic_msg( cmd="sparseSumHelp", args={ "idx1": idx1, "idx2": idx2, "val1": val1, "val2": val2, "merge": merge, "percent_transfer_limit": percent_transfer_limit, }, ) inds, vals = cast(str, rep_msg).split("+", maxsplit=1) return create_pdarray(inds), create_pdarray(vals)
[docs] @typechecked def broadcast_shapes(*shapes: Tuple[int, ...]) -> Tuple[int, ...]: """ Determine a broadcasted shape, given an arbitary number of shapes. This function implements the broadcasting rules from the Array API standard to compute the shape resulting from broadcasting two arrays together. See: https://data-apis.org/array-api/latest/API_specification/broadcasting.html#algorithm Parameters ---------- shapes : Tuple[int, ...] a list or tuple of the shapes to be broadcast Returns ------- Tuple[int, ...] The broadcasted shape Raises ------ ValueError If the shapes are not compatible for broadcasting. Examples -------- >>> import arkouda as ak >>> ak.broadcast_shapes((1,2,3),(4,1,3),(4,2,1)) (4, 2, 3) """ from numpy import broadcast_shapes as b_shapes try: return b_shapes(*shapes) except ValueError: raise ValueError(f"Found no common broadcast shape for: {shapes}")
[docs] @typechecked def broadcast_arrays(*arrays: pdarray) -> List[pdarray]: """ Broadcast arrays to a common shape. Parameters ---------- arrays : pdarray The arrays to broadcast. Must be broadcastable to a common shape. Returns ------- List A list whose elements are the given Arrays broadcasted to the common shape. Raises ------ ValueError Raised by broadcast_to if a common shape cannot be determined. Examples -------- >>> import arkouda as ak >>> a = ak.arange(10).reshape(1,2,5) >>> b = ak.arange(20).reshape(4,1,5) >>> c = ak.broadcast_arrays(a,b) >>> c[0][0,:,:] array([array([0 1 2 3 4]) array([5 6 7 8 9])]) >>> c[1][:,0,0] array([0 5 10 15]) """ shapes = [a.shape for a in arrays] bc_shape = broadcast_shapes(*shapes) return [broadcast_to(a, shape=bc_shape) for a in arrays]
[docs] @typechecked def broadcast_to(x: Union[numeric_scalars, pdarray], shape: Union[int, Tuple[int, ...]]) -> pdarray: """ Broadcast the array to the specified shape. Parameters ---------- x: int, pdarray The int or array to be broadcast. shape: int, Tuple[int, ...] The shape to which the array is to be broadcast. Notes ----- If x and shape are both integers, the result has shape (shape,). If x is an int and shape is a tuple, the result has shape (shape,). if x is a pdarray and shape is an int, then if x.shape == (shape,) x is unchanged. Otherwise a ValueError is raised. If x is a pdarray and shape is a tuple, then x is broadcast to shape, if possible. Returns ------- pdarray A new array which is x broadcast to the provided shape. Raises ------ ValueError Raised server-side if the broadcast fails, or client-side in the case where x is a pdarray, shape is an int, and x.shape != (shape,). Examples -------- >>> import arkouda as ak >>> a = ak.arange(5) >>> ak.broadcast_to(a,(2,5)) array([array([0 1 2 3 4]) array([0 1 2 3 4])]) """ from arkouda.core.client import generic_msg from arkouda.numpy.dtypes import _val_isinstance_of_union from arkouda.numpy.pdarraycreation import full as akfull if _val_isinstance_of_union(x, numeric_scalars): assert not isinstance(x, pdarray) # Required for mypy return akfull(shape, x, dtype=dtype(resolve_scalar_dtype(x))) elif isinstance(x, pdarray) and isinstance(shape, int): if x.ndim == 1 and x.size == shape: return x else: raise ValueError(f"Operands could not be broadcast together: {x.shape} and {shape}") elif isinstance(x, pdarray) and isinstance(shape, tuple): try: return create_pdarray( cast( str, generic_msg( cmd=f"broadcast<{x.dtype},{x.ndim},{len(shape)}>", args={ "name": x, "shape": shape, }, ), ) ) except RuntimeError as e: raise ValueError(f"Failed to broadcast array: {e}") else: raise ValueError("Operands could not be broadcast.")
[docs] @typechecked def broadcast_dims(sa: Sequence[int], sb: Sequence[int]) -> Tuple[int, ...]: """ Determine the broadcasted shape of two arrays given their shapes. This function implements the broadcasting rules from the Array API standard to compute the shape resulting from broadcasting two arrays together. See: https://data-apis.org/array-api/latest/API_specification/broadcasting.html#algorithm Parameters ---------- sa : Sequence[int] The shape of the first array. sb : Sequence[int] The shape of the second array. Returns ------- Tuple[int, ...] The broadcasted shape resulting from combining `sa` and `sb`. Raises ------ ValueError If the shapes are not compatible for broadcasting. Examples -------- >>> import arkouda as ak >>> from arkouda.numpy.util import broadcast_dims >>> broadcast_dims((5, 1), (1, 3)) (5, 3) >>> broadcast_dims((4,), (3, 1)) (3, 4) """ n_a = len(sa) n_b = len(sb) n = max(n_a, n_b) shape_out = [0 for i in range(n)] i = n - 1 while i >= 0: n1 = n_a - n + i n2 = n_b - n + i d1 = sa[n1] if n1 >= 0 else 1 d2 = sb[n2] if n2 >= 0 else 1 if d1 == 1: shape_out[i] = d2 elif d2 == 1: shape_out[i] = d1 elif d1 == d2: shape_out[i] = d1 else: raise ValueError("Incompatible dimensions for broadcasting") i -= 1 return tuple(shape_out)
[docs] def convert_bytes(nbytes: int_scalars, unit: Literal["B", "KB", "MB", "GB"] = "B") -> numeric_scalars: """ Convert a number of bytes to a larger unit: KB, MB, or GB. Parameters ---------- nbytes : int_scalars The number of bytes to convert. unit : {"B", "KB", "MB", "GB"}, default="B" The unit to convert to. One of {"B", "KB", "MB", "GB"}. Returns ------- numeric_scalars The converted value in the specified unit. Raises ------ ValueError If `unit` is not one of {"B", "KB", "MB", "GB"}. Examples -------- >>> import arkouda as ak >>> from arkouda.numpy.util import convert_bytes >>> convert_bytes(2048, unit="KB") 2.0 >>> convert_bytes(1048576, unit="MB") 1.0 >>> convert_bytes(1073741824, unit="GB") 1.0 """ kb = 1024 mb = kb * kb gb = mb * kb if unit == "B": return nbytes elif unit == "KB": return float(nbytes / kb) elif unit == "MB": return float(nbytes / mb) elif unit == "GB": return float(nbytes / gb) else: raise ValueError("Invalid unit. Must be one of {'B', 'KB', 'MB', 'GB'}")
[docs] def is_numeric(arry: Union[pdarray, Strings, Categorical, Series, Index]) -> builtins.bool: """ Check if the dtype of the given array-like object is numeric. Parameters ---------- arry : pdarray, Strings, Categorical, Series, or Index The object to check. Returns ------- bool True if the dtype of `arry` is numeric, False otherwise. Examples -------- >>> import arkouda as ak >>> data = ak.array([1, 2, 3, 4, 5]) >>> ak.util.is_numeric(data) True >>> strings = ak.array(["a", "b", "c"]) >>> ak.util.is_numeric(strings) False >>> from arkouda import Categorical >>> cat = Categorical(strings) >>> ak.util.is_numeric(cat) False """ from arkouda.pandas.index import Index from arkouda.pandas.series import Series if isinstance(arry, (pdarray, Series, Index)): return _is_dtype_in_union(dtype(arry.dtype), numeric_scalars) else: return False
[docs] def is_float(arry: Union[pdarray, Strings, Categorical, Series, Index]) -> builtins.bool: """ Check if the dtype of the given array-like object is a float type. Parameters ---------- arry : pdarray, Strings, Categorical, Series, or Index The object to check. Returns ------- bool True if the dtype of `arry` is a float type, False otherwise. Examples -------- >>> import arkouda as ak >>> data = ak.array([1.0, 2, 3, 4, float('nan')]) >>> ak.util.is_float(data) True >>> data2 = ak.arange(5) >>> ak.util.is_float(data2) False >>> strings = ak.array(["1.0", "2.0"]) >>> ak.util.is_float(strings) False """ from arkouda.pandas.index import Index from arkouda.pandas.series import Series if isinstance(arry, (pdarray, Series, Index)): return _is_dtype_in_union(dtype(arry.dtype), float_scalars) else: return False
[docs] def is_int(arry: Union[pdarray, Strings, Categorical, Series, Index]) -> builtins.bool: """ Check if the dtype of the given array-like object is an integer type. Parameters ---------- arry : pdarray, Strings, Categorical, Series, or Index The object to check. Returns ------- bool True if the dtype of `arry` is an integer type, False otherwise. Examples -------- >>> import arkouda as ak >>> data = ak.array([1.0, 2, 3, 4, float('nan')]) >>> ak.util.is_int(data) False >>> data2 = ak.arange(5) >>> ak.util.is_int(data2) True >>> strings = ak.array(["1", "2"]) >>> ak.util.is_int(strings) False """ from arkouda.pandas.index import Index from arkouda.pandas.series import Series if isinstance(arry, (pdarray, Series, Index)): return _is_dtype_in_union(dtype(arry.dtype), int_scalars) else: return False
[docs] def map( values: Union[pdarray, Strings, Categorical], mapping: Union[dict, Series] ) -> Union[pdarray, Strings]: """ Map the values of an array according to an input mapping. Parameters ---------- values : pdarray, Strings, or Categorical The values to be mapped. mapping : dict or Series The mapping correspondence. A dictionary or Series that defines how to map the `values` array. Returns ------- Union[pdarray, Strings] A new array with the values mapped by the provided mapping. The return type matches the type of `values`. If the input `Series` has Categorical values, the return type will be `Strings`. Raises ------ TypeError If `mapping` is not of type `dict` or `Series`. If `values` is not of type `pdarray`, `Categorical`, or `Strings`. ValueError If a mapping with tuple keys has inconsistent lengths, or if a MultiIndex mapping has a different number of levels than the GroupBy keys. Examples -------- >>> import arkouda as ak >>> from arkouda.numpy.util import map >>> a = ak.array([2, 3, 2, 3, 4]) >>> a array([2 3 2 3 4]) >>> ak.util.map(a, {4: 25.0, 2: 30.0, 1: 7.0, 3: 5.0}) array([30.00000000000000000 5.00000000000000000 30.00000000000000000 5.00000000000000000 25.00000000000000000]) >>> s = ak.Series(ak.array(["a", "b", "c", "d"]), index=ak.array([4, 2, 1, 3])) >>> ak.util.map(a, s) array(['b', 'd', 'b', 'd', 'a']) """ import numpy as np from arkouda import Series, array, broadcast, full from arkouda.numpy.pdarraysetops import in1d from arkouda.numpy.strings import Strings from arkouda.pandas.categorical import Categorical from arkouda.pandas.index import MultiIndex keys = values gb = GroupBy(keys, dropna=False) gb_keys = gb.unique_keys # helper: number of unique keys (works for single key or tuple-of-keys) nuniq = gb_keys[0].size if isinstance(gb_keys, tuple) else gb_keys.size # Fast-path: empty mapping => everything is missing if (isinstance(mapping, dict) and len(mapping) == 0) or ( isinstance(mapping, Series) and len(mapping.index) == 0 ): if not isinstance(values, (Strings, Categorical)): return broadcast(gb.segments, full(nuniq, np.nan, values.dtype), permutation=gb.permutation) else: return broadcast(gb.segments, full(nuniq, "null"), permutation=gb.permutation) if isinstance(mapping, dict): # Build mapping as a Series with an Index/MultiIndex (avoid rank>1 arrays) m_keys = list(mapping.keys()) m_vals = list(mapping.values()) k0 = m_keys[0] if isinstance(k0, tuple): # validate tuple keys if not all(isinstance(k, tuple) for k in m_keys): raise TypeError("Mixed key types in mapping dict (tuple and non-tuple).") n = len(k0) if not all(len(k) == n for k in m_keys): raise ValueError("All tuple keys in mapping dict must have the same length.") cols = list(zip(*m_keys)) # transpose list[tuple] -> list[level] idx = MultiIndex([array(col) for col in cols]) mapping = Series(array(m_vals), index=idx) else: mapping = Series(array(m_vals), index=array(m_keys)) if isinstance(mapping, Series): # Normalize mapping index keys into a "groupable" (single array OR tuple-of-arrays) mindex = mapping.index if isinstance(mindex, MultiIndex): mkeys = tuple(mindex.values) else: mkeys = mindex.values if isinstance(gb_keys, tuple) and isinstance(mkeys, tuple): if len(gb_keys) != len(mkeys): raise ValueError( f"Mapping MultiIndex has {len(mkeys)} levels but GroupBy has {len(gb_keys)} keys" ) # invert=True => mask is True for GroupBy unique keys that are *missing* from the mapping, # i.e., values that should be filled with NaN/"null". mask = in1d(gb_keys, mkeys, invert=True) # Compute extra keys + extra size without mixing tuple/non-tuple assignments if isinstance(gb_keys, tuple): xtra_keys_t = tuple(k[mask] for k in gb_keys) xtra_size = xtra_keys_t[0].size if len(xtra_keys_t) > 0 else 0 if xtra_size > 0: nans: Union[pdarray, Strings] # without this, mypy complains if not isinstance(mapping.values, (Strings, Categorical)): nans = full(xtra_size, np.nan, mapping.values.dtype) else: nans = full(xtra_size, "null") # Convert any categorical levels to strings, level-by-level xtra_keys_t = tuple( k.to_strings() if isinstance(k, Categorical) else k for k in xtra_keys_t ) xtra_series = Series(nans, index=MultiIndex(list(xtra_keys_t))) mapping = Series.concat([mapping, xtra_series]) else: xtra_keys_s = gb_keys[mask] xtra_size = xtra_keys_s.size if xtra_size > 0: if not isinstance(mapping.values, (Strings, Categorical)): nans = full(xtra_size, np.nan, mapping.values.dtype) else: nans = full(xtra_size, "null") if isinstance(xtra_keys_s, Categorical): xtra_keys_s = xtra_keys_s.to_strings() xtra_series = Series(nans, index=xtra_keys_s) mapping = Series.concat([mapping, xtra_series]) # Align mapping to gb_keys if isinstance(gb_keys, Categorical): mapping = mapping[gb_keys.to_strings()] else: mapping = mapping[gb_keys] if isinstance(mapping.values, (pdarray, Strings)): return broadcast(gb.segments, mapping.values, permutation=gb.permutation) else: raise TypeError("Map values must be castable to pdarray or Strings.") else: raise TypeError("Map must be dict or arkouda.Series.")
def _infer_shape_from_size(size): """ Infer the shape, number of dimensions (ndim), and full size from a given size or shape. This function is used in pdarray creation functions that allow a size (1D) or shape (multi-dim). If the input is a tuple, it is treated as a multidimensional shape. If the input is a single integer, it is treated as a 1D shape. Parameters ---------- size : int or tuple of int The size (for 1D arrays) or shape (for multidimensional arrays) of the desired array. Returns ------- tuple A tuple containing: - shape: The shape of the array (either an integer for 1D or a tuple for multidimensional). - ndim: The number of dimensions (1 for 1D, or the length of the shape tuple for multidimensional). - full_size: The total number of elements in the array (size for 1D or product of dimensions for multidimensional). Examples -------- >>> import arkouda as ak >>> _infer_shape_from_size(5) (5, 1, 5) >>> _infer_shape_from_size((3, 4)) ((3, 4), 2, 12) """ # used in pdarray creation functions that allow a size (1D) or shape (multi-dim) shape: Union[int_scalars, Tuple[int_scalars, ...]] = 1 if isinstance(size, tuple): shape = cast(Tuple, size) full_size = 1 for s in cast(Tuple, shape): full_size *= s ndim = len(shape) else: full_size = cast(int, size) shape = full_size ndim = 1 return shape, ndim, full_size def _generate_test_shape(rank, size): """ Generate a shape for a multi-dimensional array that is close to a given size, while ensuring each dimension is at least 2. The shape will consist of `rank` dimensions, where the product of the dimensions is close to the given `size`. The first `rank-1` dimensions are set to 2, and the last dimension is adjusted such that the product of all dimensions is close to the desired size. Parameters ---------- rank : int The number of dimensions (rank) for the generated shape. size : int The desired total size of the multi-dimensional array. Returns ------- tuple A tuple containing: - shape: The generated shape as a tuple of integers. - local_size: The product of the shape dimensions, i.e., the total size. Examples -------- >>> import arkouda as ak >>> _generate_test_shape(3, 16) ((2, 2, 4), 16) >>> _generate_test_shape(4, 24) ((2, 2, 2, 3), 24) """ # used to generate shapes of the form (2,2,...n) for testing multi-dim creation last_dim = max(2, size // (2 ** (rank - 1))) # such that 2*2*..*n is close to size, shape = (rank - 1) * [2] # and with the final dim at least 2. shape.append(last_dim) # building "shape" really does take shape = tuple(shape) # multiple steps because .append doesn't local_size = maprod(shape) # have a return value return shape, local_size
[docs] def copy(a: Union[Strings, pdarray]) -> Union[Strings, pdarray]: """ Return a deep copy of the given Arkouda object. Parameters ---------- a : Union[Strings, pdarray] The object to copy. Returns ------- Union[Strings, pdarray] A deep copy of the pdarray or Strings object. Raises ------ TypeError If the input is not a Strings or pdarray instance. """ from arkouda.numpy.strings import Strings if isinstance(a, (Strings, pdarray)): return a.copy() raise TypeError(f"Unsupported type for copy: {type(a)}")
def _ak_buffer_names(x): """ Return a set of server-side buffer names that back `x`. We try to be conservative: if we recognize a container, we pull out all of its backing pdarrays' `.name` values. Supported: - pdarray - Strings (offsets + values/bytes) - SegArray (segments + values) - Categorical (codes + categories Strings) - Nested containers of the above (tuples/lists/dicts) """ names = set() # Base case: pdarray if hasattr(x, "name") and isinstance(getattr(x, "name"), str): # Heuristic: Arkouda pdarray has `.name` referring to a server object names.add(x.name) return names # Strings: typically has .offsets and .values (or .offsets and .bytes) try: from arkouda.numpy.strings import Strings if isinstance(x, Strings): # Some versions expose .values, older expose .bytes _ak_buffer_names(x.get_offsets()) _ak_buffer_names(x.get_bytes()) if hasattr(x, "entry"): names |= _ak_buffer_names(x.entry) return names except Exception: pass # SegArray: segments + values try: from arkouda.numpy.segarray import SegArray if isinstance(x, SegArray): if hasattr(x, "segments"): names |= _ak_buffer_names(x.segments) if hasattr(x, "values"): names |= _ak_buffer_names(x.values) return names except Exception: pass # Categorical: codes + categories (Strings) try: from arkouda.pandas.categorical import Categorical if isinstance(x, Categorical): if hasattr(x, "codes"): names |= _ak_buffer_names(x.codes) if hasattr(x, "categories"): names |= _ak_buffer_names(x.categories) if hasattr(x, "segments"): names |= _ak_buffer_names(x.segments) if hasattr(x, "permutation"): names |= _ak_buffer_names(x.permutation) return names except Exception: pass # Compound structures: recurse if isinstance(x, (list, tuple, set)): for xi in x: names |= _ak_buffer_names(xi) return names if isinstance(x, dict): for xi in x.values(): names |= _ak_buffer_names(xi) return names # Unknown / unsupported type: returns empty => we assume no buffers known return names
[docs] def shares_memory(a, b): """ Return True if `a` and `b` share any Arkouda server-side buffers. This is an Arkouda analogue of numpy.shares_memory with a simpler definition: it checks for identical backing buffer *identities* (same server object names). Notes ----- - Because Arkouda commonly *materializes* results (rather than views), aliasing is rare and usually only true when objects literally reference the same backing buffers. - For compound containers (e.g., SegArray, Strings, Categorical), we check all of their component buffers. - If you introduce true view semantics in the future, teach `_ak_buffer_names` to surface the base buffer name(s) and view descriptors, and compare bases. """ a_names = _ak_buffer_names(a) b_names = _ak_buffer_names(b) return len(a_names.intersection(b_names)) > 0
[docs] def may_share_memory(a, b): """ Conservative version akin to numpy.may_share_memory. For now it just defers to shares_memory. """ # Example conservative policy: # if we fail to find any buffer names for either side but recognize # the object as Arkouda-ish, return True to be conservative. a_names = _ak_buffer_names(a) b_names = _ak_buffer_names(b) if not a_names and not b_names: # Unknown types: be conservative if you wish; here we say False. return False return len(a_names.intersection(b_names)) > 0
# bounds_check is just called on integers, to ensure they fit in the range def bounds_check(axis, rank): if axis < -rank or axis >= rank: return False else: return True # adjust_negs will only be called if bounds_check passes def adjust_negs(axis, rank): return axis if axis >= 0 else axis + rank # axis validation can be called in multiple conditions. # Some functions require the axis to be an integer (or None). For that, we have # _integer_axis_validation, which returns a boolean and an int (or None). def _integer_axis_validation(axis, rank): if axis is None: return True, None elif isinstance(axis, int): if bounds_check(axis, rank): axis = adjust_negs(axis, rank) return True, axis else: return False, axis else: return False, axis # Other functions allow the axis to be None, int, List, or Tuple. # For that, we have the more general _axis_validation, which returns # a boolean and a list. def _axis_validation(axis, rank): if axis is None: return True, None elif isinstance(axis, int): if bounds_check(axis, rank): axis = adjust_negs(axis, rank) return True, [axis] else: return False, [axis] else: if isinstance(axis, list): axis_ = axis.copy() elif isinstance(axis, tuple): axis_ = list(axis) else: return False, axis valid = True for i in range(len(axis_)): if bounds_check(axis_[i], rank): axis_[i] = adjust_negs(axis_[i], rank) else: valid = False return valid, axis_