Skip to content

data_loader

fptools.io.data_loader

Functions:

  • load_data

    Load blocks from tank_path and return a SessionCollection.

  • load_manifest

    Load a manifest file, accepting most common tabular formats.

load_data(tank_path, manifest_path=None, manifest_index='blockname', max_workers=None, locator='auto', preprocess=None, cache=True, cache_dir='cache')

Load blocks from tank_path and return a SessionCollection.

Loading will happen in parallel, split across max_workers worker processes.

For quicker future loading, results may be cached. Caching is controlled by the cache parameter, and the location of cached files is controlled by the cache_dir parameter.

You can specify a manifest (in TSV, CSV or XLSX formats) containing additional metadata to be injected into the loaded data. This manifest should have at minimum one column with header blockname containing each block's name. You may include any other arbitrary data columns you may wish. One special column name is exclude which should contain boolean True or False. If a block is marked with True in this column, then the block will not be loaded or returned in the resulting SessionCollection.

You can also specify a preprocess routine to be applied to each block prior to being returned via the preprocess parameter. This should be a callable taking a Session as the first and only parameter. Your callable preprocess routine should attach any data to the passed Session object and return this Session object as it's sole return value. For example preprocessing routines, please see the implementations in the tdt.preprocess.pipelines module.

Parameters:

  • tank_path (str) –

    path that will be recursively searched for blocks

  • manifest_path (Optional[str], default: None ) –

    if provided, path to metadata in a tabular format, indexed with blockname. See above for more details

  • manifest_index (str, default: 'blockname' ) –

    the name of the column to be set as the manifest DataFrame index.

  • max_workers (Optional[int], default: None ) –

    number of workers in the process pool for loading blocks. If None, defaults to the number of CPUs on the machine.

  • locator (Union[Literal['auto', 'tdt', 'tdt_dlc', 'ma'], DataLocator], default: 'auto' ) –

    locator to use for finding data on tank_path

  • preprocess (Optional[Processor], default: None ) –

    preprocess routine to run on the data. See above for more details.

  • cache (bool, default: True ) –

    If True, results will be cached for future use, or results will be loaded from the cache.

  • cache_dir (str, default: 'cache' ) –

    path to the cache

Returns:

Source code in fptools/io/data_loader.py
def load_data(
    tank_path: str,
    manifest_path: Optional[str] = None,
    manifest_index: str = "blockname",
    max_workers: Optional[int] = None,
    locator: Union[Literal["auto", "tdt", "tdt_dlc", "ma"], DataLocator] = "auto",
    preprocess: Optional[Processor] = None,
    cache: bool = True,
    cache_dir: str = "cache",
) -> SessionCollection:
    """Load blocks from `tank_path` and return a `SessionCollection`.

    Loading will happen in parallel, split across `max_workers` worker processes.

    For quicker future loading, results may be cached. Caching is controlled by the `cache` parameter, and the location of cached
    files is controlled by the `cache_dir` parameter.

    You can specify a manifest (in TSV, CSV or XLSX formats) containing additional metadata to be injected into the loaded data.
    This manifest should have at minimum one column with header `blockname` containing each block's name. You may include any other arbitrary
    data columns you may wish. One special column name is `exclude` which should contain boolean `True` or `False`. If a block
    is marked with `True` in this column, then the block will not be loaded or returned in the resulting `SessionCollection`.

    You can also specify a preprocess routine to be applied to each block prior to being returned via the `preprocess` parameter. This
    should be a callable taking a `Session` as the first and only parameter. Your callable preprocess routine should attach any data to the passed
    `Session` object and return this `Session` object as it's sole return value.
    For example preprocessing routines, please see the implementations in the `tdt.preprocess.pipelines` module.

    Args:
        tank_path: path that will be recursively searched for blocks
        manifest_path: if provided, path to metadata in a tabular format, indexed with `blockname`. See above for more details
        manifest_index: the name of the column to be set as the manifest DataFrame index.
        max_workers: number of workers in the process pool for loading blocks. If None, defaults to the number of CPUs on the machine.
        locator: locator to use for finding data on `tank_path`
        preprocess: preprocess routine to run on the data. See above for more details.
        cache: If `True`, results will be cached for future use, or results will be loaded from the cache.
        cache_dir: path to the cache

    Returns:
        `SessionCollection` containing loaded data
    """
    has_manifest = False
    if manifest_path is not None:
        manifest = load_manifest(manifest_path, index=manifest_index)
        has_manifest = True

    # if caching is enabled, make sure the cache directory exists
    if cache:
        os.makedirs(cache_dir, exist_ok=True)

    # create a collection to hold the loaded sessions
    sessions = SessionCollection()

    futures: dict[Future[Session], str] = {}
    context = multiprocessing.get_context("spawn")
    max_tasks_per_child = 1
    with ProcessPoolExecutor(max_workers=max_workers, mp_context=context, max_tasks_per_child=max_tasks_per_child) as executor:
        # collect common worker args in one place
        worker_args = {"preprocess": preprocess, "cache": cache, "cache_dir": cache_dir}

        # iterate over all datasets found by the locator
        for dset in _get_locator(locator)(tank_path):

            # check if we were given a manifest. If so, try to load metadata from the manifest
            # also perform some sanity checks along the way, and check some special flags (ex `exclude`)
            if has_manifest:
                try:
                    block_meta = manifest.loc[dset.name].to_dict()
                    dset.metadata.update(block_meta)
                except KeyError:
                    # this block is not listed in the manifest! Err on the side of caution and exclude the block
                    tqdm.write(f'WARNING: Excluding block "{dset.name}" because it is not listed in the manifest!!')
                    continue

                # possibly exclude the block, if flagged in the manifest
                if "exclude" in block_meta and block_meta["exclude"]:
                    tqdm.write(f'Excluding block "{dset.name}" due to manifest exclude flag')
                    continue

            # submit the task to the pool
            f = executor.submit(_load, dset, preprocess=preprocess, cache=cache, cache_dir=cache_dir)
            futures[f] = dset.name

        # collect completed tasks
        for f in tqdm(as_completed(futures), total=len(futures)):
            try:
                sessions.append(f.result())
            except Exception as e:
                tqdm.write(
                    f'Encountered problem loading data at "{futures[f]}":\n{traceback.format_exc()}\nData will be omitted from the final collection!\n'
                )
                pass

    return sessions

load_manifest(path, index=None)

Load a manifest file, accepting most common tabular formats.

*.tsv (tab-separated values), *.csv (comma-separated values), or *.xlsx (Excel) file extensions are supported.

Optionally index the dataframe with one of the loaded columns.

Parameters:

  • path (str) –

    path to the file to load

  • index (Optional[str], default: None ) –

    if not None, set this column to be the index of the DataFrame

Returns:

  • DataFrame

    pandas.DataFrame containing the manifest data.

Source code in fptools/io/data_loader.py
def load_manifest(path: str, index: Optional[str] = None) -> pd.DataFrame:
    """Load a manifest file, accepting most common tabular formats.

    *.tsv (tab-separated values), *.csv (comma-separated values), or *.xlsx (Excel) file extensions are supported.

    Optionally index the dataframe with one of the loaded columns.

    Args:
        path: path to the file to load
        index: if not None, set this column to be the index of the DataFrame

    Returns:
        pandas.DataFrame containing the manifest data.
    """
    ext = os.path.splitext(path)[1]

    df: pd.DataFrame
    if ext == ".tsv":
        df = pd.read_csv(path, sep="\t")
    elif ext == ".csv":
        df = pd.read_csv(path)
    elif ext == ".xlsx":
        df = pd.read_excel(path)
    else:
        raise ValueError(
            f'Did not understand manifest format. Supported file extensions are *.tsv (tab-separated), *.csv (comma-separated), or *.xlsx (Excel), but got "{ext}"'
        )

    if index is not None:
        if index in df.columns:
            df.set_index(index, inplace=True)
        else:
            raise ValueError(f"Cannot set manifest index to column {index}; available columns: {df.columns.values}")

    return df