Skip to content

mecfs_bio.build_system.task.lava_task

Compute local genetic correlation using LAVA (Local Analysis of [co]Variant Association).

Implemented by Github copilot

See: Werme et al. "An integrated framework for local genetic correlation analysis" Nature Genetics 54 (2022): 274-282.

LAVA estimates local heritability (h2) for each phenotype at each genomic locus, and local genetic correlation (rg) between pairs of phenotypes at loci where both show significant heritability.

This task wraps the R LAVA package via rpy2.

Classes:

Functions:

Attributes:

BIVAR_RESULTS_FILENAME module-attribute

BIVAR_RESULTS_FILENAME = 'lava_bivariate.csv'

LAVA_A1_COL module-attribute

LAVA_A1_COL = 'A1'

LAVA_A2_COL module-attribute

LAVA_A2_COL = 'A2'

LAVA_N_COL module-attribute

LAVA_N_COL = 'N'

LAVA_SNP_COL module-attribute

LAVA_SNP_COL = 'SNP'

LAVA_Z_COL module-attribute

LAVA_Z_COL = 'Z'

LavaSampleInfo module-attribute

LavaSampleInfo = (
    LavaBinarySampleInfo | LavaContinuousSampleInfo
)

UNIV_RESULTS_FILENAME module-attribute

UNIV_RESULTS_FILENAME = 'lava_univariate.csv'

logger module-attribute

logger = get_logger()

LDReferenceInfo

Reference LD data for LAVA.

filename_prefix is the prefix of the PLINK-format LD reference files (i.e. the path without .bed/.bim/.fam extensions).

Attributes:

filename_prefix instance-attribute

filename_prefix: str

ld_ref_task instance-attribute

ld_ref_task: Task

LavaBinarySampleInfo

Sample information for a binary (case/control) phenotype.

LAVA uses cases and controls to determine if a phenotype is binary, and optionally uses prevalence to convert observed h2 to liability scale.

Methods:

Attributes:

cases instance-attribute

cases: int

controls instance-attribute

controls: int

prevalence class-attribute instance-attribute

prevalence: float | None = None

from_ct_ldsc_sample_info classmethod

from_ct_ldsc_sample_info(data: BinaryPhenotypeSampleInfo)
Source code in mecfs_bio/build_system/task/lava_task.py
@classmethod
def from_ct_ldsc_sample_info(cls, data: BinaryPhenotypeSampleInfo):
    assert data.total_sample_size is not None
    return cls(
        cases=int(data.sample_prevalence * data.total_sample_size),
        controls=int((1 - data.sample_prevalence) * data.total_sample_size),
        prevalence=data.estimated_population_prevalence,
    )

LavaContinuousSampleInfo

Marker indicating a continuous (quantitative) phenotype.

Attributes:

total_sample_size class-attribute instance-attribute

total_sample_size: int | None = None

LavaPhenotypeDataSource

A source Task providing tabular GWAS summary statistics for a phenotype.

Column names are assumed to be in GWASLAB format (see: mecfs_bio/constants/gwaslab_constants.py). Data will be copied and converted to the format expected by LAVA.

Attributes:

alias instance-attribute

alias: str

asset_id property

asset_id: AssetId

pipe class-attribute instance-attribute

pipe: DataProcessingPipe = IdentityPipe()

sample_info class-attribute instance-attribute

sample_info: LavaSampleInfo = LavaContinuousSampleInfo()

task instance-attribute

task: Task

LavaTask

Bases: Task

Given a locus definition file, does the following for each locus: - Estimates heritability for all phenotypes using LAVA - For each pair of phenotypes whose heritabilities are both significant at the locus, calculates local genetic correlation using LAVA

Outputs two CSV files to scratch_dir: - lava_univariate.csv: heritability estimates per phenotype per locus - lava_bivariate.csv: genetic correlation estimates per phenotype pair per locus

Methods:

Attributes:

ct_ldsc_task_for_overlap class-attribute instance-attribute

ct_ldsc_task_for_overlap: Task | None = None

deps property

deps: list[Task]

heritability_task_for_overlap class-attribute instance-attribute

heritability_task_for_overlap: Task | None = None

lava_locus_definitions_task instance-attribute

lava_locus_definitions_task: Task

ld_reference_info instance-attribute

ld_reference_info: LDReferenceInfo

max_loci class-attribute instance-attribute

max_loci: int | None = None

meta property

meta: Meta

sources instance-attribute

sources: Sequence[LavaPhenotypeDataSource]

univ_p_threshold class-attribute instance-attribute

univ_p_threshold: float = 0.05

create classmethod

create(
    asset_id: str,
    sources: Sequence[LavaPhenotypeDataSource],
    ld_reference_info: LDReferenceInfo,
    lava_locus_definitions_task: Task,
    ct_ldsc_task_for_overlap: Task | None = None,
    heritability_task_for_overlap: Task | None = None,
    univ_p_threshold: float = 0.05,
    max_loci: int | None = None,
) -> LavaTask
Source code in mecfs_bio/build_system/task/lava_task.py
@classmethod
def create(
    cls,
    asset_id: str,
    sources: Sequence[LavaPhenotypeDataSource],
    ld_reference_info: LDReferenceInfo,
    lava_locus_definitions_task: Task,
    ct_ldsc_task_for_overlap: Task | None = None,
    heritability_task_for_overlap: Task | None = None,
    univ_p_threshold: float = 0.05,
    max_loci: int | None = None,
) -> "LavaTask":
    meta = ResultDirectoryMeta(
        id=asset_id,
        trait="multi_trait",
        project="lava_local_rg",
        sub_dir=PurePath("analysis"),
    )
    return cls(
        meta=meta,
        sources=sources,
        ld_reference_info=ld_reference_info,
        lava_locus_definitions_task=lava_locus_definitions_task,
        ct_ldsc_task_for_overlap=ct_ldsc_task_for_overlap,
        univ_p_threshold=univ_p_threshold,
        heritability_task_for_overlap=heritability_task_for_overlap,
        max_loci=max_loci,
    )

execute

execute(scratch_dir: Path, fetch: Fetch, wf: WF) -> Asset
Source code in mecfs_bio/build_system/task/lava_task.py
def execute(self, scratch_dir: Path, fetch: Fetch, wf: WF) -> Asset:
    lava = importr("LAVA")
    conv = ro.default_converter + pandas2ri.converter
    r_is_null = ro.r("is.null")

    # Fetch LD reference
    ld_ref_asset = fetch(self.ld_reference_info.ld_ref_task.asset_id)
    assert isinstance(ld_ref_asset, DirectoryAsset)
    ref_prefix = str(ld_ref_asset.path / self.ld_reference_info.filename_prefix)

    # Fetch locus definitions
    locus_asset = fetch(self.lava_locus_definitions_task.asset_id)
    assert isinstance(locus_asset, FileAsset)

    with tempfile.TemporaryDirectory() as tmp_dir:
        tmp_path = Path(tmp_dir)

        # Write LAVA-compatible summary statistics and build input info
        info_rows: list[dict] = []
        pheno_names: list[str] = []
        for source in self.sources:
            sumstats_path = _write_lava_sumstats(
                source=source, fetch=fetch, tmp_dir=tmp_path
            )
            info_rows.append(_make_info_row(source, sumstats_path))
            pheno_names.append(source.alias)

        # Write the input info file
        info_file_path = tmp_path / "input.info.txt"
        info_df = pd.DataFrame(info_rows)
        info_df.to_csv(info_file_path, sep="\t", index=False)
        logger.debug(f"LAVA input info file:\n{info_df}")

        # Build sample overlap matrix from CT-LDSC results if available
        sample_overlap_path = _get_sample_overlap_path(
            ct_ldsc_task=self.ct_ldsc_task_for_overlap,
            fetch=fetch,
            pheno_names=pheno_names,
            tmp_dir=tmp_path,
            heritability_task=self.heritability_task_for_overlap,
        )

        # Process input through LAVA
        sample_overlap_r = (
            ro.NULL if sample_overlap_path is None else sample_overlap_path
        )
        lava_input = lava.process_input(
            input_info_file=str(info_file_path),
            sample_overlap_file=sample_overlap_r,
            ref_prefix=ref_prefix,
            phenos=ro.StrVector(pheno_names),
        )

        # Read loci
        loci_r = lava.read_loci(str(locus_asset.path))
        ro.globalenv["lava_loci"] = loci_r
        n_loci = int(ro.r("nrow(lava_loci)")[0])  # type: ignore
        if self.max_loci is not None:
            n_loci = min(n_loci, self.max_loci)
        logger.debug(f"Processing {n_loci} loci")

        # Process each locus
        univ_results: list[pd.DataFrame] = []
        bivar_results: list[pd.DataFrame] = []

        ro.globalenv["lava_input"] = lava_input
        for i in tqdm(range(1, n_loci + 1)):
            locus_row = ro.r(f"lava_loci[{i},]")
            locus = lava.process_locus(locus_row, lava_input)

            if bool(r_is_null(locus)[0]):  # type: ignore
                continue

            loc_info = _extract_locus_info(locus)

            # Run univariate + bivariate with automatic filtering
            result = lava.run_univ_bivar(locus, univ_thresh=self.univ_p_threshold)

            # Extract univariate results
            univ_r = result.rx2("univ")
            with localconverter(conv):
                univ_df: pd.DataFrame = ro.conversion.get_conversion().rpy2py(
                    univ_r
                )
            for key, val in loc_info.items():
                univ_df[key] = val
            univ_results.append(univ_df)

            # Extract bivariate results if present
            bivar_r = result.rx2("bivar")
            if not bool(r_is_null(bivar_r)[0]):  # type: ignore
                with localconverter(conv):
                    bivar_df: pd.DataFrame = ro.conversion.get_conversion().rpy2py(
                        bivar_r
                    )
                for key, val in loc_info.items():
                    bivar_df[key] = val
                bivar_results.append(bivar_df)

    _write_output(scratch_dir, univ_results, bivar_results)

    # Clean up R memory
    gc.collect()
    ro.r("gc()")

    return DirectoryAsset(scratch_dir)

add_sample_size_if_missing

add_sample_size_if_missing(
    df: DataFrame, phenotype_info: LavaSampleInfo
) -> pl.DataFrame
Source code in mecfs_bio/build_system/task/lava_task.py
def add_sample_size_if_missing(
    df: pl.DataFrame, phenotype_info: LavaSampleInfo
) -> pl.DataFrame:
    if GWASLAB_SAMPLE_SIZE_COLUMN in df.columns:
        return df
    if isinstance(phenotype_info, LavaBinarySampleInfo):
        return df.with_columns(
            pl.lit(phenotype_info.controls + phenotype_info.cases).alias(
                GWASLAB_SAMPLE_SIZE_COLUMN
            )
        )
    if isinstance(phenotype_info, LavaContinuousSampleInfo):
        assert phenotype_info.total_sample_size is not None
        return df.with_columns(
            pl.lit(phenotype_info.total_sample_size).alias(GWASLAB_SAMPLE_SIZE_COLUMN)
        )
    raise ValueError(f"Unknown phenotype type {phenotype_info}")