from __future__ import annotations
import itertools
import json
from collections import defaultdict
from typing import (
DefaultDict,
Dict,
List,
Mapping,
Optional,
Sequence,
Tuple,
Union,
cast,
)
import numpy as np
from pandas import Categorical as pd_Categorical
from typeguard import typechecked
from arkouda.client import generic_msg
from arkouda.groupbyclass import GroupBy, unique
from arkouda.infoclass import information
from arkouda.logger import getArkoudaLogger
from arkouda.numeric import cast as akcast
from arkouda.numeric import where
from arkouda.numpy.dtypes import bool_ as akbool
from arkouda.numpy.dtypes import dtype as akdtype
from arkouda.numpy.dtypes import int64 as akint64
from arkouda.numpy.dtypes import int_scalars, resolve_scalar_dtype, str_, str_scalars
from arkouda.pdarrayclass import RegistrationError
from arkouda.pdarrayclass import all as akall
from arkouda.pdarrayclass import create_pdarray, pdarray
from arkouda.pdarraycreation import arange, array, ones, zeros, zeros_like
from arkouda.pdarraysetops import concatenate, in1d
from arkouda.sorting import argsort
from arkouda.sorting import sort as pda_sort
from arkouda.strings import Strings
__all__ = ["Categorical"]
[docs]
class Categorical:
"""
Represents an array of values belonging to named categories. Converting a
Strings object to Categorical often saves memory and speeds up operations,
especially if there are many repeated values, at the cost of some one-time
work in initialization.
Parameters
----------
values : Strings, Categorical, pd.Categorical
Values to convert to categories
NAvalue : str scalar
The value to use to represent missing/null data
Attributes
----------
categories : Strings
The set of category labels (determined automatically)
codes : pdarray, int64
The category indices of the values or -1 for N/A
permutation : pdarray, int64
The permutation that groups the values in the same order as categories
segments : pdarray, int64
When values are grouped, the starting offset of each group
size : Union[int,np.int64]
The number of items in the array
nlevels : Union[int,np.int64]
The number of distinct categories
ndim : Union[int,np.int64]
The rank of the array (currently only rank 1 arrays supported)
shape : tuple
The sizes of each dimension of the array
"""
BinOps = frozenset(["==", "!="])
RegisterablePieces = frozenset(["categories", "codes", "permutation", "segments", "_akNAcode"])
RequiredPieces = frozenset(["categories", "codes", "_akNAcode"])
permutation = None
segments = None
objType = "Categorical"
dtype = akdtype(str_) # this is being set for now because Categoricals only supported on Strings
def __init__(self, values, **kwargs) -> None:
self.logger = getArkoudaLogger(name=__class__.__name__) # type: ignore
if "codes" in kwargs and "categories" in kwargs:
# This initialization is called by Categorical.from_codes()
# The values arg is ignored
self.codes = kwargs["codes"]
self.categories = kwargs["categories"]
if (self.codes.min() < 0) or (self.codes.max() >= self.categories.size):
raise ValueError(
f"Codes out of bounds for categories: min = {self.codes.min()},"
f" max = {self.codes.max()}, categories = {self.categories.size}"
)
self.permutation = kwargs.get("permutation", None)
self.segments = kwargs.get("segments", None)
if self.permutation is not None and self.segments is not None:
# Permutation and segments should only ever be supplied together from
# the .from_codes() method, not user input
self.permutation = cast(pdarray, self.permutation)
self.segments = cast(pdarray, self.segments)
unique_codes = self.codes[self.permutation[self.segments]]
else:
unique_codes = unique(self.codes)
self._categories_used = self.categories[unique_codes]
else:
# Typical initialization, called with values
if isinstance(values, pd_Categorical):
self.categories = array(values.categories)
self.codes = array(values.codes.astype("int64"))
self._categories_used = self.categories[unique(self.codes)]
self.permutation = None
self.segments = None
elif isinstance(values, Categorical):
self.categories = values.categories
self.codes = values.codes
self._categories_used = values._categories_used
self.permutation = values.permutation
self.segments = values.segments
elif isinstance(values, Strings):
g = GroupBy(values)
self.categories = g.unique_keys
self.codes = g.broadcast(arange(self.categories.size), permute=True)
self.permutation = cast(pdarray, g.permutation)
self.segments = g.segments
# Make a copy because N/A value must be added below
self._categories_used = self.categories[:]
else:
raise ValueError(
("Categorical: inputs other than " + "Strings or pd.Categorical not yet supported")
)
# When read from file or attached, NA code will be passed as a pdarray
# Otherwise, the NA value is set to a string
if "_akNAcode" in kwargs and kwargs["_akNAcode"] is not None:
self._akNAcode = kwargs["_akNAcode"]
self._NAcode = int(self._akNAcode[0])
self.NAvalue = self.categories[self._NAcode]
else:
self.NAvalue = kwargs.get("NAvalue", "N/A")
findNA = self.categories == self.NAvalue
if findNA.any():
self._NAcode = int(akcast(findNA, akint64).argmax())
else:
# Append NA value
self.categories = concatenate((self.categories, array([self.NAvalue])))
self._NAcode = self.categories.size - 1
self._akNAcode = array([self._NAcode])
# Always set these values
self.size: int_scalars = self.codes.size
self.nlevels = self.categories.size
self.ndim = self.codes.ndim
self.shape = self.codes.shape
self.dtype = akdtype(str_)
self.registered_name: Optional[str] = None
@property
def nbytes(self):
"""
The size of the Categorical in bytes.
Returns
-------
int
The size of the Categorical in bytes.
"""
nbytes = 0
if self.categories is not None:
nbytes += self.categories.nbytes
if isinstance(self.codes, pdarray):
nbytes += self.codes.nbytes
elif isinstance(self.codes, akdtype("int64")):
nbytes += 1
if isinstance(self.permutation, pdarray):
nbytes += self.permutation.nbytes
elif isinstance(self.permutation, akdtype("int64")):
nbytes += 1
if isinstance(self.segments, pdarray):
nbytes += self.segments.nbytes
elif isinstance(self.segments, akdtype("int64")):
nbytes += 1
return nbytes
@property
def inferred_type(self) -> str:
"""
Return a string of the type inferred from the values.
"""
return "categorical"
[docs]
@classmethod
@typechecked
def from_codes(
cls, codes: pdarray, categories: Strings, permutation=None, segments=None, **kwargs
) -> Categorical:
"""
Make a Categorical from codes and categories arrays. If codes and
categories have already been pre-computed, this constructor saves
time. If not, please use the normal constructor.
Parameters
----------
codes : pdarray, int64
Category indices of each value
categories : Strings
Unique category labels
permutation : pdarray, int64
The permutation that groups the values in the same order
as categories
segments : pdarray, int64
When values are grouped, the starting offset of each group
Returns
-------
Categorical
The Categorical object created from the input parameters
Raises
------
TypeError
Raised if codes is not a pdarray of int64 objects or if
categories is not a Strings object
"""
if codes.dtype != akint64:
raise TypeError("Codes must be pdarray of int64")
return cls(
None,
codes=codes,
categories=categories,
permutation=permutation,
segments=segments,
**kwargs,
)
[docs]
@classmethod
def from_return_msg(cls, rep_msg) -> Categorical:
"""
Create categorical from return message from server
Notes
------
This is currently only used when reading a Categorical from HDF5 files.
"""
# parse return json
eles = json.loads(rep_msg)
codes = create_pdarray(eles["codes"])
cats = Strings.from_return_msg(eles["categories"])
na_code = create_pdarray(eles["_akNAcode"])
segments = None
perm = None
if "segments" in eles and "permutation" in eles:
segments = create_pdarray(eles["segments"])
perm = create_pdarray(eles["permutation"])
return cls.from_codes(codes, cats, permutation=perm, segments=segments, _akNAcode=na_code)
[docs]
@classmethod
def standardize_categories(cls, arrays, NAvalue="N/A"):
"""
Standardize an array of Categoricals so that they share the same categories.
Parameters
----------
arrays : sequence of Categoricals
The Categoricals to standardize
NAvalue : str scalar
The value to use to represent missing/null data
Returns
-------
List of Categoricals
A list of the original Categoricals remapped to the shared categories.
"""
for arr in arrays:
if not isinstance(arr, cls):
raise TypeError(f"All arguments must be {cls.__name__}")
new_categories = unique(concatenate([arr.categories for arr in arrays], ordered=False))
findNA = new_categories == NAvalue
if not findNA.any():
# Append NA value
new_categories = concatenate((new_categories, array([NAvalue])))
return [arr.set_categories(new_categories, NAvalue=NAvalue) for arr in arrays]
[docs]
def equals(self, other) -> bool:
"""
Whether Categoricals are the same size and all entries are equal.
Parameters
----------
other : object
object to compare.
Returns
-------
bool
True if the Categoricals are the same, o.w. False.
Examples
--------
>>> import arkouda as ak
>>> ak.connect()
>>> c = Categorical(ak.array(["a", "b", "c"]))
>>> c_cpy = Categorical(ak.array(["a", "b", "c"]))
>>> c.equals(c_cpy)
True
>>> c2 = Categorical(ak.array(["a", "x", "c"]))
>>> c.equals(c2)
False
"""
if isinstance(other, Categorical):
if other.size != self.size:
return False
else:
return akall(self == other)
else:
return False
[docs]
def set_categories(self, new_categories, NAvalue=None):
"""
Set categories to user-defined values.
Parameters
----------
new_categories : Strings
The array of new categories to use. Must be unique.
NAvalue : str scalar
The value to use to represent missing/null data
Returns
-------
Categorical
A new Categorical with the user-defined categories. Old values present
in new categories will appear unchanged. Old values not present will
be assigned the NA value.
"""
if NAvalue is None:
NAvalue = self.NAvalue
findNA = new_categories == NAvalue
if not findNA.any():
# Append NA value
new_categories = concatenate((new_categories, array([NAvalue])))
NAcode = new_categories.size - 1
else:
NAcode = int(akcast(findNA, akint64).argmax())
code_mapping = zeros(self.categories.size, dtype=akint64)
code_mapping.fill(NAcode)
# Concatenate old and new categories and unique codes
bothcats = concatenate((self.categories, new_categories), ordered=False)
bothcodes = concatenate(
(arange(self.categories.size), arange(new_categories.size)), ordered=False
)
fromold = concatenate(
(ones(self.categories.size, dtype=akbool), zeros(new_categories.size, dtype=akbool)),
ordered=False,
)
# Group combined categories to find matches
g = GroupBy(bothcats)
ct = g.size()[1]
if (ct > 2).any():
raise ValueError("User-specified categories must be unique")
# Matches have two hits in concatenated array
present = g.segments[(ct == 2)]
firstinds = g.permutation[present]
firstcodes = bothcodes[firstinds]
firstisold = fromold[firstinds]
secondinds = g.permutation[present + 1]
secondcodes = bothcodes[secondinds]
# Matching pairs define a mapping of old codes to new codes
scatterinds = where(firstisold, firstcodes, secondcodes)
gatherinds = where(firstisold, secondcodes, firstcodes)
# Make a lookup table where old code at scatterind maps to new code at gatherind
code_mapping[scatterinds] = arange(new_categories.size)[gatherinds]
# Apply the lookup to map old codes to new codes
new_codes = code_mapping[self.codes]
return self.__class__.from_codes(new_codes, new_categories, NAvalue=NAvalue)
[docs]
def to_ndarray(self) -> np.ndarray:
"""
Convert the array to a np.ndarray, transferring array data from
the arkouda server to Python. This conversion discards category
information and produces an ndarray of strings. If the arrays
exceeds a built-in size limit, a RuntimeError is raised.
Returns
-------
np.ndarray
A numpy ndarray of strings corresponding to the values in
this array
Notes
-----
The number of bytes in the array cannot exceed ``ak.client.maxTransferBytes``,
otherwise a ``RuntimeError`` will be raised. This is to protect the user
from overflowing the memory of the system on which the Python client
is running, under the assumption that the server is running on a
distributed system with much more memory than the client. The user
may override this limit by setting ak.client.maxTransferBytes to a larger
value, but proceed with caution.
"""
if self.categories.size > self.codes.size:
newcat = self.reset_categories()
idx = newcat.categories.to_ndarray()
valcodes = newcat.codes.to_ndarray()
else:
idx = self.categories.to_ndarray()
valcodes = self.codes.to_ndarray()
return idx[valcodes]
[docs]
def to_pandas(self) -> pd_Categorical:
"""
Return the equivalent Pandas Categorical.
"""
return pd_Categorical.from_codes(
codes=self.codes.to_ndarray(), categories=self.categories.to_ndarray()
)
[docs]
def to_list(self) -> List:
"""
Convert the Categorical to a list, transferring data from
the arkouda server to Python. This conversion discards category
information and produces a list of strings. If the arrays
exceeds a built-in size limit, a RuntimeError is raised.
Returns
-------
list
A list of strings corresponding to the values in
this Categorical
Notes
-----
The number of bytes in the Categorical cannot exceed ``ak.client.maxTransferBytes``,
otherwise a ``RuntimeError`` will be raised. This is to protect the user
from overflowing the memory of the system on which the Python client
is running, under the assumption that the server is running on a
distributed system with much more memory than the client. The user
may override this limit by setting ak.client.maxTransferBytes to a larger
value, but proceed with caution.
"""
return self.to_ndarray().tolist()
[docs]
def to_strings(self) -> List:
"""
Convert the Categorical to Strings.
Returns
-------
arkouda.strings.Strings
A Strings object corresponding to the values in
this Categorical.
Examples
--------
>>> import arkouda as ak
>>> ak.connect()
>>> a = ak.array(["a","b","c"])
>>> a
>>> c = ak.Categorical(a)
>>> c.to_strings()
c.to_strings()
>>> isinstance(c.to_strings(), ak.Strings)
True
"""
return self.categories[self.codes]
def __iter__(self):
raise NotImplementedError(
"Categorical does not support iteration. To force data transfer from server, use to_ndarray"
)
def __len__(self):
return self.shape[0]
def __str__(self):
# limit scope of import to pick up changes to global variable
from arkouda.client import pdarrayIterThresh
if self.size <= pdarrayIterThresh:
vals = [f"'{self[i]}'" for i in range(self.size)]
else:
vals = [f"'{self[i]}'" for i in range(3)]
vals.append("... ")
vals.extend([f"'{self[i]}'" for i in range(self.size - 3, self.size)])
return "[{}]".format(", ".join(vals))
def __repr__(self):
return f"array({self.__str__()})"
@typechecked
def _binop(self, other: Union[Categorical, str_scalars], op: str_scalars) -> pdarray:
"""
Executes the requested binop on this Categorical instance and returns
the results within a pdarray object.
Parameters
----------
other : Union[Categorical,str_scalars]
the other object is a Categorical object or string scalar
op : str_scalars
name of the binary operation to be performed
Returns
-------
pdarray
encapsulating the results of the requested binop
Raises
-----
ValueError
Raised if (1) the op is not in the self.BinOps set, or (2) if the
sizes of this and the other instance don't match
RuntimeError
Raised if a server-side error is thrown while executing the
binary operation
"""
if op not in self.BinOps:
raise NotImplementedError(f"Categorical: unsupported operator: {op}")
if np.isscalar(other) and resolve_scalar_dtype(other) == "str":
idxresult = self.categories._binop(other, op)
return idxresult[self.codes]
if self.size != cast(Categorical, other).size:
raise ValueError(
f"Categorical {op}: size mismatch {self.size} {cast(Categorical, other).size}"
)
if isinstance(other, Categorical):
if (self.categories.size == other.categories.size) and (
self.categories == other.categories
).all():
# Because categories are identical, codes can be compared directly
return self.codes._binop(other.codes, op)
else:
tmpself, tmpother = self.standardize_categories((self, other))
return tmpself.codes._binop(tmpother.codes, op)
else:
raise NotImplementedError(
"Operations between Categorical and non-Categorical not yet implemented."
"Consider converting operands to Categorical."
)
@typechecked
def _r_binop(self, other: Union[Categorical, str_scalars], op: str_scalars) -> pdarray:
"""
Executes the requested reverse binop on this Categorical instance and
returns the results within a pdarray object.
Parameters
----------
other : Union[Categorical,str_scalars]
the other object is a Categorical object or string scalar
op : str_scalars
name of the binary operation to be performed
Returns
-------
pdarray
encapsulating the results of the requested binop
Raises
-----
ValueError
Raised if (1) the op is not in the self.BinOps set, or (2) if the
sizes of this and the other instance don't match
RuntimeError
Raised if a server-side error is thrown while executing the
binary operation
"""
return self._binop(other, op)
def __eq__(self, other):
return self._binop(other, "==")
def __ne__(self, other):
return self._binop(other, "!=")
def __getitem__(self, key) -> Categorical:
if np.isscalar(key) and (resolve_scalar_dtype(key) in ["int64", "uint64"]):
return self.categories[self.codes[key]]
else:
# Don't reset categories because they might have been user-defined
# Initialization now determines which categories are used
return Categorical.from_codes(self.codes[key], self.categories)
[docs]
def isna(self):
"""
Find where values are missing or null (as defined by self.NAvalue)
"""
return self.codes == self._NAcode
[docs]
def reset_categories(self) -> Categorical:
"""
Recompute the category labels, discarding any unused labels. This
method is often useful after slicing or indexing a Categorical array,
when the resulting array only contains a subset of the original
categories. In this case, eliminating unused categories can speed up
other operations.
Returns
-------
Categorical
A Categorical object generated from the current instance
"""
g = GroupBy(self.codes)
idx = self.categories[g.unique_keys]
newvals = g.broadcast(arange(idx.size), permute=True)
return Categorical.from_codes(
newvals, idx, permutation=g.permutation, segments=g.segments, NAvalue=self.NAvalue
)
[docs]
@typechecked
def contains(self, substr: Union[bytes, str_scalars], regex: bool = False) -> pdarray:
"""
Check whether each element contains the given substring.
Parameters
----------
substr : Union[bytes, str_scalars]
The substring to search for
regex: bool
Indicates whether substr is a regular expression
Note: only handles regular expressions supported by re2
(does not support lookaheads/lookbehinds)
Returns
-------
pdarray, bool
True for elements that contain substr, False otherwise
Raises
------
TypeError
Raised if the substr parameter is not bytes or str_scalars
ValueError
Rasied if substr is not a valid regex
RuntimeError
Raised if there is a server-side error thrown
See Also
--------
Categorical.startswith, Categorical.endswith
Notes
-----
This method can be significantly faster than the corresponding method
on Strings objects, because it searches the unique category labels
instead of the full array.
"""
categories_contains = self.categories.contains(substr, regex)
return categories_contains[self.codes]
[docs]
@typechecked
def startswith(self, substr: Union[bytes, str_scalars], regex: bool = False) -> pdarray:
"""
Check whether each element starts with the given substring.
Parameters
----------
substr : Union[bytes, str_scalars]
The substring to search for
regex: bool
Indicates whether substr is a regular expression
Note: only handles regular expressions supported by re2
(does not support lookaheads/lookbehinds)
Returns
-------
pdarray, bool
True for elements that start with substr, False otherwise
Raises
------
TypeError
Raised if the substr parameter is not bytes or str_scalars
ValueError
Rasied if substr is not a valid regex
RuntimeError
Raised if there is a server-side error thrown
See Also
--------
Categorical.contains, Categorical.endswith
Notes
-----
This method can be significantly faster than the corresponding
method on Strings objects, because it searches the unique category
labels instead of the full array.
"""
categories_ends_with = self.categories.startswith(substr, regex)
return categories_ends_with[self.codes]
[docs]
@typechecked
def endswith(self, substr: Union[bytes, str_scalars], regex: bool = False) -> pdarray:
"""
Check whether each element ends with the given substring.
Parameters
----------
substr : Union[bytes, str_scalars]
The substring to search for
regex: bool
Indicates whether substr is a regular expression
Note: only handles regular expressions supported by re2
(does not support lookaheads/lookbehinds)
Returns
-------
pdarray, bool
True for elements that end with substr, False otherwise
Raises
------
TypeError
Raised if the substr parameter is not bytes or str_scalars
ValueError
Rasied if substr is not a valid regex
RuntimeError
Raised if there is a server-side error thrown
See Also
--------
Categorical.startswith, Categorical.contains
Notes
-----
This method can be significantly faster than the corresponding method
on Strings objects, because it searches the unique category labels
instead of the full array.
"""
categories_ends_with = self.categories.endswith(substr, regex)
return categories_ends_with[self.codes]
[docs]
@typechecked
def in1d(self, test: Union[Strings, Categorical]) -> pdarray:
"""
Test whether each element of the Categorical object is
also present in the test Strings or Categorical object.
Returns a boolean array the same length as `self` that is True
where an element of `self` is in `test` and False otherwise.
Parameters
----------
test : Union[Strings,Categorical]
The values against which to test each value of 'self`.
Returns
-------
pdarray, bool
The values `self[in1d]` are in the `test` Strings or Categorical object.
Raises
------
TypeError
Raised if test is not a Strings or Categorical object
See Also
--------
unique, intersect1d, union1d
Notes
-----
`in1d` can be considered as an element-wise function version of the
python keyword `in`, for 1-D sequences. ``in1d(a, b)`` is logically
equivalent to ``ak.array([item in b for item in a])``, but is much
faster and scales to arbitrarily large ``a``.
Examples
--------
>>> strings = ak.array([f'String {i}' for i in range(0,5)])
>>> cat = ak.Categorical(strings)
>>> ak.in1d(cat,strings)
array([True, True, True, True, True])
>>> strings = ak.array([f'String {i}' for i in range(5,9)])
>>> catTwo = ak.Categorical(strings)
>>> ak.in1d(cat,catTwo)
array([False, False, False, False, False])
"""
if isinstance(test, Categorical):
# Must use test._categories_used instead of test.categories to avoid
# falsely returning True when a value is present in test.categories
# but not used in the array. On the other hand, we don't need to use
# self._categories_used, because indexing with [self.codes] below ensures
# that only results for categories used in self.codes will be returned.
categoriesisin = in1d(self.categories, test._categories_used)
else:
categoriesisin = in1d(self.categories, test)
return categoriesisin[self.codes]
[docs]
def unique(self) -> Categorical:
# __doc__ = unique.__doc__
return Categorical.from_codes(
arange(self._categories_used.size), self._categories_used, NAvalue=self.NAvalue
)
[docs]
def hash(self) -> Tuple[pdarray, pdarray]:
"""
Compute a 128-bit hash of each element of the Categorical.
Returns
-------
Tuple[pdarray,pdarray]
A tuple of two int64 pdarrays. The ith hash value is the concatenation
of the ith values from each array.
Notes
-----
The implementation uses SipHash128, a fast and balanced hash function (used
by Python for dictionaries and sets). For realistic numbers of strings (up
to about 10**15), the probability of a collision between two 128-bit hash
values is negligible.
"""
rep_msg = generic_msg(
cmd="categoricalHash",
args={"objType": self.objType, "categories": self.categories, "codes": self.codes},
)
hashes = json.loads(rep_msg)
return create_pdarray(hashes["upperHash"]), create_pdarray(hashes["lowerHash"])
[docs]
def group(self) -> pdarray:
"""
Return the permutation that groups the array, placing equivalent
categories together. All instances of the same category are guaranteed
to lie in one contiguous block of the permuted array, but the blocks
are not necessarily ordered.
Returns
-------
pdarray
The permutation that groups the array by value
See Also
--------
GroupBy, unique
Notes
-----
This method is faster than the corresponding Strings method. If the
Categorical was created from a Strings object, then this function
simply returns the cached permutation. Even if the Categorical was
created using from_codes(), this function will be faster than
Strings.group() because it sorts dense integer values, rather than
128-bit hash values.
"""
if self.permutation is None:
return argsort(self.codes)
else:
return self.permutation
def _get_grouping_keys(self):
"""
Private method for generating grouping keys used by GroupBy.
API: this method must be defined by all groupable arrays, and it
must return a list of arrays that can be (co)argsorted.
"""
return [self.codes]
[docs]
def argsort(self):
# __doc__ = argsort.__doc__
idxperm = argsort(self.categories)
inverse = zeros_like(idxperm)
inverse[idxperm] = arange(idxperm.size)
newvals = inverse[self.codes]
return argsort(newvals)
[docs]
def sort_values(self):
# __doc__ = sort.__doc__
idxperm = argsort(self.categories)
inverse = zeros_like(idxperm)
inverse[idxperm] = arange(idxperm.size)
newvals = inverse[self.codes]
return Categorical.from_codes(pda_sort(newvals), self.categories[idxperm])
[docs]
@typechecked
def concatenate(self, others: Sequence[Categorical], ordered: bool = True) -> Categorical:
"""
Merge this Categorical with other Categorical objects in the array,
concatenating the arrays and synchronizing the categories.
Parameters
----------
others : Sequence[Categorical]
The Categorical arrays to concatenate and merge with this one
ordered : bool
If True (default), the arrays will be appended in the
order given. If False, array data may be interleaved
in blocks, which can greatly improve performance but
results in non-deterministic ordering of elements.
Returns
-------
Categorical
The merged Categorical object
Raises
------
TypeError
Raised if any others array objects are not Categorical objects
Notes
-----
This operation can be expensive -- slower than concatenating Strings.
"""
if isinstance(others, Categorical):
others = [others]
elif len(others) < 1:
return self
samecategories = True
for c in others:
if not isinstance(c, Categorical):
raise TypeError("Categorical: can only merge/concatenate with other Categoricals")
if (self.categories.size != c.categories.size) or not (
self.categories == c.categories
).all():
samecategories = False
if samecategories:
newvals = cast(
pdarray, concatenate([self.codes] + [o.codes for o in others], ordered=ordered)
)
return Categorical.from_codes(newvals, self.categories)
else:
new_arrays = self.standardize_categories([self] + list(others), NAvalue=self.NAvalue)
new_categories = new_arrays[0].categories
new_codes = cast(pdarray, concatenate([arr.codes for arr in new_arrays], ordered=ordered))
return Categorical.from_codes(new_codes, new_categories, NAvalue=self.NAvalue)
[docs]
def to_hdf(
self,
prefix_path,
dataset="categorical_array",
mode="truncate",
file_type="distribute",
):
"""
Save the Categorical to HDF5. The result is a collection of HDF5 files, one file
per locale of the arkouda server, where each filename starts with prefix_path.
Parameters
----------
prefix_path : str
Directory and filename prefix that all output files will share
dataset : str
Name prefix for saved data within the HDF5 file
mode : str {'truncate' | 'append'}
By default, truncate (overwrite) output files, if they exist.
If 'append', add data as a new column to existing files.
file_type: str ("single" | "distribute")
Default: "distribute"
When set to single, dataset is written to a single file.
When distribute, dataset is written on a file per locale.
Returns
-------
None
See Also
---------
load
"""
from arkouda.io import _file_type_to_int, _mode_str_to_int
args = {
"codes": self.codes,
"categories": self.categories,
"dset": dataset,
"write_mode": _mode_str_to_int(mode),
"filename": prefix_path,
"objType": "categorical",
"file_format": _file_type_to_int(file_type),
"NA_codes": self._akNAcode,
}
if self.permutation is not None and self.segments is not None:
args["permutation"] = self.permutation
args["segments"] = self.segments
generic_msg(
cmd="tohdf",
args=args,
)
[docs]
def update_hdf(self, prefix_path, dataset="categorical_array", repack=True):
"""
Overwrite the dataset with the name provided with this Categorical object. If
the dataset does not exist it is added.
Parameters
-----------
prefix_path : str
Directory and filename prefix that all output files share
dataset : str
Name of the dataset to create in files
repack: bool
Default: True
HDF5 does not release memory on delete. When True, the inaccessible
data (that was overwritten) is removed. When False, the data remains, but is
inaccessible. Setting to false will yield better performance, but will cause
file sizes to expand.
Returns
--------
None
Raises
-------
RuntimeError
Raised if a server-side error is thrown saving the Categorical
Notes
------
- If file does not contain File_Format attribute to indicate how it was saved,
the file name is checked for _LOCALE#### to determine if it is distributed.
- If the dataset provided does not exist, it will be added
- Because HDF5 deletes do not release memory, the repack option allows for
automatic creation of a file without the inaccessible data.
"""
from arkouda.io import (
_file_type_to_int,
_get_hdf_filetype,
_mode_str_to_int,
_repack_hdf,
)
# determine the format (single/distribute) that the file was saved in
file_type = _get_hdf_filetype(prefix_path + "*")
args = {
"codes": self.codes,
"categories": self.categories,
"dset": dataset,
"write_mode": _mode_str_to_int("append"),
"filename": prefix_path,
"objType": "categorical",
"overwrite": True,
"file_format": _file_type_to_int(file_type),
"NA_codes": self._akNAcode,
}
if self.permutation is not None and self.segments is not None:
args["permutation"] = self.permutation
args["segments"] = self.segments
generic_msg(
cmd="tohdf",
args=args,
)
if repack:
_repack_hdf(prefix_path)
[docs]
def to_parquet(
self,
prefix_path: str,
dataset: str = "categorical_array",
mode: str = "truncate",
compression: Optional[str] = None,
) -> str:
"""
This functionality is currently not supported and will also raise a RuntimeError.
Support is in development.
Save the Categorical to Parquet. The result is a collection of files,
one file per locale of the arkouda server, where each filename starts
with prefix_path. Each locale saves its chunk of the array to its
corresponding file.
Parameters
----------
prefix_path : str
Directory and filename prefix that all output files share
dataset : str
Name of the dataset to create in HDF5 files (must not already exist)
mode : str {'truncate' | 'append'}
By default, truncate (overwrite) output files, if they exist.
If 'append', create a new Categorical dataset within existing files.
compression : str (Optional)
Default None
Provide the compression type to use when writing the file.
Supported values: snappy, gzip, brotli, zstd, lz4
Returns
-------
String message indicating result of save operation
Raises
------
RuntimeError
On run due to compatability issues of Categorical with Parquet.
Notes
-----
- The prefix_path must be visible to the arkouda server and the user must
have write permission.
- Output files have names of the form ``<prefix_path>_LOCALE<i>``, where ``<i>``
ranges from 0 to ``numLocales`` for `file_type='distribute'`.
- 'append' write mode is supported, but is not efficient.
- If any of the output files already exist and
the mode is 'truncate', they will be overwritten. If the mode is 'append'
and the number of output files is less than the number of locales or a
dataset with the same name already exists, a ``RuntimeError`` will result.
- Any file extension can be used.The file I/O does not rely on the extension to
determine the file format.
See Also
--------
to_hdf
"""
# due to the possibility that components will be different sizes,
# writing to Parquet is not supported at this time
raise RuntimeError(
"Categorical cannot be written to Parquet at this time due to its components "
"potentially having different sizes."
)
result = []
comp_dict = {k: v for k, v in self._get_components_dict().items() if v is not None}
if self.RequiredPieces.issubset(comp_dict.keys()):
# Honor the first mode but switch to append for all others
# since each following comp may wipe out the file
first = True
for k, v in comp_dict.items():
result.append(
v.to_parquet(
prefix_path,
dataset=f"{dataset}.{k}",
mode=(mode if first else "append"),
compression=compression,
)
)
first = False
else:
raise Exception(
"The required pieces of `categories` and `codes` were not populated on this Categorical"
)
return ";".join(result)
[docs]
def save(
self,
prefix_path: str,
dataset: str = "categorical_array",
file_format: str = "HDF5",
mode: str = "truncate",
file_type: str = "distribute",
compression: Optional[str] = None,
) -> str:
"""
DEPRECATED
Save the Categorical object to HDF5 or Parquet. The result is a collection of HDF5/Parquet files,
one file per locale of the arkouda server, where each filename starts
with prefix_path and dataset. Each locale saves its chunk of the Strings array to its
corresponding file.
Parameters
----------
prefix_path : str
Directory and filename prefix that all output files share
dataset : str
Name of the dataset to create in HDF5 files (must not already exist)
file_format: str {'HDF5 | 'Parquet'}
The format to save the file to.
mode : str {'truncate' | 'append'}
By default, truncate (overwrite) output files, if they exist.
If 'append', create a new Categorical dataset within existing files.
file_type: str ("single" | "distribute")
Default: "distribute"
When set to single, dataset is written to a single file.
When distribute, dataset is written on a file per locale.
This is only supported by HDF5 files and will have no impact of Parquet Files.
compression: str (Optional)
{None | 'snappy' | 'gzip' | 'brotli' | 'zstd' | 'lz4'}
The compression type to use when writing.
This is only supported for Parquet files and will not be used with HDF5.
Returns
-------
String message indicating result of save operation
Raises
------
ValueError
Raised if the lengths of columns and values differ, or the mode is
neither 'truncate' nor 'append'
TypeError
Raised if prefix_path, dataset, or mode is not a str
Notes
-----
Important implementation notes: (1) Strings state is saved as two datasets
within an hdf5 group: one for the string characters and one for the
segments corresponding to the start of each string, (2) the hdf5 group is named
via the dataset parameter.
See Also
---------
- ak.Categorical.to_parquet
- ak.Categorical.to_hdf
"""
from warnings import warn
warn(
"ak.Categorical.save has been deprecated. "
"Please use ak.Categorical.to_parquet or ak.Categorical.to_hdf",
DeprecationWarning,
)
if mode.lower() not in ["append", "truncate"]:
raise ValueError("Allowed modes are 'truncate' and 'append'")
if file_format.lower() == "hdf5":
return self.to_hdf(prefix_path, dataset=dataset, mode=mode, file_type=file_type)
elif file_format.lower() == "parquet":
return self.to_parquet(prefix_path, dataset=dataset, mode=mode, compression=compression)
else:
raise ValueError("Valid file types are HDF5 or Parquet")
[docs]
@typechecked()
def register(self, user_defined_name: str) -> Categorical:
"""
Register this Categorical object and underlying components with the Arkouda server
Parameters
----------
user_defined_name : str
user defined name the Categorical is to be registered under,
this will be the root name for underlying components
Returns
-------
Categorical
The same Categorical which is now registered with the arkouda server and has an updated name.
This is an in-place modification, the original is returned to support
a fluid programming style.
Please note you cannot register two different Categoricals with the same name.
Raises
------
TypeError
Raised if user_defined_name is not a str
RegistrationError
If the server was unable to register the Categorical with the user_defined_name
See also
--------
unregister, attach, unregister_categorical_by_name, is_registered
Notes
-----
Objects registered with the server are immune to deletion until
they are unregistered.
"""
if self.registered_name is not None and self.is_registered():
raise RegistrationError(f"This object is already registered as {self.registered_name}")
generic_msg(
cmd="register",
args={
"name": user_defined_name,
"objType": self.objType,
"codes": self.codes,
"categories": self.categories,
"_akNAcode": self._akNAcode,
"segments": self.segments if self.segments is not None else "",
"permutation": self.permutation if self.permutation is not None else "",
},
)
self.registered_name = user_defined_name
return self
[docs]
def unregister(self) -> None:
"""
Unregister this Categorical object in the arkouda server which was previously
registered using register() and/or attached to using attach()
Raises
------
RegistrationError
If the object is already unregistered or if there is a server error
when attempting to unregister
See also
--------
register, attach, unregister_categorical_by_name, is_registered
Notes
-----
Objects registered with the server are immune to deletion until
they are unregistered.
"""
from arkouda.util import unregister
if not self.registered_name:
raise RegistrationError("This object is not registered")
unregister(self.registered_name)
self.registered_name = None
[docs]
def is_registered(self) -> np.bool_:
"""
Return True iff the object is contained in the registry or is a component of a
registered object.
Returns
-------
numpy.bool
Indicates if the object is contained in the registry
Raises
------
RegistrationError
Raised if there's a server-side error or a mis-match of registered components
See Also
--------
register, attach, unregister, unregister_categorical_by_name
Notes
-----
Objects registered with the server are immune to deletion until
they are unregistered.
"""
from arkouda.util import is_registered
if self.registered_name is None:
result = True
result &= is_registered(self.codes.name, as_component=True)
result &= is_registered(self.categories.name, as_component=True)
result &= is_registered(self._akNAcode.name, as_component=True)
if self.permutation is not None and self.segments is not None:
result &= is_registered(self.permutation.name, as_component=True)
result &= is_registered(self.segments.name, as_component=True)
return np.bool_(result)
else:
return np.bool_(is_registered(self.registered_name))
def _get_components_dict(self) -> Dict:
"""
Internal function that returns a dictionary with all required or non-None components of self
Required Categorical components (Codes and Categories) are always included in
returned components_dict
Optional Categorical components (Permutation and Segments) are only included if
they've been set (are not None)
Returns
-------
Dict
Dictionary of all required or non-None components of self
Keys: component names (Codes, Categories, Permutation, Segments)
Values: components of self
"""
return {
piece_name: getattr(self, piece_name)
for piece_name in Categorical.RegisterablePieces
if piece_name in Categorical.RequiredPieces or getattr(self, piece_name) is not None
}
def _list_component_names(self) -> List[str]:
"""
Internal function that returns a list of all component names
Parameters
----------
None
Returns
-------
List[str]
List of all component names
"""
return list(
itertools.chain.from_iterable(
[p._list_component_names() for p in Categorical._get_components_dict(self).values()]
)
)
[docs]
def info(self) -> str:
"""
Returns a JSON formatted string containing information about all components of self
Parameters
----------
None
Returns
-------
str
JSON string containing information about all components of self
"""
return information(self._list_component_names())
[docs]
def pretty_print_info(self) -> None:
"""
Prints information about all components of self in a human readable format
Parameters
----------
None
Returns
-------
None
"""
[p.pretty_print_info() for p in Categorical._get_components_dict(self).values()]
[docs]
@staticmethod
@typechecked
def attach(user_defined_name: str) -> Categorical:
"""
DEPRECATED
Function to return a Categorical object attached to the registered name in the
arkouda server which was registered using register()
Parameters
----------
user_defined_name : str
user defined name which Categorical object was registered under
Returns
-------
Categorical
The Categorical object created by re-attaching to the corresponding server components
Raises
------
TypeError
if user_defined_name is not a string
See Also
--------
register, is_registered, unregister, unregister_categorical_by_name
"""
import warnings
from arkouda.util import attach
warnings.warn(
"ak.Categorical.attach() is deprecated. Please use ak.attach() instead.",
DeprecationWarning,
)
return attach(user_defined_name)
[docs]
@staticmethod
@typechecked
def unregister_categorical_by_name(user_defined_name: str) -> None:
"""
Function to unregister Categorical object by name which was registered
with the arkouda server via register()
Parameters
----------
user_defined_name : str
Name under which the Categorical object was registered
Raises
-------
TypeError
if user_defined_name is not a string
RegistrationError
if there is an issue attempting to unregister any underlying components
See Also
--------
register, unregister, attach, is_registered
"""
import warnings
from arkouda.util import unregister
warnings.warn(
"ak.Categorical.unregister_categorical_by_name() is deprecated. "
"Please use ak.unregister() instead.",
DeprecationWarning,
)
return unregister(user_defined_name)
[docs]
@staticmethod
@typechecked
def parse_hdf_categoricals(
d: Mapping[str, Union[pdarray, Strings]]
) -> Tuple[List[str], Dict[str, Categorical]]:
"""
This function should be used in conjunction with the load_all function which reads hdf5 files
and reconstitutes Categorical objects.
Categorical objects use a naming convention and HDF5 structure so they can be identified and
constructed for the user.
In general you should not call this method directly
Parameters
----------
d : Dictionary of String to either Pdarray or Strings object
Returns
-------
2-Tuple of List of strings containing key names which should be removed and Dictionary of
base name to Categorical object
See Also
--------
Categorical.save, load_all
"""
removal_names: List[str] = []
groups: DefaultDict[str, List[str]] = defaultdict(list)
result_categoricals: Dict[str, Categorical] = {}
for k in d.keys(): # build dict of str->list[components]
if "." in k:
groups[k.split(".")[0]].append(k)
# for each of the groups, find categorical by testing values in the group for ".categories"
for k, v in groups.items(): # str->list[str]
if any(i.endswith(".categories") for i in v): # we have a categorical
# gather categorical pieces and replace the original mapping with the categorical object
cat_parts = {}
base_name = ""
for part in v:
removal_names.append(part) # flag it for removal from original
cat_parts[part.split(".")[-1]] = d[part] # put the part into our categorical parts
if part.endswith(".categories"):
base_name = ".".join(part.split(".categories")[0:-1])
# Construct categorical and add it to the return_categoricals under the parent name
result_categoricals[base_name] = Categorical.from_codes(**cat_parts)
return removal_names, result_categoricals
[docs]
def transfer(self, hostname: str, port: int_scalars):
"""
Sends a Categorical object to a different Arkouda server
Parameters
----------
hostname : str
The hostname where the Arkouda server intended to
receive the Categorical is running.
port : int_scalars
The port to send the array over. This needs to be an
open port (i.e., not one that the Arkouda server is
running on). This will open up `numLocales` ports,
each of which in succession, so will use ports of the
range {port..(port+numLocales)} (e.g., running an
Arkouda server of 4 nodes, port 1234 is passed as
`port`, Arkouda will use ports 1234, 1235, 1236,
and 1237 to send the array data).
This port much match the port passed to the call to
`ak.receive_array()`.
Returns
-------
A message indicating a complete transfer
Raises
------
ValueError
Raised if the op is not within the pdarray.BinOps set
TypeError
Raised if other is not a pdarray or the pdarray.dtype is not
a supported dtype
"""
# hostname is the hostname to send to
args = {
"codes": self.codes,
"categories": self.categories,
"objType": self.objType,
"NA_codes": self._akNAcode,
"hostname": hostname,
"port": port,
}
return generic_msg(
cmd="sendArray",
args=args,
)