Skip to content

mecfs_bio.build_system.task.mixer.mixer_task

Core task for fitting the MiXeR Gaussian mixture model to GWAS data

Classes:

Functions:

Attributes:

CONTAINER_REF_DIR module-attribute

CONTAINER_REF_DIR = Path('/ref_data')

MIXER_CHROM_COL module-attribute

MIXER_CHROM_COL = 'CHR'

MIXER_EFFECTIVE_SAMPLE_SIZE module-attribute

MIXER_EFFECTIVE_SAMPLE_SIZE = 'N'

MIXER_EFFECT_ALLELE_COL module-attribute

MIXER_EFFECT_ALLELE_COL = 'EffectAllele'

MIXER_FIT_JSON_PATTERN module-attribute

MIXER_FIT_JSON_PATTERN = 'trait1.fit.@.json'

MIXER_NON_EFFECT_ALLELE_COL module-attribute

MIXER_NON_EFFECT_ALLELE_COL = 'OtherAllele'

MIXER_POS_COL module-attribute

MIXER_POS_COL = 'POS'

MIXER_RSID_COL module-attribute

MIXER_RSID_COL = 'RSID'

MIXER_TEST_JSON_PATTERN module-attribute

MIXER_TEST_JSON_PATTERN = 'trait1.test.@.json'

MIXER_Z_SCORE_COL module-attribute

MIXER_Z_SCORE_COL = 'Z'

MixerMode module-attribute

MixerMode = BivariateMode | UnivariateMode

logger module-attribute

logger = get_logger()

BivariateMode

Attributes:

trait_2_source instance-attribute

trait_2_source: MixerDataSource

MixerDataSource

A source for data for use in Mixer. The task should provide a dataframe in gwaslab format, which will be converted to MiXeR format (column renaming + Z = BETA/SE computation).

Attributes:

alias instance-attribute

alias: str

asset_id property

asset_id: AssetId

pipe class-attribute instance-attribute

pipe: DataProcessingPipe = IdentityPipe()

sample_info instance-attribute

sample_info: PhenotypeInfo

task instance-attribute

task: Task

MixerLDGenerationTask

Bases: Task

Implemented by Claude to facilitate testing. Generates .ld files from PLINK .bed/.bim/.fam files using mixer ld command. Copies all source files plus generated .ld files to the output directory.

Methods:

Attributes:

bfile_prefix_pattern class-attribute instance-attribute

bfile_prefix_pattern: str = 'g1000_eur_hm3_chr{chr}'

chromosomes instance-attribute

chromosomes: tuple[int, ...]

deps property

deps: list[Task]

ld_window_kb class-attribute instance-attribute

ld_window_kb: str = '10000'

ldscore_r2min class-attribute instance-attribute

ldscore_r2min: str = '0.01'

meta property

meta: Meta
plink_data_task: Task

r2min class-attribute instance-attribute

r2min: str = '0.05'

execute

execute(scratch_dir: Path, fetch: Fetch, wf: WF) -> Asset
Source code in mecfs_bio/build_system/task/mixer/mixer_task.py
def execute(self, scratch_dir: Path, fetch: Fetch, wf: WF) -> Asset:
    src_asset = fetch(self.plink_data_task.asset_id)
    assert isinstance(src_asset, DirectoryAsset)

    # Copy all source files to scratch_dir
    for f in src_asset.path.iterdir():
        if f.is_file():
            shutil.copy2(str(f), str(scratch_dir / f.name))

    # Generate .ld files using mixer ld, mounting source dir in Docker
    src_mounts = {src_asset.path.resolve(): CONTAINER_REF_DIR}
    with tempfile.TemporaryDirectory(dir=os.getcwd()) as tmpdir:
        tmp_path = Path(tmpdir).relative_to(os.getcwd())
        for chri in self.chromosomes:
            bfile_prefix = self.bfile_prefix_pattern.format(chr=chri)
            ld_out = str(tmp_path / f"{bfile_prefix}.ld")
            invoke_mixer(
                [
                    "ld",
                    "--bfile",
                    str(CONTAINER_REF_DIR / bfile_prefix),
                    "--r2min",
                    self.r2min,
                    "--ldscore-r2min",
                    self.ldscore_r2min,
                    "--out",
                    ld_out,
                    "--ld-window-kb",
                    self.ld_window_kb,
                ],
                extra_mounts=src_mounts,
            )
            ld_file = Path(ld_out)
            assert ld_file.is_file(), f"Expected LD file not generated: {ld_file}"
            shutil.copy2(str(ld_file), str(scratch_dir / ld_file.name))

    return DirectoryAsset(scratch_dir)

MixerTask

Bases: Task

Core task to fit the MiXeR Gaussian mixture model to GWAS data

See: Holland, Dominic, et al. "Beyond SNP heritability: Polygenicity and discoverability of phenotypes estimated with a univariate Gaussian mixture model." PLoS Genetics 16.5 (2020): e1008612.

The MiXeR software is distributed via Docker image. Before running MixerTask, verify that you have installed Docker and added yourself to the Docker user group.

The MiXeR authors have split up the genetic variants in their reference panel into 20 random subsets. The recommended MiXeR workflow is to run MiXeR on your GWAS data using each of these 20 random subsets, then combine the results. Specify which of these random subsets to run using the reps_to_perform attribute.

Methods:

Attributes:

bim_file_pattern class-attribute instance-attribute

bim_file_pattern: str = (
    "1000G_EUR_Phase3_plink/1000G.EUR.QC.@.bim"
)

chr_to_use_arg class-attribute instance-attribute

chr_to_use_arg: str | None = None

deps property

deps: list[Task]

extra_args class-attribute instance-attribute

extra_args: Sequence[str] = tuple()

extract_file_pattern_gen instance-attribute

extract_file_pattern_gen: Callable[[int], str] | None

ld_file_pattern class-attribute instance-attribute

ld_file_pattern: str = (
    "1000G_EUR_Phase3_plink/1000G.EUR.QC.@.run4.ld"
)

meta property

meta: Meta

reference_data_directory_task instance-attribute

reference_data_directory_task: Task

reps_to_perform class-attribute instance-attribute

reps_to_perform: Sequence[int] = tuple(range(1, 21))

threads class-attribute instance-attribute

threads: int = 4

trait_1_source instance-attribute

trait_1_source: (
    MixerDataSource | PreformattedMixerDataSource
)

create classmethod

create(
    asset_id: str,
    trait_1_source: MixerDataSource
    | PreformattedMixerDataSource,
    ref_data_directory_task: Task,
    extra_args: Sequence[str] = tuple(),
    ld_file_pattern: str = "1000G_EUR_Phase3_plink/1000G.EUR.QC.@.run4.ld",
    bim_file_pattern: str = "1000G_EUR_Phase3_plink/1000G.EUR.QC.@.bim",
    extract_file_pattern_gen: Callable[[int], str]
    | None = default_mixer_extract_file_pattern_gen,
    threads: int = 4,
    reps_to_perform: Sequence[int] = tuple(range(1, 21)),
    chr_args: str | None = None,
)
Source code in mecfs_bio/build_system/task/mixer/mixer_task.py
@classmethod
def create(
    cls,
    asset_id: str,
    trait_1_source: MixerDataSource | PreformattedMixerDataSource,
    ref_data_directory_task: Task,
    extra_args: Sequence[str] = tuple(),
    ld_file_pattern: str = "1000G_EUR_Phase3_plink/1000G.EUR.QC.@.run4.ld",
    bim_file_pattern: str = "1000G_EUR_Phase3_plink/1000G.EUR.QC.@.bim",
    extract_file_pattern_gen: Callable[[int], str]
    | None = default_mixer_extract_file_pattern_gen,
    threads: int = 4,
    reps_to_perform: Sequence[int] = tuple(range(1, 21)),
    chr_args: str | None = None,
):
    source_meta = trait_1_source.task.meta
    meta: Meta
    if isinstance(source_meta, FilteredGWASDataMeta):
        meta = ResultDirectoryMeta(
            id=asset_id,
            trait=source_meta.trait,
            project=source_meta.project,
            sub_dir=PurePath("analysis") / "mixer",
        )
    elif isinstance(source_meta, SimpleDirectoryMeta):
        meta = SimpleDirectoryMeta(
            id=AssetId(asset_id),
        )
    else:
        raise ValueError(f"Unknown meta {source_meta}")
    return cls(
        meta=meta,
        trait_1_source=trait_1_source,
        reference_data_directory_task=ref_data_directory_task,
        ld_file_pattern=ld_file_pattern,
        bim_file_pattern=bim_file_pattern,
        extract_file_pattern_gen=extract_file_pattern_gen,
        threads=threads,
        reps_to_perform=reps_to_perform,
        chr_to_use_arg=chr_args,
        extra_args=extra_args,
    )

execute

execute(scratch_dir: Path, fetch: Fetch, wf: WF) -> Asset
Source code in mecfs_bio/build_system/task/mixer/mixer_task.py
def execute(self, scratch_dir: Path, fetch: Fetch, wf: WF) -> Asset:
    reference_dir_asset = fetch(self.reference_data_directory_task.asset_id)
    assert isinstance(reference_dir_asset, DirectoryAsset)
    ref_mounts = {reference_dir_asset.path.resolve(): CONTAINER_REF_DIR}
    with tempfile.TemporaryDirectory(dir=os.getcwd()) as tmpdir:
        tmp_path = Path(tmpdir).relative_to(os.getcwd())
        trait1_path = prepare_mixer_trait_input_file(
            source=self.trait_1_source,
            fetch=fetch,
            temp_dir=tmp_path,
        )
        assert trait1_path.is_file()

        common_args = [
            "--ld-file",
            str(CONTAINER_REF_DIR / self.ld_file_pattern),
            "--bim-file",
            str(CONTAINER_REF_DIR / self.bim_file_pattern),
            "--threads",
            str(self.threads),
        ]

        for rep in tqdm(self.reps_to_perform):
            extract_args = get_mixer_extract_args(
                extract_file_pattern_gen=self.extract_file_pattern_gen,
                rep=rep,
                reference_dir_path=reference_dir_asset.path,
            )
            chr_args = (
                ["--chr2use", self.chr_to_use_arg]
                if self.chr_to_use_arg is not None
                else []
            )
            fit1_trait1_out_path_prefix = str(tmp_path / f"trait1.fit.{rep}")
            invoke_mixer(
                ["fit1"]
                + common_args
                + extract_args
                + chr_args
                + list(self.extra_args)
                + [
                    "--trait1-file",
                    str(trait1_path),
                    "--out",
                    str(fit1_trait1_out_path_prefix),
                ],
                extra_mounts=ref_mounts,
            )

            test1_out_path_prefix = str(tmp_path / f"trait1.test.{rep}")
            invoke_mixer(
                ["test1"]
                + common_args
                + extract_args
                + chr_args
                + [
                    "--trait1-file",
                    str(trait1_path),
                    "--load-params",
                    fit1_trait1_out_path_prefix + ".json",
                    "--out",
                    test1_out_path_prefix,
                ],
                extra_mounts=ref_mounts,
            )
            Path(test1_out_path_prefix + ".json").rename(
                scratch_dir / f"trait1.test.{rep}.json"
            )
            Path(test1_out_path_prefix + ".log").rename(
                scratch_dir / f"trait1.test.{rep}.log"
            )

            Path(fit1_trait1_out_path_prefix + ".json").rename(
                scratch_dir / f"{_get_fit_filename_prefix(rep)}.json"
            )
            Path(fit1_trait1_out_path_prefix + ".log").rename(
                scratch_dir / f"{_get_fit_filename_prefix(rep)}.log"
            )

        return DirectoryAsset(scratch_dir)

PreformattedMixerDataSource

A source for data that is already in MiXeR sumstats format (RSID, CHR, POS, EffectAllele, OtherAllele, Z, N). No gwaslab-to-mixer column conversion is performed. The task should provide a DirectoryAsset or FileAsset containing the sumstats file.

Attributes:

alias instance-attribute

alias: str

asset_id property

asset_id: AssetId

filename instance-attribute

filename: str

task instance-attribute

task: Task

UnivariateMode

default_mixer_extract_file_pattern_gen

default_mixer_extract_file_pattern_gen(rep: int) -> str
Source code in mecfs_bio/build_system/task/mixer/mixer_task.py
def default_mixer_extract_file_pattern_gen(rep: int) -> str:
    return (
        f"1000G_EUR_Phase3_plink/1000G.EUR.QC.prune_maf0p05_rand2M_r2p8.rep{rep}.snps"
    )

get_mixer_extract_args

get_mixer_extract_args(
    extract_file_pattern_gen: Callable[[int], str] | None,
    rep: int,
    reference_dir_path: Path,
) -> list[str]
Source code in mecfs_bio/build_system/task/mixer/mixer_task.py
def get_mixer_extract_args(
    extract_file_pattern_gen: Callable[[int], str] | None,
    rep: int,
    reference_dir_path: Path,
) -> list[str]:
    if extract_file_pattern_gen is not None:
        extract_file = reference_dir_path / extract_file_pattern_gen(rep)
        assert extract_file.is_file()
        extract_args = [
            "--extract",
            str(CONTAINER_REF_DIR / extract_file_pattern_gen(rep)),
        ]
        return extract_args
    return []

prepare_mixer_trait_input_file

prepare_mixer_trait_input_file(
    source: MixerDataSource | PreformattedMixerDataSource,
    fetch: Fetch,
    temp_dir: Path,
) -> Path

Prepare a trait sumstats file in the temp dir, ready for MiXeR.

Source code in mecfs_bio/build_system/task/mixer/mixer_task.py
def prepare_mixer_trait_input_file(
    source: MixerDataSource | PreformattedMixerDataSource,
    fetch: Fetch,
    temp_dir: Path,
) -> Path:
    """Prepare a trait sumstats file in the temp dir, ready for MiXeR."""
    if isinstance(source, PreformattedMixerDataSource):
        source_asset = fetch(source.task.asset_id)
        if isinstance(source_asset, DirectoryAsset):
            source_file = source_asset.path / source.filename
        elif isinstance(source_asset, FileAsset):
            source_file = source_asset.path
        else:
            raise ValueError(f"Unexpected asset type: {type(source_asset)}")
        assert source_file.is_file(), f"Source file not found: {source_file}"
        dest = temp_dir / source.filename
        shutil.copy(str(source_file), str(dest))
        return dest
    elif isinstance(source, MixerDataSource):
        return _prep_summary_statistics_for_mixer(
            sumstats_dataframe_task=source.task,
            fetch=fetch,
            pipe=source.pipe,
            phenotype=source.sample_info,
            name=source.alias,
            temp_dir=temp_dir,
        )
    else:
        raise ValueError(f"Unexpected source type: {type(source)}")