Bases: Task
Task to extract the contents of a (possibly gzipped) tar file to a target directory
Set subdir_name to extract only the contents of one subfolder within the tar file
read_mode: use r to only untar, and not ungzip.
Methods:
Attributes:
create(
asset_id: str,
source_task: Task,
sub_folder: PurePath = PurePath("extracted"),
sub_folder_name_inside_tar: str | None = None,
read_mode: ReadMode = "r:gz",
) -> ExtractTarGzipTask
Source code in mecfs_bio/build_system/task/extract_tar_gzip_task.py
| @classmethod
def create(
cls,
asset_id: str,
source_task: Task,
sub_folder: PurePath = PurePath("extracted"),
sub_folder_name_inside_tar: str | None = None,
read_mode: ReadMode = "r:gz",
) -> "ExtractTarGzipTask":
source_meta = source_task.meta
if isinstance(source_meta, ReferenceFileMeta):
return cls(
meta=ReferenceDataDirectoryMeta(
group=source_meta.group,
sub_group=source_meta.sub_group,
sub_folder=sub_folder,
id=AssetId(asset_id),
),
source_file_task=source_task,
subdir_name=sub_folder_name_inside_tar,
read_mode=read_mode,
)
if isinstance(source_meta, GWASSummaryDataFileMeta):
return cls(
meta=ProcessedGwasDataDirectoryMeta(
id=AssetId(asset_id),
trait=source_meta.trait,
project=source_meta.project,
sub_dir=sub_folder,
),
source_file_task=source_task,
subdir_name=sub_folder_name_inside_tar,
read_mode=read_mode,
)
raise NotImplementedError(f"Handler for meta {source_meta} not implemented")
|
execute(
scratch_dir: Path, fetch: Fetch, wf: WF
) -> DirectoryAsset
Source code in mecfs_bio/build_system/task/extract_tar_gzip_task.py
| def execute(self, scratch_dir: Path, fetch: Fetch, wf: WF) -> DirectoryAsset:
source_asset = fetch(self._source_asset_id)
assert isinstance(source_asset, FileAsset)
src_path = source_asset.path
logger.debug(f"Extracting from tar/gzip file : {self._source_asset_id}...")
with tarfile.open(src_path, self._read_mode) as tar_object:
if self._subdir_name is None:
tar_object.extractall(scratch_dir)
else:
with tempfile.TemporaryDirectory() as tmpdir_name:
tmpdir_path = Path(tmpdir_name)
for member in tar_object.getmembers():
if member.name.startswith(self._subdir_name):
tar_object.extract(member=member, path=tmpdir_path)
(tmpdir_path / self._subdir_name).rename(scratch_dir)
logger.debug("Extraction complete.")
return DirectoryAsset(scratch_dir)
|