Skip to content

mecfs_bio.asset_generator.lcv_asset_generator

Asset Generator for applying the Latent Causal Variable method of O'Connor and Price to a list of upstream and downstream traits. Each upstream trait is tested against each downstream trait.

Classes:

  • LCVRun

    The Tasks required to run LCV for a particular upstream-task/ downstream-task pair

  • LCVSourceTraitInfo

    Describes a trait being passed as input to the LCV asset generator

  • LCVTaskGroup

    A group of tasks generated by the LCV asset generator

Functions:

  • lcv_generate

    Apply LCV for every upstream-trait/downstream-trait pair

LCVRun

The Tasks required to run LCV for a particular upstream-task/ downstream-task pair

Attributes:

harmonization_task instance-attribute

harmonization_task: Task

lcv_task instance-attribute

lcv_task: Task

LCVSourceTraitInfo

Describes a trait being passed as input to the LCV asset generator

Attributes:

df_task instance-attribute

df_task: Task

name instance-attribute

name: str

pipe class-attribute instance-attribute

pipe: DataProcessingPipe = IdentityPipe()

LCVTaskGroup

A group of tasks generated by the LCV asset generator

Methods:

Attributes:

agg_task instance-attribute

agg_task: Task

downstream_trait_tables instance-attribute

downstream_trait_tables: Mapping[str, Task]

lcv_run_mapping instance-attribute

lcv_run_mapping: Mapping[tuple[str, str], LCVRun]

all_tasks

all_tasks() -> list[Task]
Source code in mecfs_bio/asset_generator/lcv_asset_generator.py
def all_tasks(self) -> list[Task]:
    return (
        [self.agg_task]
        + list(self.downstream_trait_tables.values())
        + list(item.harmonization_task for item in self.lcv_run_mapping.values())
        + list(item.lcv_task for item in self.lcv_run_mapping.values())
    )

terminal_tasks

terminal_tasks() -> list[Task]
Source code in mecfs_bio/asset_generator/lcv_asset_generator.py
def terminal_tasks(self) -> list[Task]:
    return [self.agg_task] + list(self.downstream_trait_tables.values())

lcv_generate

lcv_generate(
    base_name: str,
    upstream_traits: Sequence[LCVSourceTraitInfo],
    downstream_traits: Sequence[LCVSourceTraitInfo],
    consolidated_ld_scores_task: Task,
    config: LCVConfig,
) -> LCVTaskGroup

Apply LCV for every upstream-trait/downstream-trait pair

Source code in mecfs_bio/asset_generator/lcv_asset_generator.py
def lcv_generate(
    base_name: str,
    upstream_traits: Sequence[LCVSourceTraitInfo],
    downstream_traits: Sequence[LCVSourceTraitInfo],
    consolidated_ld_scores_task: Task,
    config: LCVConfig,
) -> LCVTaskGroup:
    """
    Apply LCV for every upstream-trait/downstream-trait pair
    """
    unique_pipe = UniquePipe(
        by=[
            GWASLAB_CHROM_COL,
            GWASLAB_POS_COL,
            GWASLAB_EFFECT_ALLELE_COL,
            GWASLAB_NON_EFFECT_ALLELE_COL,
        ],
        keep="first",
        order_by=[GWASLAB_RSID_COL],
    )
    run_mapping = {}
    task_list = []
    pre_agg_pipes = []
    for us in upstream_traits:
        for ds in downstream_traits:
            harmonization_task = HarmonizeGWASWithReferenceViaAlleles.create(
                asset_id=f"{base_name}_{us.name}_harmonize_with_{ds.name}_for_lcv",
                gwas_data_task=us.df_task,
                reference_task=ds.df_task,
                gwas_pipe=CompositePipe(
                    [
                        us.pipe,
                        unique_pipe,
                    ]
                ),
                ref_pipe=CompositePipe(
                    [
                        ds.pipe,
                        unique_pipe,
                    ]
                ),
                palindrome_strategy="drop",
            )

            lcv_task = LCVTask.create(
                base_name + "_lcv_" + us.name + "_" + ds.name,
                trait_1_data=harmonization_task,
                trait_2_data=ds.df_task,
                consolidated_ld_scores=consolidated_ld_scores_task,
                config=config,
                trait_2_pipe=CompositePipe(
                    [
                        ds.pipe,
                        unique_pipe,
                    ]
                ),
            )
            pre_agg_pipes.append(
                CompositePipe(
                    [
                        SetColToConstantPipe(
                            col_name=UPSTREAM_TRAIT_COL, constant=us.name
                        ),
                        SetColToConstantPipe(
                            col_name=DOWNSTREAM_TRAIT_COL, constant=ds.name
                        ),
                    ]
                )
            )
            run_mapping[(us.name, ds.name)] = LCVRun(
                lcv_task=lcv_task, harmonization_task=harmonization_task
            )
            task_list.append(lcv_task)
    agg_task = ConcatFramesTask.create(
        asset_id=base_name + "_lcv_agg",
        frames_tasks=task_list,
        out_format=ParquetOutFormat(),
        frames_pipes=pre_agg_pipes,
    )
    downstream_trait_markdown_tables = {}
    for ds in downstream_traits:
        downstream_trait_markdown_tables[ds.name] = (
            ConvertDataFrameToMarkdownTask.create_from_result_table_task(
                source_task=agg_task,
                asset_id=f"{base_name}_lcv_table_downstream_{ds.name}",
                pipe=CompositePipe(
                    [
                        FilterRowsByValue(
                            target_column=DOWNSTREAM_TRAIT_COL, valid_values=[ds.name]
                        ),
                        SelectColPipe(
                            [
                                UPSTREAM_TRAIT_COL,
                                LCV_MEAN_GCP_COL,
                                LCV_PVAL_ZERO_COL,
                                LCV_RHO_EST_COL,
                                LCV_RHO_SE_COL,
                            ]
                        ),
                        SortPipe(
                            by=[
                                LCV_MEAN_GCP_COL,
                            ],
                            desc=True,
                        ),
                    ]
                ),
            )
        )
    return LCVTaskGroup(
        lcv_run_mapping=run_mapping,
        agg_task=agg_task,
        downstream_trait_tables=downstream_trait_markdown_tables,
    )