# from __future__ import annotations
from typing import TYPE_CHECKING, Optional, Sequence, Tuple, TypeVar, Union, cast, overload
from typeguard import typechecked
from arkouda.numpy.dtypes import bool_scalars, int_scalars, numeric_scalars
from arkouda.numpy.pdarrayclass import create_pdarray, pdarray
from arkouda.pandas.categorical import Categorical
if TYPE_CHECKING:
from arkouda.numpy.strings import Strings
else:
Strings = TypeVar("Strings")
__all__ = ["flip", "repeat", "squeeze", "tile"]
# docstr-coverage:excused `overload-only, docs live on impl`
@overload
def flip(
x: pdarray, /, *, axis: Optional[Union[int_scalars, Tuple[int_scalars, ...]]] = None
) -> pdarray: ...
# docstr-coverage:excused `overload-only, docs live on impl`
@overload
def flip(
x: Strings, /, *, axis: Optional[Union[int_scalars, Tuple[int_scalars, ...]]] = None
) -> Strings: ...
# docstr-coverage:excused `overload-only, docs live on impl`
@overload
def flip(
x: Categorical, /, *, axis: Optional[Union[int_scalars, Tuple[int_scalars, ...]]] = None
) -> Categorical: ...
[docs]
def flip(
x: Union[pdarray, Strings, Categorical],
/,
*,
axis: Optional[Union[int_scalars, Tuple[int_scalars, ...]]] = None,
) -> Union[pdarray, Strings, Categorical]:
"""
Reverse an array's values along a particular axis or axes.
Parameters
----------
x : pdarray, Strings, or Categorical
Reverse the order of elements in an array along the given axis.
The shape of the array is preserved, but the elements are reordered.
axis : int or Tuple[int, ...], optional
The axis or axes along which to flip the array. If None, flip the array along all axes.
Returns
-------
pdarray, Strings, or Categorical
An array with the entries of axis reversed.
Raises
------
IndexError
Raised if operation fails server-side.
Examples
--------
>>> import arkouda as ak
>>> a = ak.arange(12)
>>> ak.flip(a)
array([11 10 9 8 7 6 5 4 3 2 1 0])
Note
----
This differs from numpy as it actually reverses the data, rather than presenting a view.
"""
from arkouda.core.client import generic_msg
from arkouda.numpy.strings import Strings
axis_list = []
if axis is not None:
axis_list = list(axis) if isinstance(axis, tuple) else [axis]
if isinstance(x, pdarray):
try:
return create_pdarray(
cast(
str,
generic_msg(
cmd=(
f"flipAll<{x.dtype},{x.ndim}>"
if axis is None
else f"flip<{x.dtype},{x.ndim}>"
),
args={
"name": x,
"nAxes": len(axis_list),
"axis": axis_list,
},
),
)
)
except RuntimeError as e:
raise IndexError(f"Failed to flip array: {e}")
elif isinstance(x, Categorical):
if isinstance(x.permutation, pdarray):
return Categorical.from_codes(
codes=flip(x.codes),
categories=x.categories,
permutation=flip(x.permutation),
segments=x.segments,
)
else:
return Categorical.from_codes(
codes=flip(x.codes),
categories=x.categories,
permutation=None,
segments=x.segments,
)
elif isinstance(x, Strings):
rep_msg = generic_msg(
cmd="flipString", args={"objType": x.objType, "obj": x.entry, "size": x.size}
)
return Strings.from_return_msg(cast(str, rep_msg))
else:
raise TypeError("flip only accepts type pdarray, Strings, or Categorical.")
[docs]
def repeat(
a: Union[int, Sequence[int], pdarray],
repeats: Union[int, Sequence[int], pdarray],
axis: Union[None, int] = None,
) -> pdarray:
"""
Repeat each element of an array after themselves.
Parameters
----------
a : int, Sequence of int, or pdarray
Input array.
repeats: int, Sequence of int, or pdarray
The number of repetitions for each element.
`repeats` is broadcasted to fit the shape of the given axis.
axis : int, optional
The axis along which to repeat values.
By default, use the flattened input array, and return a flat output array.
Returns
-------
pdarray
Output array which has the same shape as `a`, except along the given axis.
Raises
------
ValueError
Raised if repeats is not an int or a 1-dimensional array, or if it contains
negative values, if its size does not match the input arrays size along
axis.
RuntimeError
Raised if the operation fails server-side.
TypeError
Raised if axis anything but None or int, or if either a or repeats is invalid
(the a and repeat cases should be impossible).
IndexError
Raised if axis is invalid for the given rank.
Examples
--------
>>> import arkouda as ak
>>> ak.repeat(3, 4)
array([3 3 3 3])
>>> x = ak.array([[1,2],[3,4]])
>>> ak.repeat(x, 2)
array([1 1 2 2 3 3 4 4])
>>> ak.repeat(x, 3, axis=1)
array([array([1 1 1 2 2 2]) array([3 3 3 4 4 4])])
>>> ak.repeat(x, [1, 2], axis=0)
array([array([1 2]) array([3 4]) array([3 4])])
"""
from arkouda.core.client import generic_msg
from arkouda.numpy.pdarrayclass import any as akany
from arkouda.numpy.pdarraycreation import array as ak_array
from arkouda.numpy.util import _integer_axis_validation
if isinstance(repeats, int):
ak_repeats = ak_array([repeats], int)
if isinstance(ak_repeats, pdarray):
repeats = ak_repeats
else:
raise TypeError("This should never happen because repeats was an int.")
elif isinstance(repeats, Sequence):
ak_repeats = ak_array(repeats, int)
if isinstance(ak_repeats, pdarray):
repeats = ak_repeats
else:
raise TypeError("This should never happen because repeats was a Sequence of int.")
if isinstance(a, int):
ak_a = ak_array([a], int)
if isinstance(ak_a, pdarray):
a = ak_a
else:
raise TypeError("This should never happen because a was an int.")
elif isinstance(a, Sequence):
ak_a = ak_array(a, int)
if isinstance(ak_a, pdarray):
a = ak_a
else:
raise TypeError("This should never happen because a was a Sequence of int.")
if repeats.ndim > 1:
raise ValueError(
f"Expected repeats to be a 1-dimensional array or constant, but "
f"received {repeats.ndim}-dimensional array instead."
)
if akany(repeats < 0):
raise ValueError("repeats may not contain negative values.")
if not akany(repeats > 0):
temp = cast(pdarray, ak_array([], a.dtype))
temp_shape = list(a.shape)
if axis is None:
return temp
elif isinstance(axis, int):
temp_shape[axis] = 0
temp = temp.reshape(temp_shape)
return temp
else:
raise TypeError("Axis should have been None or an int")
valid, axis_ = _integer_axis_validation(axis, a.ndim)
if not valid:
raise IndexError(f"Cannot repeat array of rank {a.ndim} along axis {axis}")
if axis_ is None:
try:
return create_pdarray(
cast(
str,
generic_msg(
cmd=f"repeatFlat<{a.dtype},{a.ndim}>",
args={
"name": a,
"repeats": repeats,
},
),
)
)
except RuntimeError as e:
raise ValueError(f"Failed to repeat array: {e}")
if repeats.size != 1 and repeats.size != a.shape[axis]:
raise ValueError(
f"repeats must either be a constant or match the length of a in axis. "
f"Instead, repeats size of {repeats.size} != {a.shape[axis]}"
)
try:
return create_pdarray(
cast(
str,
generic_msg(
cmd=f"repeat<{a.dtype},{a.ndim},{repeats.dtype},{1}>",
args={
"eIn": a,
"reps": repeats,
"axis": axis_,
},
),
)
)
except RuntimeError as e:
raise ValueError(f"Failed to repeat array: {e}")
[docs]
@typechecked
def squeeze(
x: Union[pdarray, numeric_scalars, bool_scalars],
/,
axis: Union[None, int_scalars, Tuple[int_scalars, ...]] = None,
) -> pdarray:
"""
Remove degenerate (size one) dimensions from an array.
Parameters
----------
x : pdarray
The array to squeeze
axis : int or Tuple[int, ...]
The axis or axes to squeeze (must have a size of one).
If axis = None, all dimensions of size 1 will be squeezed.
Returns
-------
pdarray
A copy of x with the dimensions specified in the axis argument removed.
Raises
------
RuntimeError
Raised if operation fails server-side.
Examples
--------
>>> import arkouda as ak
>>> x = ak.arange(10).reshape((1, 10, 1))
>>> x.shape
(1, 10, 1)
>>> ak.squeeze(x, axis=None).shape
(10,)
>>> ak.squeeze(x, axis=2).shape
(1, 10)
>>> ak.squeeze(x, axis=(0, 2)).shape
(10,)
"""
from arkouda.core.client import generic_msg
from arkouda.numpy.dtypes import _val_isinstance_of_union
from arkouda.numpy.pdarraycreation import array as ak_array
if _val_isinstance_of_union(x, numeric_scalars) or _val_isinstance_of_union(x, bool_scalars):
ret = ak_array([x])
if isinstance(ret, pdarray):
return ret
if isinstance(x, pdarray):
if axis is None:
_axis = [i for i in range(x.ndim) if x.shape[i] == 1]
# Can't squeeze over every dimension, so remove one if necessary
if len(_axis) == len(x.shape):
_axis.pop()
axis = tuple(_axis)
n_axes = len(axis) if isinstance(axis, tuple) else 1
try:
return create_pdarray(
cast(
str,
generic_msg(
cmd=f"squeeze<{x.dtype},{x.ndim},{x.ndim - n_axes}>",
args={
"name": x,
"nAxes": n_axes,
"axes": list(axis) if isinstance(axis, tuple) else [axis],
},
),
)
)
except RuntimeError as e:
raise ValueError(f"Failed to squeeze array: {e}")
raise RuntimeError("Failed to squeeze array.")
[docs]
def tile(A: pdarray, /, reps: Union[int, Tuple[int, ...]]) -> pdarray:
"""
Construct an array by repeating A the number of times given by reps.
If reps has length ``d``, the result will have dimension of ``max(d, A.ndim)``.
If ``A.ndim < d``, A is promoted to be d-dimensional by prepending new axes. So a shape (3,) \
array is promoted to (1, 3) for 2-D replication, or shape (1, 1, 3) for 3-D replication. \
If this is not the desired behavior, promote A to d-dimensions manually before calling this function.
If ``A.ndim > d``, reps is promoted to A.ndim by prepending 1’s to it. \
Thus for an A of shape (2, 3, 4, 5), a reps of (2, 2) is treated as (1, 1, 2, 2).
Parameters
----------
A : pdarray
The input pdarray to be tiled
reps : int or Tuple of int
The number of repetitions of A along each axis.
Returns
-------
pdarray
A new pdarray with the tiled data.
Examples
--------
>>> import arkouda as ak
>>> a = ak.array([0, 1, 2])
>>> ak.tile(a, 2)
array([0 1 2 0 1 2])
>>> ak.tile(a, (2, 2))
array([array([0 1 2 0 1 2]) array([0 1 2 0 1 2])])
>>> ak.tile(a, (2, 1, 2))
array([array([array([0 1 2 0 1 2])]) array([array([0 1 2 0 1 2])])])
>>> b = ak.array([[1, 2], [3, 4]])
>>> ak.tile(b, 2)
array([array([1 2 1 2]) array([3 4 3 4])])
>>> ak.tile(b, (2, 1))
array([array([1 2]) array([3 4]) array([1 2]) array([3 4])])
>>> c = ak.array([1, 2, 3, 4])
>>> ak.tile(c, (4, 1))
array([array([1 2 3 4]) array([1 2 3 4]) array([1 2 3 4]) array([1 2 3 4])])
"""
from arkouda.core.client import generic_msg
# Ensure 'reps' is a list
if isinstance(reps, int):
reps_2 = [cast(int, reps)]
l_reps = 1
else:
reps_2 = list(cast(tuple, reps))
l_reps = len(reps)
a_shape = A.shape
dim_difference = abs(len(a_shape) - l_reps)
if len(a_shape) < l_reps:
A = A.reshape((1,) * dim_difference + a_shape) # noqa: N806
elif len(a_shape) > l_reps:
reps_2 = [1] * dim_difference + reps_2
# Construct the command to send to the server
cmd = f"tile<{A.dtype},{A.ndim}>"
args = {"name": A, "reps": reps_2}
# Send the command to the Arkouda server
rep_msg = generic_msg(cmd=cmd, args=args)
# Create and return the resulting pdarray
return create_pdarray(cast(str, rep_msg))