from __future__ import annotations
import builtins
import json
from typing import TYPE_CHECKING, List, Optional, Tuple, Union
import numpy as np
import pandas as pd
from numpy import array as ndarray
from numpy import dtype as npdtype
from typeguard import typechecked
from arkouda import Categorical, Strings
from arkouda.groupbyclass import GroupBy, unique
from arkouda.numpy import cast as akcast
from arkouda.numpy.dtypes import bool_ as akbool
from arkouda.numpy.dtypes import bool_scalars
from arkouda.numpy.dtypes import float64 as akfloat64
from arkouda.numpy.dtypes import int64 as akint64
from arkouda.pdarrayclass import RegistrationError, pdarray
from arkouda.pdarraycreation import arange, array, create_pdarray, ones
from arkouda.pdarraysetops import argsort, in1d
from arkouda.sorting import coargsort
from arkouda.util import convert_if_categorical, generic_concat, get_callback
if TYPE_CHECKING:
from arkouda.series import Series
[docs]
class Index:
objType = "Index"
"""
Sequence used for indexing and alignment.
The basic object storing axis labels for all DataFrame objects.
Parameters
----------
values: List, pdarray, Strings, Categorical, pandas.Categorical, pandas.Index, or Index
name : str, default=None
Name to be stored in the index.
allow_list = False,
If False, list values will be converted to a pdarray.
If True, list values will remain as a list, provided the data length is less than max_list_size.
max_list_size = 1000
This is the maximum allowed data length for the values to be stored as a list object.
Raises
------
ValueError
Raised if allow_list=True and the size of values is > max_list_size.
See Also
--------
MultiIndex
Examples
--------
>>> ak.Index([1, 2, 3])
Index(array([1 2 3]), dtype='int64')
>>> ak.Index(list('abc'))
Index(array(['a', 'b', 'c']), dtype='<U0')
>>> ak.Index([1, 2, 3], allow_list=True)
Index([1, 2, 3], dtype='int64')
"""
@typechecked
def __init__(
self,
values: Union[List, pdarray, Strings, Categorical, pd.Index, "Index", pd.Categorical],
name: Optional[str] = None,
allow_list=False,
max_list_size=1000,
):
self.max_list_size = max_list_size
self.registered_name: Optional[str] = None
if isinstance(values, pd.Categorical):
values = Categorical(values)
if isinstance(values, Index):
self.values = values.values
self.size = values.size
self.dtype = values.dtype
self.name = name if name else values.name
elif isinstance(values, pd.Index):
if isinstance(values.values, pd.Categorical):
self.values = Categorical(values.values)
else:
self.values = array(values.values)
self.size = values.size
self.dtype = self.values.dtype
self.name = name if name else values.name
elif isinstance(values, List):
if allow_list is True:
if len(values) <= max_list_size:
self.values = values
self.size = len(values)
if len(values) > 0:
self.dtype = self._dtype_of_list_values(values)
else:
self.dtype = None
else:
raise ValueError(
f"Cannot create Index because list size {len(values)} "
f"exceeds max_list_size {self.max_list_size}."
)
else:
values = array(values)
self.values = values
self.size = self.values.size
self.dtype = self.values.dtype
self.name = name
elif isinstance(values, (pdarray, Strings, Categorical)):
self.values = values
self.size = self.values.size
self.dtype = self.values.dtype
self.name = name
else:
raise TypeError(f"Unable to create Index from type {type(values)}")
def __getitem__(self, key):
from arkouda.series import Series
allow_list = False
if isinstance(self.values, list):
allow_list = True
if isinstance(key, Series):
key = key.values
if isinstance(key, int):
return self.values[key]
if isinstance(key, list):
if len(key) < self.max_list_size:
return Index([self.values[k] for k in key], allow_list=allow_list)
else:
raise ValueError(
f"Unable to get list of size greater than "
f"Index.max_list_size ({self.max_list_size})."
)
return Index(self.values[key], allow_list=allow_list)
def __repr__(self):
# Configured to match pandas
return f"Index({repr(self.index)}, dtype='{self.dtype}')"
def __len__(self):
return len(self.index)
def _get_arrays_for_comparison(
self, other
) -> Tuple[Union[pdarray, Strings, Categorical], Union[pdarray, Strings, Categorical]]:
if isinstance(self.values, list):
values = array(self.values)
else:
values = self.values
if isinstance(other, Index):
other_values = other.values
else:
other_values = other
if isinstance(other_values, list):
other_values = array(other_values)
return values, other_values
def __eq__(self, other):
values, other_values = self._get_arrays_for_comparison(other)
return values == other_values
def __ne__(self, other):
values, other_values = self._get_arrays_for_comparison(other)
return values != other_values
def _dtype_of_list_values(self, lst):
from arkouda.numpy.dtypes import dtype
if isinstance(lst, list):
d = dtype(type(lst[0]))
for item in lst:
assert dtype(type(item)) == d, (
f"Values of Index must all be same type. "
f"Types {d} and {dtype(type(item))} do not match."
)
return d
else:
raise TypeError("Index Types must match")
@property
def nlevels(self):
"""
Integer number of levels in this Index.
An Index will always have 1 level.
See Also
--------
MultiIndex.nlevels
"""
return 1
@property
def ndim(self):
"""
Number of dimensions of the underlying data, by definition 1.
See Also
--------
MultiIndex.ndim
"""
return 1
@property
def inferred_type(self) -> str:
"""
Return a string of the type inferred from the values.
"""
if isinstance(self.values, list):
from arkouda.numpy.dtypes import float_scalars, int_scalars
from arkouda.util import _is_dtype_in_union
if _is_dtype_in_union(self.dtype, int_scalars):
return "integer"
elif _is_dtype_in_union(self.dtype, float_scalars):
return "floating"
elif self.dtype == "<U":
return "string"
return self.values.inferred_type
@property
def names(self):
"""
Return Index or MultiIndex names.
"""
return [self.name]
@property
def index(self):
"""
This is maintained to support older code
"""
return self.values
@property
def shape(self):
return (self.size,)
@property
def is_unique(self):
"""
Property indicating if all values in the index are unique
Returns
-------
bool - True if all values are unique, False otherwise.
"""
if isinstance(self.values, list):
return len(set(self.values)) == self.size
else:
g = GroupBy(self.values)
key, ct = g.size()
return (ct == 1).all()
[docs]
@staticmethod
def factory(index):
t = type(index)
if isinstance(index, Index):
return index
elif t != list and t != tuple:
return Index(index)
else:
return MultiIndex(index)
[docs]
@classmethod
def from_return_msg(cls, rep_msg):
data = json.loads(rep_msg)
idx = []
for d in data:
i_comps = d.split("+|+")
if i_comps[0].lower() == pdarray.objType.lower():
idx.append(create_pdarray(i_comps[1]))
elif i_comps[0].lower() == Strings.objType.lower():
idx.append(Strings.from_return_msg(i_comps[1]))
elif i_comps[0].lower() == Categorical.objType.lower():
idx.append(Categorical.from_return_msg(i_comps[1]))
return cls.factory(idx) if len(idx) > 1 else cls.factory(idx[0])
[docs]
def equals(self, other: Index) -> bool_scalars:
"""
Whether Indexes are the same size, and all entries are equal.
Parameters
----------
other : object
object to compare.
Returns
-------
bool
True if the Indexes are the same, o.w. False.
Examples
--------
>>> import arkouda as ak
>>> ak.connect()
>>> i = ak.Index([1, 2, 3])
>>> i_cpy = ak.Index([1, 2, 3])
>>> i.equals(i_cpy)
True
>>> i2 = ak.Index([1, 2, 4])
>>> i.equals(i2)
False
MultiIndex case:
>>> arrays = [ak.array([1, 1, 2, 2]), ak.array(["red", "blue", "red", "blue"])]
>>> m = ak.MultiIndex(arrays, names=["numbers2", "colors2"])
>>> m.equals(m)
True
>>> arrays2 = [ak.array([1, 1, 2, 2]), ak.array(["red", "blue", "red", "green"])]
>>> m2 = ak.MultiIndex(arrays2, names=["numbers2", "colors2"])
>>> m.equals(m2)
False
"""
if self is other:
return True
if not isinstance(other, Index):
raise TypeError("other must be of type Index.")
if type(self) is not type(other):
return False
if len(self) != len(other):
return False
from arkouda.pdarrayclass import all as akall
if isinstance(self, MultiIndex) and isinstance(other, MultiIndex):
if self.nlevels != other.nlevels:
return False
for i in range(self.nlevels):
if not self.levels[i].equals(other.levels[i]):
return False
return True
else:
result = akall(self == other)
if isinstance(result, (bool, np.bool_)):
return result
return False
[docs]
def memory_usage(self, unit="B"):
"""
Return the memory usage of the Index values.
Parameters
----------
unit : str, default = "B"
Unit to return. One of {'B', 'KB', 'MB', 'GB'}.
Returns
-------
int
Bytes of memory consumed.
See Also
--------
arkouda.pdarrayclass.nbytes
arkouda.index.MultiIndex.memory_usage
arkouda.series.Series.memory_usage
arkouda.dataframe.DataFrame.memory_usage
Examples
--------
>>> import arkouda as ak
>>> ak.connect()
>>> idx = Index(ak.array([1, 2, 3]))
>>> idx.memory_usage()
24
"""
from arkouda.util import convert_bytes
return convert_bytes(self.values.nbytes, unit=unit)
[docs]
def to_pandas(self):
"""
Return the equivalent Pandas Index.
"""
if isinstance(self.values, list):
val = ndarray(self.values)
elif isinstance(self.values, Categorical):
val = self.values.to_pandas()
return pd.CategoricalIndex(data=val, dtype=val.dtype, name=self.name)
else:
val = self.values.to_ndarray()
return pd.Index(data=val, dtype=val.dtype, name=self.name)
[docs]
def to_ndarray(self):
if isinstance(self.values, list):
return ndarray(self.values)
else:
val = convert_if_categorical(self.values)
return val.to_ndarray()
[docs]
def to_list(self):
if isinstance(self.values, list):
return self.values
else:
return self.to_ndarray().tolist()
[docs]
def set_dtype(self, dtype):
"""Change the data type of the index
Currently only aku.ip_address and ak.array are supported.
"""
new_idx = dtype(self.values)
self.values = new_idx
return self
[docs]
def register(self, user_defined_name):
"""
Register this Index object and underlying components with the Arkouda server
Parameters
----------
user_defined_name : str
user defined name the Index is to be registered under,
this will be the root name for underlying components
Returns
-------
Index
The same Index which is now registered with the arkouda server and has an updated name.
This is an in-place modification, the original is returned to support
a fluid programming style.
Please note you cannot register two different Indexes with the same name.
Raises
------
TypeError
Raised if user_defined_name is not a str
RegistrationError
If the server was unable to register the Index with the user_defined_name
See also
--------
unregister, attach, is_registered
Notes
-----
Objects registered with the server are immune to deletion until
they are unregistered.
"""
if isinstance(self.values, list):
raise TypeError("Index cannot be registered when values are list type.")
from arkouda.client import generic_msg
if self.registered_name is not None and self.is_registered():
raise RegistrationError(f"This object is already registered as {self.registered_name}")
generic_msg(
cmd="register",
args={
"name": user_defined_name,
"objType": self.objType,
"num_idxs": 1,
"idx_names": [
(
json.dumps(
{
"codes": self.values.codes.name,
"categories": self.values.categories.name,
"NA_codes": self.values._akNAcode.name,
**(
{"permutation": self.values.permutation.name}
if self.values.permutation is not None
else {}
),
**(
{"segments": self.values.segments.name}
if self.values.segments is not None
else {}
),
}
)
if isinstance(self.values, Categorical)
else self.values.name
)
],
"idx_types": [self.values.objType],
},
)
self.registered_name = user_defined_name
return self
[docs]
def unregister(self):
"""
Unregister this Index object in the arkouda server which was previously
registered using register() and/or attached to using attach()
Raises
------
RegistrationError
If the object is already unregistered or if there is a server error
when attempting to unregister
See also
--------
register, attach, is_registered
Notes
-----
Objects registered with the server are immune to deletion until
they are unregistered.
"""
from arkouda.util import unregister
if not self.registered_name:
raise RegistrationError("This object is not registered")
unregister(self.registered_name)
self.registered_name = None
[docs]
def is_registered(self):
"""
Return True iff the object is contained in the registry or is a component of a
registered object.
Returns
-------
numpy.bool
Indicates if the object is contained in the registry
Raises
------
RegistrationError
Raised if there's a server-side error or a mis-match of registered components
See Also
--------
register, attach, unregister
Notes
-----
Objects registered with the server are immune to deletion until
they are unregistered.
"""
from arkouda.util import is_registered
if self.registered_name is None:
if not isinstance(self.values, Categorical):
return is_registered(self.values.name, as_component=True)
else:
result = True
result &= is_registered(self.values.codes.name, as_component=True)
result &= is_registered(self.values.categories.name, as_component=True)
result &= is_registered(self.values._akNAcode.name, as_component=True)
if self.values.permutation is not None and self.values.segments is not None:
result &= is_registered(self.values.permutation.name, as_component=True)
result &= is_registered(self.values.segments.name, as_component=True)
return result
else:
return is_registered(self.registered_name)
[docs]
def to_dict(self, label):
data = {}
if label is None:
label = "idx"
elif isinstance(label, list):
label = label[0]
data[label] = self.index
return data
def _check_types(self, other):
if type(self) is not type(other):
raise TypeError("Index Types must match")
def _merge(self, other):
self._check_types(other)
callback = get_callback(self.values)
idx = generic_concat([self.values, other.values], ordered=False)
return Index(callback(unique(idx)))
def _merge_all(self, idx_list):
idx = self.values
callback = get_callback(idx)
for other in idx_list:
self._check_types(other)
idx = generic_concat([idx, other.values], ordered=False)
return Index(callback(unique(idx)))
def _check_aligned(self, other):
self._check_types(other)
length = len(self)
return len(other) == length and (self == other.values).sum() == length
[docs]
def argsort(self, ascending=True):
if isinstance(self.values, list):
reverse = not ascending
return sorted(range(self.size), key=self.values.__getitem__, reverse=reverse)
if not ascending:
if isinstance(self.values, pdarray) and self.dtype in (akint64, akfloat64):
i = argsort(-self.values)
else:
i = argsort(self.values)[arange(self.size - 1, -1, -1)]
else:
i = argsort(self.values)
return i
[docs]
def map(self, arg: Union[dict, "Series"]) -> "Index":
"""
Map values of Index according to an input mapping.
Parameters
----------
arg : dict or Series
The mapping correspondence.
Returns
-------
arkouda.index.Index
A new index with the values transformed by the mapping correspondence.
Raises
------
TypeError
Raised if arg is not of type dict or arkouda.Series.
Raised if index values not of type pdarray, Categorical, or Strings.
Examples
--------
>>> import arkouda as ak
>>> ak.connect()
>>> idx = ak.Index(ak.array([2, 3, 2, 3, 4]))
>>> display(idx)
Index(array([2 3 2 3 4]), dtype='int64')
>>> idx.map({4: 25.0, 2: 30.0, 1: 7.0, 3: 5.0})
Index(array([30.00000000000000000 5.00000000000000000 30.00000000000000000
5.00000000000000000 25.00000000000000000]), dtype='float64')
>>> s2 = ak.Series(ak.array(["a","b","c","d"]), index = ak.array([4,2,1,3]))
>>> idx.map(s2)
Index(array(['b', 'b', 'd', 'd', 'a']), dtype='<U0')
"""
from arkouda.util import map
return Index(map(self.values, arg))
[docs]
def concat(self, other):
self._check_types(other)
idx = generic_concat([self.values, other.values], ordered=True)
return Index(idx)
[docs]
def lookup(self, key):
if not isinstance(key, pdarray):
# try to handle single value
try:
key = array([key])
except Exception:
raise TypeError("Lookup must be on an arkouda array")
return in1d(self.values, key)
[docs]
def to_hdf(
self,
prefix_path: str,
dataset: str = "index",
mode: str = "truncate",
file_type: str = "distribute",
) -> str:
"""
Save the Index to HDF5.
The object can be saved to a collection of files or single file.
Parameters
----------
prefix_path : str
Directory and filename prefix that all output files share
dataset : str
Name of the dataset to create in files (must not already exist)
mode : str {'truncate' | 'append'}
By default, truncate (overwrite) output files, if they exist.
If 'append', attempt to create new dataset in existing files.
file_type: str ("single" | "distribute")
Default: "distribute"
When set to single, dataset is written to a single file.
When distribute, dataset is written on a file per locale.
This is only supported by HDF5 files and will have no impact of Parquet Files.
Returns
-------
string message indicating result of save operation
Raises
-------
RuntimeError
Raised if a server-side error is thrown saving the pdarray
TypeError
Raised if the Index values are a list.
Notes
-----
- The prefix_path must be visible to the arkouda server and the user must
have write permission.
- Output files have names of the form ``<prefix_path>_LOCALE<i>``, where ``<i>``
ranges from 0 to ``numLocales`` for `file_type='distribute'`. Otherwise,
the file name will be `prefix_path`.
- If any of the output files already exist and
the mode is 'truncate', they will be overwritten. If the mode is 'append'
and the number of output files is less than the number of locales or a
dataset with the same name already exists, a ``RuntimeError`` will result.
- Any file extension can be used.The file I/O does not rely on the extension to
determine the file format.
"""
from typing import cast as typecast
from arkouda.categorical import Categorical as Categorical_
from arkouda.client import generic_msg
from arkouda.io import _file_type_to_int, _mode_str_to_int
if isinstance(self.values, list):
raise TypeError("Unable to write Index to hdf when values are a list.")
index_data = [
(
self.values.name
if not isinstance(self.values, (Categorical_))
else json.dumps(
{
"codes": self.values.codes.name,
"categories": self.values.categories.name,
"NA_codes": self.values._akNAcode.name,
**(
{"permutation": self.values.permutation.name}
if self.values.permutation is not None
else {}
),
**(
{"segments": self.values.segments.name}
if self.values.segments is not None
else {}
),
}
)
)
]
return typecast(
str,
generic_msg(
cmd="tohdf",
args={
"filename": prefix_path,
"dset": dataset,
"file_format": _file_type_to_int(file_type),
"write_mode": _mode_str_to_int(mode),
"objType": self.objType,
"num_idx": 1,
"idx": index_data,
"idx_objTypes": [self.values.objType], # this will be pdarray, strings, or cat
"idx_dtypes": [str(self.values.dtype)],
},
),
)
[docs]
def update_hdf(
self,
prefix_path: str,
dataset: str = "index",
repack: bool = True,
):
"""
Overwrite the dataset with the name provided with this Index object. If
the dataset does not exist it is added.
Parameters
-----------
prefix_path : str
Directory and filename prefix that all output files share
dataset : str
Name of the dataset to create in files
repack: bool
Default: True
HDF5 does not release memory on delete. When True, the inaccessible
data (that was overwritten) is removed. When False, the data remains, but is
inaccessible. Setting to false will yield better performance, but will cause
file sizes to expand.
Returns
--------
str - success message if successful
Raises
-------
RuntimeError
Raised if a server-side error is thrown saving the index
Notes
------
- If file does not contain File_Format attribute to indicate how it was saved,
the file name is checked for _LOCALE#### to determine if it is distributed.
- If the dataset provided does not exist, it will be added
- Because HDF5 deletes do not release memory, this will create a copy of the
file with the new data
"""
from arkouda.categorical import Categorical as Categorical_
from arkouda.client import generic_msg
from arkouda.io import (
_file_type_to_int,
_get_hdf_filetype,
_mode_str_to_int,
_repack_hdf,
)
# determine the format (single/distribute) that the file was saved in
file_type = _get_hdf_filetype(prefix_path + "*")
index_data = [
(
self.values.name
if not isinstance(self.values, (Categorical_))
else json.dumps(
{
"codes": self.values.codes.name,
"categories": self.values.categories.name,
"NA_codes": self.values._akNAcode.name,
**(
{"permutation": self.values.permutation.name}
if self.values.permutation is not None
else {}
),
**(
{"segments": self.values.segments.name}
if self.values.segments is not None
else {}
),
}
)
)
]
generic_msg(
cmd="tohdf",
args={
"filename": prefix_path,
"dset": dataset,
"file_format": _file_type_to_int(file_type),
"write_mode": _mode_str_to_int("append"),
"objType": self.objType,
"num_idx": 1,
"idx": index_data,
"idx_objTypes": [self.values.objType], # this will be pdarray, strings, or cat
"idx_dtypes": [str(self.values.dtype)],
"overwrite": True,
},
),
if repack:
_repack_hdf(prefix_path)
[docs]
def to_parquet(
self,
prefix_path: str,
dataset: str = "index",
mode: str = "truncate",
compression: Optional[str] = None,
):
"""
Save the Index to Parquet. The result is a collection of files,
one file per locale of the arkouda server, where each filename starts
with prefix_path. Each locale saves its chunk of the array to its
corresponding file.
Parameters
----------
prefix_path : str
Directory and filename prefix that all output files share
dataset : str
Name of the dataset to create in files (must not already exist)
mode : str {'truncate' | 'append'}
By default, truncate (overwrite) output files, if they exist.
If 'append', attempt to create new dataset in existing files.
compression : str (Optional)
(None | "snappy" | "gzip" | "brotli" | "zstd" | "lz4")
Sets the compression type used with Parquet files
Returns
-------
string message indicating result of save operation
Raises
------
RuntimeError
Raised if a server-side error is thrown saving the pdarray
TypeError
Raised if the Index values are a list.
Notes
-----
- The prefix_path must be visible to the arkouda server and the user must
have write permission.
- Output files have names of the form ``<prefix_path>_LOCALE<i>``, where ``<i>``
ranges from 0 to ``numLocales`` for `file_type='distribute'`.
- 'append' write mode is supported, but is not efficient.
- If any of the output files already exist and
the mode is 'truncate', they will be overwritten. If the mode is 'append'
and the number of output files is less than the number of locales or a
dataset with the same name already exists, a ``RuntimeError`` will result.
- Any file extension can be used.The file I/O does not rely on the extension to
determine the file format.
"""
if isinstance(self.values, list):
raise TypeError("Unable to write Index to parquet when values are a list.")
return self.values.to_parquet(prefix_path, dataset=dataset, mode=mode, compression=compression)
[docs]
@typechecked
def to_csv(
self,
prefix_path: str,
dataset: str = "index",
col_delim: str = ",",
overwrite: bool = False,
):
"""
Write Index to CSV file(s). File will contain a single column with the pdarray data.
All CSV Files written by Arkouda include a header denoting data types of the columns.
Parameters
-----------
prefix_path: str
The filename prefix to be used for saving files. Files will have _LOCALE#### appended
when they are written to disk.
dataset: str
Column name to save the pdarray under. Defaults to "array".
col_delim: str
Defaults to ",". Value to be used to separate columns within the file.
Please be sure that the value used DOES NOT appear in your dataset.
overwrite: bool
Defaults to False. If True, any existing files matching your provided prefix_path will
be overwritten. If False, an error will be returned if existing files are found.
Returns
--------
str reponse message
Raises
------
ValueError
Raised if all datasets are not present in all parquet files or if one or
more of the specified files do not exist.
RuntimeError
Raised if one or more of the specified files cannot be opened.
If `allow_errors` is true this may be raised if no values are returned
from the server.
TypeError
Raised if we receive an unknown arkouda_type returned from the server.
Raised if the Index values are a list.
Notes
------
- CSV format is not currently supported by load/load_all operations
- The column delimiter is expected to be the same for column names and data
- Be sure that column delimiters are not found within your data.
- All CSV files must delimit rows using newline (`\n`) at this time.
"""
if isinstance(self.values, list):
raise TypeError("Unable to write Index to csv when values are a list.")
return self.values.to_csv(prefix_path, dataset=dataset, col_delim=col_delim, overwrite=overwrite)
[docs]
def save(
self,
prefix_path: str,
dataset: str = "index",
mode: str = "truncate",
compression: Optional[str] = None,
file_format: str = "HDF5",
file_type: str = "distribute",
) -> str:
"""
DEPRECATED
Save the index to HDF5 or Parquet. The result is a collection of files,
one file per locale of the arkouda server, where each filename starts
with prefix_path. Each locale saves its chunk of the array to its
corresponding file.
Parameters
----------
prefix_path : str
Directory and filename prefix that all output files share
dataset : str
Name of the dataset to create in files (must not already exist)
mode : str {'truncate' | 'append'}
By default, truncate (overwrite) output files, if they exist.
If 'append', attempt to create new dataset in existing files.
compression : str (Optional)
(None | "snappy" | "gzip" | "brotli" | "zstd" | "lz4")
Sets the compression type used with Parquet files
file_format : str {'HDF5', 'Parquet'}
By default, saved files will be written to the HDF5 file format. If
'Parquet', the files will be written to the Parquet file format. This
is case insensitive.
file_type: str ("single" | "distribute")
Default: "distribute"
When set to single, dataset is written to a single file.
When distribute, dataset is written on a file per locale.
This is only supported by HDF5 files and will have no impact of Parquet Files.
Returns
-------
string message indicating result of save operation
Raises
------
RuntimeError
Raised if a server-side error is thrown saving the pdarray
ValueError
Raised if there is an error in parsing the prefix path pointing to
file write location or if the mode parameter is neither truncate
nor append
TypeError
Raised if any one of the prefix_path, dataset, or mode parameters
is not a string.
Raised if the Index values are a list.
See Also
--------
save_all, load, read, to_parquet, to_hdf
Notes
-----
The prefix_path must be visible to the arkouda server and the user must
have write permission.
Output files have names of the form ``<prefix_path>_LOCALE<i>``, where ``<i>``
ranges from 0 to ``numLocales``. If any of the output files already exist and
the mode is 'truncate', they will be overwritten. If the mode is 'append'
and the number of output files is less than the number of locales or a
dataset with the same name already exists, a ``RuntimeError`` will result.
Previously all files saved in Parquet format were saved with a ``.parquet`` file extension.
This will require you to use load as if you saved the file with the extension. Try this if
an older file is not being found.
Any file extension can be used. The file I/O does not rely on the extension to determine the
file format.
"""
from warnings import warn
warn(
"ak.Index.save has been deprecated. Please use ak.Index.to_parquet or ak.Index.to_hdf",
DeprecationWarning,
)
if isinstance(self.values, list):
raise TypeError("Unable to save Index when values are a list.")
if mode.lower() not in ["append", "truncate"]:
raise ValueError("Allowed modes are 'truncate' and 'append'")
if file_format.lower() == "hdf5":
return self.to_hdf(prefix_path, dataset=dataset, mode=mode, file_type=file_type)
elif file_format.lower() == "parquet":
return self.to_parquet(prefix_path, dataset=dataset, mode=mode, compression=compression)
else:
raise ValueError("Valid file types are HDF5 or Parquet")
[docs]
class MultiIndex(Index):
objType = "MultiIndex"
levels: list
_name: str | None
_names: list[str] | list[None]
def __init__(
self,
data: Union[list, tuple, pd.MultiIndex, MultiIndex],
name: Optional[str] = None,
names: Optional[list[str]] = None,
):
self.registered_name: Optional[str] = None
if isinstance(data, MultiIndex):
self.levels = data.levels
elif isinstance(data, pd.MultiIndex):
self.levels = [
(
Categorical(data.get_level_values(i).values)
if isinstance(data.get_level_values(i).values, pd.Categorical)
else array(data.get_level_values(i).values)
)
for i in range(data.nlevels)
]
elif isinstance(data, (list, tuple)):
self.levels = list(data)
else:
raise TypeError("MultiIndex should be an iterable, ak.MultiIndex, or pd.MutiIndex")
first = True
for col in self.levels:
# col can be a python int which doesn't have a size attribute
col_size = col.size if not isinstance(col, int) else 0
if first:
# we are implicitly assuming levels contains arkouda types and not python lists
# because we are using obj.size/obj.dtype instead of len(obj)/type(obj)
# this should be made explict using typechecking
self.size = col_size
first = False
else:
if col_size != self.size:
raise ValueError("All columns in MultiIndex must have same length")
self._name = data.name if not name and isinstance(data, (MultiIndex, pd.MultiIndex)) else name
if names is not None:
self._names = list(names)
elif isinstance(data, (MultiIndex, pd.MultiIndex)) and data.names:
self._names = list(data.names)
else:
self._names = [None for _i in range(len(self.levels))]
def __getitem__(self, key):
from arkouda.series import Series
if isinstance(key, Series):
key = key.levels
return MultiIndex([i[key] for i in self.index])
def __repr__(self):
return f"MultiIndex({repr(self.index)})"
def __len__(self):
return len(self.index[0])
def __eq__(self, v):
if not isinstance(v, (list, tuple, MultiIndex)):
raise TypeError("Cannot compare MultiIndex to a scalar")
retval = ones(len(self), dtype=akbool)
if isinstance(v, MultiIndex):
v = v.index
for a, b in zip(self.index, v):
retval &= a == b
return retval
@property
def names(self):
"""
Return Index or MultiIndex names.
"""
return self._names
@property
def name(self):
"""
Return Index or MultiIndex name.
"""
return self._name
@property
def index(self):
return self.levels
@property
def nlevels(self) -> int:
"""
Integer number of levels in this MultiIndex.
See Also
--------
Index.nlevels
"""
return len(self.levels)
@property
def ndim(self):
"""
Number of dimensions of the underlying data, by definition 1.
See Also
--------
Index.ndim
"""
return 1
@property
def inferred_type(self) -> str:
return "mixed"
@property
def dtype(self) -> npdtype:
"""
Return the dtype object of the underlying data.
"""
return npdtype("O")
[docs]
def get_level_values(self, level: Union[str, int]):
if isinstance(level, str):
if self.names is None:
raise RuntimeError("Cannot get level values because Index.names is None.")
elif level not in self.names:
raise ValueError(
f'Cannot get level values because level "{level}" is not in Index.names.'
)
elif isinstance(self.names, list) and level in self.names:
level = self.names.index(level)
if isinstance(level, int) and abs(level) < self.nlevels:
name = None
if isinstance(self.names, list) and level in self.names:
name = self.names[level]
return Index(self.levels[level], name=name)
else:
raise ValueError(
"Cannot get level values because level must be a string in names or "
"an integer with absolute value less than the number of levels."
)
[docs]
def equal_levels(self, other: MultiIndex) -> builtins.bool:
"""
Return True if the levels of both MultiIndex objects are the same
"""
if self.nlevels != other.nlevels:
return False
for i in range(self.nlevels):
if not self.levels[i].equals(other.levels[i]):
return False
return True
[docs]
def memory_usage(self, unit="B"):
"""
Return the memory usage of the MultiIndex levels.
Parameters
----------
unit : str, default = "B"
Unit to return. One of {'B', 'KB', 'MB', 'GB'}.
Returns
-------
int
Bytes of memory consumed.
See Also
--------
arkouda.pdarrayclass.nbytes
arkouda.index.Index.memory_usage
arkouda.series.Series.memory_usage
arkouda.dataframe.DataFrame.memory_usage
Examples
--------
>>> import arkouda as ak
>>> ak.connect()
>>> m = ak.index.MultiIndex([ak.array([1,2,3]),ak.array([4,5,6])])
>>> m.memory_usage()
48
"""
from arkouda.util import convert_bytes
nbytes = 0
for item in self.levels:
nbytes += item.nbytes
return convert_bytes(nbytes, unit=unit)
[docs]
def to_pandas(self):
mi = pd.MultiIndex.from_arrays(
[i.to_pandas() if isinstance(i, Categorical) else i.to_ndarray() for i in self.index],
names=self.names,
)
mi.name = self.name
return mi
[docs]
def set_dtype(self, dtype):
"""Change the data type of the index
Currently only aku.ip_address and ak.array are supported.
"""
new_idx = [dtype(i) for i in self.index]
self.index = new_idx
return self
[docs]
def to_ndarray(self):
return ndarray([convert_if_categorical(val).to_ndarray() for val in self.levels])
[docs]
def to_list(self):
return self.to_ndarray().tolist()
[docs]
def register(self, user_defined_name):
"""
Register this Index object and underlying components with the Arkouda server
Parameters
----------
user_defined_name : str
user defined name the Index is to be registered under,
this will be the root name for underlying components
Returns
-------
MultiIndex
The same Index which is now registered with the arkouda server and has an updated name.
This is an in-place modification, the original is returned to support
a fluid programming style.
Please note you cannot register two different Indexes with the same name.
Raises
------
TypeError
Raised if user_defined_name is not a str
RegistrationError
If the server was unable to register the Index with the user_defined_name
See also
--------
unregister, attach, is_registered
Notes
-----
Objects registered with the server are immune to deletion until
they are unregistered.
"""
from arkouda.client import generic_msg
if self.registered_name is not None and self.is_registered():
raise RegistrationError(f"This object is already registered as {self.registered_name}")
generic_msg(
cmd="register",
args={
"name": user_defined_name,
"objType": self.objType,
"num_idxs": len(self.levels),
"idx_names": [
(
json.dumps(
{
"codes": v.codes.name,
"categories": v.categories.name,
"NA_codes": v._akNAcode.name,
**(
{"permutation": v.permutation.name}
if v.permutation is not None
else {}
),
**({"segments": v.segments.name} if v.segments is not None else {}),
}
)
if isinstance(v, Categorical)
else v.name
)
for v in self.levels
],
"idx_types": [v.objType for v in self.levels],
},
)
self.registered_name = user_defined_name
return self
[docs]
def unregister(self):
from arkouda.util import unregister
if not self.registered_name:
raise RegistrationError("This object is not registered")
unregister(self.registered_name)
self.registered_name = None
[docs]
def is_registered(self):
from arkouda.util import is_registered
if self.registered_name is None:
return False
return is_registered(self.registered_name)
[docs]
def to_dict(self, labels=None):
data = {}
if labels is None:
labels = [f"idx_{i}" for i in range(len(self.index))]
for i, value in enumerate(self.index):
data[labels[i]] = value
return data
def _merge(self, other):
self._check_types(other)
idx = [generic_concat([ix1, ix2], ordered=False) for ix1, ix2 in zip(self.index, other.index)]
return MultiIndex(GroupBy(idx).unique_keys)
def _merge_all(self, array):
idx = self.index
for other in array:
self._check_types(other)
idx = [generic_concat([ix1, ix2], ordered=False) for ix1, ix2 in zip(idx, other.index)]
return MultiIndex(GroupBy(idx).unique_keys)
[docs]
def argsort(self, ascending=True):
i = coargsort(self.index)
if not ascending:
i = i[arange(self.size - 1, -1, -1)]
return i
[docs]
def concat(self, other):
self._check_types(other)
idx = [generic_concat([ix1, ix2], ordered=True) for ix1, ix2 in zip(self.index, other.index)]
return MultiIndex(idx)
[docs]
def lookup(self, key):
if not isinstance(key, list) and not isinstance(key, tuple):
raise TypeError("MultiIndex lookup failure")
# if individual vals convert to pdarrays
if not isinstance(key[0], pdarray):
dt = self.levels[0].dtype if isinstance(self.levels[0], pdarray) else akint64
key = [akcast(array([x]), dt) for x in key]
return in1d(self.index, key)
[docs]
def to_hdf(
self,
prefix_path: str,
dataset: str = "index",
mode: str = "truncate",
file_type: str = "distribute",
) -> str:
"""
Save the Index to HDF5.
The object can be saved to a collection of files or single file.
Parameters
----------
prefix_path : str
Directory and filename prefix that all output files share
dataset : str
Name of the dataset to create in files (must not already exist)
mode : str {'truncate' | 'append'}
By default, truncate (overwrite) output files, if they exist.
If 'append', attempt to create new dataset in existing files.
file_type: str ("single" | "distribute")
Default: "distribute"
When set to single, dataset is written to a single file.
When distribute, dataset is written on a file per locale.
This is only supported by HDF5 files and will have no impact of Parquet Files.
Returns
-------
string message indicating result of save operation
Raises
-------
RuntimeError
Raised if a server-side error is thrown saving the pdarray.
Notes
-----
- The prefix_path must be visible to the arkouda server and the user must
have write permission.
- Output files have names of the form ``<prefix_path>_LOCALE<i>``, where ``<i>``
ranges from 0 to ``numLocales`` for `file_type='distribute'`. Otherwise,
the file name will be `prefix_path`.
- If any of the output files already exist and
the mode is 'truncate', they will be overwritten. If the mode is 'append'
and the number of output files is less than the number of locales or a
dataset with the same name already exists, a ``RuntimeError`` will result.
- Any file extension can be used.The file I/O does not rely on the extension to
determine the file format.
"""
from typing import cast as typecast
from arkouda.categorical import Categorical as Categorical_
from arkouda.client import generic_msg
from arkouda.io import _file_type_to_int, _mode_str_to_int
index_data = [
(
obj.name
if not isinstance(obj, (Categorical_))
else json.dumps(
{
"codes": obj.codes.name,
"categories": obj.categories.name,
"NA_codes": obj._akNAcode.name,
**({"permutation": obj.permutation.name} if obj.permutation is not None else {}),
**({"segments": obj.segments.name} if obj.segments is not None else {}),
}
)
)
for obj in self.levels
]
return typecast(
str,
generic_msg(
cmd="tohdf",
args={
"filename": prefix_path,
"dset": dataset,
"file_format": _file_type_to_int(file_type),
"write_mode": _mode_str_to_int(mode),
"objType": self.objType,
"num_idx": len(self.levels),
"idx": index_data,
"idx_objTypes": [obj.objType for obj in self.levels],
"idx_dtypes": [str(obj.dtype) for obj in self.levels],
},
),
)
[docs]
def update_hdf(
self,
prefix_path: str,
dataset: str = "index",
repack: bool = True,
):
"""
Overwrite the dataset with the name provided with this Index object. If
the dataset does not exist it is added.
Parameters
-----------
prefix_path : str
Directory and filename prefix that all output files share
dataset : str
Name of the dataset to create in files
repack: bool
Default: True
HDF5 does not release memory on delete. When True, the inaccessible
data (that was overwritten) is removed. When False, the data remains, but is
inaccessible. Setting to false will yield better performance, but will cause
file sizes to expand.
Returns
--------
str - success message if successful
Raises
-------
RuntimeError
Raised if a server-side error is thrown saving the index
TypeError
Raised if the Index levels are a list.
Notes
------
- If file does not contain File_Format attribute to indicate how it was saved,
the file name is checked for _LOCALE#### to determine if it is distributed.
- If the dataset provided does not exist, it will be added
- Because HDF5 deletes do not release memory, this will create a copy of the
file with the new data
"""
from arkouda.categorical import Categorical as Categorical_
from arkouda.client import generic_msg
from arkouda.io import (
_file_type_to_int,
_get_hdf_filetype,
_mode_str_to_int,
_repack_hdf,
)
if isinstance(self.levels, list):
raise TypeError("Unable update hdf when Index levels are a list.")
# determine the format (single/distribute) that the file was saved in
file_type = _get_hdf_filetype(prefix_path + "*")
index_data = [
(
obj.name
if not isinstance(obj, (Categorical_))
else json.dumps(
{
"codes": obj.codes.name,
"categories": obj.categories.name,
"NA_codes": obj._akNAcode.name,
**({"permutation": obj.permutation.name} if obj.permutation is not None else {}),
**({"segments": obj.segments.name} if obj.segments is not None else {}),
}
)
)
for obj in self.levels
]
generic_msg(
cmd="tohdf",
args={
"filename": prefix_path,
"dset": dataset,
"file_format": _file_type_to_int(file_type),
"write_mode": _mode_str_to_int("append"),
"objType": self.objType,
"num_idx": len(self.levels),
"idx": index_data,
"idx_objTypes": [obj.objType for obj in self.levels],
"idx_dtypes": [str(obj.dtype) for obj in self.levels],
"overwrite": True,
},
),
if repack:
_repack_hdf(prefix_path)