Source code for mepylome.dtypes.beads

"""Contains classes and function for processing Illumina methylation arrays.

It includes methods for extracting methylation information, various
preprocessing techniques, normalization, and data handling.
"""

import collections
import os
import pickle
from collections.abc import Iterator, Sequence
from functools import reduce
from pathlib import Path
from typing import (
    Any,
    Literal,
)
from uuid import uuid4

import numpy as np
import pandas as pd
from tqdm import tqdm

from mepylome.dtypes.arrays import ArrayType
from mepylome.dtypes.cache import cache_key, input_args_id, memoize
from mepylome.dtypes.idat import IdatParser
from mepylome.dtypes.manifests import Manifest
from mepylome.dtypes.probes import Channel, ProbeType
from mepylome.utils.varia import MEPYLOME_CACHE_DIR, normexp_get_xs

ENDING_GRN = "_Grn.idat"
ENDING_RED = "_Red.idat"
ENDING_GZ = ".gz"
ENDING_SUFFIXES = ("_Grn.idat", "_Red.idat", "_Grn.idat.gz", "_Red.idat.gz")

NEUTRAL_BETA = 0.5
NEUTRAL_M_VALUE = 0.0

PrepType = Literal["raw", "illumina", "swan", "noob"]


[docs] def is_valid_idat_basepath( basepath: str | Path | Sequence[str | Path], ) -> bool: """Checks if the given basepath(s) point to valid IDAT files.""" if isinstance(basepath, str | Path): basepaths = [str(basepath)] else: basepaths = [str(x) for x in basepath] return all( ( os.path.exists(x + ENDING_GRN) or os.path.exists(x + ENDING_GRN + ENDING_GZ) ) and ( os.path.exists(x + ENDING_RED) or os.path.exists(x + ENDING_RED + ENDING_GZ) ) for x in basepaths )
[docs] def idat_basepaths( files: str | Path | Sequence[str | Path], only_valid: bool = False, ) -> list[Path]: """Returns unique basepaths from IDAT files or directory. This function processes a list of IDAT files or a directory containing IDAT files and returns their basepaths by removing the file endings. The function ensures that there are no duplicate basepaths in the returned list and maintains the order of the files as they appear in the input. Args: files (path or list): A file or directory path or a list of file paths. only_valid (bool): If True, only returns basepaths that point to valid IDAT file pairs. Defaults is 'False'. Returns: list: A list of unique basepaths corresponding to the provided IDAT files. If a directory is provided, all IDAT files are recursively considered. Example: >>> idat_basepaths("/path/to/dir") [PosixPath('/path/to/dir/file1'), PosixPath('/path/to/dir/file2')] >>> idat_basepaths(["/path1/file1_Grn.idat", "/path2/file2_Red.idat"]) [PosixPath('/path1/file1'), PosixPath('/path2/file2')] >>> idat_basepaths("/path/to/idat/file_Grn.idat.gz") [PosixPath('/path/to/idat/file')] """ def get_idat_files(file_or_dir: str | Path) -> Iterator[str]: path = os.path.expanduser(file_or_dir) # If path is dir take all files in it if os.path.isdir(path): for dirpath, dirnames, filenames in os.walk( path, followlinks=True ): dirnames.sort() filenames.sort() for filename in filenames: if filename.endswith(ENDING_SUFFIXES): yield os.path.join(dirpath, filename) else: yield path def strip_suffix(file_path: str) -> str: for suffix in ENDING_SUFFIXES: if file_path.endswith(suffix): return file_path[: -len(suffix)] return file_path if isinstance(files, str | Path): files = [files] _files = [ strip_suffix(idat_file) for file_or_dir in files for idat_file in get_idat_files(file_or_dir) ] # Remove duplicates, keep ordering unique_basepaths_dict = dict.fromkeys(_files) if only_valid: return [ Path(base) for base in unique_basepaths_dict if is_valid_idat_basepath(base) ] return [Path(base) for base in unique_basepaths_dict]
[docs] def idat_paths_from_basenames( basenames: Sequence[str | Path], ) -> tuple[np.ndarray, np.ndarray]: """Returns paths to green and red IDAT files. Args: basenames (list): List of basepaths for IDAT files. Returns: tuple: Paths to green and red IDAT files. Raises: FileNotFoundError: If any IDAT file is not found. """ grn_idat_files = np.array( [Path(str(name) + ENDING_GRN) for name in basenames] ) red_idat_files = np.array( [Path(str(name) + ENDING_RED) for name in basenames] ) def check_and_fix(files: np.ndarray) -> Path | None: not_existing = [i for i, path in enumerate(files) if not path.exists()] files[not_existing] = [ x.parent / (x.name + ENDING_GZ) for x in files[not_existing] ] return next((x for x in files[not_existing] if not x.exists()), None) not_found = check_and_fix(grn_idat_files) not_found = ( check_and_fix(red_idat_files) if not_found is None else not_found ) if not_found is not None: idat_file = str(not_found).replace(ENDING_GZ, "") msg = f"IDAT file not found: {idat_file}." raise FileNotFoundError(msg) return grn_idat_files, red_idat_files
[docs] class RawData: """Represents raw intensity data extracted from IDAT files. This class initializes with a list of basepaths to IDAT files and parses them to extract raw intensity data from the green and red channels. Args: basenames (list): List of basepaths to IDAT files. manifest (Manifest, optional): The manifest associated with the array. If not provided, it will be determined from the probe count. Attributes: array_type (str): Type of Illumina array. probes (list): List of probe names corresponding to the IDAT files. ids (array): Array of probe IDs. _grn (array): Array of raw intensity values from the green channel. _red (array): Array of raw intensity values from the red channel. Example: >>> idat_basepath0 = directory_path / "200925700125_R07C01" >>> idat_basepath1 = directory_path / "200925700133_R02C01_Grn.idat" >>> raw_data = RawData(idat_basepath0) >>> raw_data = RawData([idat_basepath0, idat_basepath1]) """ array_type: ArrayType ids: np.ndarray manifest: Manifest probes: list[str] _grn: np.ndarray _grn_df: pd.DataFrame | None _red: np.ndarray _red_df: pd.DataFrame | None def __init__( self, basenames: str | Path | Sequence[str | Path], *, manifest: Manifest | None = None, ) -> None: _basenames = idat_basepaths(basenames) self.probes = [path.name.replace(ENDING_GZ, "") for path in _basenames] grn_idat_files, red_idat_files = idat_paths_from_basenames(_basenames) grn_idat = [ IdatParser(str(filepath), intensity_only=True) for filepath in grn_idat_files ] red_idat = [ IdatParser(str(filepath), intensity_only=True) for filepath in red_idat_files ] array_types = [ ArrayType.from_probe_count(len(idat.illumina_ids)) for idat in grn_idat + red_idat ] if len(set(array_types)) != 1: msg = "Array types must all be the same." raise ValueError(msg) self.array_type = array_types[0] self.manifest = ( Manifest(self.array_type) if manifest is None else manifest ) all_illumina_ids = [idat.illumina_ids for idat in grn_idat + red_idat] if all( np.array_equal(all_illumina_ids[0], arr) for arr in all_illumina_ids ): self.ids = all_illumina_ids[0] self._grn = np.array([idat.probe_means for idat in grn_idat]) self._red = np.array([idat.probe_means for idat in red_idat]) else: self.ids = reduce( np.intersect1d, [idat.illumina_ids for idat in grn_idat] ) self._grn = np.array( [ idat.probe_means[ np.isin( idat.illumina_ids, self.ids, assume_unique=True ) ] for idat in grn_idat ] ) self._red = np.array( [ idat.probe_means[ np.isin( idat.illumina_ids, self.ids, assume_unique=True ) ] for idat in red_idat ] ) self._grn_df = None self._red_df = None @property def grn(self) -> pd.DataFrame: """DataFrame: Green channel raw intensity indexed by probe IDs.""" if self._grn_df is None: self._grn_df = pd.DataFrame( self._grn.T, index=self.ids, columns=self.probes, dtype="int32" ) return self._grn_df @property def red(self) -> pd.DataFrame: """DataFrame: Red channel raw intensity indexed by probe IDs.""" if self._red_df is None: self._red_df = pd.DataFrame( self._red.T, index=self.ids, columns=self.probes, dtype="int32" ) return self._red_df def __repr__(self) -> str: title = "RawData():" lines = [ title + "\n" + "*" * len(title), f"array_type: {self.array_type}", f"manifest: {self.manifest.array_type}", f"probes:\n{self.probes}", f"ids:\n{self.ids}", f"_grn:\n{self._grn}", f"_red:\n{self._red}", f"grn:\n{self.grn}", f"red:\n{self.red}", ] return "\n\n".join(lines)
@memoize def _overlap_indices( left_arr: Sequence | pd.Index, right_arr: Sequence | pd.Index, ) -> tuple[np.ndarray, np.ndarray]: """Compute the indices of overlapping elements between two arrays. This function finds the common elements (indices) between two input arrays and returns their positions in both arrays. It uses pandas Index objects and memoization for performance improvement. Example: >>> left_arr = ['a', 'b', 'c', 'd'] >>> right_arr = ['b', 'c', 'e'] >>> left_idx, right_idx = _overlap_indices(left_arr, right_arr) >>> print(left_idx) [1 2] >>> print(right_idx) [0 1] """ if not isinstance(left_arr, pd.Index): left_arr = pd.Index(left_arr) if not isinstance(right_arr, pd.Index): right_arr = pd.Index(right_arr) common_indices = left_arr.intersection(right_arr) left_index = left_arr.get_indexer(common_indices) right_index = right_arr.get_indexer(common_indices) return left_index, right_index
[docs] class MethylData: """Represents methylated and unmethylated intensity data from RawData. This class provides methods for preprocessing Illumina methylation data and computing beta values from methylated and unmethylated intensities. Args: data (RawData): RawData object containing raw intensity data. file (str): Path to file or dir or list of paths containing raw intensity data. prep (str): Preprocessing method. Options: "illumina", "swan", "noob". seed (int, optional): Seed value used for random number generation in the SWAN preprocessing method. Default is None. Note: If 'data' is not provided, it will attempt to create a RawData object using the specified 'file'. Raises: ValueError: If neither 'data' nor 'file' is provided. ValueError: If 'data' is provided but is not of type 'RawData'. Examples: >>> methyl_data = MethylData(raw_data) >>> methyl_data = MethylData(file=file_path, prep="swan") """ array_type: ArrayType data: RawData ids: np.ndarray manifest: Manifest methyl: np.ndarray methyl_ilmnid: np.ndarray methyl_index: np.ndarray probes: list[str] seed: int | None unmethyl: np.ndarray intensity: pd.DataFrame | None _grn: np.ndarray _grn_df: pd.DataFrame | None _methylated_df: pd.DataFrame | None _red: np.ndarray _red_df: pd.DataFrame | None _unmethylated_df: pd.DataFrame | None def __init__( self, data: RawData | None = None, file: str | Path | Sequence[str | Path] | None = None, prep: PrepType = "illumina", seed: int | None = None, ) -> None: if (data is None) == (file is None): msg = "Exactly one of 'data' or 'file' must be provided." raise ValueError(msg) if data is None: assert file is not None data = RawData(file) elif not isinstance(data, RawData): msg = "'data' is not of type 'RawData'." raise ValueError(msg) self.seed = seed self._grn = data._grn self._red = data._red self.array_type = data.array_type self.ids = data.ids self.manifest = data.manifest self.probes = data.probes self.data = data self.intensity = None self._grn_df = None self._red_df = None self._methylated_df = None self._unmethylated_df = None if prep == "illumina": self.preprocess_illumina() elif prep == "swan": self.preprocess_swan() elif prep == "noob": self.preprocess_noob() elif prep == "raw": self.preprocess_raw() else: msg = f"invalid 'prep' value {prep}" raise ValueError(msg) @property def grn(self) -> pd.DataFrame: """DataFrame: Normalized green intensity by probe ID.""" if self._grn_df is None: self._grn_df = pd.DataFrame( self._grn.T, index=self.ids, columns=self.probes, dtype="float32", ) return self._grn_df @property def red(self) -> pd.DataFrame: """DataFrame: Normalized red intensity by probe ID.""" if self._red_df is None: self._red_df = pd.DataFrame( self._red.T, index=self.ids, columns=self.probes, dtype="float32", ) return self._red_df @property def methylated(self) -> pd.DataFrame: """DataFrame: Methylated intensity values indexed by IlmnID.""" if self._methylated_df is None: self._methylated_df = pd.DataFrame( self.methyl.T, index=self.methyl_ilmnid, columns=self.probes, dtype="float32", ) self._methylated_df.index.name = "IlmnID" return self._methylated_df @property def unmethylated(self) -> pd.DataFrame: """DataFrame: Unmethylated intensity values indexed by IlmnID.""" if self._unmethylated_df is None: self._unmethylated_df = pd.DataFrame( self.unmethyl.T, index=self.methyl_ilmnid, columns=self.probes, dtype="float32", ) self._unmethylated_df.index.name = "IlmnID" return self._unmethylated_df
[docs] def preprocess_illumina(self) -> None: """Performs preprocessing usings Illuminas method. This function implements preprocessing for Illumina methylation microarrays as used in Genome Studio, the standard software provided by Illumina. Details: This implementation is adapted from 'minfi'. """ self._methylated_df = None self._unmethylated_df = None ci = MethylData._cached_indices(self.manifest, self.ids, "illumina") self._illumina_control_normalization(ci=ci) self._illumina_bg_correction(ci) self._preprocess_raw(ci)
def _illumina_control_normalization( self, ci: dict, reference: int = 0, ) -> None: """Performs normalization using control probes.""" grn_average = np.mean( self._grn[:, ci["ids_cg_cont"]], axis=1, ) red_average = np.mean( self._red[:, ci["ids_at_cont"]], axis=1, ) ref = (grn_average + red_average)[reference] / 2 grn_factor = ref / grn_average red_factor = ref / red_average self._grn = grn_factor[:, np.newaxis] * self._grn self._red = red_factor[:, np.newaxis] * self._red def _illumina_bg_correction(self, ci: dict) -> None: """Performs background normalization using negative control probes.""" if len(ci["ids_ng_cont"]) <= 30: return grn_bg = np.partition(self._grn[:, ci["ids_ng_cont"]], 30)[:, 30] red_bg = np.partition(self._red[:, ci["ids_ng_cont"]], 30)[:, 30] # Subtract and threshold at zero, using in-place operations np.subtract(self._grn, grn_bg[:, np.newaxis], out=self._grn) np.maximum(self._grn, 0, out=self._grn) # Subtract and threshold at zero, using in-place operations np.subtract(self._red, red_bg[:, np.newaxis], out=self._red) np.maximum(self._red, 0, out=self._red) def _preprocess_raw_uncached(self) -> None: """Calculates methylated/unmethylated arrays without preprocessing. Converts the Red/Green channel for an Illumina methylation array into methylation signal, without using any normalization. Note: Uncached, slower version of ``preprocess_raw``. """ type_1 = self.manifest.probe_info(ProbeType.ONE) type_2 = self.manifest.probe_info(ProbeType.TWO) type_1_red = type_1[type_1.Color_Channel.values == Channel.RED.value] type_1_grn = type_1[type_1.Color_Channel.values == Channel.GRN.value] man_idx_np = np.sort( np.concatenate( [ type_1.IlmnID.index, type_2.IlmnID.index, ] ) ) self._preprocess_raw_methylated( man_idx_np, type_1_grn, type_1_red, type_2 ) self._preprocess_raw_unmethylated( man_idx_np, type_1_grn, type_1_red, type_2 ) def _preprocess_raw_methylated( self, man_idx_np: np.ndarray, t1_grn: pd.DataFrame, t1_red: pd.DataFrame, t2: pd.DataFrame, ) -> None: """Calculates methylated data frame without preprocessing.""" red = self.red grn = self.grn result = pd.DataFrame( np.nan, index=man_idx_np, columns=red.columns, dtype="float32", ) result.loc[t1_red.index] = red.loc[t1_red["AddressB_ID"].values].values result.loc[t1_grn.index] = grn.loc[t1_grn["AddressB_ID"].values].values result.loc[t2.index] = grn.loc[t2["AddressA_ID"].values].values result["IlmnID"] = self.manifest.data_frame.IlmnID.values[man_idx_np] self._methylated_df = result.set_index("IlmnID") def _preprocess_raw_unmethylated( self, man_idx_np: np.ndarray, t1_grn: pd.DataFrame, t1_red: pd.DataFrame, t2: pd.DataFrame, ) -> None: """Calculates unmethylated data frame without preprocessing.""" red = self.red grn = self.grn result = pd.DataFrame( np.nan, index=man_idx_np, columns=red.columns, dtype="float32", ) result.loc[t1_red.index] = red.loc[t1_red["AddressA_ID"].values].values result.loc[t1_grn.index] = grn.loc[t1_grn["AddressA_ID"].values].values result.loc[t2.index] = red.loc[t2["AddressA_ID"].values].values result["IlmnID"] = self.manifest.data_frame.IlmnID.values[man_idx_np] self._unmethylated_df = result.set_index("IlmnID") @memoize def _cached_indices( manifest: Manifest, illumina_ids: np.ndarray, prep: PrepType = "illumina", ) -> dict[str, np.ndarray]: """Cache the indices required for data processing. Args: manifest (Manifest): Manifest object. illumina_ids (array): Array of Illumina IDs. prep (str): Preprocessing method. Options: "illumina", "noob", "swan", "raw". Returns: dict: Cached indices including probe indices, Illumina IDs indices, and probe type indices. """ type_1 = manifest.probe_info(ProbeType.ONE) type_2 = manifest.probe_info(ProbeType.TWO) type_1_red = type_1[type_1.Color_Channel.values == Channel.RED.value] type_1_grn = type_1[type_1.Color_Channel.values == Channel.GRN.value] idx = pd.Index( np.sort( np.concatenate( [ type_1.IlmnID.index, type_2.IlmnID.index, ] ) ) ) ilmnid = manifest.data_frame.IlmnID.values[idx.values] ids = pd.Index(illumina_ids) ci = {"idx": idx.values, "ilmnid": ilmnid} ci["ids_1_red_a"] = ids.get_indexer(type_1_red["AddressA_ID"]) ci["ids_1_red_b"] = ids.get_indexer(type_1_red["AddressB_ID"]) ci["ids_1_grn_a"] = ids.get_indexer(type_1_grn["AddressA_ID"]) ci["ids_1_grn_b"] = ids.get_indexer(type_1_grn["AddressB_ID"]) ci["ids_2_____a"] = ids.get_indexer(type_2["AddressA_ID"]) ci["idx_1_red__"] = idx.get_indexer(type_1_red.index) ci["idx_1_grn__"] = idx.get_indexer(type_1_grn.index) ci["idx_2______"] = idx.get_indexer(type_2.index) if prep == "illumina": at_controls = manifest.control_address(["NORM_A", "NORM_T"]) ng_controls = manifest.control_address("NEGATIVE") cg_controls = manifest.control_address(["NORM_C", "NORM_G"]) def valid_ids(indices: np.ndarray) -> np.ndarray: return indices[indices != -1] ci["ids_at_cont"] = valid_ids(ids.get_indexer(at_controls)) ci["ids_cg_cont"] = valid_ids(ids.get_indexer(cg_controls)) ci["ids_ng_cont"] = valid_ids(ids.get_indexer(ng_controls)) if prep == "swan": ng_controls = manifest.control_address("NEGATIVE") def valid_ids(indices: np.ndarray) -> np.ndarray: return indices[indices != -1] ci["ids_ng_cont"] = valid_ids(ids.get_indexer(ng_controls)) if prep == "noob": control_probes = manifest.control_data_frame control_probes = control_probes[ control_probes.Address_ID.isin(ids) ].reset_index(drop=True) ci["idx_cg"] = control_probes[ control_probes.Control_Type.isin(["NORM_C", "NORM_G"]) ].index.values ci["idx_at"] = control_probes[ control_probes.Control_Type.isin(["NORM_A", "NORM_T"]) ].index.values ci["ids_ctr"] = ids.get_indexer(control_probes["Address_ID"]) return ci
[docs] def preprocess_raw(self) -> None: """Calculates methylated/unmethylated arrays without preprocessing. Converts the Red/Green channel for an Illumina methylation array into methylation signal, without using any normalization. """ ci = MethylData._cached_indices(self.manifest, self.ids, "raw") self._preprocess_raw(ci)
def _preprocess_raw_old(self, ci: dict) -> None: """Same as _preprocess_raw just slower and cleaner.""" self.methyl = np.full((len(self.probes), len(ci["idx"])), np.nan) self.methyl[:, ci["idx_1_red__"]] = self._red[:, ci["ids_1_red_b"]] self.methyl[:, ci["idx_1_grn__"]] = self._grn[:, ci["ids_1_grn_b"]] self.methyl[:, ci["idx_2______"]] = self._grn[:, ci["ids_2_____a"]] self.unmethyl = np.full((len(self.probes), len(ci["idx"])), np.nan) self.unmethyl[:, ci["idx_1_red__"]] = self._red[:, ci["ids_1_red_a"]] self.unmethyl[:, ci["idx_1_grn__"]] = self._grn[:, ci["ids_1_grn_a"]] self.unmethyl[:, ci["idx_2______"]] = self._red[:, ci["ids_2_____a"]] self.methyl_index = ci["idx"] self.methyl_ilmnid = ci["ilmnid"] def _preprocess_raw(self, ci: dict) -> None: """Internal preprocess logic.""" methyl_shape = (len(self.probes), len(ci["idx"])) self.methyl = np.full(methyl_shape, np.nan) self.unmethyl = np.full(methyl_shape, np.nan) self.methyl[:, ci["idx_1_red__"]] = np.take( self._red, ci["ids_1_red_b"], axis=1 ) self.methyl[:, ci["idx_1_grn__"]] = np.take( self._grn, ci["ids_1_grn_b"], axis=1 ) self.methyl[:, ci["idx_2______"]] = np.take( self._grn, ci["ids_2_____a"], axis=1 ) self.unmethyl[:, ci["idx_1_red__"]] = np.take( self._red, ci["ids_1_red_a"], axis=1 ) self.unmethyl[:, ci["idx_1_grn__"]] = np.take( self._grn, ci["ids_1_grn_a"], axis=1 ) self.unmethyl[:, ci["idx_2______"]] = np.take( self._red, ci["ids_2_____a"], axis=1 ) self.methyl_index = ci["idx"] self.methyl_ilmnid = ci["ilmnid"] def _swan_bg_intensity(self, ci: dict) -> np.ndarray: """Intensity background normalization used for SWAN preprocessing.""" grn_med = np.median( self._grn[:, ci["ids_ng_cont"]], axis=1, ) red_med = np.median( self._red[:, ci["ids_ng_cont"]], axis=1, ) return np.mean([grn_med, red_med], axis=0) @staticmethod def _swan_indices( manifest: Manifest, methyl_index: np.ndarray, seed: int | None = None, ) -> tuple[dict, dict]: rng = np.random.default_rng(seed) all_ncpgs = ( manifest.data_frame[["Probe_Type", "N_CpG"]] .loc[methyl_index] .reset_index(drop=True) ) subset_sizes = all_ncpgs.groupby( ["Probe_Type", "N_CpG"], dropna=False ).size() subset_size = min( subset_sizes.get((probe_type, n_cpg), 0) for probe_type in [ProbeType.ONE, ProbeType.TWO] for n_cpg in [1, 2, 3] ) all_indices = {} random_subset_indices = {} for probe_type in [ProbeType.ONE, ProbeType.TWO]: all_ncpts_type = all_ncpgs[all_ncpgs.Probe_Type == probe_type] all_indices[probe_type] = all_ncpts_type.index.values all_ncpts_type = all_ncpts_type.reset_index(drop=True) indices = [] for ncpgs in range(1, 4): ids = all_ncpts_type.index[all_ncpts_type.N_CpG == ncpgs] ids_subset = rng.permutation(ids)[:subset_size] indices.append(ids_subset) random_subset_indices[probe_type] = np.sort( np.concatenate(indices) ) return all_indices, random_subset_indices
[docs] def preprocess_swan(self) -> None: """Subset-quantile Within Array Normalization (SWAN). Details: The SWAN method has two parts. First, an average quantile distribution is created using a subset of probes defined to be biologically similar based on the number of CpGs underlying the probe body. This is achieved by randomly selecting N Infinium I and II probes that have 1, 2 and 3 underlying CpGs, where N is the minimum number of probes in the 6 sets of Infinium I and II probes with 1, 2 or 3 probe body CpGs. This results in a pool of 3N Infinium I and 3N Infinium II probes. The subset for each probe type is then sorted by increasing intensity. The value of each of the 3N pairs of observations is subsequently assigned to be the mean intensity of the two probe types for that row or “quantile”. This is the standard quantile procedure. The intensities of the remaining probes are then separately adjusted for each probe type using linear interpolation between the subset probes. Implementation adapted from 'minfi' Note: SWAN uses a random subset of probes for between array normalization. To achieve reproducible results, set the seed. References: J Maksimovic, L Gordon and A Oshlack (2012). SWAN: Subset quantile Within-Array Normalization for Illumina Infinium HumanMethylation450 BeadChips. Genome Biology 13, R44. """ self._methylated_df = None self._unmethylated_df = None ci = MethylData._cached_indices(self.manifest, self.ids, "swan") self._preprocess_raw(ci) bg_intensity = self._swan_bg_intensity(ci) all_indices, random_subset_indices = MethylData._swan_indices( self.manifest, self.methyl_index, self.seed ) self.methyl = MethylData._preprocess_swan_main( self.methyl, bg_intensity, all_indices, random_subset_indices ) self.unmethyl = MethylData._preprocess_swan_main( self.unmethyl, bg_intensity, all_indices, random_subset_indices )
@staticmethod def _preprocess_swan_main( intensity: np.ndarray, bg_intensity: np.ndarray, all_indices: dict, random_subset_indices: dict, ) -> np.ndarray: """Main function for preprocess_swan.""" from scipy.stats import rankdata random_subset_one = all_indices[ProbeType.ONE][ random_subset_indices[ProbeType.ONE] ] random_subset_two = all_indices[ProbeType.TWO][ random_subset_indices[ProbeType.TWO] ] sorted_subset_intensity = ( np.sort(intensity[:, random_subset_one], axis=1) + np.sort(intensity[:, random_subset_two], axis=1) ) / 2 swan = np.full(intensity.shape, np.nan) for i in range(len(intensity)): for probe_type in [ProbeType.ONE, ProbeType.TWO]: curr_intensity = intensity[i, all_indices[probe_type]] x = rankdata(curr_intensity) / len(curr_intensity) xp = np.sort(x[random_subset_indices[probe_type]]) fp = sorted_subset_intensity[i, :] interp = np.interp(x=x, xp=xp, fp=fp) intensity_min = np.min( curr_intensity[random_subset_indices[probe_type]] ) intensity_max = np.max( curr_intensity[random_subset_indices[probe_type]] ) interp[x > np.max(xp)] += ( curr_intensity[x > np.max(xp)] - intensity_max ) interp[x < np.min(xp)] += ( curr_intensity[x < np.min(xp)] - intensity_min ) interp[interp <= 0] = bg_intensity[i] swan[i, all_indices[probe_type]] = interp return swan
[docs] def preprocess_noob( self, offset: float = 15, dye_method: str = "single", ) -> None: """The Noob preprocessing method. Description: Noob (normal-exponential out-of-band) is a background correction method with dye-bias normalization. Args: offset (float): An offset for the normexp background correction. dye_method (str): How should dye bias correction be done: "single" for single sample approach, or "reference" for a reference array. References: TJ Triche, DJ Weisenberger, D Van Den Berg, PW Laird and KD Siegmund _Low-level processing of Illumina Infinium DNA Methylation BeadArrays. Nucleic Acids Res (2013) 41, e90. doi:10.1093/nar/gkt090. """ self._methylated_df = None self._unmethylated_df = None ci = MethylData._cached_indices(self.manifest, self.ids, "noob") self._preprocess_raw(ci) grn_oob = np.concatenate( [self._grn[:, ci["ids_1_red_a"]], self._grn[:, ci["ids_1_red_b"]]], axis=1, ) red_oob = np.concatenate( [self._red[:, ci["ids_1_grn_a"]], self._red[:, ci["ids_1_grn_b"]]], axis=1, ) methyl = self.methyl unmethyl = self.unmethyl methyl[methyl <= 0] = 1 unmethyl[unmethyl <= 0] = 1 grn_m = methyl[:, ci["idx_1_grn__"]] grn_u = unmethyl[:, ci["idx_1_grn__"]] grn_2 = methyl[:, ci["idx_2______"]] xf_grn = np.concatenate([grn_m, grn_u, grn_2], axis=1) xs_grn = normexp_get_xs(xf_grn, controls=grn_oob, offset=offset) cumsum = np.cumsum([0, grn_m.shape[1], grn_u.shape[1], grn_2.shape[1]]) slice_grn_m = slice(cumsum[0], cumsum[1]) slice_grn_u = slice(cumsum[1], cumsum[2]) slice_grn_2 = slice(cumsum[2], cumsum[3]) red_m = methyl[:, ci["idx_1_red__"]] red_u = unmethyl[:, ci["idx_1_red__"]] red_2 = unmethyl[:, ci["idx_2______"]] xf_red = np.concatenate([red_m, red_u, red_2], axis=1) xs_red = normexp_get_xs(xf_red, controls=red_oob, offset=offset) cumsum = np.cumsum([0, red_m.shape[1], red_u.shape[1], red_2.shape[1]]) slice_red_m = slice(cumsum[0], cumsum[1]) slice_red_u = slice(cumsum[1], cumsum[2]) slice_red_2 = slice(cumsum[2], cumsum[3]) methyl[:, ci["idx_1_grn__"]] = xs_grn["xs"][:, slice_grn_m] unmethyl[:, ci["idx_1_grn__"]] = xs_grn["xs"][:, slice_grn_u] methyl[:, ci["idx_1_red__"]] = xs_red["xs"][:, slice_red_m] unmethyl[:, ci["idx_1_red__"]] = xs_red["xs"][:, slice_red_u] methyl[:, ci["idx_2______"]] = xs_grn["xs"][:, slice_grn_2] unmethyl[:, ci["idx_2______"]] = xs_red["xs"][:, slice_red_2] # Dye correction grn_control = self._grn[:, ci["ids_ctr"]] red_control = self._red[:, ci["ids_ctr"]] xcs_grn = normexp_get_xs( grn_control, param=xs_grn["param"], offset=offset ) xcs_red = normexp_get_xs( red_control, param=xs_red["param"], offset=offset ) grn_avg = np.mean(xcs_grn["xs"][:, ci["idx_cg"]], axis=1) red_avg = np.mean(xcs_red["xs"][:, ci["idx_at"]], axis=1) red_grn_ratio = red_avg / grn_avg if dye_method == "single": red_factor = 1 / red_grn_ratio grn_factor = np.array([1, 1]) elif dye_method == "reference": ref_idx = np.argmin(np.abs(red_grn_ratio - 1)) ref = (grn_avg + red_avg)[ref_idx] / 2 if np.isnan(ref): msg = "'ref_idx' refers to an array that is not present" raise ValueError(msg) grn_factor = ref / grn_avg red_factor = ref / red_avg else: msg = "dye_method must be 'single' or 'reference'" raise ValueError(msg) red_factor = red_factor.reshape(-1, 1) methyl[:, ci["idx_1_red__"]] *= red_factor unmethyl[:, ci["idx_1_red__"]] *= red_factor unmethyl[:, ci["idx_2______"]] *= red_factor if dye_method == "reference": grn_factor = grn_factor.reshape(-1, 1) methyl[:, ci["idx_1_grn__"]] *= grn_factor unmethyl[:, ci["idx_1_grn__"]] *= grn_factor methyl[:, ci["idx_2______"]] *= grn_factor self.methyl = methyl self.unmethyl = unmethyl
@property def betas(self) -> pd.DataFrame: """Returns beta values.""" betas = self._get_beta(self.methyl, self.unmethyl) return pd.DataFrame( betas.T, columns=self.probes, index=self.methyl_ilmnid )
[docs] def betas_at( self, cpgs: Sequence | np.ndarray | None = None, fill: float = NEUTRAL_BETA, ) -> pd.DataFrame: """Calculates beta values for specified CpG sites. Args: cpgs (array-like): Array of CpG IDs. fill (float): Value to fill for CpGs not found in the used manifest or equal to NaN. Returns: pandas.DataFrame: DataFrame containing beta values for specified CpGs. Nore: If 'cpgs' is None, all CpGs from the used manifest are considered. """ if cpgs is None: cpgs = self.manifest.methylation_probes betas = self._get_beta(self.methyl, self.unmethyl) converted = np.full((len(self.probes), len(cpgs)), fill) left_idx, right_idx = _overlap_indices(cpgs, self.methyl_ilmnid) converted[:, left_idx] = betas[:, right_idx] converted[np.isnan(converted)] = fill return pd.DataFrame(converted.T, columns=self.probes, index=cpgs)
@staticmethod def _get_beta( methylated: np.ndarray, unmethylated: np.ndarray, offset: float = 100, beta_threshold: float = 0, *, min_zero: bool = True, ) -> np.ndarray: if offset < 0: msg = "'offset' must be non-negative" raise ValueError(msg) if not (0 <= beta_threshold <= 0.5): msg = "'beta_threshold' must be between 0 and 0.5" raise ValueError(msg) if min_zero: methylated = np.maximum(methylated, 0) unmethylated = np.maximum(unmethylated, 0) # Ignore division by zero with np.errstate(divide="ignore", invalid="ignore"): betas = methylated / (methylated + unmethylated + offset) if beta_threshold > 0: betas = np.minimum( np.maximum(betas, beta_threshold), 1 - beta_threshold ) return betas @property def mvalues(self) -> pd.DataFrame: """Returns M-values.""" mvals = self._get_m_value(self.methyl, self.unmethyl) return pd.DataFrame( mvals.T, columns=self.probes, index=self.methyl_ilmnid ) def mvalues_at( self, cpgs: Sequence | np.ndarray | None = None, fill: float = NEUTRAL_M_VALUE, ) -> pd.DataFrame: if cpgs is None: cpgs = self.manifest.methylation_probes mvals = self._get_m_value(self.methyl, self.unmethyl) converted = np.full((len(self.probes), len(cpgs)), fill) left_idx, right_idx = _overlap_indices(cpgs, self.methyl_ilmnid) converted[:, left_idx] = mvals[:, right_idx] converted[np.isnan(converted)] = fill return pd.DataFrame(converted.T, columns=self.probes, index=cpgs) @staticmethod def _get_m_value( methylated: np.ndarray, unmethylated: np.ndarray, offset: float = 1.0, *, min_zero: bool = True, ) -> np.ndarray: """Compute M-values: log2((M + offset) / (U + offset)). Args: methylated: methylated intensities unmethylated: unmethylated intensities offset: small constant to prevent log(0) min_zero: clamp negative intensities to zero Returns: np.ndarray of M-values """ if offset < 0: raise ValueError("'offset' must be non-negative") if min_zero: methylated = np.maximum(methylated, 0) unmethylated = np.maximum(unmethylated, 0) # Ignore division by zero with np.errstate(divide="ignore", invalid="ignore"): mvals = np.log2((methylated + offset) / (unmethylated + offset)) return mvals def __repr__(self) -> str: title = "MethylData():" lines = [ title + "\n" + "*" * len(title), f"array_type: {self.array_type}", f"manifest: {self.manifest.array_type}", f"probes:\n{self.probes}", f"_grn:\n{self._grn}", f"_red:\n{self._red}", f"grn:\n{self.grn}", f"red:\n{self.red}", f"methylated:\n{self.methylated}", f"unmethylated:\n{self.unmethylated}", ] if hasattr(self, "intensity"): lines.append(f"intensity:\n{self.intensity}") return "\n\n".join(lines)
[docs] class ReferenceMethylData: """Stores and manages reference cases for different array types. This class categorizes and processes reference IDAT files to create MethylData objects for different array types. It is intended for CNV neutral reference cases used in CNV calculation. Args: file (list): List of file paths to IDAT files or directory containing IDAT files. prep (str): Preprocessing method. Options: "illumina", "swan", "noob". Attributes: _methyl_data (dict): Internal dictionary to cache MethylData objects for each array type. Raises: ValueError: If no reference files are found for the specified array type. Examples: >>> # 'directory' contains 450k, EPIC and EPICv2 idat files >>> reference = ReferenceMethylData(file=directory, prep="illumina") >>> sample_450k = MethylData(file=idat_file_450k) >>> sample_epic = MethylData(file=idat_file_epic) >>> sample_epicv2 = MethylData(file=idat_file_epicv2) >>> # reference can be used for all types >>> cnv_450k = CNV(sample_450k, reference) >>> cnv_epic = CNV(sample_epic, reference) >>> cnv_epicv2 = CNV(sample_epicv2, reference) """ _cache: dict[Any, "ReferenceMethylData"] = {} file: str | Path | Sequence[str | Path] prep: PrepType save_to_disk: bool _methyl_data: dict[ArrayType, MethylData] _cached: bool def __new__( cls, file: str | Path | Sequence[str | Path], prep: PrepType = "illumina", save_to_disk: bool = False, ) -> "ReferenceMethylData": key = cache_key(file, prep) if key in cls._cache: return cls._cache[key] instance = super().__new__(cls) # Cache the instance cls._cache[key] = instance return instance def __getnewargs__( self, ) -> tuple[str | Path | Sequence[str | Path], PrepType, bool]: # Necessary for pickle return self.file, self.prep, self.save_to_disk def __init__( self, file: str | Path | Sequence[str | Path], prep: PrepType = "illumina", save_to_disk: bool = False, ) -> None: # Don't need to initialize if instance is cached. if hasattr(self, "_cached"): return self._cached = True self.file = file self.prep = prep self.save_to_disk = save_to_disk idat_files = idat_basepaths(self.file) # Load data from disk filepath = ReferenceMethylData.pickle_filename(self.prep, idat_files) if self.save_to_disk and filepath.exists(): with filepath.open("rb") as f: saved_instance = pickle.load(f) self.__dict__.update(saved_instance.__dict__) return reference_files = collections.defaultdict(list) self._methyl_data = {} for idat_file in tqdm( idat_files, desc="Categorizing reference IDAT files" ): array_type = ArrayType.from_idat(idat_file) reference_files[array_type].append(idat_file) for array_type, file_list in tqdm( reference_files.items(), desc="Processing reference IDAT files" ): raw_data = RawData(file_list) self._methyl_data[array_type] = MethylData( raw_data, prep=self.prep ) # Save saving to disk tmp_path = filepath.with_suffix(f".{uuid4()}.tmp") if self.save_to_disk: with tmp_path.open("wb") as f: pickle.dump(self, f) tmp_path.replace(filepath) @staticmethod def pickle_filename( prep: PrepType, idat_files: list[Path], ) -> Path: return MEPYLOME_CACHE_DIR / input_args_id( "Ref", prep, sorted(str(x) for x in idat_files) ) def __getitem__(self, array_type: ArrayType) -> MethylData: if array_type not in self._methyl_data: msg = ( f"No copy number neutral reference files found for " f"array type '{array_type.value}'." ) raise ValueError(msg) return self._methyl_data[array_type]