Skip to content

mecfs_bio.build_system.task.join_dataframes_task

Task to perform a SQL-style join.

Classes:

  • JoinDataFramesTask

    Task to load to dataframes, join them, and then write out the resulting dataframe.

Attributes:

logger module-attribute

logger = get_logger()

JoinDataFramesTask

Bases: Task

Task to load to dataframes, join them, and then write out the resulting dataframe.

By default, writes as csv.

Methods:

Attributes:

backend class-attribute instance-attribute

backend: ValidBackend = 'polars'

deps property

deps: list[Task]

df_1_pipe class-attribute instance-attribute

df_1_pipe: DataProcessingPipe = IdentityPipe()

df_2_pipe class-attribute instance-attribute

df_2_pipe: DataProcessingPipe = IdentityPipe()

how instance-attribute

how: JoinStrategy

left_on instance-attribute

left_on: Sequence[str]

meta property

meta: Meta

out_format class-attribute instance-attribute

out_format: OutFormat = CSVOutFormat(sep=',')

out_pipe class-attribute instance-attribute

out_pipe: DataProcessingPipe = IdentityPipe()

right_on instance-attribute

right_on: Sequence[str]

create_from_result_df classmethod

create_from_result_df(
    asset_id: str,
    result_df_task: Task,
    reference_df_task,
    how: JoinStrategy,
    left_on: Sequence[str],
    right_on: Sequence[str],
    out_format: OutFormat = CSVOutFormat(sep=","),
    df_1_pipe: DataProcessingPipe = IdentityPipe(),
    df_2_pipe: DataProcessingPipe = IdentityPipe(),
    out_pipe: DataProcessingPipe = IdentityPipe(),
    backend: ValidBackend = "polars",
)

Join a result dataframe to a reference dataframe.

Source code in mecfs_bio/build_system/task/join_dataframes_task.py
@classmethod
def create_from_result_df(
    cls,
    asset_id: str,
    result_df_task: Task,
    reference_df_task,
    how: JoinStrategy,
    left_on: Sequence[str],
    right_on: Sequence[str],
    out_format: OutFormat = CSVOutFormat(sep=","),
    df_1_pipe: DataProcessingPipe = IdentityPipe(),
    df_2_pipe: DataProcessingPipe = IdentityPipe(),
    out_pipe: DataProcessingPipe = IdentityPipe(),
    backend: ValidBackend = "polars",
):
    """
    Join a result dataframe to a reference dataframe.
    """
    extension, read_spec = get_extension_and_read_spec_from_format(
        out_format=out_format
    )
    source_meta = result_df_task.meta
    meta: Meta
    if isinstance(source_meta, ResultTableMeta):
        meta = ResultTableMeta(
            id=AssetId(asset_id),
            trait=source_meta.trait,
            project=source_meta.project,
            extension=extension,
            read_spec=read_spec,
        )
    elif isinstance(source_meta, FilteredGWASDataMeta):
        meta = FilteredGWASDataMeta(
            id=AssetId(asset_id),
            trait=source_meta.trait,
            project=source_meta.project,
            extension=extension,
            read_spec=read_spec,
            sub_dir=source_meta.sub_dir,
        )
    elif isinstance(source_meta, GWASLabLeadVariantsMeta):
        meta = ResultTableMeta(
            id=AssetId(asset_id),
            trait=source_meta.trait,
            project=source_meta.project,
            extension=extension,
            read_spec=read_spec,
            sub_dir=source_meta.sub_dir,
        )
    elif isinstance(source_meta, ReferenceFileMeta):
        meta = ReferenceFileMeta(
            id=AssetId(asset_id),
            group=source_meta.group,
            sub_group=source_meta.sub_group,
            sub_folder=source_meta.sub_folder,
            extension=extension,
            read_spec=read_spec,
        )
    else:
        raise ValueError(f"Source meta has unknown type: {source_meta}")
    return cls(
        df_1_task=result_df_task,
        df_2_task=reference_df_task,
        how=how,
        left_on=left_on,
        right_on=right_on,
        meta=meta,
        df_1_pipe=df_1_pipe,
        df_2_pipe=df_2_pipe,
        out_format=out_format,
        backend=backend,
        out_pipe=out_pipe,
    )

execute

execute(scratch_dir: Path, fetch: Fetch, wf: WF) -> Asset
Source code in mecfs_bio/build_system/task/join_dataframes_task.py
def execute(self, scratch_dir: Path, fetch: Fetch, wf: WF) -> Asset:
    result_path = scratch_dir / "result.csv"
    asset_1 = fetch(self._df_1_id)
    df_1 = scan_dataframe_asset(
        asset_1, meta=self._df_1_meta, parquet_backend=self.backend
    )
    df_1 = self.df_1_pipe.process(df_1)
    asset_2 = fetch(self._df_12_id)
    df_2 = scan_dataframe_asset(
        asset_2, meta=self._df_2_meta, parquet_backend=self.backend
    )
    df_2 = self.df_2_pipe.process(df_2)
    joined = df_1.join(
        df_2, how=self.how, left_on=list(self.left_on), right_on=list(self.right_on)
    )
    joined = self.out_pipe.process(joined)
    if isinstance(self.out_format, CSVOutFormat):
        result = joined.collect().to_pandas()
        result.to_csv(result_path, index=False, sep=self.out_format.sep)
    elif isinstance(self.out_format, ParquetOutFormat):
        joined.sink_parquet(result_path)
    return FileAsset(result_path)