Skip to content

mecfs_bio.build_system.task.discard_deps_task_wrapper

Save disk space materializing dependencies of the wrapped task in a temporary directory.

Classes:

  • DiscardDepsWrapper

    Save disk space materializing dependencies of the wrapped task in a temporary directory.

DiscardDepsWrapper

Bases: Task

Save disk space materializing dependencies of the wrapped task in a temporary directory.

Methods:

Attributes:

deps property

deps: list[Task]

meta property

meta: Meta

execute

execute(scratch_dir: Path, fetch: Fetch, wf: WF) -> Asset
Source code in mecfs_bio/build_system/task/discard_deps_task_wrapper.py
def execute(self, scratch_dir: Path, fetch: Fetch, wf: WF) -> Asset:
    out_path = scratch_dir / "out"
    with tempfile.TemporaryDirectory() as temp_dir:
        temp_path = Path(temp_dir)
        info_store = temp_path / "info_store.yaml"

        asset_root = temp_path / "asset_store"
        runner = SimpleRunner(
            tracer=ImoHasher.with_xxhash_128(),
            info_store=info_store,
            asset_root=asset_root,
        )
        result = runner.run([self._inner])
        result_asset = result[self._inner.asset_id]
        if isinstance(result_asset, FileAsset):
            is_file = True
        else:
            is_file = False
            assert isinstance(result_asset, DirectoryAsset)
        result_path = result_asset.path
        result_path.rename(out_path)
    if is_file:
        return FileAsset(out_path)
    return DirectoryAsset(out_path)