Skip to content

mecfs_bio.build_system.task.fixed_effect_meta_analysis_task

Task to combine GWAS with fixed-effects meta analysis

Classes:

  • CaseControlSampleInfo

    Specifies the number of cases and controls in a case control study

  • FixedEffectsMetaAnalysisTask

    Task to perform a fixed effects meta analysis on non-overlapping GWAS of the same trait

  • GwasSource

    Describes a source from which to draw GWAS data in order to perform a meta analysis

Functions:

Attributes:

SampleInfo module-attribute

SampleInfo = CaseControlSampleInfo

logger module-attribute

logger = get_logger()

CaseControlSampleInfo

Specifies the number of cases and controls in a case control study

Methods:

Attributes:

cases instance-attribute

cases: int

controls instance-attribute

controls: int

effective_sample_size

effective_sample_size() -> int
Source code in mecfs_bio/build_system/task/fixed_effect_meta_analysis_task.py
def effective_sample_size(self) -> int:
    return int(4 / (1 / self.cases + 1 / self.controls))

FixedEffectsMetaAnalysisTask

Bases: Task

Task to perform a fixed effects meta analysis on non-overlapping GWAS of the same trait Assumes all alleles are expressed with respect to the forward strand

The variants present in the output dataframe will be equal to the intersection of the variants in the studies

For more information on fixed effects meta analysis, see: Chapter 22 of Balding, David J., Ida Moltke, and John Marioni, eds. Handbook of statistical genomics. John Wiley & Sons, 2019.

Methods:

Attributes:

deps property

deps: list[Task]

meta property

meta: Meta

sources instance-attribute

sources: Sequence[GwasSource]

create classmethod

create(
    asset_id: str,
    meta_analysis_name: str,
    sources: Sequence[GwasSource],
)
Source code in mecfs_bio/build_system/task/fixed_effect_meta_analysis_task.py
@classmethod
def create(
    cls, asset_id: str, meta_analysis_name: str, sources: Sequence[GwasSource]
):
    assert len(sources) > 1
    source_meta = sources[0].task.meta
    assert isinstance(source_meta, FilteredGWASDataMeta)
    for item in sources[1:]:
        assert isinstance(item.task.meta, FilteredGWASDataMeta)
        assert item.task.meta.trait == source_meta.trait
    meta = FilteredGWASDataMeta(
        id=AssetId(asset_id),
        trait=source_meta.trait,
        project=meta_analysis_name,
        sub_dir=PurePath("processed"),
        read_spec=DataFrameReadSpec(DataFrameParquetFormat()),
    )
    return cls(
        meta=meta,
        sources=sources,
    )

execute

execute(scratch_dir: Path, fetch: Fetch, wf: WF) -> Asset
Source code in mecfs_bio/build_system/task/fixed_effect_meta_analysis_task.py
def execute(self, scratch_dir: Path, fetch: Fetch, wf: WF) -> Asset:
    asset = fetch(self.sources[0].task.asset_id)
    df = self.sources[0].pipe.process(
        scan_dataframe_asset(asset, self.sources[0].task.meta)
    )
    _check_unique_variants(df)
    _check_nonzero_se(df)
    df = _select_df_1_columns(df)
    df = df.rename(
        {
            GWASLAB_BETA_COL: GWASLAB_BETA_COL + "_0",
            GWASLAB_SE_COL: GWASLAB_SE_COL + "_0",
        }
    )
    beta_col_list = [GWASLAB_BETA_COL + "_0"]
    se_col_list = [GWASLAB_SE_COL + "_0"]

    i = 0
    for source in self.sources[1:]:
        i += 1
        asset = fetch(source.task.asset_id)
        source_df = scan_dataframe_asset(asset, source.task.meta)
        source_df = source.pipe.process(source_df)
        _check_unique_variants(source_df)
        _check_nonzero_se(source_df)
        source_df = source_df.select(
            [
                GWASLAB_CHROM_COL,
                GWASLAB_POS_COL,
                GWASLAB_EFFECT_ALLELE_COL,
                GWASLAB_NON_EFFECT_ALLELE_COL,
                GWASLAB_BETA_COL,
                GWASLAB_SE_COL,
            ]
        )
        source_df = source_df.rename(
            {
                GWASLAB_BETA_COL: GWASLAB_BETA_COL + f"_{i}",
                GWASLAB_SE_COL: GWASLAB_SE_COL + f"_{i}",
            }
        )
        beta_col_list.append(GWASLAB_BETA_COL + f"_{i}")
        se_col_list.append(GWASLAB_SE_COL + f"_{i}")
        df_forward_match = df.join(
            source_df,
            on=[
                GWASLAB_CHROM_COL,
                GWASLAB_POS_COL,
                GWASLAB_EFFECT_ALLELE_COL,
                GWASLAB_NON_EFFECT_ALLELE_COL,
            ],
        ).with_columns(narwhals.lit(False).alias(f"flipped_{i}"))
        source_df_flipped = get_reversed(
            source_df, beta_col=GWASLAB_BETA_COL + f"_{i}"
        )
        df_reverse_match = df.join(
            source_df_flipped,
            on=[
                GWASLAB_CHROM_COL,
                GWASLAB_POS_COL,
                GWASLAB_EFFECT_ALLELE_COL,
                GWASLAB_NON_EFFECT_ALLELE_COL,
            ],
        ).with_columns(narwhals.lit(True).alias(f"flipped_{i}"))
        df = narwhals.concat([df_forward_match, df_reverse_match], how="vertical")

    meta_beta, meta_std = _fixed_effects_beta_se_cols(
        beta_cols=beta_col_list,
        se_cols=se_col_list,
    )
    df = df.with_columns(
        meta_beta.alias(GWASLAB_BETA_COL),
        meta_std.alias(GWASLAB_SE_COL),
    )
    out_path = scratch_dir / (self.asset_id + ".parquet")
    df = add_effective_sample_size_column(
        df, [item.sample_info for item in self.sources]
    )
    df.sink_parquet(
        out_path,
    )
    final = narwhals.scan_parquet(out_path, backend="polars")
    report_flips(
        final,
        num_sources=len(self.sources),
    )
    report_output_size(final)
    return FileAsset(out_path)

GwasSource

Describes a source from which to draw GWAS data in order to perform a meta analysis

Attributes:

pipe class-attribute instance-attribute

pipe: DataProcessingPipe = IdentityPipe()

sample_info instance-attribute

sample_info: SampleInfo

task instance-attribute

task: Task

add_effective_sample_size_column

add_effective_sample_size_column(
    out_df: LazyFrame, sample_info: list[SampleInfo]
) -> narwhals.LazyFrame
Source code in mecfs_bio/build_system/task/fixed_effect_meta_analysis_task.py
def add_effective_sample_size_column(
    out_df: narwhals.LazyFrame,
    sample_info: list[SampleInfo],
) -> narwhals.LazyFrame:
    effective_sample_size = sum(item.effective_sample_size() for item in sample_info)
    return out_df.with_columns(
        narwhals.lit(effective_sample_size).alias(GWASLAB_EFFECTIVE_SAMPLE_SIZE)
    )

get_reversed

get_reversed(
    df: LazyFrame, beta_col: str
) -> narwhals.LazyFrame
Source code in mecfs_bio/build_system/task/fixed_effect_meta_analysis_task.py
def get_reversed(df: narwhals.LazyFrame, beta_col: str) -> narwhals.LazyFrame:
    return df.with_columns(
        narwhals.col(GWASLAB_EFFECT_ALLELE_COL).alias(GWASLAB_NON_EFFECT_ALLELE_COL),
        narwhals.col(GWASLAB_NON_EFFECT_ALLELE_COL).alias(GWASLAB_EFFECT_ALLELE_COL),
        (-1 * narwhals.col(beta_col)).alias(beta_col),
    )

report_flips

report_flips(df: LazyFrame, num_sources: int)
Source code in mecfs_bio/build_system/task/fixed_effect_meta_analysis_task.py
def report_flips(df: narwhals.LazyFrame, num_sources: int):
    sum_cols = [
        narwhals.col(f"flipped_{i}").sum().alias(f"num_flipped_{i}")
        for i in range(1, num_sources)
    ]
    result = df.select(
        *sum_cols,
    ).collect()
    logger.debug(f"Flipped alleles:\n{result}")

report_output_size

report_output_size(df: LazyFrame)
Source code in mecfs_bio/build_system/task/fixed_effect_meta_analysis_task.py
def report_output_size(
    df: narwhals.LazyFrame,
):
    l = df.select(narwhals.len()).collect().item()
    logger.debug(f"Final meta-analysis has {l} variants")