Skip to content

mecfs_bio.build_system.task.convert_dataframe_to_markdown_task

Prepare results for presentation by converting a dataframe to markdown format.

Classes:

ConvertDataFrameToMarkdownTask

Bases: Task

Task to convert a dataframe to markdown format. Useful for writing up results

Methods:

Attributes:

deps property

deps: list[Task]

meta property

meta: Meta

pipe class-attribute instance-attribute

pipe: DataProcessingPipe = IdentityPipe()

create_from_result_table_task classmethod

create_from_result_table_task(
    source_task: Task,
    asset_id: str,
    pipe: DataProcessingPipe = IdentityPipe(),
)
Source code in mecfs_bio/build_system/task/convert_dataframe_to_markdown_task.py
@classmethod
def create_from_result_table_task(
    cls, source_task: Task, asset_id: str, pipe: DataProcessingPipe = IdentityPipe()
):
    source_meta = source_task.meta
    assert isinstance(source_meta, ResultTableMeta)
    meta = MarkdownFileMeta(
        id=AssetId(asset_id),
        trait=source_meta.trait,
        project=source_meta.project,
        sub_dir=source_meta.sub_dir,
    )
    return cls(meta=meta, df_task=source_task, pipe=pipe)

execute

execute(scratch_dir: Path, fetch: Fetch, wf: WF) -> Asset
Source code in mecfs_bio/build_system/task/convert_dataframe_to_markdown_task.py
def execute(self, scratch_dir: Path, fetch: Fetch, wf: WF) -> Asset:
    source_asset = fetch(self._source_id)
    nw_df = self.pipe.process(
        scan_dataframe_asset(asset=source_asset, meta=self._source_meta)
    )

    df = nw_df.collect().to_pandas()
    df = _array_to_list(df)
    out_path = scratch_dir / "output.md"
    markdown_str = df.to_markdown(index=False)
    with open(out_path, "w") as f:
        f.write(markdown_str)
    return FileAsset(out_path)