Skip to content

mecfs_bio.build_system.task.pipe_dataframe_task

Task to transform a dataframe asset using a DataProcessingPipes.

Classes:

Functions:

Attributes:

OutFormat module-attribute

OutFormat = ParquetOutFormat | CSVOutFormat

CSVOutFormat

Attributes:

sep instance-attribute

sep: str

ParquetOutFormat

PipeDataFrameTask

Bases: Task

Methods:

Attributes:

backend class-attribute instance-attribute

backend: ValidBackend = 'ibis'

deps property

deps: list[Task]

meta property

meta: Meta

out_format instance-attribute

out_format: OutFormat

pipes instance-attribute

pipes: Sequence[DataProcessingPipe]

source_data_task instance-attribute

source_data_task: Task

create classmethod

create(
    source_task: Task,
    asset_id: str,
    out_format: OutFormat,
    pipes: Sequence[DataProcessingPipe],
    backend: ValidBackend = "ibis",
) -> PipeDataFrameTask
Source code in mecfs_bio/build_system/task/pipe_dataframe_task.py
@classmethod
def create(
    cls,
    source_task: Task,
    asset_id: str,
    out_format: OutFormat,
    pipes: Sequence[DataProcessingPipe],
    backend: ValidBackend = "ibis",
) -> "PipeDataFrameTask":
    source_meta = source_task.meta
    extension, read_spec = get_extension_and_read_spec_from_format(
        out_format=out_format
    )
    meta: Meta
    if isinstance(source_meta, ReferenceFileMeta):
        meta = ReferenceFileMeta(
            group=source_meta.group,
            sub_group=source_meta.sub_group,
            sub_folder=PurePath("processed"),
            id=AssetId(asset_id),
            extension=extension,
            read_spec=read_spec,
        )
    elif isinstance(source_meta, GWASSummaryDataFileMeta):
        meta = FilteredGWASDataMeta(
            id=AssetId(asset_id),
            trait=source_meta.trait,
            project=source_meta.project,
            sub_dir=PurePath("processed"),
            read_spec=read_spec,
        )
    elif isinstance(source_meta, ResultTableMeta):
        meta = ResultTableMeta(
            id=AssetId(asset_id),
            trait=source_meta.trait,
            project=source_meta.project,
            extension=extension,
            read_spec=read_spec,
        )
    elif isinstance(source_meta, FilteredGWASDataMeta):
        meta = FilteredGWASDataMeta(
            id=AssetId(asset_id),
            trait=source_meta.trait,
            project=source_meta.project,
            sub_dir=source_meta.sub_dir,
            read_spec=read_spec,
        )
    else:
        raise ValueError("unknown source meta")

    return cls(
        source_data_task=source_task,
        pipes=list(pipes),
        meta=meta,
        out_format=out_format,
        backend=backend,
    )

execute

execute(scratch_dir: Path, fetch: Fetch, wf: WF) -> Asset
Source code in mecfs_bio/build_system/task/pipe_dataframe_task.py
def execute(self, scratch_dir: Path, fetch: Fetch, wf: WF) -> Asset:
    asset = fetch(self._source_id)
    df = scan_dataframe_asset(
        asset=asset, meta=self._source_meta, parquet_backend=self.backend
    )
    out_path = scratch_dir / "out_dataframe"
    for pipe in self.pipes:
        df = pipe.process(df)
    if isinstance(self.out_format, CSVOutFormat):
        df.collect().to_pandas().to_csv(
            out_path, index=False, sep=self.out_format.sep
        )
    elif isinstance(self.out_format, ParquetOutFormat):
        df.sink_parquet(out_path)
    return FileAsset(out_path)

get_extension_and_read_spec_from_format

get_extension_and_read_spec_from_format(
    out_format: OutFormat,
) -> tuple[str, DataFrameReadSpec]
Source code in mecfs_bio/build_system/task/pipe_dataframe_task.py
def get_extension_and_read_spec_from_format(
    out_format: OutFormat,
) -> tuple[str, DataFrameReadSpec]:
    if isinstance(out_format, CSVOutFormat):
        read_spec = DataFrameReadSpec(DataFrameTextFormat(separator=out_format.sep))
        if out_format.sep == "\t":
            extension = ".tsv"
        elif out_format.sep == ",":
            extension = ".csv"
        else:
            raise ValueError("Unknown sep")
    elif isinstance(out_format, ParquetOutFormat):
        read_spec = DataFrameReadSpec(DataFrameParquetFormat())
        extension = ".parquet"
    else:
        raise ValueError(f"Unknown format {out_format}")
    return extension, read_spec