from functools import partial
from ipaddress import ip_address as _ip_address
from typing import Optional, Union
import numpy as np # type: ignore
from typeguard import typechecked
from arkouda.dtypes import bitType, intTypes, isSupportedInt
from arkouda.dtypes import uint64 as akuint64
from arkouda.groupbyclass import GroupBy, broadcast
from arkouda.numeric import cast as akcast
from arkouda.numeric import where
from arkouda.pdarrayclass import RegistrationError, pdarray
from arkouda.pdarraycreation import arange, array, create_pdarray, zeros
from arkouda.strings import Strings
[docs]
def BitVectorizer(width=64, reverse=False):
"""
Make a callback (i.e. function) that can be called on an
array to create a BitVector.
Parameters
----------
width : int
The number of bit fields in the vector
reverse : bool
If True, display bits from least significant (left) to most
significant (right). By default, the most significant bit
is the left-most bit.
Returns
-------
bitvectorizer : callable
A function that takes an array and returns a BitVector instance
"""
return partial(BitVector, width=width, reverse=reverse)
[docs]
class BitVector(pdarray):
"""
Represent integers as bit vectors, e.g. a set of flags.
Parameters
----------
values : pdarray, int64
The integers to represent as bit vectors
width : int
The number of bit fields in the vector
reverse : bool
If True, display bits from least significant (left) to most
significant (right). By default, the most significant bit
is the left-most bit.
Returns
-------
bitvectors : BitVector
The array of binary vectors
Notes
-----
This class is a thin wrapper around pdarray that mostly affects
how values are displayed to the user. Operators and methods will
typically treat this class like a uint64 pdarray.
"""
conserves = frozenset(("+", "-", "|", "&", "^", ">>", "<<"))
special_objType = "BitVector"
def __init__(self, values, width=64, reverse=False):
self.registered_name = None
if not isinstance(values, pdarray) or values.dtype not in intTypes:
self.name = None # This is needed to silence warnings of missing name during failed creation
raise TypeError("Argument must be integer pdarray")
self.width = width
self.reverse = reverse
# If values is already bitType, this will make a copy
# A copy should be made in order to get a new server-side name,
# to avoid undefinedSymbol errors if/when values gets deleted
self.values = akcast(values, bitType)
super().__init__(
self.values.name,
self.values.dtype.name,
self.values.size,
self.values.ndim,
self.values.shape,
self.values.itemsize,
)
def __str__(self):
from arkouda.client import pdarrayIterThresh
if self.size <= pdarrayIterThresh:
vals = [self.format(self.values[i]) for i in range(self.size)]
else:
vals = [self.format(self.values[i]) for i in range(3)]
vals.append("...")
vals.extend([self.format(self.values[i]) for i in range(self.size - 3, self.size)])
# Print values as a single, aligned column for easier viewing
# Also show "width" and "reverse" parameters
spaces = " " * (len(self.__class__.__name__) + 1)
return "{}([{}],\n{}width={}, reverse={})".format(
self.__class__.__name__, ",\n{} ".format(spaces).join(vals), spaces, self.width, self.reverse
)
def __repr__(self):
return self.__str__()
[docs]
def to_ndarray(self):
"""
Export data to a numpy array of string-formatted bit vectors.
"""
return np.array([self.format(x) for x in self.values.to_ndarray()])
[docs]
def to_list(self):
"""
Export data to a list of string-formatted bit vectors.
"""
return self.to_ndarray().tolist()
def _cast(self, values):
return self.__class__(values, width=self.width, reverse=self.reverse)
def __getitem__(self, key):
if isSupportedInt(key):
# Return single value as a formatted string
return self.format(self.values[key])
else:
# Forward all other indexing ops to integer array
return self._cast(self.values[key])
def __setitem__(self, key, value):
if isinstance(value, self.__class__):
# Set a slice or selection of values directly
self.values[key] = value.values
elif isSupportedInt(key) and isSupportedInt(value):
# Set a single value
self.values[key] = value
else:
return NotImplemented
def _binop(self, other, op):
# Based on other type, select which data to pass
# to _binop of pdarray
if isSupportedInt(other):
otherdata = other
elif isinstance(other, self.__class__):
otherdata = other.values
elif isinstance(other, pdarray) and other.dtype in intTypes:
otherdata = other
else:
return NotImplemented
# Some operators return a BitVector, but others don't
if op in self.conserves:
# These ops are defined as a class attribute
return self._cast(self.values._binop(otherdata, op))
else:
return self.values._binop(otherdata, op)
def _r_binop(self, other, op):
# Cases where left operand is BitVector are handled above by _binop
# Only two cases to handle here: scalar int and int64 pdarray
if isSupportedInt(other):
if op in self.conserves:
return self._cast(self.values._r_binop(other, op))
else:
return self.values._r_binop(other, op)
elif isinstance(other, pdarray) and other.dtype in intTypes:
# Some operators return a BitVector, but others don't
if op in self.conserves:
return self._cast(other._binop(self.values, op))
else:
return other._binop(self.values, op)
else:
return NotImplemented
[docs]
def opeq(self, other, op):
# Based on other type, select data to pass to pdarray opeq
if isSupportedInt(other):
otherdata = other
elif isinstance(other, self.__class__):
otherdata = other.values
elif isinstance(other, pdarray) and other.dtype in intTypes:
otherdata = other
else:
return NotImplemented
self.values.opeq(otherdata, op)
[docs]
def register(self, user_defined_name):
"""
Register this BitVector object and underlying components with the Arkouda server
Parameters
----------
user_defined_name : str
user defined name the BitVector is to be registered under,
this will be the root name for underlying components
Returns
-------
BitVector
The same BitVector which is now registered with the arkouda server and has an updated name.
This is an in-place modification, the original is returned to support
a fluid programming style.
Please note you cannot register two different BitVectors with the same name.
Raises
------
TypeError
Raised if user_defined_name is not a str
RegistrationError
If the server was unable to register the BitVector with the user_defined_name
See also
--------
unregister, attach, is_registered
Notes
-----
Objects registered with the server are immune to deletion until
they are unregistered.
"""
from arkouda.client import generic_msg
if self.registered_name is not None and self.is_registered():
raise RegistrationError(f"This object is already registered as {self.registered_name}")
generic_msg(
cmd="register",
args={
"name": user_defined_name,
"objType": self.special_objType,
"values": self.values.name,
"width": self.width,
"reverse": self.reverse,
},
)
self.registered_name = user_defined_name
return self
[docs]
@classmethod
def from_return_msg(cls, rep_msg):
import json
data = json.loads(rep_msg)
return cls(
create_pdarray(data["values"]),
width=data["width"],
reverse=json.loads(data["reverse"].lower()),
)
[docs]
class Fields(BitVector):
"""
An integer-backed representation of a set of named binary fields, e.g. flags.
Parameters
----------
values : pdarray or Strings
The array of field values. If (u)int64, the values are used as-is for the
binary representation of fields. If Strings, the values are converted
to binary according to the mapping defined by the names and MSB_left
arguments.
names : str or sequence of str
The names of the fields, in order. A string will be treated as a list
of single-character field names. Multi-character field names are allowed,
but must be passed as a list or tuple and user must specify a separator.
MSB_left : bool
Controls how field names are mapped to binary values. If True (default),
the left-most field name corresponds to the most significant bit in the
binary representation. If False, the left-most field name corresponds to
the least significant bit.
pad : str
Character to display when field is not present. Use empty string if no
padding is desired.
separator : str
Substring that separates fields. Used to parse input values (if ak.Strings)
and to display output.
show_int : bool
If True (default), display the integer value of the binary fields in output.
Returns
-------
fields : Fields
The array of field values
Notes
-----
This class is a thin wrapper around pdarray that mostly affects
how values are displayed to the user. Operators and methods will
typically treat this class like an int64 pdarray.
"""
def __init__(self, values, names, MSB_left=True, pad="-", separator="", show_int=True):
# Argument validation
# Normalize names, which can be string or sequence
self.names = tuple(names)
self.name = None # This is needed to silence warnings of missing name during failed creation
if len(self.names) > 63:
raise ValueError("Cannot represent more than 63 fields")
# Ensure no duplicate names
if len(self.names) != len(set(self.names)):
raise ValueError("Field names must be unique")
# Ensure no empty names
if any(name == "" for name in self.names):
raise ValueError("Names cannot be empty strings")
# If separator is non-empty, it cannot be a field name
if (separator != "") and (separator in self.names):
raise ValueError("Separator cannot be a field name")
# Field names can have multiple characters, but if so, there must be a separator
self.namewidth = max(len(n) for n in self.names)
if (self.namewidth > 1) and (separator == ""):
raise ValueError(
"A non-empty separator must be specified when field names have more than one character"
)
if len(pad) > 1:
raise ValueError("Pad must be single character or empty string")
self.padchar = pad
# string to display when a field is not set
self.pad = self.namewidth * self.padchar
# separator to display between fields
self.separator = separator
width = len(self.names)
# whether the left-most field name corresponds to the most significant bit
self.MSB_left = MSB_left
if self.MSB_left:
self.shifts = range(width - 1, -1, -1)
else:
self.shifts = range(width)
# whether to display the integer value of the field bit vector
self.show_int = show_int
# values arg can be either int64 or Strings. If Strings, convert to binary
if isinstance(values, Strings):
values = self._convert_strings(values)
# super init will type-check values and make copy
super().__init__(values, width=width, reverse=not MSB_left)
def _convert_strings(self, s):
"""
Convert string field names to binary vectors.
"""
# Initialize to zero
values = zeros(s.size, dtype=bitType)
if self.separator == "":
# When separator is empty, field names are guaranteed to be single characters
for name, shift in zip(self.names, self.shifts):
# Check if name exists in each string
bit = s.contains(name)
values = values | akcast(where(bit, 1 << shift, 0), bitType)
else:
# When separator is non-empty, split on it
sf, segs = s.flatten(self.separator, return_segments=True)
# Create a grouping to map split fields back to originating string
orig = broadcast(segs, arange(segs.size), sf.size)
g = GroupBy(orig)
for name, shift in zip(self.names, self.shifts):
# Check if name matches one of the split fields from originating string
bit = g.any(sf == name)[1]
values = values | akcast(where(bit, 1 << shift, 0), bitType)
return values
def _parse_scalar(self, s):
"""
Convert a string of named fields to a binary value.
"""
val = 0
if self.separator == "":
# Arg validation guarantees single-character field names if here
for name, shift in zip(self.names, self.shifts):
if name in s:
val |= 1 << shift
else:
fields = s.split(self.separator)
for name, shift in zip(self.names, self.shifts):
if name in fields:
val |= 1 << shift
return val
def __setitem__(self, key, value):
if isinstance(value, str):
v = self._parse_scalar(value)
return super().__setitem__(key, v)
else:
return super().__setitem__(key, value)
def _cast(self, values):
return self.__class__(
values,
self.names,
MSB_left=self.MSB_left,
pad=self.padchar,
separator=self.separator,
show_int=self.show_int,
)
def _binop(self, other, op):
if isinstance(other, str):
o = self._parse_scalar(other)
return super()._binop(o, op)
else:
return super()._binop(other, op)
def _r_binop(self, other, op):
if isinstance(other, str):
o = self._parse_scalar(other)
return super()._r_binop(o, op)
else:
return super()._r_binop(other, op)
[docs]
def opeq(self, other, op):
if isinstance(other, str):
o = self._parse_scalar(other)
return super().opeq(o, op)
else:
return super().opeq(other, op)
[docs]
def ip_address(values):
"""
Convert values to an Arkouda array of IP addresses.
Parameters
----------
values : list-like, integer pdarray, or IPv4
The integer IP addresses or IPv4 object.
Returns
-------
IPv4
The same IP addresses as an Arkouda array
Notes
-----
This helper is intended to help future proof changes made to
accomodate IPv6 and to prevent errors if a user inadvertently
casts a IPv4 instead of a int64 pdarray. It can also be used
for importing Python lists of IP addresses into Arkouda.
"""
if isinstance(values, IPv4):
return values
if isinstance(values, pdarray):
return IPv4(values)
if isinstance(values, Strings):
raise NotImplementedError("Strings to IP address not yet implemented")
# Assume values is a python sequence of IP addresses in some format
try:
return IPv4(array([int(_ip_address(x)) for x in values]))
except Exception as e:
raise RuntimeError("Error converting non-arkouda object to list of IP addresses") from e
[docs]
class IPv4(pdarray):
"""
Represent integers as IPv4 addresses.
Parameters
----------
values : pdarray, int64
The integer IP addresses
Returns
-------
IPv4
The same IP addresses
Notes
-----
This class is a thin wrapper around pdarray that mostly affects
how values are displayed to the user. Operators and methods will
typically treat this class like an int64 pdarray.
"""
special_objType = "IPv4"
def __init__(self, values):
if not isinstance(values, pdarray) or values.dtype not in intTypes:
self.name = None # This is needed to silence warnings of missing name during failed creation
raise TypeError("Argument must be int64 pdarray")
# Casting always creates a copy with new server-side name,
# which will avoid unknown symbol errors
self.values = akcast(values, bitType)
super().__init__(
self.values.name,
self.values.dtype.name,
self.values.size,
self.values.ndim,
self.values.shape,
self.values.itemsize,
)
[docs]
def export_uint(self):
return akcast(self.values, akuint64)
[docs]
def normalize(self, x):
"""
Take in an IP address as a string, integer, or IPAddress object,
and convert it to an integer.
"""
if not isSupportedInt(x):
x = int(_ip_address(x))
if x < 0:
raise ValueError(f"Not an IP address: {_ip_address(x)}")
return x
def _is_supported_scalar(self, x):
try:
return True, self.normalize(x)
except ValueError:
return False, None
def __str__(self):
from arkouda.client import pdarrayIterThresh
if self.size <= pdarrayIterThresh:
vals = [self.format(self.values[i]) for i in range(self.size)]
else:
vals = [self.format(self.values[i]) for i in range(3)]
vals.append("...")
vals.extend([self.format(self.values[i]) for i in range(self.size - 3, self.size)])
# Display values as single, aligned column for ease of viewing
spaces = " " * (len(self.__class__.__name__) + 1)
return "{}([{}],\n{})".format(
self.__class__.__name__, ",\n{} ".format(spaces).join(vals), spaces
)
def __repr__(self):
return self.__str__()
[docs]
def to_ndarray(self):
"""
Export array as a numpy array of integers.
"""
return np.array([self.format(x) for x in self.values.to_ndarray()])
[docs]
def to_list(self):
"""
Export array as a list of integers.
"""
return self.to_ndarray().tolist()
def __getitem__(self, key):
if isSupportedInt(key):
# Display single item as string
return self.format(self.values[key])
else:
# Forward other accesses to pdarray class
return self.__class__(self.values[key])
def __setitem__(self, key, value):
# If scalar, convert to integer and set
isscalar, scalarval = self._is_supported_scalar(value)
if isscalar:
self.values[key] = scalarval
elif isinstance(value, self.__class__):
self.values[key] = value.values
else:
return NotImplemented
def _binop(self, other, op):
# Based on other type, select data to pass to pdarray _binop
isscalar, scalarval = self._is_supported_scalar(other)
if isscalar:
otherdata = scalarval
elif isinstance(other, self.__class__):
otherdata = other.values
elif isinstance(other, pdarray) and other.dtype in intTypes:
otherdata = other
else:
return NotImplemented
return self.values._binop(otherdata, op)
def _r_binop(self, other, op):
# _binop handles everything except left types: scalar and pdarray int64
isscalar, scalarval = self._is_supported_scalar(other)
if isscalar:
return self.values._r_binop(scalarval, op)
elif isinstance(other, pdarray) and other.dtype in intTypes:
return other._binop(self.values, op)
else:
return NotImplemented
[docs]
def opeq(self, other, op):
# Based on other type, select data to pass to pdarray opeq
isscalar, scalarval = self._is_supported_scalar(other)
if isscalar:
otherdata = scalarval
elif isinstance(other, self.__class__):
otherdata = other.values
elif isinstance(other, pdarray) and other.dtype in intTypes:
otherdata = other
else:
return NotImplemented
self.values.opeq(otherdata, op)
[docs]
def register(self, user_defined_name):
"""
Register this IPv4 object and underlying components with the Arkouda server
Parameters
----------
user_defined_name : str
user defined name the IPv4 is to be registered under,
this will be the root name for underlying components
Returns
-------
IPv4
The same IPv4 which is now registered with the arkouda server and has an updated name.
This is an in-place modification, the original is returned to support
a fluid programming style.
Please note you cannot register two different IPv4s with the same name.
Raises
------
TypeError
Raised if user_defined_name is not a str
RegistrationError
If the server was unable to register the IPv4 with the user_defined_name
See also
--------
unregister, attach, is_registered
Notes
-----
Objects registered with the server are immune to deletion until
they are unregistered.
"""
from arkouda.client import generic_msg
if self.registered_name is not None and self.is_registered():
raise RegistrationError(f"This object is already registered as {self.registered_name}")
generic_msg(
cmd="register",
args={
"name": user_defined_name,
"objType": self.special_objType,
"array": self.values,
},
)
self.registered_name = user_defined_name
return self
[docs]
def to_hdf(
self,
prefix_path: str,
dataset: str = "array",
mode: str = "truncate",
file_type: str = "distribute",
):
"""
Override of the pdarray to_hdf to store the special object type
"""
from typing import cast as typecast
from arkouda.client import generic_msg
from arkouda.io import _file_type_to_int, _mode_str_to_int
return typecast(
str,
generic_msg(
cmd="tohdf",
args={
"values": self,
"dset": dataset,
"write_mode": _mode_str_to_int(mode),
"filename": prefix_path,
"dtype": self.dtype,
"objType": self.special_objType,
"file_format": _file_type_to_int(file_type),
},
),
)
[docs]
def update_hdf(self, prefix_path: str, dataset: str = "array", repack: bool = True):
"""
Override the pdarray implementation so that the special object type will be used.
"""
from arkouda.client import generic_msg
from arkouda.io import (
_file_type_to_int,
_get_hdf_filetype,
_mode_str_to_int,
_repack_hdf,
)
# determine the format (single/distribute) that the file was saved in
file_type = _get_hdf_filetype(prefix_path + "*")
generic_msg(
cmd="tohdf",
args={
"values": self,
"dset": dataset,
"write_mode": _mode_str_to_int("append"),
"filename": prefix_path,
"dtype": self.dtype,
"objType": self.special_objType,
"file_format": _file_type_to_int(file_type),
"overwrite": True,
},
)
if repack:
_repack_hdf(prefix_path)
[docs]
@typechecked
def is_ipv4(ip: Union[pdarray, IPv4], ip2: Optional[pdarray] = None) -> pdarray:
"""
Indicate which values are ipv4 when passed data containing IPv4 and IPv6 values.
Parameters
----------
ip: pdarray (int64) or ak.IPv4
IPv4 value. High Bits of IPv6 if IPv6 is passed in.
ip2: pdarray (int64), Optional
Low Bits of IPv6. This is added for support when dealing with data that contains IPv6 as well.
Returns
-------
pdarray of bools indicating which indexes are IPv4.
See Also
--------
ak.is_ipv6
"""
# grab the ipv4 pdarray of values
if isinstance(ip, IPv4):
ip = ip.values
if ip.dtype not in intTypes or (ip2 is not None and ip2.dtype not in intTypes):
raise TypeError("ip and ip2 must be int64 pdarrays. ip2 is Optional.")
if ip2 is not None and ip.size != ip2.size:
raise RuntimeError("When supplying a value for ip2, ip and ip2 must be the same size.")
if ip2 is not None:
ans = ip < 2**32
ans2 = ip2 == 0
return ans & ans2
else:
return ip < 2**32
[docs]
@typechecked
def is_ipv6(ip: Union[pdarray, IPv4], ip2: Optional[pdarray] = None) -> pdarray:
"""
Indicate which values are ipv6 when passed data containing IPv4 and IPv6 values.
Parameters
----------
ip: pdarray (int64) or ak.IPv4
High Bits of IPv6.
ip2: pdarray (int64), Optional
Low Bits of IPv6
Returns
-------
pdarray of bools indicating which indexes are IPv6.
See Also
--------
ak.is_ipv4
"""
# grab the ipv4 pdarray of values
if isinstance(ip, IPv4):
ip = ip.values
if ip.dtype not in intTypes or (ip2 is not None and ip2.dtype not in intTypes):
raise TypeError("ip and ip2 must be int64 pdarrays. ip2 is Optional.")
if ip2 is not None and ip.size != ip2.size:
raise RuntimeError("When supplying a value for ip2, ip and ip2 must be the same size.")
if ip2 is not None:
ans = ip >= 2**32
ans2 = ip2 != 0
return ans | ans2
else:
return ip >= 2**32