Skip to content

mecfs_bio.build_system.task.magma.magma_snp_location_task

Classes:

MagmaSNPFileTask

Bases: Task

Methods:

Attributes:

deps property

deps: list[Task]

extra_columns_to_output instance-attribute

extra_columns_to_output: list[str]

gwas_parquet_with_rsid_task instance-attribute

gwas_parquet_with_rsid_task: Task

meta property

meta: Meta

pipes instance-attribute

pipes: Sequence[DataProcessingPipe]

source_id property

source_id: AssetId

source_meta property

source_meta: Meta

create_for_magma_snp_p_value_file classmethod

create_for_magma_snp_p_value_file(
    gwas_parquet_with_rsids_task: Task,
    asset_id: str,
    pipes: list[DataProcessingPipe] | None = None,
)
Source code in mecfs_bio/build_system/task/magma/magma_snp_location_task.py
@classmethod
def create_for_magma_snp_p_value_file(
    cls,
    gwas_parquet_with_rsids_task: Task,
    asset_id: str,
    pipes: list[DataProcessingPipe] | None = None,
):
    if pipes is None:
        pipes = []
    pipes.append(ComputePPipe())
    extra_cols = [GWASLAB_P_COL]
    source_meta = gwas_parquet_with_rsids_task.meta
    meta = create_new_meta(
        source_meta,
        asset_id=asset_id,
        format=DataFrameTextFormat(
            separator=" ",
            has_header=False,
            column_names=[GWASLAB_RSID_COL] + extra_cols,
        ),
        extension=".id.p.txt",
    )
    return cls(
        meta=meta,
        gwas_parquet_with_rsid_task=gwas_parquet_with_rsids_task,
        extra_columns_to_output=extra_cols,
        pipes=pipes,
    )

create_for_magma_snp_p_value_file_compute_if_needed classmethod

create_for_magma_snp_p_value_file_compute_if_needed(
    gwas_parquet_with_rsids_task: Task,
    asset_id: str,
    pipes: list[DataProcessingPipe] | None = None,
)
Source code in mecfs_bio/build_system/task/magma/magma_snp_location_task.py
@classmethod
def create_for_magma_snp_p_value_file_compute_if_needed(
    cls,
    gwas_parquet_with_rsids_task: Task,
    asset_id: str,
    pipes: list[DataProcessingPipe] | None = None,
):
    if pipes is None:
        pipes = []
    pipes.append(ComputePIfNeededPipe())
    extra_cols = [GWASLAB_P_COL]
    source_meta = gwas_parquet_with_rsids_task.meta
    meta = create_new_meta(
        source_meta,
        asset_id=asset_id,
        format=DataFrameTextFormat(
            separator=" ",
            has_header=False,
            column_names=[GWASLAB_RSID_COL] + extra_cols,
        ),
        extension=".id.p.txt",
    )
    return cls(
        meta=meta,
        gwas_parquet_with_rsid_task=gwas_parquet_with_rsids_task,
        extra_columns_to_output=extra_cols,
        pipes=pipes,
    )

create_for_magma_snp_p_value_file_precomputed_p classmethod

create_for_magma_snp_p_value_file_precomputed_p(
    gwas_parquet_with_rsids_task: Task, asset_id: str
)

As above, but assume the data already contains p values.

Source code in mecfs_bio/build_system/task/magma/magma_snp_location_task.py
@classmethod
def create_for_magma_snp_p_value_file_precomputed_p(
    cls,
    gwas_parquet_with_rsids_task: Task,
    asset_id: str,
):
    """
    As above, but assume the data already contains p values.
    """
    extra_cols = [GWASLAB_P_COL]
    source_meta = gwas_parquet_with_rsids_task.meta
    meta = create_new_meta(
        source_meta,
        asset_id=asset_id,
        format=DataFrameTextFormat(
            separator=" ",
            has_header=False,
            column_names=[GWASLAB_RSID_COL] + extra_cols,
        ),
        extension=".id.p.txt",
    )
    return cls(
        meta=meta,
        gwas_parquet_with_rsid_task=gwas_parquet_with_rsids_task,
        extra_columns_to_output=extra_cols,
        pipes=[],
    )

create_for_magma_snp_pos_file classmethod

create_for_magma_snp_pos_file(
    gwas_parquet_with_rsids_task: Task,
    asset_id: str,
    pipes: list[DataProcessingPipe] | None = None,
)
Source code in mecfs_bio/build_system/task/magma/magma_snp_location_task.py
@classmethod
def create_for_magma_snp_pos_file(
    cls,
    gwas_parquet_with_rsids_task: Task,
    asset_id: str,
    pipes: list[DataProcessingPipe] | None = None,
):
    if pipes is None:
        pipes = []
    extra_cols = [GWASLAB_CHROM_COL, GWASLAB_POS_COL]
    source_meta = gwas_parquet_with_rsids_task.meta
    meta = create_new_meta(
        source_meta,
        asset_id=asset_id,
        format=DataFrameTextFormat(
            separator=" ",
            has_header=False,
            column_names=[GWASLAB_RSID_COL] + extra_cols,
        ),
        extension=".id.chr.genpos.txt",
    )
    return cls(
        meta=meta,
        gwas_parquet_with_rsid_task=gwas_parquet_with_rsids_task,
        extra_columns_to_output=extra_cols,
        pipes=pipes,
    )

execute

execute(scratch_dir: Path, fetch: Fetch, wf: WF) -> Asset
Source code in mecfs_bio/build_system/task/magma/magma_snp_location_task.py
def execute(self, scratch_dir: Path, fetch: Fetch, wf: WF) -> Asset:
    source_asset = fetch(self.source_id)
    gwas_data = scan_dataframe_asset(source_asset, meta=self.source_meta)
    for pipe in self.pipes:
        gwas_data = pipe.process(gwas_data)
    gwas_data = gwas_data.unique(
        subset=GWASLAB_RSID_COL, order_by=[GWASLAB_CHROM_COL, GWASLAB_POS_COL]
    )
    out_path = scratch_dir / "output"
    gwas_data.sort([GWASLAB_CHROM_COL, GWASLAB_POS_COL]).select(
        GWASLAB_RSID_COL, *self.extra_columns_to_output
    ).collect().to_polars().write_csv(out_path, include_header=False, separator=" ")
    return FileAsset(
        out_path,
    )