Bases: Task
Task to concatenate dataframes in a directory matching a glob pattern
Example use case: when GWAS summary statistics are split over multiple files.
Methods:
Attributes:
path_glob
instance-attribute
read_spec_for_frames
instance-attribute
read_spec_for_frames: DataFrameReadSpec
source_dir_task
instance-attribute
create
classmethod
create(
asset_id: str,
source_dir_task: Task,
path_glob: str,
read_spec_for_frames: DataFrameReadSpec,
)
Source code in mecfs_bio/build_system/task/concat_frames_in_dir_task.py
| @classmethod
def create(
cls,
asset_id: str,
source_dir_task: Task,
path_glob: str,
read_spec_for_frames: DataFrameReadSpec,
):
meta: Meta
source_meta: Meta = source_dir_task.meta
if isinstance(source_meta, ReferenceDataDirectoryMeta):
meta = ReferenceFileMeta(
group=source_meta.group,
sub_group=source_meta.sub_group,
sub_folder=source_meta.sub_folder,
id=AssetId(asset_id),
extension=".parquet",
read_spec=DataFrameReadSpec(DataFrameParquetFormat()),
)
elif isinstance(source_meta, ProcessedGwasDataDirectoryMeta):
meta = FilteredGWASDataMeta(
id=AssetId(asset_id),
trait=source_meta.trait,
project=source_meta.project,
sub_dir=PurePath("processed"),
read_spec=DataFrameReadSpec(DataFrameParquetFormat()),
)
else:
raise NotImplementedError(
f"Handlers for meta {source_meta} are not implemented"
)
return cls(
meta=meta,
source_dir_task=source_dir_task,
path_glob=path_glob,
read_spec_for_frames=read_spec_for_frames,
)
|
execute
execute(scratch_dir: Path, fetch: Fetch, wf: WF) -> Asset
Source code in mecfs_bio/build_system/task/concat_frames_in_dir_task.py
| def execute(self, scratch_dir: Path, fetch: Fetch, wf: WF) -> Asset:
dir_asset = fetch(self.src_id)
assert isinstance(dir_asset, DirectoryAsset)
file_paths = sorted(dir_asset.path.rglob(self.path_glob))
assert len(file_paths) > 0
frames = []
for pth in file_paths:
frames.append(
scan_dataframe(path=pth, spec=self.read_spec_for_frames)
.collect()
.to_pandas()
)
logger.debug(f"loaded frame from {pth}. shape: {frames[-1].shape}")
result: pd.DataFrame = pd.concat(frames, axis=0)
out_path = scratch_dir / "combined_dataframe.parquet"
result.to_parquet(out_path)
return FileAsset(out_path)
|