import glob
import json
import os
from typing import Dict, List, Mapping, Optional, Union, cast
from warnings import warn
import pandas as pd
from typeguard import typechecked
from arkouda.categorical import Categorical
from arkouda.client import generic_msg
from arkouda.client_dtypes import IPv4
from arkouda.dataframe import DataFrame
from arkouda.groupbyclass import GroupBy
from arkouda.index import Index, MultiIndex
from arkouda.numpy.dtypes import float32, float64, int32, int64
from arkouda.pdarrayclass import create_pdarray, pdarray
from arkouda.pdarraycreation import arange, array
from arkouda.segarray import SegArray
from arkouda.strings import Strings
from arkouda.timeclass import Datetime, Timedelta
__all__ = [
"get_filetype",
"ls",
"ls_csv",
"get_null_indices",
"get_datasets",
"get_columns",
"read_hdf",
"read_parquet",
"read_csv",
"read_zarr",
"read",
"read_tagged_data",
"import_data",
"export",
"to_hdf",
"to_parquet",
"to_csv",
"to_zarr",
"save_all",
"load",
"load_all",
"update_hdf",
"snapshot",
"restore",
"receive",
"receive_dataframe",
]
ARKOUDA_HDF5_FILE_METADATA_GROUP = "_arkouda_metadata"
[docs]
def get_filetype(filenames: Union[str, List[str]]) -> str:
"""
Get the type of a file accessible to the server. Supported
file types and possible return strings are 'HDF5' and 'Parquet'.
Parameters
----------
filenames : Union[str, List[str]]
A file or list of files visible to the arkouda server
Returns
-------
str
Type of the file returned as a string, either 'HDF5', 'Parquet' or 'CSV
Raises
------
ValueError
Raised if filename is empty or contains only whitespace
Notes
-----
- When list provided, it is assumed that all files are the same type
- CSV Files without the Arkouda Header are not supported
See Also
--------
read_parquet, read_hdf
"""
if isinstance(filenames, list):
fname = filenames[0]
else:
fname = filenames
if not (fname and fname.strip()):
raise ValueError("filename cannot be an empty string")
return cast(str, generic_msg(cmd="getfiletype", args={"filename": fname}))
[docs]
def ls(filename: str, col_delim: str = ",", read_nested: bool = True) -> List[str]:
"""
This function calls the h5ls utility on a HDF5 file visible to the
arkouda server or calls a function that imitates the result of h5ls
on a Parquet file.
Parameters
----------
filename : str
The name of the file to pass to the server
col_delim : str
The delimiter used to separate columns if the file is a csv
read_nested: bool
Default True, when True, SegArray objects will be read from the file. When False,
SegArray (or other nested Parquet columns) will be ignored.
Only used for Parquet files.
Returns
-------
str
The string output of the datasets from the server
Raises
------
TypeError
Raised if filename is not a str
ValueError
Raised if filename is empty or contains only whitespace
RuntimeError
Raised if error occurs in executing ls on an HDF5 file
Notes
- This will need to be updated because Parquet will not technically support this when we update.
Similar functionality will be added for Parquet in the future
- For CSV files without headers, please use ls_csv
See Also
---------
ls_csv
"""
if not (filename and filename.strip()):
raise ValueError("filename cannot be an empty string")
cmd = "lsany"
return json.loads(
cast(
str,
generic_msg(
cmd=cmd,
args={"filename": filename, "col_delim": col_delim, "read_nested": read_nested},
),
)
)
[docs]
def get_null_indices(
filenames: Union[str, List[str]], datasets: Optional[Union[str, List[str]]] = None
) -> Union[pdarray, Mapping[str, pdarray]]:
"""
Get null indices of a string column in a Parquet file.
Parameters
----------
filenames : list or str
Either a list of filenames or shell expression
datasets : list or str or None
(List of) name(s) of dataset(s) to read. Each dataset must be a string
column. There is no default value for this function, the datasets to be
read must be specified.
Returns
-------
returns a dictionary of Arkouda pdarrays
Dictionary of {datasetName: pdarray}
Raises
------
RuntimeError
Raised if one or more of the specified files cannot be opened.
TypeError
Raised if we receive an unknown arkouda_type returned from the server
See Also
--------
get_datasets, ls
"""
if isinstance(filenames, str):
filenames = [filenames]
if isinstance(datasets, str):
datasets = [datasets]
rep_msg = generic_msg(
cmd="getnullparquet",
args={
"dset_size": len(datasets) if datasets is not None else 0, # if needed for mypy
"filename_size": len(filenames),
"dsets": datasets,
"filenames": filenames,
},
)
rep = json.loads(rep_msg) # See GenSymIO._buildReadAllMsgJson for json structure
# ignore the type here because we are returning a specific case
return _build_objects(rep) # type: ignore
@typechecked
def _file_type_to_int(file_type: str) -> int:
"""
Convert a string to integer representing the format to save the file in
Parameters
----------
file_type: str (single | distribute)
The string representation of the format for saving the file
Returns
-------
int representing the format
Raises
------
ValueError
If mode is not 'single' or 'distribute'
"""
if file_type.lower() == "single":
return 0
elif file_type.lower() == "distribute":
return 1
else:
raise ValueError(f"File Type expected to be 'single' or 'distributed'. Got {file_type}")
@typechecked
def _mode_str_to_int(mode: str) -> int:
"""
Convert string to integer representing the mode to write
Parameters
----------
mode: str (truncate | append)
The string representation of the write mode to be converted to integer
Returns
-------
int representing the mode
Raises
------
ValueError
If mode is not 'truncate' or 'append'
"""
if mode.lower() == "truncate":
return 0
elif mode.lower() == "append":
return 1
else:
raise ValueError(f"Write Mode expected to be 'truncate' or 'append'. Got {mode}.")
[docs]
def get_datasets(
filenames: Union[str, List[str]],
allow_errors: bool = False,
column_delim: str = ",",
read_nested: bool = True,
) -> List[str]:
"""
Get the names of the datasets in the provide files
Parameters
----------
filenames: str or List[str]
Name of the file/s from which to return datasets
allow_errors: bool
Default: False
Whether or not to allow errors while accessing datasets
column_delim : str
Column delimiter to be used if dataset is CSV. Otherwise, unused.
read_nested: bool
Default True, when True, SegArray objects will be read from the file. When False,
SegArray (or other nested Parquet columns) will be ignored.
Only used for Parquet Files.
Returns
-------
List[str] of names of the datasets
Raises
------
RuntimeError
- If no datasets are returned
Notes
-----
- This function currently supports HDF5 and Parquet formats.
- Future updates to Parquet will deprecate this functionality on that format,
but similar support will be added for Parquet at that time.
- If a list of files is provided, only the datasets in the first file will be returned
See Also
--------
ls
"""
datasets = []
if isinstance(filenames, str):
filenames = [filenames]
for fname in filenames:
try:
datasets = ls(fname, col_delim=column_delim, read_nested=read_nested)
if datasets:
break
except RuntimeError:
if allow_errors:
pass
else:
raise
if not datasets: # empty
raise RuntimeError("Unable to identify datasets.")
return datasets
[docs]
def ls_csv(filename: str, col_delim: str = ",") -> List[str]:
"""
Used for identifying the datasets within a file when a CSV does not
have a header.
Parameters
----------
filename : str
The name of the file to pass to the server
col_delim : str
The delimiter used to separate columns if the file is a csv
Returns
-------
str
The string output of the datasets from the server
See Also
---------
ls
"""
if not (filename and filename.strip()):
raise ValueError("filename cannot be an empty string")
return json.loads(
cast(
str,
generic_msg(
cmd="lscsv",
args={"filename": filename, "col_delim": col_delim},
),
)
)
[docs]
def get_columns(
filenames: Union[str, List[str]], col_delim: str = ",", allow_errors: bool = False
) -> List[str]:
"""
Get a list of column names from CSV file(s).
"""
datasets = []
if isinstance(filenames, str):
filenames = [filenames]
for fname in filenames:
try:
datasets = ls_csv(fname, col_delim)
if datasets:
break
except RuntimeError:
if allow_errors:
pass
else:
raise
if not datasets: # empty
raise RuntimeError("Unable to identify datasets.")
return datasets
def _prep_datasets(
filenames: Union[str, List[str]],
datasets: Optional[Union[str, List[str]]] = None,
allow_errors: bool = False,
read_nested: bool = True,
) -> List[str]:
"""
Prepare a list of datasets to be read
Parameters
----------
filenames: str or List[str]
Names of the files for which datasets are being prepped.
Used to call get_datasets()
datasets: Optional str or List[str]
datasets to be accessed
allow_errors: bool
Default: False
Whether or not to allow errors during access operations
read_nested: bool
Default True, when True, SegArray objects will be read from the file. When False,
SegArray (or other nested Parquet columns) will be ignored.
Only used for Parquet Files
Returns
-------
List[str] of dataset names to access
Raises
------
ValueError
- If one or more datasets cannot be found
"""
if datasets is None:
# get datasets. We know they exist because we pulled from the file
datasets = get_datasets(filenames, allow_errors, read_nested=read_nested)
else:
if isinstance(datasets, str):
# TODO - revisit this and enable checks that support things like "strings/values"
# old logic did not check existence for single string dataset.
return [datasets]
# ensure dataset(s) exist
# read_nested always true because when user supplies datasets, it is ignored
nonexistent = set(datasets) - set(get_datasets(filenames, allow_errors, read_nested=True))
if len(nonexistent) > 0:
raise ValueError(f"Dataset(s) not found: {nonexistent}")
return datasets
def _parse_errors(rep_msg, allow_errors: bool = False):
"""
Helper function to parse error messages from a read operation
Parameters
----------
rep_msg
The server response from a read operation
allow_errors: bool
Default: False
Whether or not errors are to be allowed during read operation
"""
file_errors = rep_msg["file_errors"] if "file_errors" in rep_msg else []
if allow_errors and file_errors:
file_error_count = rep_msg["file_error_count"] if "file_error_count" in rep_msg else -1
warn(
f"There were {file_error_count} errors reading files on the server. "
+ f"Sample error messages {file_errors}",
RuntimeWarning,
)
def _parse_obj(
obj: Dict,
) -> Union[
Strings,
pdarray,
SegArray,
Categorical,
DataFrame,
IPv4,
Datetime,
Timedelta,
Index,
MultiIndex,
]:
"""
Helper function to create an Arkouda object from read response
Parameters
----------
obj : Dict
The response data used to create an Arkouda object
Returns
-------
Strings, pdarray, SegArray, IPv4, Datetime, Timedelta, Categorical, GroupBy, DataFrame, or Index
Raises
------
TypeError
- If return object is an unsupported type
"""
if Strings.objType.upper() == obj["arkouda_type"]:
return Strings.from_return_msg(obj["created"])
elif SegArray.objType.upper() == obj["arkouda_type"]:
return SegArray.from_return_msg(obj["created"])
elif pdarray.objType.upper() == obj["arkouda_type"]:
return create_pdarray(obj["created"])
elif IPv4.special_objType.upper() == obj["arkouda_type"]:
return IPv4(create_pdarray(obj["created"]))
elif Datetime.special_objType.upper() == obj["arkouda_type"]:
return Datetime(create_pdarray(obj["created"]))
elif Timedelta.special_objType.upper() == obj["arkouda_type"]:
return Timedelta(create_pdarray(obj["created"]))
elif Categorical.objType.upper() == obj["arkouda_type"]:
return Categorical.from_return_msg(obj["created"])
elif GroupBy.objType.upper() == obj["arkouda_type"]:
return GroupBy.from_return_msg(obj["created"])
elif DataFrame.objType.upper() == obj["arkouda_type"]:
return DataFrame.from_return_msg(obj["created"])
elif (
obj["arkouda_type"].lower() == Index.objType.lower()
or obj["arkouda_type"].lower() == MultiIndex.objType.lower()
):
return Index.from_return_msg(obj["created"])
else:
raise TypeError(f"Unknown arkouda type:{obj['arkouda_type']}")
def _dict_recombine_segarrays_categoricals(df_dict):
# this assumes segments will always have corresponding values.
# This should happen due to save config
seg_cols = ["_".join(col.split("_")[:-1]) for col in df_dict.keys() if col.endswith("_segments")]
cat_cols = [".".join(col.split(".")[:-1]) for col in df_dict.keys() if col.endswith(".categories")]
df_dict_keys = {
(
"_".join(col.split("_")[:-1])
if col.endswith("_segments") or col.endswith("_values")
else (
".".join(col.split(".")[:-1])
if col.endswith("._akNAcode")
or col.endswith(".categories")
or col.endswith(".codes")
or col.endswith(".permutation")
or col.endswith(".segments")
else col
)
)
for col in df_dict.keys()
}
# update dict to contain segarrays where applicable if any exist
if len(seg_cols) > 0 or len(cat_cols) > 0:
df_dict = {
col: (
SegArray(df_dict[col + "_segments"], df_dict[col + "_values"])
if col in seg_cols
else (
Categorical.from_codes(
df_dict[f"{col}.codes"],
df_dict[f"{col}.categories"],
permutation=(
df_dict[f"{col}.permutation"]
if f"{col}.permutation" in df_dict_keys
else None
),
segments=(
df_dict[f"{col}.segments"] if f"{col}.segments" in df_dict_keys else None
),
_akNAcode=df_dict[f"{col}._akNAcode"],
)
if col in cat_cols
else df_dict[col]
)
)
for col in df_dict_keys
}
return df_dict
def _build_objects(
rep_msg: Dict,
) -> Union[
Mapping[
str,
Union[
Strings,
pdarray,
SegArray,
Categorical,
DataFrame,
IPv4,
Datetime,
Timedelta,
Index,
],
],
]:
"""
Helper function to create the Arkouda objects from a read operation
Parameters
----------
rep_msg: Dict
rep_msg to create objects from
Returns
-------
Dictionary mapping the dataset name to the object
Raises
------
RuntimeError
- If no objects were returned
"""
items = json.loads(rep_msg["items"]) if "items" in rep_msg else []
if len(items) >= 1:
return _dict_recombine_segarrays_categoricals(
{item["dataset_name"]: _parse_obj(item) for item in items}
)
else:
raise RuntimeError("No items were returned")
[docs]
def read_hdf(
filenames: Union[str, List[str]],
datasets: Optional[Union[str, List[str]]] = None,
iterative: bool = False,
strict_types: bool = True,
allow_errors: bool = False,
calc_string_offsets: bool = False,
tag_data=False,
) -> Union[
Mapping[
str,
Union[
pdarray,
Strings,
SegArray,
Categorical,
DataFrame,
IPv4,
Datetime,
Timedelta,
Index,
],
],
]:
"""
Read Arkouda objects from HDF5 file/s
Parameters
----------
filenames : str, List[str]
Filename/s to read objects from
datasets : Optional str, List[str]
datasets to read from the provided files
iterative : bool
Iterative (True) or Single (False) function call(s) to server
strict_types: bool
If True (default), require all dtypes of a given dataset to have the
same precision and sign. If False, allow dtypes of different
precision and sign across different files. For example, if one
file contains a uint32 dataset and another contains an int64
dataset with the same name, the contents of both will be read
into an int64 pdarray.
allow_errors: bool
Default False, if True will allow files with read errors to be skipped
instead of failing. A warning will be included in the return containing
the total number of files skipped due to failure and up to 10 filenames.
calc_string_offsets: bool
Default False, if True this will tell the server to calculate the
offsets/segments array on the server versus loading them from HDF5 files.
In the future this option may be set to True as the default.
tagData: bool
Default False, if True tag the data with the code associated with the filename
that the data was pulled from.
Returns
-------
Returns a dictionary of Arkouda pdarrays, Arkouda Strings, or Arkouda Segarrays.
Dictionary of {datasetName: pdarray, String, SegArray}
Raises
------
ValueError
Raised if all datasets are not present in all hdf5 files or if one or
more of the specified files do not exist
RuntimeError
Raised if one or more of the specified files cannot be opened.
If `allow_errors` is true this may be raised if no values are returned
from the server.
TypeError
Raised if we receive an unknown arkouda_type returned from the server
Notes
-----
If filenames is a string, it is interpreted as a shell expression
(a single filename is a valid expression, so it will work) and is
expanded with glob to read all matching files.
If iterative == True each dataset name and file names are passed to
the server as independent sequential strings while if iterative == False
all dataset names and file names are passed to the server in a single
string.
If datasets is None, infer the names of datasets from the first file
and read all of them. Use ``get_datasets`` to show the names of datasets
to HDF5 files.
See Also
---------
read_tagged_data
Examples
--------
>>>
# Read with file Extension
>>> x = ak.read_hdf('path/name_prefix.h5') # load HDF5
# Read Glob Expression
>>> x = ak.read_hdf('path/name_prefix*') # Reads HDF5
"""
if isinstance(filenames, str):
filenames = [filenames]
datasets = _prep_datasets(filenames, datasets, allow_errors)
if iterative:
if tag_data:
raise RuntimeError("Cannot tag data with iterative read.")
return {
dset: read_hdf(
filenames,
datasets=dset,
strict_types=strict_types,
allow_errors=allow_errors,
calc_string_offsets=calc_string_offsets,
tag_data=tag_data,
)[dset]
for dset in datasets
}
else:
rep_msg = generic_msg(
cmd="readAllHdf",
args={
"strict_types": strict_types,
"dset_size": len(datasets),
"filename_size": len(filenames),
"allow_errors": allow_errors,
"calc_string_offsets": calc_string_offsets,
"dsets": datasets,
"filenames": filenames,
"tag_data": tag_data,
},
)
rep = json.loads(rep_msg) # See GenSymIO._buildReadAllMsgJson for json structure
_parse_errors(rep, allow_errors)
return _build_objects(rep)
[docs]
def read_parquet(
filenames: Union[str, List[str]],
datasets: Optional[Union[str, List[str]]] = None,
iterative: bool = False,
strict_types: bool = True,
allow_errors: bool = False,
tag_data: bool = False,
read_nested: bool = True,
has_non_float_nulls: bool = False,
fixed_len: int = -1,
) -> Union[
Mapping[
str,
Union[
pdarray,
Strings,
SegArray,
Categorical,
DataFrame,
IPv4,
Datetime,
Timedelta,
Index,
],
],
]:
"""
Read Arkouda objects from Parquet file/s
Parameters
----------
filenames : str, List[str]
Filename/s to read objects from
datasets : Optional str, List[str]
datasets to read from the provided files
iterative : bool
Iterative (True) or Single (False) function call(s) to server
strict_types: bool
If True (default), require all dtypes of a given dataset to have the
same precision and sign. If False, allow dtypes of different
precision and sign across different files. For example, if one
file contains a uint32 dataset and another contains an int64
dataset with the same name, the contents of both will be read
into an int64 pdarray.
allow_errors: bool
Default False, if True will allow files with read errors to be skipped
instead of failing. A warning will be included in the return containing
the total number of files skipped due to failure and up to 10 filenames.
tagData: bool
Default False, if True tag the data with the code associated with the filename
that the data was pulled from.
read_nested: bool
Default True, when True, SegArray objects will be read from the file. When False,
SegArray (or other nested Parquet columns) will be ignored.
If datasets is not None, this will be ignored.
has_non_float_nulls: bool
Default False. This flag must be set to True to read non-float parquet columns
that contain null values.
fixed_len: int
Default -1. This value can be set for reading Parquet string columns when the
length of each string is known at runtime. This can allow for skipping byte
calculation, which can have an impact on performance.
Returns
-------
Returns a dictionary of Arkouda pdarrays, Arkouda Strings, or Arkouda Segarrays.
Dictionary of {datasetName: pdarray, String, or SegArray}
Raises
------
ValueError
Raised if all datasets are not present in all parquet files or if one or
more of the specified files do not exist
RuntimeError
Raised if one or more of the specified files cannot be opened.
If `allow_errors` is true this may be raised if no values are returned
from the server.
TypeError
Raised if we receive an unknown arkouda_type returned from the server
Notes
-----
If filenames is a string, it is interpreted as a shell expression
(a single filename is a valid expression, so it will work) and is
expanded with glob to read all matching files.
If iterative == True each dataset name and file names are passed to
the server as independent sequential strings while if iterative == False
all dataset names and file names are passed to the server in a single
string.
If datasets is None, infer the names of datasets from the first file
and read all of them. Use ``get_datasets`` to show the names of datasets
to Parquet files.
Parquet always recomputes offsets at this time
This will need to be updated once parquets workflow is updated
See Also
---------
read_tagged_data
Examples
--------
Read without file Extension
>>> x = ak.read_parquet('path/name_prefix.parquet') # load Parquet
Read Glob Expression
>>> x = ak.read_parquet('path/name_prefix*') # Reads Parquet
"""
if isinstance(filenames, str):
filenames = [filenames]
datasets = _prep_datasets(filenames, datasets, read_nested=read_nested)
if iterative:
if tag_data:
raise RuntimeError("Cannot tag data with iterative read.")
return {
dset: read_parquet(
filenames,
datasets=dset,
strict_types=strict_types,
allow_errors=allow_errors,
tag_data=tag_data,
read_nested=read_nested,
has_non_float_nulls=has_non_float_nulls,
fixed_len=fixed_len,
)[dset]
for dset in datasets
}
else:
rep_msg = generic_msg(
cmd="readAllParquet",
args={
"strict_types": strict_types,
"dset_size": len(datasets),
"filename_size": len(filenames),
"allow_errors": allow_errors,
"dsets": datasets,
"filenames": filenames,
"tag_data": tag_data,
"has_non_float_nulls": has_non_float_nulls,
"fixed_len": fixed_len,
},
)
rep = json.loads(rep_msg) # See GenSymIO._buildReadAllMsgJson for json structure
_parse_errors(rep, allow_errors)
return _build_objects(rep)
[docs]
def read_csv(
filenames: Union[str, List[str]],
datasets: Optional[Union[str, List[str]]] = None,
column_delim: str = ",",
allow_errors: bool = False,
) -> Union[
Mapping[
str,
Union[
pdarray,
Strings,
SegArray,
Categorical,
DataFrame,
IPv4,
Datetime,
Timedelta,
Index,
],
],
]:
"""
Read CSV file(s) into Arkouda objects. If more than one dataset is found, the objects
will be returned in a dictionary mapping the dataset name to the Arkouda object
containing the data. If the file contains the appropriately formatted header, typed
data will be returned. Otherwise, all data will be returned as a Strings object.
Parameters
-----------
filenames: str or List[str]
The filenames to read data from
datasets: str or List[str] (Optional)
names of the datasets to read. When `None`, all datasets will be read.
column_delim: str
The delimiter for column names and data. Defaults to ",".
allow_errors: bool
Default False, if True will allow files with read errors to be skipped
instead of failing. A warning will be included in the return containing
the total number of files skipped due to failure and up to 10 filenames.
Returns
--------
Returns a dictionary of Arkouda pdarrays, Arkouda Strings, or Arkouda Segarrays.
Dictionary of {datasetName: pdarray, String, or SegArray}
Raises
------
ValueError
Raised if all datasets are not present in all parquet files or if one or
more of the specified files do not exist
RuntimeError
Raised if one or more of the specified files cannot be opened.
If `allow_errors` is true this may be raised if no values are returned
from the server.
TypeError
Raised if we receive an unknown arkouda_type returned from the server
See Also
---------
to_csv
Notes
------
- CSV format is not currently supported by load/load_all operations
- The column delimiter is expected to be the same for column names and data
- Be sure that column delimiters are not found within your data.
- All CSV files must delimit rows using newline (``\\n``) at this time.
- Unlike other file formats, CSV files store Strings as their UTF-8 format instead of storing
bytes as uint(8).
"""
if isinstance(filenames, str):
filenames = [filenames]
if isinstance(datasets, str):
datasets = [datasets]
elif datasets is None:
datasets = get_columns(filenames, col_delim=column_delim, allow_errors=allow_errors)
rep_msg = generic_msg(
cmd="readcsv",
args={
"filenames": filenames,
"nfiles": len(filenames),
"datasets": datasets,
"num_dsets": len(datasets),
"col_delim": column_delim,
"allow_errors": allow_errors,
},
)
rep = json.loads(rep_msg) # See GenSymIO._buildReadAllMsgJson for json structure
_parse_errors(rep, allow_errors)
return _build_objects(rep)
[docs]
def import_data(
read_path: str, write_file: Optional[str] = None, return_obj: bool = True, index: bool = False
):
"""
Import data from a file saved by Pandas (HDF5/Parquet) to Arkouda object and/or
a file formatted to be read by Arkouda.
Parameters
__________
read_path: str
path to file where pandas data is stored. This can be glob expression for parquet formats.
write_file: str, optional
path to file to write arkouda formatted data to. Only write file if provided
return_obj: bool, optional
Default True. When True return the Arkouda DataFrame object, otherwise return None
index: bool, optional
Default False. When True, maintain the indexes loaded from the pandas file
Raises
______
RuntimeWarning
- Export attempted on Parquet file. Arkouda formatted Parquet files are readable by pandas.
RuntimeError
- Unsupported file type
Returns
_______
pd.DataFrame
When `return_obj=True`
See Also
________
pandas.DataFrame.to_parquet, pandas.DataFrame.to_hdf,
pandas.DataFrame.read_parquet, pandas.DataFrame.read_hdf,
ak.export
Notes
_____
- Import can only be performed from hdf5 or parquet files written by pandas.
"""
from arkouda.dataframe import DataFrame
# verify file path
is_glob = not os.path.isfile(read_path)
file_list = glob.glob(read_path)
if len(file_list) == 0:
raise FileNotFoundError(f"Invalid read_path, {read_path}. No files found.")
# access the file type - multiple files valid here because parquet supports glob. Check first listed.
file = read_path if not is_glob else glob.glob(read_path)[0]
filetype = get_filetype(file)
# Note - in the future if we support more than pandas here, we should verify attributes.
if filetype == "HDF5":
if is_glob:
raise RuntimeError(
"Pandas HDF5 import supports valid file path only. Only supports the local file system,"
" remote URLs and file-like objects are not supported."
)
df_def = pd.read_hdf(read_path)
elif filetype == "Parquet":
# parquet supports glob input in pandas
df_def = pd.read_parquet(read_path)
else:
raise RuntimeError(
"File type not supported. Import is only supported for HDF5 and Parquet file formats."
)
df = DataFrame(df_def)
if write_file:
(
df.to_hdf(write_file, index=index)
if filetype == "HDF5"
else df.to_parquet(write_file, index=index)
)
if return_obj:
return df
[docs]
def export(
read_path: str,
dataset_name: str = "ak_data",
write_file: Optional[str] = None,
return_obj: bool = True,
index: bool = False,
):
"""
Export data from Arkouda file (Parquet/HDF5) to Pandas object or file formatted to be
readable by Pandas
Parameters
__________
read_path: str
path to file where arkouda data is stored.
dataset_name: str
name to store dataset under
index: bool
Default False. When True, maintain the indexes loaded from the pandas file
write_file: str, optional
path to file to write pandas formatted data to. Only write the file if this is set
return_obj: bool, optional
Default True. When True return the Pandas DataFrame object, otherwise return None
Raises
______
RuntimeError
- Unsupported file type
Returns
_______
pd.DataFrame
When `return_obj=True`
See Also
________
pandas.DataFrame.to_parquet, pandas.DataFrame.to_hdf,
pandas.DataFrame.read_parquet, pandas.DataFrame.read_hdf,
ak.import_data
Notes
_____
- If Arkouda file is exported for pandas, the format will not change. This mean parquet files
will remain parquet and hdf5 will remain hdf5.
- Export can only be performed from hdf5 or parquet files written by Arkouda. The result will be
the same file type, but formatted to be read by Pandas.
"""
from arkouda.dataframe import DataFrame
# get the filetype
prefix, extension = os.path.splitext(read_path)
first_file = f"{prefix}_LOCALE0000{extension}"
filetype = get_filetype(first_file)
if filetype not in ["HDF5", "Parquet"]:
raise RuntimeError(
"File type not supported. Import is only supported for HDF5 and Parquet file formats."
)
akdf = DataFrame.load(read_path, file_format=filetype)
df = akdf.to_pandas(retain_index=index)
if write_file:
if filetype == "HDF5":
# write to fixed format as this should be the most efficient
df.to_hdf(write_file, key=dataset_name, format="fixed", mode="w", index=index)
else:
# we know this is parquet because otherwise we would have errored at the type check
df.to_parquet(write_file, index=index)
if return_obj:
return df
def _bulk_write_prep(
columns: Union[
Mapping[str, Union[pdarray, Strings, SegArray]],
List[Union[pdarray, Strings, SegArray]],
],
names: Optional[List[str]] = None,
convert_categoricals: bool = False,
):
datasetNames = []
if names is not None:
if len(names) != len(columns):
raise ValueError("Number of names does not match number of columns")
else:
datasetNames = names
data = [] # init to avoid undefined errors
if isinstance(columns, dict):
data = list(columns.values())
if names is None:
datasetNames = list(columns.keys())
elif isinstance(columns, list):
data = cast(List[pdarray], columns)
if names is None:
datasetNames = [str(column) for column in range(len(columns))]
if len(data) == 0:
raise RuntimeError("No data was found.")
if convert_categoricals:
for i, val in enumerate(data):
if isinstance(val, Categorical):
data[i] = val.categories[val.codes]
col_objtypes = [c.objType for c in data]
return datasetNames, data, col_objtypes
[docs]
def to_parquet(
columns: Union[
Mapping[str, Union[pdarray, Strings, SegArray]],
List[Union[pdarray, Strings, SegArray]],
],
prefix_path: str,
names: Optional[List[str]] = None,
mode: str = "truncate",
compression: Optional[str] = None,
convert_categoricals: bool = False,
) -> None:
"""
Save multiple named pdarrays to Parquet files.
Parameters
----------
columns : dict or list of pdarrays
Collection of arrays to save
prefix_path : str
Directory and filename prefix for output files
names : list of str
Dataset names for the pdarrays
mode : {'truncate' | 'append'}
By default, truncate (overwrite) the output files if they exist.
If 'append', attempt to create new dataset in existing files.
'append' is deprecated, please use the multi-column write
compression : str (Optional)
Default None
Provide the compression type to use when writing the file.
Supported values: snappy, gzip, brotli, zstd, lz4
convert_categoricals: bool
Defaults to False
Parquet requires all columns to be the same size and Categoricals
don't satisfy that requirement.
if set, write the equivalent Strings in place of any Categorical columns.
Returns
-------
None
Raises
------
ValueError
Raised if (1) the lengths of columns and values differ or (2) the mode
is not 'truncate' or 'append'
RuntimeError
Raised if a server-side error is thrown saving the pdarray
See Also
--------
to_hdf, load, load_all, read
Notes
-----
Creates one file per locale containing that locale's chunk of each pdarray.
If columns is a dictionary, the keys are used as the Parquet column names.
Otherwise, if no names are supplied, 0-up integers are used. By default,
any existing files at path_prefix will be overwritten, unless the user
specifies the 'append' mode, in which case arkouda will attempt to add
<columns> as new datasets to existing files. If the wrong number of files
is present or dataset names already exist, a RuntimeError is raised.
Examples
--------
>>> a = ak.arange(25)
>>> b = ak.arange(25)
>>> # Save with mapping defining dataset names
>>> ak.to_parquet({'a': a, 'b': b}, 'path/name_prefix')
>>> # Save using names instead of mapping
>>> ak.to_parquet([a, b], 'path/name_prefix', names=['a', 'b'])
"""
if mode.lower() not in ["append", "truncate"]:
raise ValueError("Allowed modes are 'truncate' and 'append'")
if mode.lower() == "append":
warn(
"Append has been deprecated when writing Parquet files. "
"Please write all columns to the file at once.",
DeprecationWarning,
)
datasetNames, data, col_objtypes = _bulk_write_prep(columns, names, convert_categoricals)
# append or single column use the old logic
if mode.lower() == "append" or len(data) == 1:
for arr, name in zip(data, cast(List[str], datasetNames)):
arr.to_parquet(prefix_path=prefix_path, dataset=name, mode=mode, compression=compression)
else:
print(
cast(
str,
generic_msg(
cmd="toParquet_multi",
args={
"columns": data,
"col_names": datasetNames,
"col_objtypes": col_objtypes,
"filename": prefix_path,
"num_cols": len(data),
"compression": compression,
},
),
)
)
[docs]
def to_hdf(
columns: Union[
Mapping[str, Union[pdarray, Strings, SegArray]],
List[Union[pdarray, Strings, SegArray]],
],
prefix_path: str,
names: Optional[List[str]] = None,
mode: str = "truncate",
file_type: str = "distribute",
) -> None:
"""
Save multiple named pdarrays to HDF5 files.
Parameters
----------
columns : dict or list of pdarrays
Collection of arrays to save
prefix_path : str
Directory and filename prefix for output files
names : list of str
Dataset names for the pdarrays
mode : {'truncate' | 'append'}
By default, truncate (overwrite) the output files if they exist.
If 'append', attempt to create new dataset in existing files.
file_type : str ("single" | "distribute")
Default: distribute
Single writes the dataset to a single file
Distribute writes the dataset to a file per locale
Returns
-------
None
Raises
------
ValueError
Raised if (1) the lengths of columns and values differ or (2) the mode
is not 'truncate' or 'append'
RuntimeError
Raised if a server-side error is thrown saving the pdarray
See Also
--------
to_parquet, load, load_all, read
Notes
-----
Creates one file per locale containing that locale's chunk of each pdarray.
If columns is a dictionary, the keys are used as the HDF5 dataset names.
Otherwise, if no names are supplied, 0-up integers are used. By default,
any existing files at path_prefix will be overwritten, unless the user
specifies the 'append' mode, in which case arkouda will attempt to add
<columns> as new datasets to existing files. If the wrong number of files
is present or dataset names already exist, a RuntimeError is raised.
Examples
--------
>>> a = ak.arange(25)
>>> b = ak.arange(25)
>>> # Save with mapping defining dataset names
>>> ak.to_hdf({'a': a, 'b': b}, 'path/name_prefix')
>>> # Save using names instead of mapping
>>> ak.to_hdf([a, b], 'path/name_prefix', names=['a', 'b'])
"""
if mode.lower() not in ["append", "truncate"]:
raise ValueError("Allowed modes are 'truncate' and 'append'")
datasetNames, pdarrays, _ = _bulk_write_prep(columns, names)
for arr, name in zip(pdarrays, cast(List[str], datasetNames)):
arr.to_hdf(
prefix_path=prefix_path,
dataset=name,
mode=mode,
file_type=file_type,
)
if mode.lower() == "truncate":
mode = "append"
def _get_hdf_filetype(filename: str) -> str:
if not (filename and filename.strip()):
raise ValueError("filename cannot be an empty string")
cmd = "hdffileformat"
return cast(
str,
generic_msg(
cmd=cmd,
args={"filename": filename},
),
)
def _repack_hdf(prefix_path: str):
"""
Overwrites the existing hdf5 file with a copy that removes any inaccessible datasets
"""
file_type = _get_hdf_filetype(prefix_path + "*")
dset_list = ls(prefix_path + "*")
if len(dset_list) == 1:
# early out because when overwriting only one value, hdf5 automatically releases memory
return
data = read_hdf(prefix_path + "*")
if not isinstance(data, dict):
# handles the case of reading only 1 dataset
data = [data] # type: ignore
to_hdf(data, prefix_path, names=dset_list, file_type=file_type) # type: ignore
[docs]
def update_hdf(
columns: Union[
Mapping[str, Union[pdarray, Strings, SegArray]],
List[Union[pdarray, Strings, SegArray]],
],
prefix_path: str,
names: Optional[List[str]] = None,
repack: bool = True,
):
"""
Overwrite the datasets with name appearing in names or keys in columns if columns
is a dictionary
Parameters
-----------
columns : dict or list of pdarrays
Collection of arrays to save
prefix_path : str
Directory and filename prefix for output files
names : list of str
Dataset names for the pdarrays
repack: bool
Default: True
HDF5 does not release memory on delete. When True, the inaccessible
data (that was overwritten) is removed. When False, the data remains, but is
inaccessible. Setting to false will yield better performance, but will cause
file sizes to expand.
Raises
-------
RuntimeError
Raised if a server-side error is thrown saving the datasets
Notes
-----
- If file does not contain File_Format attribute to indicate how it was saved,
the file name is checked for _LOCALE#### to determine if it is distributed.
- If the datasets provided do not exist, they will be added
- Because HDF5 deletes do not release memory, this will create a copy of the
file with the new data
- This workflow is slightly different from `to_hdf` to prevent reading and
creating a copy of the file for each dataset
"""
datasetNames, pdarrays, _ = _bulk_write_prep(columns, names)
for arr, name in zip(pdarrays, cast(List[str], datasetNames)):
# overwrite the data without repacking. Repack done once at end if set
arr.update_hdf(prefix_path, dataset=name, repack=False)
if repack:
_repack_hdf(prefix_path)
[docs]
def to_csv(
columns: Union[Mapping[str, Union[pdarray, Strings]], List[Union[pdarray, Strings]]],
prefix_path: str,
names: Optional[List[str]] = None,
col_delim: str = ",",
overwrite: bool = False,
):
"""
Write Arkouda object(s) to CSV file(s). All CSV Files written by Arkouda
include a header denoting data types of the columns.
Parameters
-----------
columns: Mapping[str, pdarray] or List[pdarray]
The objects to be written to CSV file. If a mapping is used and `names` is None
the keys of the mapping will be used as the dataset names.
prefix_path: str
The filename prefix to be used for saving files. Files will have _LOCALE#### appended
when they are written to disk.
names: List[str] (Optional)
names of dataset to be written. Order should correspond to the order of data
provided in `columns`.
col_delim: str
Defaults to ",". Value to be used to separate columns within the file.
Please be sure that the value used DOES NOT appear in your dataset.
overwrite: bool
Defaults to False. If True, any existing files matching your provided prefix_path will
be overwritten. If False, an error will be returned if existing files are found.
Returns
--------
None
Raises
------
ValueError
Raised if any datasets are present in all csv files or if one or
more of the specified files do not exist
RuntimeError
Raised if one or more of the specified files cannot be opened.
If `allow_errors` is true this may be raised if no values are returned
from the server.
TypeError
Raised if we receive an unknown arkouda_type returned from the server
See Also
---------
read_csv
Notes
------
- CSV format is not currently supported by load/load_all operations
- The column delimiter is expected to be the same for column names and data
- Be sure that column delimiters are not found within your data.
- All CSV files must delimit rows using newline (``\\n``) at this time.
- Unlike other file formats, CSV files store Strings as their UTF-8 format instead of storing
bytes as uint(8).
"""
datasetNames, pdarrays, _ = _bulk_write_prep(columns, names) # type: ignore
dtypes = [a.dtype.name for a in pdarrays]
generic_msg(
cmd="writecsv",
args={
"datasets": pdarrays,
"col_names": datasetNames,
"filename": prefix_path,
"num_dsets": len(pdarrays),
"col_delim": col_delim,
"dtypes": dtypes,
"row_count": pdarrays[0].size, # all columns should have equal number of entries
"overwrite": overwrite,
},
)
[docs]
def to_zarr(store_path: str, arr: pdarray, chunk_shape):
"""
Writes a pdarray to disk as a Zarr store. Supports multi-dimensional pdarrays of numeric types.
To use this function, ensure you have installed the blosc dependency (`make install-blosc`)
and have included `ZarrMsg.chpl` in the `ServerModules.cfg` file.
Parameters
----------
store_path : str
The path at which Zarr store should be written
arr : pdarray
The pdarray to be written to disk
chunk_shape : tuple
The shape of the chunks to be used in the Zarr store
Raises
------
ValueError
Raised if the number of dimensions in the chunk shape does not match
the number of dimensions in the array or if the array is not a 32 or 64 bit numeric type
"""
ndim = arr.ndim
if ndim != len(chunk_shape):
raise ValueError(
"The number of dimensions in the chunk shape must match the \
number of dimensions in the array"
)
if arr.dtype not in [int64, int32, float64, float32]:
raise ValueError("Only pdarrays of 64 and 32 bit numeric types are supported")
generic_msg(
cmd=f"writeAllZarr{ndim}D",
args={"store_path": store_path, "arr": arr, "chunk_shape": chunk_shape},
)
[docs]
def read_zarr(store_path: str, ndim: int, dtype):
"""
Reads a Zarr store from disk into a pdarray. Supports multi-dimensional pdarrays of numeric types.
To use this function, ensure you have installed the blosc dependency (`make install-blosc`)
and have included `ZarrMsg.chpl` in the `ServerModules.cfg` file.
Parameters
----------
store_path : str
The path to the Zarr store. The path must be to a directory that contains a `.zarray`
file containing the Zarr store metadata.
ndim : int
The number of dimensions in the array
dtype : str
The data type of the array
Returns
-------
pdarray
The pdarray read from the Zarr store.
"""
rep_msg = generic_msg(cmd=f"readAllZarr{ndim}D", args={"store_path": store_path, "dtype": dtype})
return create_pdarray(rep_msg)
[docs]
def save_all(
columns: Union[
Mapping[str, Union[pdarray, Strings, SegArray]],
List[Union[pdarray, Strings, SegArray]],
],
prefix_path: str,
names: Optional[List[str]] = None,
file_format="HDF5",
mode: str = "truncate",
file_type: str = "distribute",
compression: Optional[str] = None,
) -> None:
"""
DEPRECATED
Save multiple named pdarrays to HDF5/Parquet files.
Parameters
----------
columns : dict or list of pdarrays
Collection of arrays to save
prefix_path : str
Directory and filename prefix for output files
names : list of str
Dataset names for the pdarrays
file_format : str
'HDF5' or 'Parquet'. Defaults to hdf5
mode : {'truncate' | 'append'}
By default, truncate (overwrite) the output files if they exist.
If 'append', attempt to create new dataset in existing files.
file_type : str ("single" | "distribute")
Default: distribute
Single writes the dataset to a single file
Distribute writes the dataset to a file per locale
Only used with HDF5
compression: str (None | "snappy" | "gzip" | "brotli" | "zstd" | "lz4")
Optional
Select the compression to use with Parquet files.
Only used with Parquet.
Returns
-------
None
Raises
------
ValueError
Raised if (1) the lengths of columns and values differ or (2) the mode
is not 'truncate' or 'append'
See Also
--------
save, load_all, to_parquet, to_hdf
Notes
-----
Creates one file per locale containing that locale's chunk of each pdarray.
If columns is a dictionary, the keys are used as the HDF5 dataset names.
Otherwise, if no names are supplied, 0-up integers are used. By default,
any existing files at path_prefix will be overwritten, unless the user
specifies the 'append' mode, in which case arkouda will attempt to add
<columns> as new datasets to existing files. If the wrong number of files
is present or dataset names already exist, a RuntimeError is raised.
Examples
--------
>>> a = ak.arange(25)
>>> b = ak.arange(25)
>>> # Save with mapping defining dataset names
>>> ak.save_all({'a': a, 'b': b}, 'path/name_prefix', file_format='Parquet')
>>> # Save using names instead of mapping
>>> ak.save_all([a, b], 'path/name_prefix', names=['a', 'b'], file_format='Parquet')
"""
warn(
"ak.save_all has been deprecated. Please use ak.to_hdf or ak.to_parquet",
DeprecationWarning,
)
if file_format.lower() == "hdf5":
to_hdf(columns, prefix_path, names=names, mode=mode, file_type=file_type)
elif file_format.lower() == "parquet":
to_parquet(columns, prefix_path, names=names, mode=mode, compression=compression)
else:
raise ValueError("Arkouda only supports HDF5 and Parquet files.")
[docs]
@typechecked
def load(
path_prefix: str,
file_format: str = "INFER",
dataset: str = "array",
calc_string_offsets: bool = False,
column_delim: str = ",",
) -> Union[
Mapping[
str,
Union[
pdarray,
Strings,
SegArray,
Categorical,
DataFrame,
IPv4,
Datetime,
Timedelta,
Index,
],
],
]:
"""
Load a pdarray previously saved with ``pdarray.save()``.
Parameters
----------
path_prefix : str
Filename prefix used to save the original pdarray
file_format : str
'INFER', 'HDF5' or 'Parquet'. Defaults to 'INFER'. Used to indicate the file type being loaded.
If INFER, this will be detected during processing
dataset : str
Dataset name where the pdarray was saved, defaults to 'array'
calc_string_offsets : bool
If True the server will ignore Segmented Strings 'offsets' array and derive
it from the null-byte terminators. Defaults to False currently
column_delim : str
Column delimiter to be used if dataset is CSV. Otherwise, unused.
Returns
-------
Mapping[str, Union[pdarray, Strings, SegArray, Categorical]]
Dictionary of {datsetName: Union[pdarray, Strings, SegArray, Categorical]}
with the previously saved pdarrays, Strings, SegArrays, or Categoricals
Raises
------
TypeError
Raised if either path_prefix or dataset is not a str
ValueError
Raised if invalid file_format or if the dataset is not present in all hdf5 files or if the
path_prefix does not correspond to files accessible to Arkouda
RuntimeError
Raised if the hdf5 files are present but there is an error in opening
one or more of them
See Also
--------
to_parquet, to_hdf, load_all, read
Notes
-----
If you have a previously saved Parquet file that is raising a FileNotFound error, try loading it
with a .parquet appended to the prefix_path.
Parquet files were previously ALWAYS stored with a ``.parquet`` extension.
ak.load does not support loading a single file.
For loading single HDF5 files without the _LOCALE#### suffix please use ak.read().
CSV files without the Arkouda Header are not supported.
Examples
--------
>>> # Loading from file without extension
>>> obj = ak.load('path/prefix')
Loads the array from numLocales files with the name ``cwd/path/name_prefix_LOCALE####``.
The file type is inferred during processing.
>>> # Loading with an extension (HDF5)
>>> obj = ak.load('path/prefix.test')
Loads the object from numLocales files with the name ``cwd/path/name_prefix_LOCALE####.test`` where
#### is replaced by each locale numbers. Because filetype is inferred during processing,
the extension is not required to be a specific format.
"""
if "*" in path_prefix:
raise ValueError(
"Glob expressions not supported by ak.load(). "
"To read files using a glob expression, please use ak.read()"
)
prefix, extension = os.path.splitext(path_prefix)
globstr = f"{prefix}_LOCALE*{extension}"
try:
file_format = get_filetype(globstr) if file_format.lower() == "infer" else file_format
if file_format.lower() == "hdf5":
return read_hdf(globstr, dataset, calc_string_offsets=calc_string_offsets)
elif file_format.lower() == "parquet":
return read_parquet(globstr, dataset)
else:
return read_csv(globstr, dataset, column_delim=column_delim)
except RuntimeError as re:
if "does not exist" in str(re):
raise ValueError(
f"There are no files corresponding to the path_prefix {path_prefix} in"
" a location accessible to Arkouda"
)
else:
raise RuntimeError(re)
[docs]
@typechecked
def load_all(
path_prefix: str, file_format: str = "INFER", column_delim: str = ",", read_nested=True
) -> Mapping[str, Union[pdarray, Strings, SegArray, Categorical]]:
"""
Load multiple pdarrays, Strings, SegArrays, or Categoricals previously
saved with ``save_all()``.
Parameters
----------
path_prefix : str
Filename prefix used to save the original pdarray
file_format: str
'INFER', 'HDF5', 'Parquet', or 'CSV'. Defaults to 'INFER'. Indicates the format being loaded.
When 'INFER' the processing will detect the format
Defaults to 'INFER'
column_delim : str
Column delimiter to be used if dataset is CSV. Otherwise, unused.
read_nested: bool
Default True, when True, SegArray objects will be read from the file. When False,
SegArray (or other nested Parquet columns) will be ignored.
Parquet files only
Returns
-------
Mapping[str, Union[pdarray, Strings, SegArray, Categorical]]
Dictionary of {datsetName: Union[pdarray, Strings, SegArray, Categorical]}
with the previously saved pdarrays, Strings, SegArrays, or Categoricals
Raises
------
TypeError:
Raised if path_prefix is not a str
ValueError
Raised if file_format/extension is encountered that is not hdf5 or parquet or
if all datasets are not present in all hdf5/parquet files or if the
path_prefix does not correspond to files accessible to Arkouda
RuntimeError
Raised if the hdf5 files are present but there is an error in opening
one or more of them
See Also
--------
to_parquet, to_hdf, load, read
Notes
_____
This function has been updated to determine the file extension based on the file format variable
This function will be deprecated when glob flags are added to read_* methods
CSV files without the Arkouda Header are not supported.
"""
prefix, extension = os.path.splitext(path_prefix)
firstname = f"{prefix}_LOCALE0000{extension}"
try:
result = dict()
for dataset in get_datasets(firstname, column_delim=column_delim, read_nested=read_nested):
result[dataset] = load(prefix, file_format=file_format, dataset=dataset)[dataset]
result = _dict_recombine_segarrays_categoricals(result)
# Check for Categoricals and remove if necessary
removal_names, categoricals = Categorical.parse_hdf_categoricals(result)
if removal_names:
result.update(categoricals)
for n in removal_names:
result.pop(n)
return result
except RuntimeError as re:
# enables backwards compatibility with previous naming convention
if "does not exist" in str(re):
try:
firstname = f"{prefix}_LOCALE0{extension}"
return {dataset: load(prefix, dataset=dataset) for dataset in get_datasets(firstname)}
except RuntimeError as re:
if "does not exist" in str(re):
raise ValueError(
f"There are no files corresponding to the path_prefix {prefix} and "
f"file_format {file_format} in location accessible to Arkouda"
)
else:
raise RuntimeError(re)
else:
raise RuntimeError(
f"Could not open one or more files with path_prefix {prefix} and "
f"file_format {file_format} in location accessible to Arkouda"
)
[docs]
def read(
filenames: Union[str, List[str]],
datasets: Optional[Union[str, List[str]]] = None,
iterative: bool = False,
strictTypes: bool = True,
allow_errors: bool = False,
calc_string_offsets=False,
column_delim: str = ",",
read_nested: bool = True,
has_non_float_nulls: bool = False,
fixed_len: int = -1,
) -> Union[
Mapping[
str,
Union[
pdarray,
Strings,
SegArray,
Categorical,
DataFrame,
IPv4,
Datetime,
Timedelta,
Index,
],
],
]:
"""
Read datasets from files.
File Type is determined automatically.
Parameters
----------
filenames : list or str
Either a list of filenames or shell expression
datasets : list or str or None
(List of) name(s) of dataset(s) to read (default: all available)
iterative : bool
Iterative (True) or Single (False) function call(s) to server
strictTypes: bool
If True (default), require all dtypes of a given dataset to have the
same precision and sign. If False, allow dtypes of different
precision and sign across different files. For example, if one
file contains a uint32 dataset and another contains an int64
dataset with the same name, the contents of both will be read
into an int64 pdarray.
allow_errors: bool
Default False, if True will allow files with read errors to be skipped
instead of failing. A warning will be included in the return containing
the total number of files skipped due to failure and up to 10 filenames.
calc_string_offsets: bool
Default False, if True this will tell the server to calculate the
offsets/segments array on the server versus loading them from HDF5 files.
In the future this option may be set to True as the default.
column_delim : str
Column delimiter to be used if dataset is CSV. Otherwise, unused.
read_nested: bool
Default True, when True, SegArray objects will be read from the file. When False,
SegArray (or other nested Parquet columns) will be ignored.
Ignored if datasets is not None
Parquet Files only.
has_non_float_nulls: bool
Default False. This flag must be set to True to read non-float parquet columns
that contain null values.
fixed_len: int
Default -1. This value can be set for reading Parquet string columns when the
length of each string is known at runtime. This can allow for skipping byte
calculation, which can have an impact on performance.
Returns
-------
Returns a dictionary of Arkouda pdarrays, Arkouda Strings, or Arkouda Segarrays.
Dictionary of {datasetName: pdarray, String, or SegArray}
Raises
------
RuntimeError
If invalid filetype is detected
See Also
--------
get_datasets, ls, read_parquet, read_hdf
Notes
-----
If filenames is a string, it is interpreted as a shell expression
(a single filename is a valid expression, so it will work) and is
expanded with glob to read all matching files.
If iterative == True each dataset name and file names are passed to
the server as independent sequential strings while if iterative == False
all dataset names and file names are passed to the server in a single
string.
If datasets is None, infer the names of datasets from the first file
and read all of them. Use ``get_datasets`` to show the names of datasets
to HDF5/Parquet files.
CSV files without the Arkouda Header are not supported.
Examples
--------
Read with file Extension
>>> x = ak.read('path/name_prefix.h5') # load HDF5 - processing determines file type not extension
Read without file Extension
>>> x = ak.read('path/name_prefix.parquet') # load Parquet
Read Glob Expression
>>> x = ak.read('path/name_prefix*') # Reads HDF5
"""
if isinstance(filenames, str):
filenames = [filenames]
ftype = get_filetype(filenames)
if ftype.lower() == "hdf5":
return read_hdf(
filenames,
datasets=datasets,
iterative=iterative,
strict_types=strictTypes,
allow_errors=allow_errors,
calc_string_offsets=calc_string_offsets,
)
elif ftype.lower() == "parquet":
return read_parquet(
filenames,
datasets=datasets,
iterative=iterative,
strict_types=strictTypes,
allow_errors=allow_errors,
read_nested=read_nested,
has_non_float_nulls=has_non_float_nulls,
fixed_len=fixed_len,
)
elif ftype.lower() == "csv":
return read_csv(
filenames, datasets=datasets, column_delim=column_delim, allow_errors=allow_errors
)
else:
raise RuntimeError(f"Invalid File Type detected, {ftype}")
[docs]
def read_tagged_data(
filenames: Union[str, List[str]],
datasets: Optional[Union[str, List[str]]] = None,
strictTypes: bool = True,
allow_errors: bool = False,
calc_string_offsets=False,
read_nested: bool = True,
has_non_float_nulls: bool = False,
):
"""
Read datasets from files and tag each record to the file it was read from.
File Type is determined automatically.
Parameters
----------
filenames : list or str
Either a list of filenames or shell expression
datasets : list or str or None
(List of) name(s) of dataset(s) to read (default: all available)
strictTypes: bool
If True (default), require all dtypes of a given dataset to have the
same precision and sign. If False, allow dtypes of different
precision and sign across different files. For example, if one
file contains a uint32 dataset and another contains an int64
dataset with the same name, the contents of both will be read
into an int64 pdarray.
allow_errors: bool
Default False, if True will allow files with read errors to be skipped
instead of failing. A warning will be included in the return containing
the total number of files skipped due to failure and up to 10 filenames.
calc_string_offsets: bool
Default False, if True this will tell the server to calculate the
offsets/segments array on the server versus loading them from HDF5 files.
In the future this option may be set to True as the default.
read_nested: bool
Default True, when True, SegArray objects will be read from the file. When False,
SegArray (or other nested Parquet columns) will be ignored.
Ignored if datasets is not `None`
Parquet Files only.
has_non_float_nulls: bool
Default False. This flag must be set to True to read non-float parquet columns
that contain null values.
Notes
------
Not currently supported for Categorical or GroupBy datasets
Examples
---------
Read files and return data with tagging corresponding to the Categorical returned
cat.codes will link the codes in data to the filename. Data will contain the code `Filename_Codes`
>>> data, cat = ak.read_tagged_data('path/name')
>>> data
{'Filname_Codes': array([0 3 6 9 12]), 'col_name': array([0 0 0 1])}
"""
if isinstance(filenames, str):
filenames = [filenames]
# handle glob expansion
j_str = generic_msg(
cmd="globExpansion",
args={"file_count": len(filenames), "filenames": filenames},
)
file_list = array(json.loads(j_str))
file_cat = Categorical.from_codes(
arange(file_list.size), file_list
) # create a categorical from the ak.Strings representation of the file list
ftype = get_filetype(filenames)
if ftype.lower() == "hdf5":
return (
read_hdf(
filenames,
datasets=datasets,
iterative=False,
strict_types=strictTypes,
allow_errors=allow_errors,
calc_string_offsets=calc_string_offsets,
tag_data=True,
),
file_cat,
)
elif ftype.lower() == "parquet":
return (
read_parquet(
filenames,
datasets=datasets,
iterative=False, # hard-coded because iterative not supported
strict_types=strictTypes,
allow_errors=allow_errors,
tag_data=True,
read_nested=read_nested,
has_non_float_nulls=has_non_float_nulls,
),
file_cat,
)
elif ftype.lower() == "csv":
raise RuntimeError("CSV does not support tagging data with file name associated.")
else:
raise RuntimeError(f"Invalid File Type detected, {ftype}")
[docs]
def snapshot(filename):
"""
Create a snapshot of the current Arkouda namespace. All currently accessible variables containing
Arkouda objects will be written to an HDF5 file.
Unlike other save/load functions, this maintains the integrity of dataframes.
Current Variable names are used as the dataset name when saving.
Parameters
----------
filename: str
Name to use when storing file
Returns
--------
None
See Also
---------
ak.restore
"""
import inspect
from types import ModuleType
from arkouda.dataframe import DataFrame
filename = filename + "_SNAPSHOT"
mode = "TRUNCATE"
callers_local_vars = inspect.currentframe().f_back.f_locals.items()
for name, val in [
(n, v) for n, v in callers_local_vars if not n.startswith("__") and not isinstance(v, ModuleType)
]:
if isinstance(val, (pdarray, Categorical, SegArray, Strings, DataFrame, GroupBy)):
if isinstance(val, DataFrame):
val._to_hdf_snapshot(filename, dataset=name, mode=mode)
else:
val.to_hdf(filename, dataset=name, mode=mode)
mode = "APPEND"
[docs]
def restore(filename):
"""
Return data saved using `ak.snapshot`
Parameters
----------
filename: str
Name used to create snapshot to be read
Returns
--------
Dict
Notes
------
Unlike other save/load methods using snapshot restore will save DataFrames alongside other
objects in HDF5. Thus, they are returned within the dictionary as a dataframe.
"""
restore_files = glob.glob(f"{filename}_SNAPSHOT_LOCALE*")
return read_hdf(sorted(restore_files))
[docs]
def receive(hostname: str, port):
"""
Receive a pdarray sent by `pdarray.transfer()`.
Parameters
----------
hostname : str
The hostname of the pdarray that sent the array
port : int_scalars
The port to send the array over. This needs to be an
open port (i.e., not one that the Arkouda server is
running on). This will open up `numLocales` ports,
each of which in succession, so will use ports of the
range {port..(port+numLocales)} (e.g., running an
Arkouda server of 4 nodes, port 1234 is passed as
`port`, Arkouda will use ports 1234, 1235, 1236,
and 1237 to send the array data).
This port much match the port passed to the call to
`pdarray.transfer()`.
Returns
-------
pdarray
The pdarray sent from the sending server to the current
receiving server.
Raises
------
ValueError
Raised if the op is not within the pdarray.BinOps set
TypeError
Raised if other is not a pdarray or the pdarray.dtype is not
a supported dtype
"""
rep_msg = generic_msg(cmd="receiveArray", args={"hostname": hostname, "port": port})
rep = json.loads(rep_msg)
return _build_objects(rep)
[docs]
def receive_dataframe(hostname: str, port):
"""
Receive a pdarray sent by `dataframe.transfer()`.
Parameters
----------
hostname : str
The hostname of the dataframe that sent the array
port : int_scalars
The port to send the dataframe over. This needs to be an
open port (i.e., not one that the Arkouda server is
running on). This will open up `numLocales` ports,
each of which in succession, so will use ports of the
range {port..(port+numLocales)} (e.g., running an
Arkouda server of 4 nodes, port 1234 is passed as
`port`, Arkouda will use ports 1234, 1235, 1236,
and 1237 to send the array data).
This port much match the port passed to the call to
`pdarray.send_array()`.
Returns
-------
pdarray
The dataframe sent from the sending server to the
current receiving server.
Raises
------
ValueError
Raised if the op is not within the pdarray.BinOps set
TypeError
Raised if other is not a pdarray or the pdarray.dtype is not
a supported dtype
"""
rep_msg = generic_msg(cmd="receiveDataframe", args={"hostname": hostname, "port": port})
rep = json.loads(rep_msg)
return DataFrame(_build_objects(rep))