Skip to content

mecfs_bio.build_system.task.concat_frames_in_dir_task

Concatenate dataframes in a DirectoryAsset to create a single FileAsset.

Classes:

Attributes:

logger module-attribute

logger = getLogger()

ConcatFramesInDirTask

Bases: Task

Task to concatenate dataframes in a directory matching a glob pattern

Example use case: when GWAS summary statistics are split over multiple files.

Methods:

Attributes:

deps property

deps: list[Task]

meta property

meta: Meta

path_glob instance-attribute

path_glob: str

read_spec_for_frames instance-attribute

read_spec_for_frames: DataFrameReadSpec

source_dir_task instance-attribute

source_dir_task: Task

src_id property

src_id: AssetId

create classmethod

create(
    asset_id: str,
    source_dir_task: Task,
    path_glob: str,
    read_spec_for_frames: DataFrameReadSpec,
)
Source code in mecfs_bio/build_system/task/concat_frames_in_dir_task.py
@classmethod
def create(
    cls,
    asset_id: str,
    source_dir_task: Task,
    path_glob: str,
    read_spec_for_frames: DataFrameReadSpec,
):
    meta: Meta
    source_meta: Meta = source_dir_task.meta
    if isinstance(source_meta, ReferenceDataDirectoryMeta):
        meta = ReferenceFileMeta(
            group=source_meta.group,
            sub_group=source_meta.sub_group,
            sub_folder=source_meta.sub_folder,
            id=AssetId(asset_id),
            extension=".parquet",
            read_spec=DataFrameReadSpec(DataFrameParquetFormat()),
        )
    elif isinstance(source_meta, ProcessedGwasDataDirectoryMeta):
        meta = FilteredGWASDataMeta(
            id=AssetId(asset_id),
            trait=source_meta.trait,
            project=source_meta.project,
            sub_dir=PurePath("processed"),
            read_spec=DataFrameReadSpec(DataFrameParquetFormat()),
        )
    else:
        raise NotImplementedError(
            f"Handlers for meta {source_meta} are not implemented"
        )
    return cls(
        meta=meta,
        source_dir_task=source_dir_task,
        path_glob=path_glob,
        read_spec_for_frames=read_spec_for_frames,
    )

execute

execute(scratch_dir: Path, fetch: Fetch, wf: WF) -> Asset
Source code in mecfs_bio/build_system/task/concat_frames_in_dir_task.py
def execute(self, scratch_dir: Path, fetch: Fetch, wf: WF) -> Asset:
    dir_asset = fetch(self.src_id)
    assert isinstance(dir_asset, DirectoryAsset)
    file_paths = sorted(dir_asset.path.rglob(self.path_glob))
    assert len(file_paths) > 0
    frames = []
    for pth in file_paths:
        frames.append(
            scan_dataframe(path=pth, spec=self.read_spec_for_frames)
            .collect()
            .to_pandas()
        )
        logger.debug(f"loaded frame from {pth}. shape: {frames[-1].shape}")
    result: pd.DataFrame = pd.concat(frames, axis=0)
    out_path = scratch_dir / "combined_dataframe.parquet"
    result.to_parquet(out_path)
    return FileAsset(out_path)