Skip to content

mecfs_bio.build_system.task.two_sample_mr_task

Task to apply two sample mendelian randomization to GWAS data, together with associated axillary functions.

Classes:

Functions:

Attributes:

GWASLAB_MR_INPUT_COL_SPEC module-attribute

GWASLAB_MR_INPUT_COL_SPEC = MRInputColSpec(
    rsid_col=GWASLAB_RSID_COL,
    beta_col=GWASLAB_BETA_COL,
    se_col=GWASLAB_SE_COL,
    ea_col=GWASLAB_EFFECT_ALLELE_COL,
    nea_col=GWASLAB_NON_EFFECT_ALLELE_COL,
    eaf_col=GWASLAB_EFFECT_ALLELE_FREQ_COL,
    phenotype_col=None,
    pos_col=GWASLAB_POS_COL,
    chrom_col=GWASLAB_CHROM_COL,
    n_case_col=GWASLAB_N_CASE_COL,
    n_control_col=GWASLAB_N_CONTROL_COL,
    pval_col=GWASLAB_P_COL,
    errors="ignore",
)

IgnoreOrRaise module-attribute

IgnoreOrRaise = Literal['ignore', 'raise']

MAIN_RESULT_DF_PATH module-attribute

MAIN_RESULT_DF_PATH = 'mr_result.csv'

NEEDED_COLS module-attribute

NEEDED_COLS = [
    TSM_RSID_COL,
    TSM_BETA_COL,
    TSM_SE_COL,
    TSM_EFFECT_ALLELE_COL,
]

REPORT_SUBDIR_PATH module-attribute

REPORT_SUBDIR_PATH = PurePath('reports')

RPackageType module-attribute

RPackageType = Union[InstalledSTPackage, InstalledPackage]

STEIGER_RESULT_PATH module-attribute

STEIGER_RESULT_PATH = PurePath('steiger_result.csv')

SUN_ET_AL_MR_INPUT_COL_SPEC_hg37 module-attribute

SUN_ET_AL_MR_INPUT_COL_SPEC_hg37 = MRInputColSpec(
    rsid_col="rsID",
    beta_col="BETA",
    se_col="SE",
    ea_col="A1",
    nea_col="A0",
    eaf_col="A1FREQ",
    phenotype_col="protein_exposure_id",
    chrom_col="CHROM_hg37",
    pos_col="GENPOS_hg37",
    pval_col=GWASLAB_P_COL,
)

TSM_BETA_COL module-attribute

TSM_BETA_COL = 'beta'

TSM_CHR_COL module-attribute

TSM_CHR_COL = 'chr'

TSM_EAF_COL module-attribute

TSM_EAF_COL = 'eaf'

TSM_EFFECT_ALLELE_COL module-attribute

TSM_EFFECT_ALLELE_COL = 'effect_allele'

TSM_N_CASE module-attribute

TSM_N_CASE = 'ncase'

TSM_N_CONTROL module-attribute

TSM_N_CONTROL = 'ncontrol'

TSM_OTHER_ALLELE_COL module-attribute

TSM_OTHER_ALLELE_COL = 'other_allele'

TSM_OUTPUT_B_COL module-attribute

TSM_OUTPUT_B_COL = 'b'

TSM_OUTPUT_EXPOSURE_COL module-attribute

TSM_OUTPUT_EXPOSURE_COL = 'exposure'

TSM_OUTPUT_METHOD_COL module-attribute

TSM_OUTPUT_METHOD_COL = 'method'

TSM_OUTPUT_NSNP_COL module-attribute

TSM_OUTPUT_NSNP_COL = 'nsnp'

TSM_OUTPUT_P_COL module-attribute

TSM_OUTPUT_P_COL = 'pval'

TSM_OUTPUT_SE_COL module-attribute

TSM_OUTPUT_SE_COL = 'se'

TSM_OUTPUT_STEIGER_DIR_COL module-attribute

TSM_OUTPUT_STEIGER_DIR_COL = 'steiger_dir'

TSM_OUTPUT_STEIGER_P_COL module-attribute

TSM_OUTPUT_STEIGER_P_COL = 'steiger_pval'

TSM_PHENOTYPE module-attribute

TSM_PHENOTYPE = 'Phenotype'

TSM_POS_COL module-attribute

TSM_POS_COL = 'position'

TSM_P_VALUE module-attribute

TSM_P_VALUE = 'pval'

TSM_RSID_COL module-attribute

TSM_RSID_COL = 'SNP'

TSM_SAMPLE_SIZE_COL module-attribute

TSM_SAMPLE_SIZE_COL = 'samplesize'

TSM_SE_COL module-attribute

TSM_SE_COL = 'se'

TSM_UNITS_COL module-attribute

TSM_UNITS_COL = 'units'

logger module-attribute

logger = get_logger()

ClumpOptions

MRInputColSpec

Methods:

Attributes:

beta_col instance-attribute

beta_col: str

chrom_col class-attribute instance-attribute

chrom_col: str | None = None

ea_col instance-attribute

ea_col: str

eaf_col class-attribute instance-attribute

eaf_col: str | None = None

errors class-attribute instance-attribute

errors: IgnoreOrRaise = 'raise'

n_case_col class-attribute instance-attribute

n_case_col: str | None = None

n_control_col class-attribute instance-attribute

n_control_col: str | None = None

nea_col class-attribute instance-attribute

nea_col: str | None = None

phenotype_col class-attribute instance-attribute

phenotype_col: str | None = None

pos_col class-attribute instance-attribute

pos_col: str | None = None

pval_col class-attribute instance-attribute

pval_col: str | None = None

rsid_col instance-attribute

rsid_col: str

sample_size_col class-attribute instance-attribute

sample_size_col: str | None = None

se_col instance-attribute

se_col: str

make_renamer

make_renamer() -> dict[str, str]
Source code in mecfs_bio/build_system/task/two_sample_mr_task.py
def make_renamer(self) -> dict[str, str]:
    base = {
        self.rsid_col: TSM_RSID_COL,
        self.beta_col: TSM_BETA_COL,
        self.se_col: TSM_SE_COL,
        self.ea_col: TSM_EFFECT_ALLELE_COL,
    }
    if self.nea_col is not None:
        base[self.nea_col] = TSM_OTHER_ALLELE_COL
    if self.eaf_col is not None:
        base[self.eaf_col] = TSM_EAF_COL
    if self.chrom_col is not None:
        base[self.chrom_col] = TSM_CHR_COL
    if self.pos_col is not None:
        base[self.pos_col] = TSM_POS_COL
    if self.phenotype_col is not None:
        base[self.phenotype_col] = TSM_PHENOTYPE
    if self.pval_col is not None:
        base[self.pval_col] = TSM_P_VALUE
    if self.n_case_col is not None:
        base[self.n_case_col] = TSM_N_CASE
    if self.n_control_col is not None:
        base[self.n_control_col] = TSM_N_CONTROL
    if self.sample_size_col is not None:
        base[self.sample_size_col] = TSM_SAMPLE_SIZE_COL
    return base

MRReportOptions

SteigerFilteringOptions

Attributes:

drop_failures instance-attribute

drop_failures: bool

p_value_thresh class-attribute instance-attribute

p_value_thresh: float | None = None

TwoSampleMRConfig

Attributes:

clump_exposure_data instance-attribute

clump_exposure_data: ClumpOptions | None

pre_filter_outcome_variants class-attribute instance-attribute

pre_filter_outcome_variants: bool = False

report_options class-attribute instance-attribute

report_options: MRReportOptions | None = None

steiger_filter class-attribute instance-attribute

steiger_filter: SteigerFilteringOptions | None = None

TwoSampleMRResult

Attributes:

result instance-attribute

result: DataFrame

TwoSampleMRTask

Bases: Task

Task to run mendelian randomization using the R package TwoSampleMR. This R package is accessed through Python via rpy2.

Note that some of the calls to the TSMR library below (like clumping) require access to the OpenGWAS database. This in turn requires an access token. You can get a token here: https://api.opengwas.io/ Add to your .Renviron file the following line: OPENGWAS_JWT=

Methods:

Attributes:

config instance-attribute

config: TwoSampleMRConfig

deps property

deps: list[Task]

exposure_col_spec class-attribute instance-attribute

exposure_col_spec: MRInputColSpec | None = None

exposure_data_task instance-attribute

exposure_data_task: Task

exposure_id property

exposure_id: AssetId

exposure_meta property

exposure_meta: Meta

exposure_pipe class-attribute instance-attribute

exposure_pipe: DataProcessingPipe = IdentityPipe()

meta property

meta: Meta

mr_method_list class-attribute instance-attribute

mr_method_list: list[str] | None = None

outcome_col_spec class-attribute instance-attribute

outcome_col_spec: MRInputColSpec | None = None

outcome_data_task instance-attribute

outcome_data_task: Task

outcome_id property

outcome_id: AssetId

outcome_meta property

outcome_meta: Meta

outcome_pipe class-attribute instance-attribute

outcome_pipe: DataProcessingPipe = IdentityPipe()

create classmethod

create(
    asset_id: str,
    outcome_data_task: Task,
    exposure_data_task: Task,
    config: TwoSampleMRConfig,
    exposure_pipe: DataProcessingPipe,
    outcome_pipe: DataProcessingPipe,
    exposure_col_spec: MRInputColSpec,
    outcome_col_spec: MRInputColSpec,
    method_list: list[str] | None = None,
)
Source code in mecfs_bio/build_system/task/two_sample_mr_task.py
@classmethod
def create(
    cls,
    asset_id: str,
    outcome_data_task: Task,
    exposure_data_task: Task,
    config: TwoSampleMRConfig,
    exposure_pipe: DataProcessingPipe,
    outcome_pipe: DataProcessingPipe,
    exposure_col_spec: MRInputColSpec,
    outcome_col_spec: MRInputColSpec,
    method_list: list[str] | None = None,
):
    outcome_meta = outcome_data_task.meta
    assert isinstance(outcome_meta, FilteredGWASDataMeta)
    meta = ResultDirectoryMeta(
        id=AssetId(asset_id),
        trait=outcome_meta.trait,
        project=outcome_meta.project,
    )
    return cls(
        meta=meta,
        outcome_data_task=outcome_data_task,
        exposure_data_task=exposure_data_task,
        config=config,
        exposure_pipe=exposure_pipe,
        outcome_pipe=outcome_pipe,
        exposure_col_spec=exposure_col_spec,
        outcome_col_spec=outcome_col_spec,
        mr_method_list=method_list,
    )

execute

execute(scratch_dir: Path, fetch: Fetch, wf: WF) -> Asset
Source code in mecfs_bio/build_system/task/two_sample_mr_task.py
def execute(self, scratch_dir: Path, fetch: Fetch, wf: WF) -> Asset:
    tsmr = importr("TwoSampleMR")
    conv = ro.default_converter + pandas2ri.converter

    exposure_asset = fetch(self.exposure_id)
    outcome_asset = fetch(self.outcome_id)

    logger.debug("loading exposure and outcome dataframes...")
    exposure_df = (
        self.exposure_pipe.process(
            scan_dataframe_asset(exposure_asset, meta=self.exposure_meta)
        )
        .collect()
        .to_pandas()
    )
    outcome_df = (
        self.outcome_pipe.process(
            scan_dataframe_asset(outcome_asset, meta=self.outcome_meta)
        )
        .collect()
        .to_pandas()
    )

    logger.debug("Renaming columns...")

    if self.exposure_col_spec is not None:
        exposure_df = exposure_df.rename(
            columns=self.exposure_col_spec.make_renamer(),
            errors=self.exposure_col_spec.errors,
        )

    if self.outcome_col_spec is not None:
        outcome_df = outcome_df.rename(
            columns=self.outcome_col_spec.make_renamer(),
            errors=self.outcome_col_spec.errors,
        )

    assert set(NEEDED_COLS) <= set(exposure_df.columns)
    assert set(NEEDED_COLS) <= set(outcome_df.columns)

    outcome_df = pre_filter_outcome_variants(
        exposure_df=exposure_df, outcome_df=outcome_df, config=self.config
    )

    exposure_rdf, outcome_rdf = convert_outcome_and_exposure_to_r(
        exposure_df=exposure_df, outcome_df=outcome_df
    )

    f_exposure_rdf, f_outcome_rdf = format_data_no_conversion(
        exposure_rdf, outcome_rdf, tsmr=tsmr
    )

    f_exposure_rdf = optionally_clump_exposure_data_no_conversion(
        formatted_exposure=f_exposure_rdf,
        clump_options=self.config.clump_exposure_data,
        tsmr=tsmr,
    )

    harmonized = harmonize_data_no_conversion(
        formatted_exposure=f_exposure_rdf,
        formatted_outcome=f_outcome_rdf,
        tsmr=tsmr,
    )

    harmonized = steiger_filtering_write_output(
        harmonized=harmonized,
        options=self.config.steiger_filter,
        scratch_dir=scratch_dir,
        tsmr=tsmr,
    )

    # with localconverter(conv):
    #     harm_py = ro.conversion.get_conversion().rpy2py(harmonized)
    gen_mr_report(
        harmonized=harmonized,
        options=self.config.report_options,
        target_dir=scratch_dir / REPORT_SUBDIR_PATH,
        tsmr=tsmr,
    )

    result_rdf = run_tsmr_on_harmonized_data_no_conversion(
        harmonized=harmonized, tsmr=tsmr, method_list=self.mr_method_list
    )
    logger.info("Converting R dataframe to pandas dataframe...")
    with localconverter(conv):
        result_df = ro.conversion.get_conversion().rpy2py(result_rdf)

    result_df.to_csv(scratch_dir / MAIN_RESULT_DF_PATH, index=False)
    return DirectoryAsset(scratch_dir)

convert_outcome_and_exposure_to_r

convert_outcome_and_exposure_to_r(
    exposure_df: DataFrame, outcome_df: DataFrame
) -> tuple[RDataFrame, RDataFrame]
Source code in mecfs_bio/build_system/task/two_sample_mr_task.py
def convert_outcome_and_exposure_to_r(
    exposure_df: pd.DataFrame,
    outcome_df: pd.DataFrame,
) -> tuple[RDataFrame, RDataFrame]:
    conv = ro.default_converter + pandas2ri.converter

    with localconverter(conv):
        logger.debug("converting exposure to r...")
        exposure_rdf = ro.conversion.get_conversion().py2rpy(exposure_df)
        logger.debug("converting outcome to r...")
        outcome_rdf = ro.conversion.get_conversion().py2rpy(outcome_df)
    return exposure_rdf, outcome_rdf

format_data

format_data(
    exposure_df: DataFrame,
    outcome_df: DataFrame,
    tsmr: RPackageType,
) -> tuple[pd.DataFrame, pd.DataFrame]
Source code in mecfs_bio/build_system/task/two_sample_mr_task.py
def format_data(
    exposure_df: pd.DataFrame,
    outcome_df: pd.DataFrame,
    tsmr: RPackageType,
) -> tuple[pd.DataFrame, pd.DataFrame]:
    conv = ro.default_converter + pandas2ri.converter

    with localconverter(conv):
        logger.debug("formatting exposure...")
        formatted_exposure = tsmr.format_data(
            exposure_df,
            type="exposure",
        )

        logger.debug("formatting outcome...")
        formatted_outcome = tsmr.format_data(outcome_df, type="outcome")
        return formatted_exposure, formatted_outcome

format_data_no_conversion

format_data_no_conversion(
    exposure_rdf: DataFrame,
    outcome_rdf: DataFrame,
    tsmr: RPackageType,
) -> tuple[RDataFrame, RDataFrame]
Source code in mecfs_bio/build_system/task/two_sample_mr_task.py
def format_data_no_conversion(
    exposure_rdf: RDataFrame,
    outcome_rdf: RDataFrame,
    tsmr: RPackageType,
) -> tuple[RDataFrame, RDataFrame]:
    logger.debug("formatting exposure...")
    formatted_exposure = tsmr.format_data(
        exposure_rdf,
        type="exposure",
    )

    logger.debug("formatting outcome...")
    formatted_outcome = tsmr.format_data(outcome_rdf, type="outcome")
    return formatted_exposure, formatted_outcome

gen_mr_report

gen_mr_report(
    harmonized: DataFrame,
    options: MRReportOptions | None,
    target_dir: Path,
    tsmr: RPackageType,
)
Source code in mecfs_bio/build_system/task/two_sample_mr_task.py
def gen_mr_report(
    harmonized: RDataFrame,
    options: MRReportOptions | None,
    target_dir: Path,
    tsmr: RPackageType,
):
    if options is None:
        return
    tsmr.mr_report(harmonized, str(target_dir))

harmonize_data

harmonize_data(
    formatted_exposure: DataFrame,
    formatted_outcome: DataFrame,
    tsmr: RPackageType,
) -> pd.DataFrame
Source code in mecfs_bio/build_system/task/two_sample_mr_task.py
def harmonize_data(
    formatted_exposure: pd.DataFrame,
    formatted_outcome: pd.DataFrame,
    tsmr: RPackageType,
) -> pd.DataFrame:
    conv = ro.default_converter + pandas2ri.converter
    with localconverter(conv):
        logger.debug("Harmonizing outcome and exposure...")
        harmonized = tsmr.harmonise_data(formatted_exposure, formatted_outcome)
        return harmonized

harmonize_data_no_conversion

harmonize_data_no_conversion(
    formatted_exposure: DataFrame,
    formatted_outcome: DataFrame,
    tsmr: RPackageType,
) -> RDataFrame
Source code in mecfs_bio/build_system/task/two_sample_mr_task.py
def harmonize_data_no_conversion(
    formatted_exposure: RDataFrame,
    formatted_outcome: RDataFrame,
    tsmr: RPackageType,
) -> RDataFrame:
    logger.debug("Harmonizing outcome and exposure...")
    harmonized = tsmr.harmonise_data(formatted_exposure, formatted_outcome)
    return harmonized

optionally_clump_exposure_data

optionally_clump_exposure_data(
    formatted_exposure: DataFrame,
    clump_options: ClumpOptions | None,
    tsmr: RPackageType,
)
Source code in mecfs_bio/build_system/task/two_sample_mr_task.py
def optionally_clump_exposure_data(
    formatted_exposure: pd.DataFrame,
    clump_options: ClumpOptions | None,
    tsmr: RPackageType,
):
    if clump_options is None:
        return formatted_exposure
    logger.debug(f"shape of exposure df before clumping:{formatted_exposure.shape}")
    conv = ro.default_converter + pandas2ri.converter
    with localconverter(conv):
        formatted_exposure = tsmr.clump_data(
            formatted_exposure,
        )
    logger.debug(f"shape of exposure df after clumping:{formatted_exposure.shape}")
    return formatted_exposure

optionally_clump_exposure_data_no_conversion

optionally_clump_exposure_data_no_conversion(
    formatted_exposure: DataFrame,
    clump_options: ClumpOptions | None,
    tsmr: RPackageType,
) -> RDataFrame
Source code in mecfs_bio/build_system/task/two_sample_mr_task.py
def optionally_clump_exposure_data_no_conversion(
    formatted_exposure: RDataFrame,
    clump_options: ClumpOptions | None,
    tsmr: RPackageType,
) -> RDataFrame:
    if clump_options is None:
        return formatted_exposure
    logger.debug("clumping")
    formatted_exposure = tsmr.clump_data(
        formatted_exposure,
    )

    logger.debug(" done clumping")

    return formatted_exposure

pre_filter_outcome_variants

pre_filter_outcome_variants(
    exposure_df: DataFrame,
    outcome_df: DataFrame,
    config: TwoSampleMRConfig,
) -> pd.DataFrame
Source code in mecfs_bio/build_system/task/two_sample_mr_task.py
def pre_filter_outcome_variants(
    exposure_df: pd.DataFrame, outcome_df: pd.DataFrame, config: TwoSampleMRConfig
) -> pd.DataFrame:
    if not config.pre_filter_outcome_variants:
        return outcome_df
    logger.debug("filtering outcome variants...")
    return outcome_df.loc[outcome_df[TSM_RSID_COL].isin(exposure_df[TSM_RSID_COL])]

run_tsmr_on_formatted_data

run_tsmr_on_formatted_data(
    formatted_exposure: DataFrame,
    formatted_outcome: DataFrame,
    config: TwoSampleMRConfig,
    tsmr: RPackageType,
) -> TwoSampleMRResult
Source code in mecfs_bio/build_system/task/two_sample_mr_task.py
def run_tsmr_on_formatted_data(
    formatted_exposure: pd.DataFrame,
    formatted_outcome: pd.DataFrame,
    config: TwoSampleMRConfig,
    tsmr: RPackageType,
) -> TwoSampleMRResult:
    conv = ro.default_converter + pandas2ri.converter

    with localconverter(conv):
        if config.clump_exposure_data:
            logger.debug("Clumping exposure data...")
            formatted_exposure = tsmr.clump_data(formatted_exposure)
        logger.debug("Harmonizing outcome and exposure...")
        harmonized = tsmr.harmonise_data(formatted_exposure, formatted_outcome)
        return run_tsmr_on_harmonized_data(harmonized=harmonized, tsmr=tsmr)

run_tsmr_on_harmonized_data

run_tsmr_on_harmonized_data(
    harmonized: DataFrame, tsmr: RPackageType
) -> TwoSampleMRResult
Source code in mecfs_bio/build_system/task/two_sample_mr_task.py
def run_tsmr_on_harmonized_data(
    harmonized: pd.DataFrame,
    tsmr: RPackageType,
) -> TwoSampleMRResult:
    conv = ro.default_converter + pandas2ri.converter
    with localconverter(conv):
        logger.debug("performing Mendelian randomization...")
        output = tsmr.mr(harmonized)
        return TwoSampleMRResult(output)

run_tsmr_on_harmonized_data_no_conversion

run_tsmr_on_harmonized_data_no_conversion(
    harmonized: DataFrame,
    tsmr: RPackageType,
    method_list: list[str] | None = None,
) -> RDataFrame
Source code in mecfs_bio/build_system/task/two_sample_mr_task.py
def run_tsmr_on_harmonized_data_no_conversion(
    harmonized: RDataFrame,
    tsmr: RPackageType,
    method_list: list[str] | None = None,
) -> RDataFrame:
    logger.debug("performing Mendelian randomization...")
    if method_list is None:
        method_dict = {}
    else:
        method_dict = {"method_list": method_list}
    output = tsmr.mr(harmonized, **method_dict)
    return output

run_two_sample_mr

run_two_sample_mr(
    exposure_df: DataFrame,
    outcome_df: DataFrame,
    config: TwoSampleMRConfig,
) -> TwoSampleMRResult
Source code in mecfs_bio/build_system/task/two_sample_mr_task.py
def run_two_sample_mr(
    exposure_df: pd.DataFrame,
    outcome_df: pd.DataFrame,
    config: TwoSampleMRConfig,
) -> TwoSampleMRResult:
    tsmr = importr("TwoSampleMR")
    # base = importr('base')
    formatted_exposure, formatted_outcome = format_data(
        exposure_df, outcome_df, tsmr=tsmr
    )
    return run_tsmr_on_formatted_data(
        formatted_exposure, formatted_outcome, config, tsmr=tsmr
    )

steiger_filtering_write_output

steiger_filtering_write_output(
    harmonized: DataFrame,
    options: SteigerFilteringOptions | None,
    scratch_dir: Path,
    tsmr: RPackageType,
) -> RDataFrame
Source code in mecfs_bio/build_system/task/two_sample_mr_task.py
def steiger_filtering_write_output(
    harmonized: RDataFrame,
    options: SteigerFilteringOptions | None,
    scratch_dir: Path,
    tsmr: RPackageType,
) -> RDataFrame:
    if options is None:
        return harmonized
    logger.debug("Performing steiger filtering")
    filtered = tsmr.steiger_filtering(harmonized)
    conv = ro.default_converter + pandas2ri.converter
    with localconverter(conv):
        py_filtered = ro.conversion.get_conversion().rpy2py(filtered)
        assert not py_filtered[TSM_OUTPUT_STEIGER_P_COL].isnull().any()
        prerows = len(py_filtered)
        logger.debug(f"Pre steiger filtering df has {prerows} rows")
        py_filtered.to_csv(scratch_dir / STEIGER_RESULT_PATH, index=False)
        if options.drop_failures:
            py_filtered = py_filtered.loc[py_filtered[TSM_OUTPUT_STEIGER_DIR_COL]]
            if options.p_value_thresh is not None:
                py_filtered = py_filtered.loc[
                    py_filtered[TSM_OUTPUT_STEIGER_P_COL] <= options.p_value_thresh
                ]
            harmonized = ro.conversion.get_conversion().py2rpy(py_filtered)

        logger.debug(
            f"Post steiger filtering df has {len(py_filtered)} rows, after dropping {prerows - len(py_filtered)}"
        )
        return harmonized