Source code for arkouda.segarray

from __future__ import annotations

import json
import warnings
from typing import Optional, Sequence, Tuple
from typing import cast as type_cast
from warnings import warn

import numpy as np  # type: ignore

from arkouda.client import generic_msg
from arkouda.dtypes import bool as akbool
from arkouda.dtypes import int64 as akint64
from arkouda.dtypes import int_scalars, isSupportedInt, str_
from arkouda.dtypes import uint64 as akuint64
from arkouda.groupbyclass import GroupBy, broadcast
from arkouda.join import gen_ranges
from arkouda.logger import getArkoudaLogger
from arkouda.numeric import cumsum
from arkouda.pdarrayclass import RegistrationError, create_pdarray, is_sorted, pdarray
from arkouda.pdarraycreation import arange, array, ones, zeros
from arkouda.pdarraysetops import concatenate
from arkouda.strings import Strings

SEG_SUFFIX = "_segments"
VAL_SUFFIX = "_values"
LEN_SUFFIX = "_lengths"


def _aggregator(func):
    aggdoc = """
        Aggregate values over each sub-array.

        Parameters
        ----------
        x : pdarray
        The values to aggregate. By default, the values of the sub-arrays
        themselves are used, but the user may supply an array of values
        corresponding to the flattened values of all sub-arrays.

        Returns
        -------
        pdarray
        Array of one aggregated value per sub-array.
        """

    def update_doc():
        func.__doc__ = aggdoc
        return func

    return update_doc


[docs] def segarray(segments: pdarray, values: pdarray, lengths=None, grouping=None): """ Alias for the from_parts function. Prevents user from needing to call `ak.SegArray` constructor DEPRECATED """ warn( "ak.segarray has been deprecated. Please use ak.SegArray constructor moving forward", DeprecationWarning, ) return SegArray(segments, values, lengths, grouping)
[docs] class SegArray: objType = "SegArray" def __init__(self, segments, values, lengths=None, grouping=None): self.logger = getArkoudaLogger(name=__class__.__name__) # type: ignore self.registered_name: Optional[str] = None # validate inputs if not isinstance(segments, pdarray) or segments.dtype != akint64: raise TypeError("Segments must be int64 pdarray") if not isinstance(values, pdarray) and not isinstance(values, Strings): raise TypeError("Values must be a pdarray or Strings.") if not is_sorted(segments): raise ValueError("Segments must be unique and in sorted order") if segments.size > 0: if segments[0] != 0: raise ValueError("Segments must start at zero.") elif values.size > 0: raise ValueError("Cannot have non-empty values with empty segments") # references to supporting pdarrays self.values = values self.segments = segments self.size = segments.size self.valsize = values.size self.dtype = values.dtype if lengths is None: self.lengths = self._get_lengths() else: self.lengths = lengths self._non_empty = self.lengths > 0 self._non_empty_count = self._non_empty.sum() # grouping object computation. (This will need to be moved to the server) # GroupBy computation left here because of lack of server obj. May need to move in Future if grouping is None: if self.size == 0 or self._non_empty_count == 0: self._grouping = GroupBy(zeros(0, dtype=akint64)) else: # Treat each sub-array as a group, for grouped aggregations self._grouping = GroupBy( broadcast(self.segments[self.non_empty], arange(self._non_empty_count), self.valsize) ) else: self._grouping = grouping
[docs] @classmethod def from_return_msg(cls, rep_msg) -> SegArray: # parse return json eles = json.loads(rep_msg) # parse the create for the values pdarray values = ( Strings.from_return_msg(eles["values"]) if eles["values"].split()[2] == "str" else create_pdarray(eles["values"]) ) segments = create_pdarray(eles["segments"]) lengths = create_pdarray(eles["lengths"]) if "lengths" in eles else None return cls(segments, values, lengths=lengths)
[docs] @classmethod def from_parts(cls, segments, values, lengths=None, grouping=None) -> SegArray: """ DEPRECATED Construct a SegArray object from its parts Parameters ---------- segments : pdarray, int64 Start index of each sub-array in the flattened values array values : pdarray The flattened values of all sub-arrays lengths: pdarray The length of each segment grouping: GroupBy grouping of segments Returns ------- SegArray Data structure representing an array whose elements are variable-length arrays. Notes ----- Keyword args 'lengths' and 'grouping' are not user-facing. They are used by the attach method. """ warn( "ak.SegArray.from_parts has been deprecated. Please use ak.SegArray constructor to " "generate SegArray objects.", DeprecationWarning, ) return cls(segments, values, lengths=lengths, grouping=grouping)
[docs] @classmethod def from_multi_array(cls, m): """ Construct a SegArray from a list of columns. This essentially transposes the input, resulting in an array of rows. Parameters ---------- m : list of pdarray or Strings List of columns, the rows of which will form the sub-arrays of the output Returns ------- SegArray Array of rows of input """ if isinstance(m, pdarray): return cls(arange(m.size), m) else: sizes = np.array([mi.size for mi in m]) dtypes = {mi.dtype for mi in m} if len(dtypes) != 1: raise ValueError("All values must have same dtype") n = len(m) offsets = np.cumsum(sizes) - sizes newvals = zeros(sum(sizes), dtype=dtypes.pop()) for j in range(n): newvals[offsets[j] : (offsets[j] + sizes[j])] = m[j] return cls(array(offsets), newvals)
@property def non_empty(self): from arkouda.infoclass import list_symbol_table if self._non_empty.name not in list_symbol_table(): self._non_empty = self.lengths > 0 self._non_empty_count = self._non_empty.sum() return self._non_empty @property def grouping(self): if self._grouping is not None: return self._grouping if self.size == 0 or self._non_empty_count == 0: self._grouping = GroupBy(zeros(0, dtype=akint64)) else: # Treat each sub-array as a group, for grouped aggregations self._grouping = GroupBy( broadcast(self.segments[self.non_empty], arange(self._non_empty_count), self.valsize) ) def _get_lengths(self): if self.size == 0: return zeros(0, dtype=akint64) elif self.size == 1: return array([self.valsize]) else: return concatenate((self.segments[1:], array([self.valsize]))) - self.segments def __getitem__(self, i): if isSupportedInt(i): start = self.segments[i] end = self.segments[i] + self.lengths[i] return self.values[start:end] elif (isinstance(i, pdarray) and i.dtype in [akint64, akuint64, akbool]) or isinstance(i, slice): starts = self.segments[i] ends = starts + self.lengths[i] newsegs, inds, lengths = gen_ranges(starts, ends, return_lengths=True) return SegArray(newsegs, self.values[inds], lengths) else: raise TypeError(f"Invalid index type: {type(i)}")
[docs] @classmethod def concat(cls, x, axis=0, ordered=True): """ Concatenate a sequence of SegArrays Parameters ---------- x : sequence of SegArray The SegArrays to concatenate axis : 0 or 1 Select vertical (0) or horizontal (1) concatenation. If axis=1, all SegArrays must have same size. ordered : bool Must be True. This option is present for compatibility only, because unordered concatenation is not yet supported. Returns ------- SegArray The input arrays joined into one SegArray """ if not ordered: raise ValueError("Unordered concatenation not yet supported on SegArray; use ordered=True.") if len(x) == 0: raise ValueError("Empty sequence passed to concat") for xi in x: if not isinstance(xi, cls): return NotImplemented if len({xi.dtype for xi in x}) != 1: raise ValueError("SegArrays must all have same dtype to concatenate") if axis == 0: ctr = 0 segs = [] vals = [] for xi in x: # Segment offsets need to be raised by length of previous values segs.append(xi.segments + ctr) ctr += xi.valsize # Values can just be concatenated vals.append(xi.values) return cls(concatenate(segs), concatenate(vals)) elif axis == 1: sizes = {xi.size for xi in x} if len(sizes) != 1: raise ValueError("SegArrays must all have same size to concatenate with axis=1") if sizes.pop() == 0: return x[0] dt = list(x)[0].dtype newlens = sum(xi.lengths for xi in x) newsegs = cumsum(newlens) - newlens # Ignore sub-arrays that are empty in all arrays nonzero = concatenate((newsegs[:-1] < newsegs[1:], array([True]))) nzsegs = newsegs[nonzero] newvals = zeros(newlens.sum(), dtype=dt) for xi in x: # Set up fromself for a scan, so that it steps up at the start of a segment # from the current array, and steps back down at the end fromself = zeros(newvals.size + 1, dtype=akint64) fromself[nzsegs] += 1 nzlens = xi.lengths[nonzero] fromself[nzsegs + nzlens] -= 1 fromself = cumsum(fromself[:-1]) == 1 newvals[fromself] = xi.values nzsegs += nzlens return cls(newsegs, newvals) else: raise ValueError( "Supported values for axis are 0 (vertical concat) or 1 (horizontal concat)" )
[docs] def copy(self): """ Return a deep copy. """ return SegArray(self.segments[:], self.values[:])
def __eq__(self, other): if not isinstance(other, SegArray): return NotImplemented if self.size != other.size: raise ValueError("Segarrays must have same size to compare") eq = zeros(self.size, dtype=akbool) leneq = self.lengths == other.lengths if leneq.sum() > 0: selfcmp = self[leneq] othercmp = other[leneq] intersection = selfcmp.all(selfcmp.values == othercmp.values) eq[leneq & (self.lengths != 0)] = intersection eq[leneq & (self.lengths == 0)] = True return eq def __len__(self) -> int: return self.size def __str__(self): if self.size <= 6: rows = list(range(self.size)) else: rows = [0, 1, 2, None, self.size - 3, self.size - 2, self.size - 1] outlines = ["SegArray(["] for r in rows: if r is None: outlines.append("...") else: outlines.append(str(self[r])) outlines.append("])") return "\n".join(outlines) def __repr__(self): return self.__str__()
[docs] def get_suffixes(self, n, return_origins=True, proper=True): """ Return the n-long suffix of each sub-array, where possible Parameters ---------- n : int Length of suffix return_origins : bool If True, return a logical index indicating which sub-arrays were long enough to return an n-suffix proper : bool If True, only return proper suffixes, i.e. from sub-arrays that are at least n+1 long. If False, allow the entire sub-array to be returned as a suffix. Returns ------- suffixes : list of pdarray An n-long list of pdarrays, essentially a table where each row is an n-suffix. The number of rows is the number of True values in the returned mask. origin_indices : pdarray, bool Boolean array that is True where the sub-array was long enough to return an n-suffix, False otherwise. """ if proper: longenough = self.lengths > n else: longenough = self.lengths >= n suffixes = [] for i in range(n): ind = (self.segments + self.lengths - (n - i))[longenough] suffixes.append(self.values[ind]) if return_origins: return suffixes, longenough else: return suffixes
[docs] def get_prefixes(self, n, return_origins=True, proper=True): """ Return all sub-array prefixes of length n (for sub-arrays that are at least n+1 long) Parameters ---------- n : int Length of suffix return_origins : bool If True, return a logical index indicating which sub-arrays were long enough to return an n-prefix proper : bool If True, only return proper prefixes, i.e. from sub-arrays that are at least n+1 long. If False, allow the entire sub-array to be returned as a prefix. Returns ------- prefixes : list of pdarray An n-long list of pdarrays, essentially a table where each row is an n-prefix. The number of rows is the number of True values in the returned mask. origin_indices : pdarray, bool Boolean array that is True where the sub-array was long enough to return an n-suffix, False otherwise. """ if proper: longenough = self.lengths > n else: longenough = self.lengths >= n prefixes = [] for i in range(n): ind = (self.segments + i)[longenough] prefixes.append(self.values[ind]) if return_origins: return prefixes, longenough else: return prefixes
[docs] def get_ngrams(self, n, return_origins=True): """ Return all n-grams from all sub-arrays. Parameters ---------- n : int Length of n-gram return_origins : bool If True, return an int64 array indicating which sub-array each returned n-gram came from. Returns ------- ngrams : list of pdarray An n-long list of pdarrays, essentially a table where each row is an n-gram. origin_indices : pdarray, int The index of the sub-array from which the corresponding n-gram originated """ if n > self.lengths.max(): raise ValueError("n must be <= the maximum length of the sub-arrays") ngrams = [] notsegstart = ones(self.valsize, dtype=akbool) notsegstart[self.segments[self.non_empty]] = False valid = ones(self.valsize - n + 1, dtype=akbool) for i in range(n): end = self.valsize - n + i + 1 ngrams.append(self.values[i:end]) if i > 0: valid &= notsegstart[i:end] ngrams = [char[valid] for char in ngrams] if return_origins: # set the proper indexes for broadcasting. Needed to alot for empty segments seg_idx = arange(self.size)[self.non_empty] origin_indices = self.grouping.broadcast(seg_idx, permute=True)[: valid.size][valid] return ngrams, origin_indices else: return ngrams
def _normalize_index(self, j): if not isSupportedInt(j): raise TypeError(f"index must be integer, not {type(j)}") if j >= 0: longenough = self.lengths > j else: j = self.lengths + j longenough = j >= 0 return longenough, j
[docs] def get_jth(self, j, return_origins=True, compressed=False, default=0): """ Select the j-th element of each sub-array, where possible. Parameters ---------- j : int The index of the value to get from each sub-array. If j is negative, it counts backwards from the end of each sub-array. return_origins : bool If True, return a logical index indicating where j is in bounds compressed : bool If False, return array is same size as self, with default value where j is out of bounds. If True, the return array only contains values where j is in bounds. default : scalar When compressed=False, the value to return when j is out of bounds for the sub-array Returns ------- val : pdarray compressed=False: The j-th value of each sub-array where j is in bounds and the default value where j is out of bounds. compressed=True: The j-th values of only the sub-arrays where j is in bounds origin_indices : pdarray, bool A Boolean array that is True where j is in bounds for the sub-array. Notes ------ If values are Strings, only the compressed format is supported. """ longenough, newj = self._normalize_index(j) ind = (self.segments + newj)[longenough] if compressed or self.dtype == str_: # Strings not supported by uncompressed version res = self.values[ind] else: res = zeros(self.size, dtype=self.dtype) res.fill(default) res[longenough] = self.values[ind] if return_origins: return res, longenough else: return res
[docs] def set_jth(self, i, j, v): """ Set the j-th element of each sub-array in a subset. Parameters ---------- i : pdarray, int Indices of sub-arrays to set j-th element j : int Index of value to set in each sub-array. If j is negative, it counts backwards from the end of the sub-array. v : pdarray or scalar The value(s) to set. If v is a pdarray, it must have same length as i. Raises ----- ValueError If j is out of bounds in any of the sub-arrays specified by i. """ if self.dtype == str_: raise TypeError("String elements are immutable") longenough, newj = self._normalize_index(j) if not longenough[i].all(): raise ValueError("Not all (i, j) in bounds") ind = (self.segments + newj)[i] self.values[ind] = v
[docs] def get_length_n(self, n, return_origins=True): """ Return all sub-arrays of length n, as a list of columns. Parameters ---------- n : int Length of sub-arrays to select return_origins : bool Return a logical index indicating which sub-arrays are length n Returns ------- columns : list of pdarray An n-long list of pdarray, where each row is one of the n-long sub-arrays from the SegArray. The number of rows is the number of True values in the returned mask. origin_indices : pdarray, bool Array of bool for each element of the SegArray, True where sub-array has length n. """ mask = self.lengths == n elem = [] for i in range(n): ind = (self.segments + self.lengths - (n - i))[mask] elem.append(self.values[ind]) if return_origins: return elem, mask else: return elem
[docs] def append(self, other, axis=0): """ Append other to self, either vertically (axis=0, length of resulting SegArray increases), or horizontally (axis=1, each sub-array of other appends to the corresponding sub-array of self). Parameters ---------- other : SegArray Array of sub-arrays to append axis : 0 or 1 Whether to append vertically (0) or horizontally (1). If axis=1, other must be same size as self. Returns ------- SegArray axis=0: New SegArray containing all sub-arrays axis=1: New SegArray of same length, with pairs of sub-arrays concatenated """ if not isinstance(other, SegArray): return NotImplemented if self.dtype != other.dtype: raise TypeError("SegArrays must have same value type to append") return self.__class__.concat((self, other), axis=axis)
[docs] def append_single(self, x, prepend=False): """ Append a single value to each sub-array. Parameters ---------- x : pdarray or scalar Single value to append to each sub-array Returns ------- SegArray Copy of original SegArray with values from x appended to each sub-array """ if self.dtype == str_: raise TypeError("String elements are immutable and cannot accept a single value") if hasattr(x, "size"): if x.size != self.size: raise ValueError("Argument must be scalar or same size as SegArray") if not isinstance(x, type(self.values)) or x.dtype != self.dtype: raise TypeError("Argument type must match value type of SegArray") newlens = self.lengths + 1 newsegs = cumsum(newlens) - newlens newvals = zeros(newlens.sum(), dtype=self.dtype) if prepend: lastscatter = newsegs else: lastscatter = newsegs + newlens - 1 newvals[lastscatter] = x origscatter = arange(self.valsize) + self.grouping.broadcast( arange(self.size)[self.non_empty], permute=True ) if prepend: origscatter += 1 newvals[origscatter] = self.values return SegArray(newsegs, newvals)
[docs] def prepend_single(self, x): return self.append_single(x, prepend=True)
[docs] def remove_repeats(self, return_multiplicity=False): """ Condense sequences of repeated values within a sub-array to a single value. Parameters ---------- return_multiplicity : bool If True, also return the number of times each value was repeated. Returns ------- norepeats : SegArray Sub-arrays with runs of repeated values replaced with single value multiplicity : SegArray If return_multiplicity=True, this array contains the number of times each value in the returned SegArray was repeated in the original SegArray. """ isrepeat = zeros(self.values.size, dtype=akbool) isrepeat[1:] = self.values[:-1] == self.values[1:] isrepeat[self.segments[self.non_empty]] = False truepaths = self.values[~isrepeat] nhops = self.grouping.sum(~isrepeat)[1] # Correct segments to properly assign empty lists - prevents dropping empty segments lens = self.lengths[:] lens[self.non_empty] = nhops truesegs = cumsum(lens) - lens norepeats = SegArray(truesegs, truepaths) if return_multiplicity: truehopinds = arange(self.valsize)[~isrepeat] multiplicity = zeros(truepaths.size, dtype=akint64) multiplicity[:-1] = truehopinds[1:] - truehopinds[:-1] multiplicity[-1] = self.valsize - truehopinds[-1] return norepeats, SegArray(truesegs, multiplicity) else: return norepeats
[docs] def to_ndarray(self): """ Convert the array into a numpy.ndarray containing sub-arrays Returns ------- np.ndarray A numpy ndarray with the same sub-arrays (also numpy.ndarray) as this array See Also -------- array() to_list() Examples -------- >>> segarr = ak.SegArray(ak.array([0, 4, 7]), ak.arange(12)) >>> segarr.to_ndarray() array([array([1, 2, 3, 4]), array([5, 6, 7]), array([8, 9, 10, 11, 12])]) >>> type(segarr.to_ndarray()) numpy.ndarray """ ndvals = self.values.to_ndarray() ndsegs = self.segments.to_ndarray() arr = [ndvals[start:end] for start, end in zip(ndsegs, ndsegs[1:])] if self.size > 0: arr.append(ndvals[ndsegs[-1] :]) return np.array(arr, dtype=object)
[docs] def to_list(self): """ Convert the segarray into a list containing sub-arrays Returns ------- list A list with the same sub-arrays (also list) as this segarray See Also -------- to_ndarray() Examples -------- >>> segarr = ak.SegArray(ak.array([0, 4, 7]), ak.arange(12)) >>> segarr.to_list() [[0, 1, 2, 3], [4, 5, 6], [7, 8, 9, 10, 11]] >>> type(segarr.to_list()) list """ return [arr.tolist() for arr in self.to_ndarray()]
[docs] def sum(self, x=None): if x is None: x = self.values return self.grouping.sum(x)[1]
[docs] def prod(self, x=None): if x is None: x = self.values return self.grouping.prod(x)[1]
[docs] def min(self, x=None): if x is None: x = self.values return self.grouping.min(x)[1]
[docs] def max(self, x=None): if x is None: x = self.values return self.grouping.max(x)[1]
[docs] def argmin(self, x=None): if x is None: x = self.values return self.grouping.argmin(x)[1]
[docs] def argmax(self, x=None): if x is None: x = self.values return self.grouping.argmax(x)[1]
[docs] def any(self, x=None): if x is None: x = self.values return self.grouping.any(x)[1]
[docs] def all(self, x=None): if x is None: x = self.values return self.grouping.all(x)[1]
[docs] def OR(self, x=None): if x is None: x = self.values return self.grouping.OR(x)[1]
[docs] def AND(self, x=None): if x is None: x = self.values return self.grouping.AND(x)[1]
[docs] def XOR(self, x=None): if x is None: x = self.values return self.grouping.XOR(x)[1]
[docs] def nunique(self, x=None): if x is None: x = self.values return self.grouping.nunique(x)[1]
[docs] def mean(self, x=None): if x is None: x = self.values return self.grouping.mean(x)[1]
[docs] def aggregate(self, op, x=None): if x is None: x = self.values return self.grouping.aggregate(x, op)
[docs] def unique(self, x=None): """ Return sub-arrays of unique values. Parameters ---------- x : pdarray The values to unique, per group. By default, the values of this SegArray's sub-arrays. Returns ------- SegArray Same number of sub-arrays as original SegArray, but elements in sub-array are unique and in sorted order. """ if x is None: x = self.values keyidx = self.grouping.broadcast(arange(self.size), permute=True) ukey, uval = GroupBy([keyidx, x]).unique_keys g = GroupBy(ukey, assume_sorted=True) _, lengths = g.count() return SegArray(g.segments, uval, grouping=g, lengths=lengths)
[docs] def hash(self) -> Tuple[pdarray, pdarray]: """ Compute a 128-bit hash of each segment. Returns ------- Tuple[pdarray,pdarray] A tuple of two int64 pdarrays. The ith hash value is the concatenation of the ith values from each array. """ repMsg = type_cast( str, generic_msg( cmd="segmentedHash", args={ "objType": self.objType, "values": self.values, "segments": self.segments, "valObjType": self.values.objType, }, ), ) h1, h2 = repMsg.split("+") return create_pdarray(h1), create_pdarray(h2)
[docs] def to_hdf( self, prefix_path, dataset="segarray", mode="truncate", file_type="distribute", ): """ Save the SegArray to HDF5. The result is a collection of HDF5 files, one file per locale of the arkouda server, where each filename starts with prefix_path. Parameters ---------- prefix_path : str Directory and filename prefix that all output files will share dataset : str Name prefix for saved data within the HDF5 file mode : str {'truncate' | 'append'} By default, truncate (overwrite) output files, if they exist. If 'append', add data as a new column to existing files. file_type: str ("single" | "distribute") Default: "distribute" When set to single, dataset is written to a single file. When distribute, dataset is written on a file per locale. This is only supported by HDF5 files and will have no impact of Parquet Files. Returns ------- None See Also --------- load """ from arkouda.io import _file_type_to_int, _mode_str_to_int return type_cast( str, generic_msg( cmd="tohdf", args={ "values": self.values.name, "segments": self.segments.name, "dset": dataset, "write_mode": _mode_str_to_int(mode), "filename": prefix_path, "dtype": self.dtype, "objType": self.objType, "file_format": _file_type_to_int(file_type), }, ), )
[docs] def update_hdf( self, prefix_path: str, dataset: str = "segarray", repack: bool = True, ): """ Overwrite the dataset with the name provided with this SegArray object. If the dataset does not exist it is added. Parameters ----------- prefix_path : str Directory and filename prefix that all output files share dataset : str Name of the dataset to create in files repack: bool Default: True HDF5 does not release memory on delete. When True, the inaccessible data (that was overwritten) is removed. When False, the data remains, but is inaccessible. Setting to false will yield better performance, but will cause file sizes to expand. Returns -------- None Raises ------- RuntimeError Raised if a server-side error is thrown saving the SegArray Notes ------ - If file does not contain File_Format attribute to indicate how it was saved, the file name is checked for _LOCALE#### to determine if it is distributed. - If the dataset provided does not exist, it will be added - Because HDF5 deletes do not release memory, this will create a copy of the file with the new data """ from arkouda.io import ( _file_type_to_int, _get_hdf_filetype, _mode_str_to_int, _repack_hdf, ) if self.dtype == str_: # Support will be added by Issue #2443 raise TypeError("SegArrays with Strings values are not yet supported by HDF5") # determine the format (single/distribute) that the file was saved in file_type = _get_hdf_filetype(prefix_path + "*") generic_msg( cmd="tohdf", args={ "values": self.values.name, "segments": self.segments.name, "dset": dataset, "write_mode": _mode_str_to_int("append"), "filename": prefix_path, "dtype": self.dtype, "objType": self.objType, "file_format": _file_type_to_int(file_type), "overwrite": True, }, ) if repack: _repack_hdf(prefix_path)
[docs] def to_parquet( self, prefix_path, dataset="segarray", mode: str = "truncate", compression: Optional[str] = None ): """ Save the SegArray object to Parquet. The result is a collection of files, one file per locale of the arkouda server, where each filename starts with prefix_path. Each locale saves its chunk of the object to its corresponding file. Parameters ---------- prefix_path : str Directory and filename prefix that all output files share dataset : str Name of the dataset to create in files (must not already exist) mode : str {'truncate' | 'append'} Deprecated. Parameter kept to maintain functionality of other calls. Only Truncate supported. By default, truncate (overwrite) output files, if they exist. If 'append', attempt to create new dataset in existing files. compression : str (Optional) (None | "snappy" | "gzip" | "brotli" | "zstd" | "lz4") Sets the compression type used with Parquet files Returns ------- string message indicating result of save operation Raises ------ RuntimeError Raised if a server-side error is thrown saving the pdarray ValueError If write mode is not Truncate. Notes ----- - Append mode for Parquet has been deprecated. It was not implemented for SegArray. - The prefix_path must be visible to the arkouda server and the user must have write permission. - Output files have names of the form ``<prefix_path>_LOCALE<i>``, where ``<i>`` ranges from 0 to ``numLocales`` for `file_type='distribute'`. - If any of the output files already exist and the mode is 'truncate', they will be overwritten. If the mode is 'append' and the number of output files is less than the number of locales or a dataset with the same name already exists, a ``RuntimeError`` will result. - Any file extension can be used.The file I/O does not rely on the extension to determine the file format. """ from arkouda.io import _mode_str_to_int if mode.lower() == "append": raise ValueError("Append mode is not supported for SegArray.") return type_cast( str, generic_msg( "writeParquet", { "values": self.values.name, "segments": self.segments.name, "dset": dataset, "mode": _mode_str_to_int(mode), "prefix": prefix_path, "objType": self.objType, "compression": compression, }, ), )
[docs] def save( self, prefix_path, dataset="segarray", mode="truncate", file_type="distribute", ): """ DEPRECATED Save the SegArray to HDF5. The object can be saved to a collection of files or single file. Parameters ---------- prefix_path : str Directory and filename prefix that all output files share dataset : str Name of the dataset to create in files (must not already exist) mode : str {'truncate' | 'append'} By default, truncate (overwrite) output files, if they exist. If 'append', attempt to create new dataset in existing files. file_type: str ("single" | "distribute") Default: "distribute" When set to single, dataset is written to a single file. When distribute, dataset is written on a file per locale. This is only supported by HDF5 files and will have no impact of Parquet Files. Returns ------- string message indicating result of save operation Raises ------- RuntimeError Raised if a server-side error is thrown saving the pdarray Notes ----- - The prefix_path must be visible to the arkouda server and the user must have write permission. - Output files have names of the form ``<prefix_path>_LOCALE<i>``, where ``<i>`` ranges from 0 to ``numLocales`` for `file_type='distribute'`. Otherwise, the file name will be `prefix_path`. - If any of the output files already exist and the mode is 'truncate', they will be overwritten. If the mode is 'append' and the number of output files is less than the number of locales or a dataset with the same name already exists, a ``RuntimeError`` will result. - Any file extension can be used.The file I/O does not rely on the extension to determine the file format. See Also -------- to_hdf, load """ from warnings import warn warn( "ak.SegArray.save has been deprecated. Please use ak.SegArray.to_hdf", DeprecationWarning, ) return self.to_hdf( prefix_path, dataset, mode=mode, file_type=file_type, )
[docs] @classmethod def read_hdf(cls, prefix_path, dataset="segarray"): """ Load a saved SegArray from HDF5. All arguments must match what was supplied to SegArray.save() Parameters ---------- prefix_path : str Directory and filename prefix dataset : str Name prefix for saved data within the HDF5 files Returns ------- SegArray """ from arkouda.io import read_hdf return read_hdf(prefix_path, datasets=dataset)
[docs] @classmethod def load(cls, prefix_path, dataset="segarray", segment_name="segments", value_name="values"): warnings.warn( "ak.SegArray.load() is deprecated. Please use ak.SegArray.read_hdf() instead.", DeprecationWarning, ) if segment_name != "segments" or value_name != "values": dataset = [dataset + "_" + value_name, dataset + "_" + segment_name] return cls.read_hdf(prefix_path, dataset)
[docs] def intersect(self, other): """ Computes the intersection of 2 SegArrays. Parameters ---------- other : SegArray SegArray to compute against Returns ------- SegArray Segments are the 1d intersections of the segments of self and other See Also -------- pdarraysetops.intersect1d Examples -------- >>> a = [1, 2, 3, 1, 4] >>> b = [3, 1, 4, 5] >>> c = [1, 3, 3, 5] >>> d = [2, 2, 4] >>> seg_a = ak.segarray(ak.array([0, len(a)]), ak.array(a+b)) >>> seg_b = ak.segarray(ak.array([0, len(c)]), ak.array(c+d)) >>> seg_a.intersect(seg_b) SegArray([ [1, 3], [4] ]) """ from arkouda.pdarraysetops import intersect1d a_seg_inds = self.grouping.broadcast(arange(self.size)[self.non_empty]) b_seg_inds = other.grouping.broadcast(arange(other.size)[other.non_empty]) (new_seg_inds, new_values) = intersect1d([a_seg_inds, self.values], [b_seg_inds, other.values]) g = GroupBy(new_seg_inds) # This method does not return any empty resulting segments # We need to add these if they are missing if g.segments.size == self.size: return SegArray(g.segments, new_values[g.permutation]) else: segments = zeros(self.size, dtype=akint64) truth = ones(self.size, dtype=akbool) k, ct = g.count() segments[k] = g.segments truth[k] = zeros(k.size, dtype=akbool) if truth[-1]: segments[-1] = g.permutation.size truth[-1] = False segments[truth] = segments[arange(self.size)[truth] + 1] return SegArray(segments, new_values[g.permutation])
[docs] def union(self, other): """ Computes the union of 2 SegArrays. Parameters ---------- other : SegArray SegArray to compute against Returns ------- SegArray Segments are the 1d union of the segments of self and other See Also -------- pdarraysetops.union1d Examples -------- >>> a = [1, 2, 3, 1, 4] >>> b = [3, 1, 4, 5] >>> c = [1, 3, 3, 5] >>> d = [2, 2, 4] >>> seg_a = ak.segarray(ak.array([0, len(a)]), ak.array(a+b)) >>> seg_b = ak.segarray(ak.array([0, len(c)]), ak.array(c+d)) >>> seg_a.union(seg_b) SegArray([ [1, 2, 3, 4, 5], [1, 2, 3, 4, 5] ]) """ from arkouda.pdarraysetops import union1d a_seg_inds = self.grouping.broadcast(arange(self.size)[self.non_empty]) b_seg_inds = other.grouping.broadcast(arange(other.size)[other.non_empty]) (new_seg_inds, new_values) = union1d([a_seg_inds, self.values], [b_seg_inds, other.values]) g = GroupBy(new_seg_inds) # This method does not return any empty resulting segments # We need to add these if they are missing if g.segments.size == self.size: return SegArray(g.segments, new_values[g.permutation]) else: segments = zeros(self.size, dtype=akint64) truth = ones(self.size, dtype=akbool) k, ct = g.count() segments[k] = g.segments truth[k] = zeros(k.size, dtype=akbool) if truth[-1]: segments[-1] = g.permutation.size truth[-1] = False segments[truth] = segments[arange(self.size)[truth] + 1] return SegArray(segments, new_values[g.permutation])
[docs] def setdiff(self, other): """ Computes the set difference of 2 SegArrays. Parameters ---------- other : SegArray SegArray to compute against Returns ------- SegArray Segments are the 1d set difference of the segments of self and other See Also -------- pdarraysetops.setdiff1d Examples -------- >>> a = [1, 2, 3, 1, 4] >>> b = [3, 1, 4, 5] >>> c = [1, 3, 3, 5] >>> d = [2, 2, 4] >>> seg_a = ak.segarray(ak.array([0, len(a)]), ak.array(a+b)) >>> seg_b = ak.segarray(ak.array([0, len(c)]), ak.array(c+d)) >>> seg_a.setdiff(seg_b) SegArray([ [2, 4], [1, 3, 5] ]) """ from arkouda.pdarraysetops import setdiff1d a_seg_inds = self.grouping.broadcast(arange(self.size)[self.non_empty]) b_seg_inds = other.grouping.broadcast(arange(other.size)[other.non_empty]) (new_seg_inds, new_values) = setdiff1d([a_seg_inds, self.values], [b_seg_inds, other.values]) g = GroupBy(new_seg_inds) # This method does not return any empty resulting segments # We need to add these if they are missing if g.segments.size == self.size: return SegArray(g.segments, new_values[g.permutation]) else: segments = zeros(self.size, dtype=akint64) truth = ones(self.size, dtype=akbool) k, ct = g.count() segments[k] = g.segments truth[k] = zeros(k.size, dtype=akbool) if truth[-1]: segments[-1] = g.permutation.size truth[-1] = False segments[truth] = segments[arange(self.size)[truth] + 1] return SegArray(segments, new_values[g.permutation])
[docs] def setxor(self, other): """ Computes the symmetric difference of 2 SegArrays. Parameters ---------- other : SegArray SegArray to compute against Returns ------- SegArray Segments are the 1d symmetric difference of the segments of self and other See Also -------- pdarraysetops.setxor1d Examples -------- >>> a = [1, 2, 3, 1, 4] >>> b = [3, 1, 4, 5] >>> c = [1, 3, 3, 5] >>> d = [2, 2, 4] >>> seg_a = ak.segarray(ak.array([0, len(a)]), ak.array(a+b)) >>> seg_b = ak.segarray(ak.array([0, len(c)]), ak.array(c+d)) >>> seg_a.setxor(seg_b) SegArray([ [2, 4, 5], [1, 3, 5, 2] ]) """ from arkouda.pdarraysetops import setxor1d a_seg_inds = self.grouping.broadcast(arange(self.size)[self.non_empty]) b_seg_inds = other.grouping.broadcast(arange(other.size)[other.non_empty]) (new_seg_inds, new_values) = setxor1d([a_seg_inds, self.values], [b_seg_inds, other.values]) g = GroupBy(new_seg_inds) # This method does not return any empty resulting segments # We need to add these if they are missing if g.segments.size == self.size: return SegArray(g.segments, new_values[g.permutation]) else: segments = zeros(self.size, dtype=akint64) truth = ones(self.size, dtype=akbool) k, ct = g.count() segments[k] = g.segments truth[k] = zeros(k.size, dtype=akbool) if truth[-1]: segments[-1] = g.permutation.size truth[-1] = False segments[truth] = segments[arange(self.size)[truth] + 1] return SegArray(segments, new_values[g.permutation])
[docs] def filter(self, filter, discard_empty: bool = False): """ Filter values out of the SegArray object Parameters ---------- filter: pdarray, list, or value The value/s to be filtered out of the SegArray discard_empty: bool Defaults to False. When True, empty segments are removed from the return SegArray Returns -------- SegArray """ from arkouda.pdarraysetops import in1d # convert to pdarray if more than 1 element if isinstance(filter, Sequence): filter = array(filter) # create boolean index for values to keep keep = ( in1d(self.values, filter, invert=True) if isinstance(filter, pdarray) or isinstance(filter, Strings) else self.values != filter ) new_vals = self.values[keep] lens = self.lengths[:] # recreate the segment boundaries seg_cts = self.grouping.sum(keep)[1] lens[self.non_empty] = seg_cts new_segs = cumsum(lens) - lens new_segarray = SegArray(new_segs, new_vals) return new_segarray[new_segarray.non_empty] if discard_empty else new_segarray
[docs] def register(self, user_defined_name): """ Register this SegArray object and underlying components with the Arkouda server Parameters ---------- user_defined_name : str user defined name which this SegArray object will be registered under Returns ------- SegArray The same SegArray which is now registered with the arkouda server and has an updated name. This is an in-place modification, the original is returned to support a fluid programming style. Please note you cannot register two different SegArrays with the same name. Raises ------ RegistrationError Raised if the server could not register the SegArray object Notes ----- Objects registered with the server are immune to deletion until they are unregistered. See Also -------- unregister, attach, is_registered """ if self.registered_name is not None and self.is_registered(): raise RegistrationError(f"This object is already registered as {self.registered_name}") generic_msg( cmd="register", args={ "name": user_defined_name, "objType": self.objType, "segments": self.segments, "values": self.values, "val_type": self.values.objType, }, ) self.registered_name = user_defined_name return self
[docs] def unregister(self): """ Unregister this SegArray object in the arkouda server which was previously registered using register() and/or attached to using attach() Returns ------- None Raises ------ RuntimeError Raised if the server could not unregister the SegArray object from the Symbol Table Notes ----- Objects registered with the server are immune to deletion until they are unregistered. See Also -------- register, attach, is_registered """ from arkouda.util import unregister if not self.registered_name: raise RegistrationError("This object is not registered") unregister(self.registered_name) self.registered_name = None
[docs] @staticmethod def unregister_segarray_by_name(user_defined_name): """ Using the defined name, remove the registered SegArray object from the Symbol Table Parameters ---------- user_defined_name : str user defined name which the SegArray object was registered under Returns ------- None Raises ------ RuntimeError Raised if the server could not unregister the SegArray object from the Symbol Table See Also -------- register, unregister, attach, is_registered """ import warnings from arkouda.util import unregister warnings.warn( "ak.SegArray.unregister_segarray_by_name() is deprecated. " "Please use ak.unregister() instead.", DeprecationWarning, ) return unregister(user_defined_name)
[docs] @classmethod def attach(cls, user_defined_name): """ Using the defined name, attach to a SegArray that has been registered to the Symbol Table Parameters ---------- user_defined_name : str user defined name which the SegArray object was registered under Returns ------- SegArray The resulting SegArray Raises ------ RuntimeError Raised if the server could not attach to the SegArray object See Also -------- register, unregister, is_registered """ import warnings from arkouda.util import attach warnings.warn( "ak.SegArray.attach() is deprecated. Please use ak.attach() instead.", DeprecationWarning, ) return attach(user_defined_name)
[docs] def is_registered(self) -> bool: """ Checks if the name of the SegArray object is registered in the Symbol Table Returns ------- bool True if SegArray is registered, false if not See Also -------- register, unregister, attach """ from arkouda.util import is_registered if self.registered_name is None: # if it is registered as a component of DataFrame return is_registered(self.segments.name, as_component=True) and is_registered( self.values.name, as_component=True ) else: return is_registered(self.registered_name)
[docs] def transfer(self, hostname: str, port: int_scalars): """ Sends a Segmented Array to a different Arkouda server Parameters ---------- hostname : str The hostname where the Arkouda server intended to receive the Segmented Array is running. port : int_scalars The port to send the array over. This needs to be an open port (i.e., not one that the Arkouda server is running on). This will open up `numLocales` ports, each of which in succession, so will use ports of the range {port..(port+numLocales)} (e.g., running an Arkouda server of 4 nodes, port 1234 is passed as `port`, Arkouda will use ports 1234, 1235, 1236, and 1237 to send the array data). This port much match the port passed to the call to `ak.receive_array()`. Returns ------- A message indicating a complete transfer Raises ------ ValueError Raised if the op is not within the pdarray.BinOps set TypeError Raised if other is not a pdarray or the pdarray.dtype is not a supported dtype """ return generic_msg( cmd="sendArray", args={ "segments": self.segments, "values": self.values, "hostname": hostname, "port": port, "dtype": self.dtype, "objType": "segarray", }, )