Source code for mepylome.analysis.methyl_aux

"""Auxiliary methods for the methylation analysis."""

import logging
import pickle
import re
import threading
import warnings
from collections.abc import Iterable, Sequence, Sized
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Any, TypeVar, overload

import numpy as np
import pandas as pd
import plotly.graph_objects as go
import plotly.io as pio
import psutil
from tqdm import tqdm

from mepylome.dtypes.arrays import ArrayType
from mepylome.dtypes.beads import (
    NEUTRAL_BETA,
    MethylData,
    PrepType,
    _overlap_indices,
    idat_basepaths,
)
from mepylome.dtypes.manifests import Manifest
from mepylome.utils import (
    CONFIG,
)
from mepylome.utils.files import ensure_directory_exists
from mepylome.utils.varia import MEPYLOME_CACHE_DIR

logger = logging.getLogger(__name__)

DTYPE = np.float32
INVALID_PATH = Path("None")
TEST_CASE = "Test_Case"
METHYLATION_CLASS = "Methylation_Class"
MLH1_CPGS = CONFIG["genes"]["mlh1_promoter_cpgs"]

T = TypeVar("T", dict, set, list)


class ProgressBar:
    """A thread-safe progress bar.

    Attributes:
        cur_value (int): The current value of the progress bar.
        max_value (int): The maximum value of the progress bar.
        text (str): Optional text to display alongside the progress.
        lock (threading.Lock): A lock to ensure thread safety.
    """

    def __init__(self, max_value: int = 100, text: str = "") -> None:
        self.cur_value = 0
        self.max_value = int(max_value)
        self.text = str(text)
        self.lock = threading.Lock()

    def reset(
        self,
        max_value: int = 100,
        cur_value: int = 0,
        text: str = "",
    ) -> None:
        with self.lock:
            self.cur_value = cur_value
            self.max_value = int(max_value)
            self.text = str(text)

    def increment(self, n: int = 1) -> None:
        with self.lock:
            self.cur_value = min(self.cur_value + n, self.max_value)

    def get_progress(self) -> int:
        with self.lock:
            if self.max_value == 0:
                return 100
            return self.cur_value * 100 // self.max_value

    def get_text(self) -> str:
        with self.lock:
            if self.cur_value == self.max_value:
                out_str = "100 %"
            else:
                out_str = (
                    f"{self.cur_value}/{self.max_value} {self.text}".rstrip()
                )
            return out_str

    def __str__(self) -> str:
        lines = [
            "ProgressBar(",
            f"    cur_value: {self.cur_value}",
            f"    max_value: {self.max_value}",
            f"    progress: {self.get_progress()}",
            ")",
        ]
        return "\n".join(lines)

    def __repr__(self) -> str:
        return str(self)


def read_dataframe(path: str | Path, **kwargs: Any) -> pd.DataFrame:
    """Reads a DataFrame from the specified file path.

    Supports ods, xlsx, xls, csv (comma-separated), csv (column-separated), and
    tsv formats.

    Args:
        path (str): The file path to read the DataFrame from.
        **kwargs: Additional keyword arguments to pass to the underlying pandas
            read function.

    Returns:
        pd.DataFrame: The loaded DataFrame.

    Raises:
        ValueError: If the file format is not supported.
    """
    path = Path(path)
    if path.suffix in [".xlsx", ".xls"]:
        return pd.read_excel(path, **kwargs)
    if path.suffix == ".ods":
        return pd.read_excel(path, engine="odf", **kwargs)
    if path.suffix == ".csv":
        return pd.read_csv(path, sep=",", **kwargs)
    if path.suffix == ".tsv":
        return pd.read_csv(path, sep="\t", **kwargs)
    msg = "Unsupported file format. Supported: ods, xlsx, xls, csv, tsv."
    raise ValueError(msg)


def guess_annotation_file(directory: Path) -> Path:
    """Return the first spreadsheet found (shallowest path first)."""
    logger.info("Searching for annotation file...")
    supported_extensions = {".csv", ".tsv", ".ods", ".xls", ".xlsx"}

    # depth first, then name
    files = sorted(
        (
            f
            for f in directory.rglob("*")
            if f.suffix.lower() in supported_extensions
        ),
        key=lambda f: (len(f.parts), f.name.lower()),
    )

    if files:
        logger.info("Found annotation file: %s", files[0])
        return files[0]

    logger.info("No annotation file found")
    return INVALID_PATH


def extract_sentrix_id(text: Any) -> str:
    """Extracts the Sentrix ID from a given text if found."""
    matches = re.findall(r"\d+_R\d{2}C\d{2}", str(text))
    return matches[-1] if matches else text


@overload
def convert_to_sentrix_ids(data: None) -> None: ...


@overload
def convert_to_sentrix_ids(data: set) -> set: ...


@overload
def convert_to_sentrix_ids(data: dict) -> dict: ...


@overload
def convert_to_sentrix_ids(data: list) -> list: ...


@overload
def convert_to_sentrix_ids(data: Iterable[str]) -> list[str]: ...


def convert_to_sentrix_ids(
    data: set | dict | list | Iterable | None,
) -> set | dict | list | None:
    """Tries to convert every ID in 'data' to a Sentrix ID."""
    if data is None:
        return None
    if isinstance(data, dict):
        return {extract_sentrix_id(key): value for key, value in data.items()}
    if isinstance(data, set):
        return {extract_sentrix_id(item) for item in data}
    return [extract_sentrix_id(item) for item in data]


[docs] class IdatHandler: """A class for handling IDAT files with annotation. Includes reading annotation from various file formats and provides description lookups for methylation classes. Args: analysis_dir (str or Path): The directory where the IDAT files are located. annotation (str or Path, optional): The path to the annotation file. Defaults to None. test_dir (Path or None, optional): Directory for test files, including new cases or validation IDAT files or other test cases. Defaults to `None`. overlap (bool, optional): If True, restricts the sample paths to only those present in both the IDAT files and the annotation file. Defaults to False. analysis_ids (list, optional): A list of sample IDs within `analysis_dir`. - If provided, only these samples will be used. - If `None`, all available IDAT files in `analysis_dir` will be used. Defaults to None. **Note:** The IDs may be converted to Sentrix format during initialization if the IDs in the annotation and IDs in analysis_dir do not match directly. test_ids (list, optional): A list of sample IDs within `test_dir`. - If provided, only these samples will be used. - If `None`, all available IDAT files in `test_dir` will be used. Defaults to None. **Note:** The IDs may be converted to Sentrix format during initialization if the IDs in the annotation and IDs in analysis_dir do not match directly. Attributes: analysis_dir (Path): The directory path where the IDAT files are located. test_dir (Path or None, optional): Directory for test files, including new cases or validation IDAT files or other test cases. Defaults to `None`. overlap (bool): A flag indicating whether to restrict sample paths to only those present in both the IDAT files and the annotation file. id_to_path (dict): A dictionary where the keys are sample IDs and the values are the file paths of IDAT files (from both `analysis_dir` and `test_dir`). annotation (Path): The path to the annotation file. Defaults to None. If not provided, the first spreadsheet file found in `self.analysis_dir` will be used as the annotation. annotation_df (pandas.DataFrame or None): A DataFrame containing the annotation data, if loaded. samples_annotated (pandas.DataFrame or None): A DataFrame containing the samples as index and the annotation in the columns. selected_columns (list): A list of selected columns from the annotated samples, initialized with the first column. analysis_ids (list): A list of sample IDs from `analysis_dir` that are actually used after filtering and optional conversion to Sentrix IDs. test_ids (list): A list of sample IDs from `test_dir` that are actually used after filtering and optional conversion to Sentrix IDs. Raises: ValueError: - If any sample in `analysis_ids` is not found in `analysis_dir`. - If any sample in in `test_ids` is not found in `test_dir`. """ analysis_dir: Path analysis_id_to_path: dict[str, Path] analysis_ids: list[str] annotation: Path annotation_df: pd.DataFrame basename_to_id: dict[str, str] id_to_basename: dict[str, str] id_to_path: dict[str, Path] overlap: bool samples_annotated: pd.DataFrame selected_columns: list[str] test_dir: Path | None test_id_to_path: dict[str, Path] test_ids: list[str] _init_parameters: dict[str, Any] _tmp_analysis_ids: Iterable[str] | None _tmp_test_ids: Iterable[str] | None def __init__( self, analysis_dir: str | Path, *, annotation: str | Path | None = None, test_dir: str | Path | None = None, test_ids: Iterable[str] | None = None, overlap: bool = False, analysis_ids: Iterable[str] | None = None, ) -> None: # Initialize paths and attributes self.analysis_dir = Path(analysis_dir) self.annotation = ( Path(annotation) if annotation and Path(annotation).exists() else guess_annotation_file(self.analysis_dir) ) self.test_dir = Path(test_dir) if test_dir else None self.overlap = overlap self._tmp_analysis_ids = analysis_ids self._tmp_test_ids = test_ids # Track initialization parameters self._init_parameters = { "analysis_dir": analysis_dir, "annotation": annotation, "test_dir": test_dir, "overlap": overlap, "test_ids": test_ids, "analysis_ids": analysis_ids, } # Load IDAT paths and annotation data self.analysis_id_to_path = self._get_id_to_path(self.analysis_dir) self.test_id_to_path = self._get_id_to_path(self.test_dir) self.annotation_df = self._read_annotation_file() # Find ID column in annotation, set as index and filter cases id_mismatch, matched_column = self._identify_annotation_index() self._set_annotation_index_and_convert_ids(id_mismatch, matched_column) self._restrict_sample_ids() self._apply_overlap_filter() # Make shure ids are not None self.test_ids = list(self.test_id_to_path.keys()) self.analysis_ids = list(self.analysis_id_to_path.keys()) # Derived attributes self.id_to_path = {**self.analysis_id_to_path, **self.test_id_to_path} self.basename_to_id = {v.name: k for k, v in self.id_to_path.items()} self.id_to_basename = {k: v.name for k, v in self.id_to_path.items()} # Set available annotation for all IDAT files self.samples_annotated = self._get_samples_annotated() self.selected_columns = [self.samples_annotated.columns[0]] # Validation self._warn_on_sample_overlap()
[docs] def init_parameters(self) -> dict[str, Any]: """Returns the initialization attributes.""" return self._init_parameters
def _get_id_to_path( self, directory: str | Path | None, ) -> dict[str, Path]: """Retrieve valid IDAT sample IDs and paths from a directory.""" if not directory or not Path(directory).exists(): return {} return { path.name: path for path in idat_basepaths(directory, only_valid=True) } def _read_annotation_file(self) -> pd.DataFrame: try: return read_dataframe(self.annotation) except (FileNotFoundError, ValueError): logger.info( "Annotation file is missing, invalid or could not be read" ) return pd.DataFrame() def _identify_annotation_index(self) -> tuple[bool, str | None]: """Identify the appropriate annotation column. This method checks the columns of `self.annotation_df` to determine which column contains the annotation indices matching the analysis IDAT files. It first tries the original format, else it tries to extract the Sentrix ID. Returns: tuple: A tuple (is_sentrix, column_name), where: - is_sentrix (bool): True if a matching column is found and matches only when Sentrix IDs are extracted from both. - column_name (str or None): The name of the matching column, or None if no match is found. """ analysis_samples = set(self.analysis_id_to_path.keys()) sentrix_analysis_samples = convert_to_sentrix_ids(analysis_samples) # Compute overlaps regular_overlap = self.annotation_df.isin(analysis_samples) sentrix_df = self.annotation_df.apply(convert_to_sentrix_ids) sentrix_overlap = sentrix_df.isin(sentrix_analysis_samples) # Column-wise match counts regular_hits = regular_overlap.sum(axis=0) sentrix_hits = sentrix_overlap.sum(axis=0) # Find the best column best_regular_col = ( regular_hits.idxmax() if regular_hits.max() > 0 else None ) best_sentrix_col = ( sentrix_hits.idxmax() if sentrix_hits.max() > 0 else None ) best_regular_hits = regular_hits.max() best_sentrix_hits = sentrix_hits.max() if best_sentrix_hits > best_regular_hits: best_col = best_sentrix_col is_sentrix = True n_regular = len(set(self.annotation_df[best_col])) n_sentrix = len(set(sentrix_df[best_col])) if n_regular > n_sentrix: logging.warning( ( "Sentrix ID extraction reduced samples in column '%s' " "of the annotation file from %s to %s. There may be " "duplicates or mismaps!" ), best_col, n_regular, n_sentrix, ) if len(analysis_samples) != len(sentrix_analysis_samples): logging.warning( ( "Sentrix ID extraction reduced samples in directory " "%s from %s to %s. There may be duplicates or mismaps!" ), self.analysis_dir, len(analysis_samples), len(sentrix_analysis_samples), ) else: best_col = best_regular_col is_sentrix = False return is_sentrix, best_col def _set_annotation_index_and_convert_ids( self, id_missmatch: bool, col_name: str | None, ) -> None: """Set annotation index and convert IDs to Sentrix format if needed.""" if col_name is None: logger.info( "No IDAT files found that are both on disk and " "in the annotation file" ) return if not id_missmatch: self.annotation_df = self.annotation_df.set_index(col_name) logger.info("Setting '%s' as annotation index", col_name) return self.analysis_id_to_path = convert_to_sentrix_ids( self.analysis_id_to_path ) self.test_id_to_path = convert_to_sentrix_ids(self.test_id_to_path) self.annotation_df.index = convert_to_sentrix_ids( self.annotation_df[col_name] ) self.annotation_df = self.annotation_df.drop(columns=[col_name]) self._tmp_analysis_ids = convert_to_sentrix_ids(self._tmp_analysis_ids) self._tmp_test_ids = convert_to_sentrix_ids(self._tmp_test_ids) logger.info("Extracted Sentrix IDs from column '%s'", col_name) def _get_samples_annotated(self) -> pd.DataFrame: result_df = pd.DataFrame(index=self.id_to_path.keys()) # Remove duplicate rows unique_annotation_df = self.annotation_df.loc[ ~self.annotation_df.index.duplicated(keep="first") ] result_df = result_df.join(unique_annotation_df).fillna("") if result_df.empty: result_df[METHYLATION_CLASS] = "" if self._tmp_test_ids: result_df[TEST_CASE] = False result_df.loc[self._tmp_test_ids, TEST_CASE] = True return result_df def _restrict_sample_ids(self) -> None: """Restricts samples to the ones in 'analysis_ids' and 'test_ids'.""" # NOTE: This must be after _set_annotation_index_and_convert_ids def restrict_and_validate( ids: Iterable | None, id_to_path: dict[str, Path], id_type: str, ) -> dict[str, Path]: if not ids: return id_to_path ids_set = set(ids) missing_ids = ids_set - set(id_to_path.keys()) if missing_ids: msg = ( f"Some ids in 'self.{id_type}_ids' are not found on disk: " f"{', '.join(missing_ids)}" ) raise ValueError(msg) return { id_: path for id_, path in id_to_path.items() if id_ in ids_set } self.analysis_id_to_path = restrict_and_validate( self._tmp_analysis_ids, self.analysis_id_to_path, "analysis" ) self.test_id_to_path = restrict_and_validate( self._tmp_test_ids, self.test_id_to_path, "test" ) def _apply_overlap_filter(self) -> None: """Filter samples to include only those IDATs present in annotation.""" if not self.overlap: return valid_ids = set(self.annotation_df.index).intersection( self.analysis_id_to_path.keys() ) self.analysis_id_to_path = { x: self.analysis_id_to_path[x] for x in self.analysis_id_to_path if x in valid_ids } def _warn_on_sample_overlap(self) -> None: """Warn about overlapping samples between analysis and test samples.""" assert self.test_ids is not None n_inters = len( set(self.analysis_id_to_path.keys()).intersection(self.test_ids) ) if n_inters: warnings.warn( f"WARNING: 'test_dir' and 'analysis_dir' share {n_inters} " f"sample(s). 'idat_handler' may not work as expected. " "Verify inputs.", stacklevel=2, ) def __len__(self) -> int: return len(self.id_to_path) @property def ids(self) -> list[str]: return list(self.id_to_path.keys()) @property def idat_basenames(self) -> list[str]: return list(self.basename_to_id.keys()) @property def paths(self) -> list[Path]: return list(self.id_to_path.values()) @property def columns(self) -> list[str]: return self.samples_annotated.columns.tolist()
[docs] def features( self, columns: str | Iterable[str] | None = None, separator: str = "|", ) -> pd.Series: """Combines specified columns into a single label per sample. If `columns` is not provided, it defaults to the first column in `samples_annotated` or `selected_columns` if they are available. The function joins the values from the specified columns for each sample, converting them to strings and joining them with the specified separator. Args: columns (list, str, or None): List of column names (or a single column name) to use for creating the label. If None, defaults to the first column in `samples_annotated` or `selected_columns` if not None. separator (str): The separator used to join values from the columns. Default is "|". Returns: pd.Series: A Series of combined labels, indexed by sample IDs. Example: >>> idat_handler.features(columns=["GEO", "CNVs"]) sample_1 SGT_103|Balanced sample_2 SGT_056|Balanced sample_3 SGT_276|Balanced dtype: object """ if columns is None: if self.selected_columns is not None: columns = self.selected_columns else: columns = [self.samples_annotated.columns[0]] columns = [columns] if isinstance(columns, str) else columns return self.samples_annotated[columns].apply( lambda row: separator.join(row.values.astype(str)), axis=1 )
def __str__(self) -> str: title = f"{self.__class__.__name__}()" header = title + "\n" + "*" * len(title) lines = [header] def format_value(value: object) -> tuple[str, str]: length_info = "" if isinstance(value, pd.DataFrame | pd.Series | pd.Index): display_value = str(value) elif isinstance(value, np.ndarray): display_value = str(value) length_info = f"\n\n[{len(value)} items]" else: display_value = str(value)[:80] + ( "..." if len(str(value)) > 80 else "" ) if isinstance(value, Sized) and len(str(value)) > 80: length_info = f"\n\n[{len(value)} items]" return display_value, length_info for attr, value in sorted(self.__dict__.items()): display_value, length_info = format_value(value) lines.append(f"{attr}:\n{display_value}{length_info}") return "\n\n".join(lines) def __repr__(self) -> str: return str(self)
def check_memory(nrows: int, ncols: int, dtype: type | np.dtype) -> None: """Checks if sufficient free memory is available for a given numpy array. Raises: MemoryError: If the required memory exceeds the available memory. Example: >>> check_memory(1000, 500000, np.float32) """ available_memory = psutil.virtual_memory().available dtype_size = np.dtype(dtype).itemsize required_memory = nrows * ncols * dtype_size if required_memory > available_memory: msg = ( f"Not enout free memory available. For the given dimension " f"({nrows} samples, {ncols} CpG's), " f"{required_memory / (1024**3):.1f} GB is required." ) raise MemoryError(msg) def get_array_cpgs() -> dict[ArrayType, np.ndarray]: """Returns all CpG sites for all array types.""" path = MEPYLOME_CACHE_DIR / "all_cpgs.pkl" if not path.exists(): with path.open("wb") as f: array_cpgs = { ArrayType("450k"): Manifest("450k").methylation_probes, ArrayType("epic"): Manifest("epic").methylation_probes, ArrayType("epicv2"): Manifest("epicv2").methylation_probes, ArrayType("msa48"): Manifest("msa48").methylation_probes, } pickle.dump(array_cpgs, f) with path.open("rb") as f: return pickle.load(f) class BetasHandler: """Manages storage and retrieval of beta values. Args: directory (str or Path): Directory path where beta files ara stored. array_cpgs (dict, optional): Dictionary mapping ArrayType names to CpG arrays. If not provided, CpG arrays are fetched using default method. Attributes: basedir (Path): Path to the directory containing beta files. array_cpgs (dict): Dictionary mapping ArrayType names to CpG arrays. dir (dict): Dictionary mapping ArrayType and 'error' to subdirectories of basedir. array_type_from_dir (dict): Reverse mapping of directory paths to ArrayType names. paths (dict): Dictionary of all beta file paths. invalid_paths (dict): Dictionary of invalid beta file paths. """ def __init__( self, directory: str | Path, array_cpgs: dict[ArrayType, np.ndarray] | None = None, ) -> None: self.basedir = Path(directory).expanduser() self.array_cpgs = ( get_array_cpgs() if array_cpgs is None else array_cpgs ) self.dir: dict = {} for key in self.array_cpgs: self.dir[key] = self.basedir / f"{key}" ensure_directory_exists(self.dir[key]) self.dir["error"] = self.basedir / "error" ensure_directory_exists(self.dir["error"]) self.array_type_from_dir = { item: key for key, item in self.dir.items() } self.paths: dict = {} self.update() @property def filenames(self) -> list[str]: """Returns all idat basenames.""" return list(self.paths.keys()) @property def invalid_filenames(self) -> list[str]: """Returns all invalid invalid basenames.""" return list(self.invalid_paths.keys()) def add( self, betas: np.ndarray, filename: str, array_type: ArrayType, ) -> None: """Adds beta values to the file system on disk.""" betas.astype(DTYPE).tofile(self.dir[array_type] / filename) def add_error( self, filename: str, msg: Any, ) -> None: """Adds error message to the file system on disk.""" with (self.dir["error"] / filename).open("w") as f: f.write(str(msg)) def get( self, idat_handler: IdatHandler, cpgs: np.ndarray, ids: Sequence[str] | None = None, fill: float = NEUTRAL_BETA, parallel: bool = True, ) -> pd.DataFrame: """Retrieves beta values for specified IDs and CpGs.""" if ids is None: filenames = [ filename for filename in idat_handler.idat_basenames if filename not in self.invalid_filenames ] else: basenames = [idat_handler.id_to_basename[id_] for id_ in ids] filenames = [ basename for basename in basenames if basename not in self.invalid_filenames ] ids = [idat_handler.basename_to_id[x] for x in filenames] check_memory(len(filenames), len(cpgs), DTYPE) beta_matrix = np.full((len(filenames), len(cpgs)), fill, dtype=DTYPE) left_idx = {} right_idx = {} for key, item in self.array_cpgs.items(): left_idx[key], right_idx[key] = _overlap_indices(cpgs, item) def process_beta_value(i: int, filename: str) -> None: path = self.paths.get(filename) if path is not None: key = self.array_type_from_dir[path.parent] try: betas = np.fromfile(path, dtype=DTYPE) beta_matrix[i, left_idx[key]] = betas[right_idx[key]] except Exception as error: # Delete the bad file path.unlink() raise ValueError( f"Deleted invalid cached beta file {filename} due to " f"error: {error}. Please try again." ) from error if parallel: with ThreadPoolExecutor() as executor: futures = [ executor.submit(process_beta_value, i, filename) for i, filename in enumerate(filenames) ] for future in tqdm( as_completed(futures), total=len(futures), desc="Reading beta values from disk (parallel)", ): future.result() else: for i, filename in tqdm( enumerate(filenames), desc="Reading beta values from disk (serial)", total=len(filenames), ): process_beta_value(i, filename) return pd.DataFrame(beta_matrix, columns=cpgs, index=ids) def columnwise_variance( self, idat_handler: IdatHandler, cpgs: np.ndarray, ids: Sequence[str] | None = None, fill: float = NEUTRAL_BETA, parallel: bool = True, ) -> np.ndarray: """Calculates the column-wise variance over all beta values. This method is memory-efficient and uses Welford’s online algorithm, which allows for accurate variance computation in a single pass over the data. Suitable for large datasets that cannot fit entirely into memory. """ if ids is None: filenames = [ filename for filename in idat_handler.idat_basenames if filename not in self.invalid_filenames ] else: basenames = [idat_handler.id_to_basename[id_] for id_ in ids] filenames = [ basename for basename in basenames if basename not in self.invalid_filenames ] # Initialize running mean and M2 for Welford’s algorithm count = 0 mean = np.zeros(len(cpgs), dtype=DTYPE) M2 = np.zeros(len(cpgs), dtype=DTYPE) left_idx = {} right_idx = {} for key, item in self.array_cpgs.items(): left_idx[key], right_idx[key] = _overlap_indices(cpgs, item) def get_row(filename: str) -> np.ndarray | None: path = self.paths.get(filename) if path is None: return None key = self.array_type_from_dir[path.parent] betas = np.fromfile(path, dtype=DTYPE) row = np.full(len(cpgs), fill, dtype=DTYPE) row[left_idx[key]] = betas[right_idx[key]] return row def update_stats(row: np.ndarray) -> None: nonlocal count, mean, M2 count += 1 delta = row - mean mean += delta / count delta2 = row - mean M2 += delta * delta2 if parallel: lock = threading.Lock() with ThreadPoolExecutor() as executor: futures = [executor.submit(get_row, f) for f in filenames] for future in tqdm( as_completed(futures), total=len(futures), desc="Streaming beta values for variance (parallel)", ): row = future.result() if row is not None: with lock: update_stats(row) else: for filename in tqdm( filenames, desc="Streaming beta values for variance (serial)" ): row = get_row(filename) if row is not None: update_stats(row) if count > 1: return M2 / (count - 1) return np.zeros(len(cpgs), dtype=DTYPE) def update(self) -> None: """Updates the paths if beta vales were added after initialization.""" self.paths = { path.name: path for path in self.basedir.rglob("*") if not path.is_dir() } self.invalid_paths = { path.name: path for path in self.dir["error"].rglob("*") } def extract_beta(data: tuple[Path, PrepType, BetasHandler]) -> None: """Extracts and saves beta values for specified CpGs from an IDAT file.""" idat_file, prep, betas_handler = data try: methyl = MethylData(file=idat_file, prep=prep) betas = methyl.betas_at().values.ravel() array_type = methyl.array_type betas_handler.add(betas, idat_file.name, array_type) except Exception as error: betas_handler.add_error(idat_file.name, error) print(f"The following error occured for {idat_file.name}: {error}") def ensure_betas_exist( idat_handler: IdatHandler, cpgs: np.ndarray, prep: PrepType, betas_dir: Path, pbar: ProgressBar | None = None, ) -> BetasHandler: """Ensures all beta files are extracted and available. Args: idat_handler (IdatHandler): Handler for IDAT file paths and metadata. cpgs (list): List of CpGs to include in the output matrix. prep (str): Prepreparation method for the MethylData. betas_dir (Path): Path the directory to save/load the betas. pbar (ProgressBar): Progress bar for tracking progress. Returns: BetasHandler: The corresponding BetasHandler object. """ # Loading manifests here prevents race conditions Manifest.load() betas_handler = BetasHandler(betas_dir) ids_found = { idat_handler.basename_to_id.get(x) for x in betas_handler.filenames } missing_ids = list(set(idat_handler.ids) - ids_found) if missing_ids: args_list = [ (idat_handler.id_to_path[id_], prep, betas_handler) for id_ in missing_ids ] with ThreadPoolExecutor() as executor: futures = [ executor.submit(extract_beta, args) for args in args_list ] with tqdm( total=len(missing_ids), desc="Reading IDAT files" ) as tqdm_bar: for future in as_completed(futures): future.result() tqdm_bar.update(1) if pbar is not None: pbar.increment() betas_handler.update() return betas_handler def get_betas( idat_handler: IdatHandler, cpgs: np.ndarray, prep: PrepType, betas_dir: Path, ids: Sequence[str] | None = None, pbar: ProgressBar | None = None, ) -> pd.DataFrame: """Extracts and processes beta values from IDAT files. This function processes IDAT files to extract beta values for specified CpG sites (CpGs). The processed beta values are saved in a temporary folder to facilitate quicker subsequent extractions. Args: idat_handler (IdatHandler): Handler for IDAT file paths and metadata. cpgs (list): List of CpGs to include in the output matrix. prep (str): Prepreparation method for the MethylData. betas_dir (Path): Path the directory to save/load the betas. pbar (ProgressBar): Progress bar for tracking progress. ids (list, optional): A list of IDs to retrieve. If not provided (default), all IDs will be retrieved. Returns: pd.DataFrame: DataFrame containing the beta values for the specified CpGs. """ betas_handler = ensure_betas_exist( idat_handler=idat_handler, cpgs=cpgs, prep=prep, betas_dir=betas_dir, pbar=pbar, ) return betas_handler.get(idat_handler=idat_handler, cpgs=cpgs, ids=ids) def get_columnwise_variance( idat_handler: IdatHandler, cpgs: np.ndarray, prep: PrepType, betas_dir: Path, pbar: ProgressBar | None = None, ids: Sequence[str] | None = None, parallel: bool = True, ) -> np.ndarray: """Computes column-wise variance of beta values for specified CpGs. Returns: numpy.ndarray: Variance per CpG. """ betas_handler = ensure_betas_exist( idat_handler=idat_handler, cpgs=cpgs, prep=prep, betas_dir=betas_dir, pbar=pbar, ) return betas_handler.columnwise_variance( idat_handler=idat_handler, cpgs=cpgs, ids=ids, parallel=parallel ) def reordered_cpgs_by_variance( data_frame: pd.DataFrame, ) -> tuple[np.ndarray, np.ndarray]: """Returns CpG and their variances, ordered by descending variance.""" logger.info("Reordering CpG's by variance...") variances = np.var(data_frame.values, axis=0) sorted_indices = np.argsort(-variances, kind="stable") sorted_columns = data_frame.columns[sorted_indices].to_numpy() sorted_variances = variances[sorted_indices] return sorted_columns, sorted_variances def reordered_cpgs_by_variance_online( idat_handler: IdatHandler, cpgs: np.ndarray, prep: PrepType, betas_dir: Path, pbar: ProgressBar | None = None, ids: Sequence[str] | None = None, parallel: bool = True, ) -> tuple[np.ndarray, np.ndarray]: """Returns CpG and their variances, ordered by descending variance.""" logger.info("Reordering CpG's by variance (low-memory)...") variances = get_columnwise_variance( idat_handler=idat_handler, cpgs=cpgs, prep=prep, betas_dir=betas_dir, pbar=pbar, ids=ids, parallel=parallel, ) sorted_indices = np.argsort(-variances, kind="stable") sorted_columns = cpgs[sorted_indices] sorted_variances = variances[sorted_indices] return sorted_columns, sorted_variances def get_mlh1_report_template() -> str: """Returns HTML template of the MLH1 report.""" return """ <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>MLH1 Promoter Methylation Report: {id_}</title> <style> body {{ font-family: Arial, sans-serif; background: #f4f7f9; color: #333; margin: 0 auto; max-width: 1000px; line-height: 1.4; }} h1 {{ text-align: center; margin-top: 10px; font-size: 1.5em; }} h2 {{ font-size: 1.2em; margin-bottom: 5px; }} table {{ width: 100%; max-width: 500px; margin: 10px auto; border-collapse: collapse; }} hr {{ margin: 20px 0; border: none; height: 2px; background-color: #ccc; }} th, td {{ padding: 3px 8px; text-align: left; border-bottom: 1px solid #ddd; font-size: 0.9em; }} td.label {{ font-weight: bold; }} td.value {{ text-align: right; }} @media (max-width: 768px) {{ body {{ max-width: 95%; }} table {{ max-width: 100%; }} }} </style> <script src="https://cdn.plot.ly/plotly-latest.min.js"></script> </head> <body> <h1>Sample Report: {id_}</h1> <h2>Statistics</h2> <table> <tr><td class="label">Median</td><td class="value">{median}</td></tr> <tr><td class="label">Mean</td><td class="value">{mean}</td></tr> <tr><td class="label">Std Dev</td><td class="value">{std}</td></tr> <tr><td class="label">Number of probes</td><td class="value"> {n_probes} / 43 </td></tr> <tr><td class="label">Interpretation</td><td class="value"> {interpretation} </td></tr> </table> <hr> <h2>CpG Probe Values</h2> {plot_html} <hr /> <h2>Methods</h2> <p> Our analysis provides the MLH1 promoter methylation level (value between 0 and 1; mean, median, standard deviation) of CpGs, based on the 43 described probes from the Illumina Infinium EPIC Methylation Arrays (<a href="https://pmc.ncbi.nlm.nih.gov/articles/PMC5348626/" target="_blank" rel="noopener noreferrer">Sánchez-Vega F et al., <em>World Journal of Gastrointestinal Oncology</em>, 2017; 9(3):105–120 </a>).<br><br> As part of an internal study using TCGA data, additional published reference cohorts, and in-house datasets (n &gt; 14,000), the threshold (based on the median) was defined as follows: </p> <ul> <li>&lt; 0.2 is considered <strong>unmethylated</strong>,</li> <li>≥ 0.2 to 0.4 is considered <strong>low methylation</strong> (clinical significance unclear),</li> <li>&gt; 0.4 is considered <strong>methylated</strong>.</li> </ul> </body> </html> """ def interpret_methylation(median: float) -> str: """Interpretation of median according to internal tests.""" if median < 0.2: return "Unmethylated" elif median > 0.4: return "Methylated" else: return "Unclear" def make_single_mlh1_report_page(probes: pd.Series) -> str: """Generate an HTML report for MLH1 promoter methylation of a sample. Args: probes (pd.Series): A pandas Series (row) containing CpG probe beta values. The Series index should be probe names, and its `.name` should be the sample ID. Returns: str: HTML string representing the methylation report with embedded Plotly plot. """ id_ = probes.name mean = probes.mean().round(2) median = probes.median().round(2) std = probes.std().round(2) cpg_names = probes.index cpg_values = probes.values n_probes = len(cpg_names) interpretation = interpret_methylation(median) fig = go.Figure( [go.Bar(x=cpg_names, y=cpg_values, marker_color="steelblue")] ) fig.update_layout( xaxis_title=f"CpG Probes (n = {n_probes})", yaxis=dict(range=[0, 1]), yaxis_title="Beta Value", margin=dict(t=40, b=100), height=500, ) plot_html = pio.to_html(fig, full_html=True, include_plotlyjs=True) return get_mlh1_report_template().format( id_=id_, mean=str(mean), median=str(median), std=str(std), n_probes=n_probes, interpretation=interpretation, plot_html=plot_html, )