Skip to content

mecfs_bio.build_system.task.lcv.lcv_task

Task to apply the Latent Causal Variable technique of O'Connor and Price to attempt to estimate the causal director between two genetically correlated traits. The key output value is GCP: Genetic Causality Proportion.

Citation: O’Connor, Luke J., and Alkes L. Price. "Distinguishing genetic correlation from causation across 52 diseases and complex traits." Nature genetics 50.12 (2018): 1728-1734.

Classes:

  • LCVConfig –
  • LCVTask –

    Task to apply the Latent Causal Variable technique of O'Connor and Price to attempt

Functions:

Attributes:

DOWNSTREAM_TRAIT_COL module-attribute

DOWNSTREAM_TRAIT_COL = 'downstream_trait'

UPSTREAM_TRAIT_COL module-attribute

UPSTREAM_TRAIT_COL = 'upstream_trait'

Z_SCORE_1 module-attribute

Z_SCORE_1 = '_z_score_1_'

Z_SCORE_2 module-attribute

Z_SCORE_2 = '_z_score_2_'

Z_SCORE_COL module-attribute

Z_SCORE_COL = '_z_score_'

logger module-attribute

logger = get_logger()

LCVConfig

Attributes:

build class-attribute instance-attribute

build: GenomeBuild = '19'

chisq_exclude_factor_threshold class-attribute instance-attribute

chisq_exclude_factor_threshold: float = 50

exclude_mhc_region class-attribute instance-attribute

exclude_mhc_region: MHCRegion | None = 'extended'

LCVTask

Bases: Task

Task to apply the Latent Causal Variable technique of O'Connor and Price to attempt to estimate the causal director between two genetically correlated traits. The key output value is GCP: Genetic Causality Proportion.

Assume that the trat 1 and trait 2 datasets have already been harmonized via HarmonizeGWASWithReferenceViaRSIDTask

Methods:

Attributes:

config instance-attribute

config: LCVConfig

consolidated_ld_scores instance-attribute

consolidated_ld_scores: Task

deps property

deps: list[Task]

meta instance-attribute

meta: Meta

trait_1_data instance-attribute

trait_1_data: Task

trait_1_pipe class-attribute instance-attribute

trait_1_pipe: DataProcessingPipe = IdentityPipe()

trait_2_data instance-attribute

trait_2_data: Task

trait_2_pipe class-attribute instance-attribute

trait_2_pipe: DataProcessingPipe = IdentityPipe()

create classmethod

create(
    asset_id: str,
    trait_1_data: Task,
    trait_2_data: Task,
    consolidated_ld_scores: Task,
    config: LCVConfig,
    trait_1_pipe: DataProcessingPipe = IdentityPipe(),
    trait_2_pipe: DataProcessingPipe = IdentityPipe(),
)
Source code in mecfs_bio/build_system/task/lcv/lcv_task.py
@classmethod
def create(
    cls,
    asset_id: str,
    trait_1_data: Task,
    trait_2_data: Task,
    consolidated_ld_scores: Task,
    config: LCVConfig,
    trait_1_pipe: DataProcessingPipe = IdentityPipe(),
    trait_2_pipe: DataProcessingPipe = IdentityPipe(),
):
    meta = ResultTableMeta(
        id=AssetId(asset_id),
        trait=MULTI_TRAIT,
        project="causal_analysis",
        sub_dir=PurePath("lcv"),
        read_spec=DataFrameReadSpec(DataFrameParquetFormat()),
        extension=".parquet",
    )
    return cls(
        trait_1_data=trait_1_data,
        trait_2_data=trait_2_data,
        consolidated_ld_scores=consolidated_ld_scores,
        config=config,
        meta=meta,
        trait_1_pipe=trait_1_pipe,
        trait_2_pipe=trait_2_pipe,
    )

execute

execute(scratch_dir: Path, fetch: Fetch, wf: WF) -> Asset
Source code in mecfs_bio/build_system/task/lcv/lcv_task.py
def execute(self, scratch_dir: Path, fetch: Fetch, wf: WF) -> Asset:
    trait_1_asset = fetch(self.trait_1_data.asset_id)
    trait_2_asset = fetch(self.trait_2_data.asset_id)
    ld_scores_asset = fetch(self.consolidated_ld_scores.asset_id)

    df_trait_1 = make_z_score_frame(
        self.trait_1_pipe.process(
            scan_dataframe_asset(asset=trait_1_asset, meta=self.trait_1_data.meta)
        )
    )
    df_trait_1 = exclude_mhc(
        df_trait_1, build=self.config.build, region=self.config.exclude_mhc_region
    )
    df_trait_1 = convert_ea_nea_to_str(df_trait_1)

    df_trait_2 = make_z_score_frame(
        self.trait_2_pipe.process(
            scan_dataframe_asset(asset=trait_2_asset, meta=self.trait_2_data.meta)
        )
    )
    df_trait_2 = exclude_mhc(
        df_trait_2, build=self.config.build, region=self.config.exclude_mhc_region
    )
    df_trait_2 = convert_ea_nea_to_str(df_trait_2)

    df_ld_scores = scan_dataframe_asset(
        asset=ld_scores_asset, meta=self.consolidated_ld_scores.meta
    )
    logger.debug("Aligning data for LCV")
    aligned = align_traits_and_ld(
        df_trait_1=df_trait_1,
        df_trait_2=df_trait_2,
        ld_scores=df_ld_scores,
    )
    logger.debug(f"Done. Aligned data covers {len(aligned)} genetic variants")
    logger.debug("Running LCV")
    lcv_result = run_lcv(
        ld_scores=aligned[LD_SCORE_LD_SCORE_COL].to_numpy(),
        z1=aligned[Z_SCORE_1].to_numpy(),
        z2=aligned[Z_SCORE_2].to_numpy(),
        chisq_exclude_factor_threshold=self.config.chisq_exclude_factor_threshold,
    )
    result_df = lcv_result.to_df()
    logger.debug(f"LCV results : \n {result_df}")
    out_path = scratch_dir / "result.parquet"
    result_df.write_parquet(out_path)
    return FileAsset(out_path)

align_traits_and_ld

align_traits_and_ld(
    df_trait_1: LazyFrame,
    df_trait_2: LazyFrame,
    ld_scores: LazyFrame,
) -> nw.DataFrame
Source code in mecfs_bio/build_system/task/lcv/lcv_task.py
def align_traits_and_ld(
    df_trait_1: narwhals.LazyFrame,
    df_trait_2: narwhals.LazyFrame,
    ld_scores: narwhals.LazyFrame,
) -> nw.DataFrame:
    df_trait_1 = df_trait_1.rename({Z_SCORE_COL: Z_SCORE_1})
    df_trait_2 = df_trait_2.rename({Z_SCORE_COL: Z_SCORE_2})
    join_1_2 = df_trait_1.join(
        df_trait_2,
        on=[
            GWASLAB_CHROM_COL,
            GWASLAB_POS_COL,
            GWASLAB_RSID_COL,
            GWASLAB_EFFECT_ALLELE_COL,
            GWASLAB_NON_EFFECT_ALLELE_COL,
        ],
    ).collect()

    join_1_ld = df_trait_1.join(
        ld_scores,
        left_on=[
            GWASLAB_CHROM_COL,
            GWASLAB_POS_COL,
            GWASLAB_RSID_COL,
        ],
        right_on=[LD_SCORE_CHROM_COL, LD_SCORE_POS_COL, LD_SCORE_RSID_COL],
    ).collect()
    join_2_ld = df_trait_2.join(
        ld_scores,
        left_on=[
            GWASLAB_CHROM_COL,
            GWASLAB_POS_COL,
            GWASLAB_RSID_COL,
        ],
        right_on=[LD_SCORE_CHROM_COL, LD_SCORE_POS_COL, LD_SCORE_RSID_COL],
    ).collect()
    join_1_2_no_a = df_trait_1.join(
        df_trait_2,
        on=[
            GWASLAB_CHROM_COL,
            GWASLAB_POS_COL,
            GWASLAB_RSID_COL,
        ],
    )
    joined = (
        df_trait_1.join(
            df_trait_2,
            on=[
                GWASLAB_CHROM_COL,
                GWASLAB_POS_COL,
                GWASLAB_RSID_COL,
                GWASLAB_EFFECT_ALLELE_COL,
                GWASLAB_NON_EFFECT_ALLELE_COL,
            ],
            how="inner",
        )
        .join(
            ld_scores,
            left_on=[
                GWASLAB_CHROM_COL,
                GWASLAB_POS_COL,
                GWASLAB_RSID_COL,
            ],
            right_on=[LD_SCORE_CHROM_COL, LD_SCORE_POS_COL, LD_SCORE_RSID_COL],
        )
        .collect()
        .sort(
            by=[GWASLAB_CHROM_COL, GWASLAB_POS_COL, GWASLAB_RSID_COL],
        )
    )
    assert len(joined) > 100
    return joined

convert_ea_nea_to_str

convert_ea_nea_to_str(df: LazyFrame) -> nw.LazyFrame
Source code in mecfs_bio/build_system/task/lcv/lcv_task.py
def convert_ea_nea_to_str(df: nw.LazyFrame) -> nw.LazyFrame:
    return df.with_columns(
        narwhals.col(GWASLAB_EFFECT_ALLELE_COL)
        .cast(narwhals.dtypes.String())
        .alias(GWASLAB_EFFECT_ALLELE_COL),
        narwhals.col(GWASLAB_NON_EFFECT_ALLELE_COL)
        .cast(narwhals.dtypes.String())
        .alias(GWASLAB_NON_EFFECT_ALLELE_COL),
    )

make_z_score_frame

make_z_score_frame(df: LazyFrame) -> nw.LazyFrame
Source code in mecfs_bio/build_system/task/lcv/lcv_task.py
def make_z_score_frame(
    df: narwhals.LazyFrame,
) -> nw.LazyFrame:
    _check_required_columns(df)
    return df.with_columns(
        (nw.col(GWASLAB_BETA_COL) / nw.col(GWASLAB_SE_COL)).alias(Z_SCORE_COL),
    ).select(
        GWASLAB_CHROM_COL,
        GWASLAB_POS_COL,
        GWASLAB_RSID_COL,
        GWASLAB_EFFECT_ALLELE_COL,
        GWASLAB_NON_EFFECT_ALLELE_COL,
        Z_SCORE_COL,
    )