data_loader
fptools.io.data_loader
¶
Functions:
-
load_data
–Load blocks from
tank_path
and return aSessionCollection
. -
load_manifest
–Load a manifest file, accepting most common tabular formats.
load_data(tank_path, manifest_path=None, manifest_index='blockname', max_workers=None, locator='auto', preprocess=None, cache=True, cache_dir='cache')
¶
Load blocks from tank_path
and return a SessionCollection
.
Loading will happen in parallel, split across max_workers
worker processes.
For quicker future loading, results may be cached. Caching is controlled by the cache
parameter, and the location of cached
files is controlled by the cache_dir
parameter.
You can specify a manifest (in TSV, CSV or XLSX formats) containing additional metadata to be injected into the loaded data.
This manifest should have at minimum one column with header blockname
containing each block's name. You may include any other arbitrary
data columns you may wish. One special column name is exclude
which should contain boolean True
or False
. If a block
is marked with True
in this column, then the block will not be loaded or returned in the resulting SessionCollection
.
You can also specify a preprocess routine to be applied to each block prior to being returned via the preprocess
parameter. This
should be a callable taking a Session
as the first and only parameter. Your callable preprocess routine should attach any data to the passed
Session
object and return this Session
object as it's sole return value.
For example preprocessing routines, please see the implementations in the tdt.preprocess.pipelines
module.
Parameters:
-
tank_path
(str
) –path that will be recursively searched for blocks
-
manifest_path
(Optional[str]
, default:None
) –if provided, path to metadata in a tabular format, indexed with
blockname
. See above for more details -
manifest_index
(str
, default:'blockname'
) –the name of the column to be set as the manifest DataFrame index.
-
max_workers
(Optional[int]
, default:None
) –number of workers in the process pool for loading blocks. If None, defaults to the number of CPUs on the machine.
-
locator
(Union[Literal['auto', 'tdt', 'tdt_dlc', 'ma'], DataLocator]
, default:'auto'
) –locator to use for finding data on
tank_path
-
preprocess
(Optional[Processor]
, default:None
) –preprocess routine to run on the data. See above for more details.
-
cache
(bool
, default:True
) –If
True
, results will be cached for future use, or results will be loaded from the cache. -
cache_dir
(str
, default:'cache'
) –path to the cache
Returns:
-
SessionCollection
–SessionCollection
containing loaded data
Source code in fptools/io/data_loader.py
def load_data(
tank_path: str,
manifest_path: Optional[str] = None,
manifest_index: str = "blockname",
max_workers: Optional[int] = None,
locator: Union[Literal["auto", "tdt", "tdt_dlc", "ma"], DataLocator] = "auto",
preprocess: Optional[Processor] = None,
cache: bool = True,
cache_dir: str = "cache",
) -> SessionCollection:
"""Load blocks from `tank_path` and return a `SessionCollection`.
Loading will happen in parallel, split across `max_workers` worker processes.
For quicker future loading, results may be cached. Caching is controlled by the `cache` parameter, and the location of cached
files is controlled by the `cache_dir` parameter.
You can specify a manifest (in TSV, CSV or XLSX formats) containing additional metadata to be injected into the loaded data.
This manifest should have at minimum one column with header `blockname` containing each block's name. You may include any other arbitrary
data columns you may wish. One special column name is `exclude` which should contain boolean `True` or `False`. If a block
is marked with `True` in this column, then the block will not be loaded or returned in the resulting `SessionCollection`.
You can also specify a preprocess routine to be applied to each block prior to being returned via the `preprocess` parameter. This
should be a callable taking a `Session` as the first and only parameter. Your callable preprocess routine should attach any data to the passed
`Session` object and return this `Session` object as it's sole return value.
For example preprocessing routines, please see the implementations in the `tdt.preprocess.pipelines` module.
Args:
tank_path: path that will be recursively searched for blocks
manifest_path: if provided, path to metadata in a tabular format, indexed with `blockname`. See above for more details
manifest_index: the name of the column to be set as the manifest DataFrame index.
max_workers: number of workers in the process pool for loading blocks. If None, defaults to the number of CPUs on the machine.
locator: locator to use for finding data on `tank_path`
preprocess: preprocess routine to run on the data. See above for more details.
cache: If `True`, results will be cached for future use, or results will be loaded from the cache.
cache_dir: path to the cache
Returns:
`SessionCollection` containing loaded data
"""
has_manifest = False
if manifest_path is not None:
manifest = load_manifest(manifest_path, index=manifest_index)
has_manifest = True
# if caching is enabled, make sure the cache directory exists
if cache:
os.makedirs(cache_dir, exist_ok=True)
# create a collection to hold the loaded sessions
sessions = SessionCollection()
futures: dict[Future[Session], str] = {}
context = multiprocessing.get_context("spawn")
max_tasks_per_child = 1
with ProcessPoolExecutor(max_workers=max_workers, mp_context=context, max_tasks_per_child=max_tasks_per_child) as executor:
# collect common worker args in one place
worker_args = {"preprocess": preprocess, "cache": cache, "cache_dir": cache_dir}
# iterate over all datasets found by the locator
for dset in _get_locator(locator)(tank_path):
# check if we were given a manifest. If so, try to load metadata from the manifest
# also perform some sanity checks along the way, and check some special flags (ex `exclude`)
if has_manifest:
try:
block_meta = manifest.loc[dset.name].to_dict()
dset.metadata.update(block_meta)
except KeyError:
# this block is not listed in the manifest! Err on the side of caution and exclude the block
tqdm.write(f'WARNING: Excluding block "{dset.name}" because it is not listed in the manifest!!')
continue
# possibly exclude the block, if flagged in the manifest
if "exclude" in block_meta and block_meta["exclude"]:
tqdm.write(f'Excluding block "{dset.name}" due to manifest exclude flag')
continue
# submit the task to the pool
f = executor.submit(_load, dset, preprocess=preprocess, cache=cache, cache_dir=cache_dir)
futures[f] = dset.name
# collect completed tasks
for f in tqdm(as_completed(futures), total=len(futures)):
try:
sessions.append(f.result())
except Exception as e:
tqdm.write(
f'Encountered problem loading data at "{futures[f]}":\n{traceback.format_exc()}\nData will be omitted from the final collection!\n'
)
pass
return sessions
load_manifest(path, index=None)
¶
Load a manifest file, accepting most common tabular formats.
*.tsv (tab-separated values), *.csv (comma-separated values), or *.xlsx (Excel) file extensions are supported.
Optionally index the dataframe with one of the loaded columns.
Parameters:
-
path
(str
) –path to the file to load
-
index
(Optional[str]
, default:None
) –if not None, set this column to be the index of the DataFrame
Returns:
-
DataFrame
–pandas.DataFrame containing the manifest data.
Source code in fptools/io/data_loader.py
def load_manifest(path: str, index: Optional[str] = None) -> pd.DataFrame:
"""Load a manifest file, accepting most common tabular formats.
*.tsv (tab-separated values), *.csv (comma-separated values), or *.xlsx (Excel) file extensions are supported.
Optionally index the dataframe with one of the loaded columns.
Args:
path: path to the file to load
index: if not None, set this column to be the index of the DataFrame
Returns:
pandas.DataFrame containing the manifest data.
"""
ext = os.path.splitext(path)[1]
df: pd.DataFrame
if ext == ".tsv":
df = pd.read_csv(path, sep="\t")
elif ext == ".csv":
df = pd.read_csv(path)
elif ext == ".xlsx":
df = pd.read_excel(path)
else:
raise ValueError(
f'Did not understand manifest format. Supported file extensions are *.tsv (tab-separated), *.csv (comma-separated), or *.xlsx (Excel), but got "{ext}"'
)
if index is not None:
if index in df.columns:
df.set_index(index, inplace=True)
else:
raise ValueError(f"Cannot set manifest index to column {index}; available columns: {df.columns.values}")
return df