Skip to content

mecfs_bio.asset_generator.annovar_37_basic_rsid_assignment

Asset generator for assigning RSIDS to genome build-37 GWS datasets.

Classes:

  • RSIDAssignmentTaskGroup

    Collection of tasks used to assign RSIDS by joining with an existing dataframe of SNPs

Functions:

  • annovar_37_basic_rsid_assignment

    Asset generator that creates a chain of tasks to assign rsids to existing build 37 sumstats datasets using the annovar dbSNP reference data

RSIDAssignmentTaskGroup

Collection of tasks used to assign RSIDS by joining with an existing dataframe of SNPs

Attributes:

dump_parquet_task instance-attribute

dump_parquet_task: Task

harmonize_task instance-attribute

harmonize_task: Task

join_task instance-attribute

join_task: Task

annovar_37_basic_rsid_assignment

annovar_37_basic_rsid_assignment(
    sumstats_task: Task,
    base_name: str,
    use_gwaslab_rsids_convention: bool = False,
    drop_palindromic_ambiguous: bool = True,
) -> RSIDAssignmentTaskGroup

Asset generator that creates a chain of tasks to assign rsids to existing build 37 sumstats datasets using the annovar dbSNP reference data

Source code in mecfs_bio/asset_generator/annovar_37_basic_rsid_assignment.py
def annovar_37_basic_rsid_assignment(
    sumstats_task: Task,
    base_name: str,
    use_gwaslab_rsids_convention: bool = False,
    drop_palindromic_ambiguous: bool = True,
) -> RSIDAssignmentTaskGroup:
    """
    Asset generator that creates a chain of tasks to assign rsids to existing build 37 sumstats datasets using the annovar dbSNP reference data
    """
    harmonized_task = GWASLabTransformSumstatsTask.create_from_source_task(
        sumstats_task,
        asset_id=base_name + "__harmonized",
        spec=GwasLabTransformSpec(
            harmonize_options=HarmonizationOptions(
                ref_infer=GWASLabVCFRef(name="1kg_eur_hg19", ref_alt_freq="AF"),
                ref_seq="ucsc_genome_hg19",
                check_ref_files=True,
                drop_missing_from_ref_seq=True,
                drop_missing_from_ref_infer_or_ambiguous=drop_palindromic_ambiguous,
                cores=4,
            )
        ),
    )
    dump_parquet_task = GwasLabSumstatsToTableTask.create_from_source_task(
        source_tsk=harmonized_task,
        asset_id=base_name + "_harmonized_dump_to_parquet",
        sub_dir="processed",
    )
    out_pipe: DataProcessingPipe
    if use_gwaslab_rsids_convention:
        out_pipe = RenameColPipe(old_name="rsid", new_name="rsID")
    else:
        out_pipe = IdentityPipe()
    join_with_rsid_task = JoinDataFramesTask.create_from_result_df(
        asset_id=base_name + "_assign_rsids_via_dbsnp150",
        result_df_task=dump_parquet_task,
        reference_df_task=PARQUET_DBSNP150_37_ANNOVAR_PROC_RENAME_UNIQUE_DIRECT_DOWNLOAD,
        left_on=["CHR", "POS", "EA", "NEA"],
        right_on=["int_chrom", "POS", "ALT", "REF"],
        out_format=ParquetOutFormat(),
        how="inner",
        df_1_pipe=CompositePipe(
            [
                CastPipe(
                    target_column="EA",
                    type=narwhals.dtypes.String(),
                    new_col_name="EA",
                ),
                CastPipe(
                    target_column="NEA",
                    type=narwhals.dtypes.String(),
                    new_col_name="NEA",
                ),
            ]
        ),
        backend="ibis",
        out_pipe=out_pipe,
    )
    group = RSIDAssignmentTaskGroup(
        harmonize_task=harmonized_task,
        dump_parquet_task=dump_parquet_task,
        join_task=join_with_rsid_task,
    )
    return group