sandboxed_execute(
task: Task,
meta_to_path: MetaToPath,
wf: WF,
fetch: Fetch,
) -> Asset
Execute a task in a temporary directory, then move its result to the final location
determined by the task's metadata.
Source code in mecfs_bio/build_system/rebuilder/sandboxed_execute.py
| def sandboxed_execute(
task: Task,
meta_to_path: MetaToPath,
wf: WF,
fetch: Fetch,
) -> Asset:
"""
Execute a task in a temporary directory, then move its result to the final location
determined by the task's metadata.
"""
meta = task.meta
target_path = meta_to_path(meta)
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir) / "temp"
temp_path.mkdir(parents=True, exist_ok=True)
result = task.execute(scratch_dir=temp_path, fetch=fetch, wf=wf)
target_path.parent.mkdir(exist_ok=True, parents=True)
result = _move_asset(result, target_path)
logger.debug(f"Saved asset {task.asset_id} to {target_path}.")
return result
|