Source code for mepylome.analysis.methyl

"""Methylation analysis tools including a Dash-based browser application.

This module provides a comprehensive set of tools for conducting methylation
analysis. The core functionality is encapsulated in the ``MethylAnalysis``
class, which manages the methylation analysis process and executes an
interactive web application for the exploration of methylation data.
"""

import base64
import logging
import os
import re
import sys
import threading
import time
import webbrowser
from collections import Counter
from collections.abc import Iterable, Sequence, Sized
from pathlib import Path
from typing import Any

import dash_bootstrap_components as dbc
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from dash import (
    Dash,
    Input,
    Output,
    State,
    callback_context,
    dcc,
    html,
    no_update,
)
from sklearn.model_selection import (
    StratifiedKFold,
)

from mepylome import LOG_FILE
from mepylome.analysis.methyl_aux import (
    INVALID_PATH,
    IdatHandler,
    ProgressBar,
    get_betas,
    guess_annotation_file,
    make_single_mlh1_report_page,
    read_dataframe,
    reordered_cpgs_by_variance,
    reordered_cpgs_by_variance_online,
)
from mepylome.analysis.methyl_clf import (
    ClassifierResult,
    fit_and_evaluate_clf,
)
from mepylome.analysis.methyl_plots import (
    EMPTY_FIGURE,
    get_cnv_plot,
    umap_plot_from_data,
    write_cnv_to_disk,
)
from mepylome.dtypes import (
    Annotation,
    ArrayType,
    Manifest,
    PrepType,
    _get_cgsegment,
    _overlap_indices,
    get_cn_summary,
    input_args_id,
    is_valid_idat_basepath,
    read_cnv_data_from_disk,
)
from mepylome.utils import (
    CONFIG,
    MEPYLOME_TMP_DIR,
    ensure_directory_exists,
    get_free_port,
    make_log_file,
)

logger = logging.getLogger(__name__)

DEFAULT_OUTPUT_DIR = Path(MEPYLOME_TMP_DIR, "analysis")
DEFAULT_N_CPGS = 25000
ON = "on"
OFF = "off"
MLH1_CPGS = CONFIG["genes"]["mlh1_promoter_cpgs"]
ZIP_ENDING = CONFIG["suffixes"]["cnv_zip"]
UMAP_METRICS = [
    "manhattan",
    "euclidean",
    "cosine",
    "chebyshev",
    "canberra",
    "braycurtis",
    "correlation",
    "hamming",
    "jaccard",
    "dice",
    "kulsinski",
    "ll_dirichlet",
    "hellinger",
    "rogerstanimoto",
    "sokalmichener",
    "sokalsneath",
    "yule",
]
VERBOSITY_LEVELS = {
    0: logging.CRITICAL + 1,
    1: logging.INFO,
    2: logging.DEBUG,
}


class DualOutput:
    """Enables to simultaneously write output to the terminal and file.

    Should only be used with `while` as it leads to print problems with
    jupyter notebooks.
    """

    def __init__(self, filename: str | Path) -> None:
        self.terminal = sys.stdout
        self.log = open(filename, "a")

    def write(self, message: str) -> None:
        self.terminal.write(message)
        self.log.write(message)
        self.flush()

    def flush(self) -> None:
        self.terminal.flush()
        self.log.flush()

    def close(self) -> None:
        self.log.close()

    def __enter__(self) -> "DualOutput":
        self.original_stdout = sys.stdout
        sys.stdout = self
        return self

    def __exit__(
        self,
        exc_type: type | None,
        exc_val: BaseException | None,
        exc_tb: Any | None,
    ) -> None:
        sys.stdout = self.original_stdout
        self.close()


def get_all_genes() -> list[str]:
    """Returns a list of names for all genes."""
    return Annotation.default_genes()["Name"].tolist()


def get_navbar() -> dbc.Navbar:
    """Returns a navigation bar with a logo and title."""
    logo = html.Img(src="/assets/mepylome.svg", height="30px")
    title = dbc.NavbarBrand("Methylation Analysis", className="ms-2")

    return dbc.Navbar(
        dbc.Container(
            [
                html.A(
                    dbc.Row(
                        [
                            dbc.Col(logo),
                            dbc.Col(title),
                        ],
                        align="center",
                        className="g-0",
                    ),
                    style={"textDecoration": "none"},
                ),
                dbc.NavbarToggler(id="navbar-toggler", n_clicks=0),
                html.Div(id="dummy-output", hidden=True),
            ]
        ),
        color="dark",
        dark=True,
    )


def get_side_navigation(
    *,
    sample_ids: Sequence[str],
    ids_to_highlight: Sequence[str],
    annotation_columns: Sequence[str],
    analysis_dir: Path,
    annotation: Path,
    reference_dir: Path,
    test_dir: Path,
    output_dir: Path,
    cpgs: np.ndarray,
    n_cpgs: int,
    prep: PrepType,
    precalculate: bool,
    cpg_selection: str,
    n_neighbors: int,
    metric: str,
    min_dist: float,
    use_discrete_colors: bool,
    custom_clfs: Sequence[dict[str, Any]],
) -> Any:
    """Generates a side navigation panel for setting up parameters."""
    n_cpgs_max = np.inf if len(cpgs) == 0 else len(cpgs)
    n_cpgs_max_str = "" if n_cpgs_max == np.inf else f" (max. {n_cpgs_max})"
    color_scheme = "discrete" if use_discrete_colors else "continuous"
    clf_options = {
        "vtl-kbest(k=10000)-et": "ExtraTreesClassifier",
        "vtl-kbest(k=10000)-lr(max_iter=10000)": "LogisticRegression",
        "vtl-kbest(k=10000)-rf": "RandomForestClassifier",
        "vtl-kbest(k=10000)-svc": "SVC(kernel='rbf')",
        "vtl-pca_auto-lr(max_iter=10000)": "PCALogisticRegression",
        "vtl-pca_auto-et": "PCAExtraTreesClassifier",
        "vtl-knn(weights='distance')": "KNeighborsClassifier",
        **{str(i): clf["name"] for i, clf in enumerate(custom_clfs)},
    }
    tabs = [
        dbc.Tab(
            label="Setting",
            children=[
                dcc.Store(id="running-state"),
                dcc.Interval(
                    id="clock",
                    interval=500,
                    n_intervals=0,
                    max_intervals=-1,
                ),
                html.Div(
                    dbc.Button(
                        "Console",
                        id="toggle-button-setting",
                        n_clicks=0,
                        size="sm",
                    ),
                    className="d-grid gap-2",
                ),
                html.Div(
                    id="console-out-setting",
                    style={
                        "overflow": "hidden",
                        "fontFamily": "monospace",
                        "fontSize": "9px",
                        "whiteSpace": "pre-wrap",
                        "height": "40vh",
                        "display": "flex",
                        "flexDirection": "column-reverse",
                    },
                ),
                html.Br(),
                dbc.Progress(value=0, id="umap-progress-bar"),
                html.Br(),
                dbc.Row(
                    [
                        dbc.Col(
                            dbc.Button(
                                "Start",
                                id="start-button",
                                color="primary",
                                disabled=False,
                            ),
                            width={"size": 6},
                        ),
                    ],
                ),
                html.Div(id="output-div"),
                html.Br(),
                html.H6(f"Number of CpG sites{n_cpgs_max_str}"),
                dcc.Input(
                    id="num-cpgs",
                    type="number",
                    min=1,
                    max=n_cpgs_max,
                    step=1,
                    value=min(n_cpgs, len(cpgs) or n_cpgs),
                ),
                html.Br(),
                html.Br(),
                html.H6("Analysis directory"),
                dbc.Input(
                    id="analysis-dir",
                    valid=False,
                    value=str(analysis_dir),
                    type="text",
                ),
                html.Div(id="analysis-path-validation"),
                html.Br(),
                html.H6("Annotation file"),
                dbc.Input(
                    id="annotation-file",
                    valid=False,
                    value=str(annotation),
                    type="text",
                ),
                html.Div(id="annotation-file-validation"),
                html.Br(),
                html.H6("Reference directory (CNV neutral cases)"),
                dbc.Input(
                    id="reference-dir",
                    value=str(reference_dir),
                    type="text",
                ),
                html.Div(id="reference-path-validation"),
                html.Br(),
                html.H6("Test directory (new cases for analysis)"),
                dbc.Input(
                    id="test-dir",
                    value=str(test_dir),
                    type="text",
                ),
                html.Div(id="test-path-validation"),
                html.Br(),
                html.H6("Output directory"),
                dbc.Input(
                    id="output-dir",
                    valid=False,
                    value=str(output_dir),
                    type="text",
                ),
                html.Div(id="output-path-validation"),
                html.Br(),
                html.H6("IDAT preprocessing method"),
                dcc.Dropdown(
                    id="preprocessing-method",
                    options={
                        "illumina": "Illumina",
                        "swan": "SWAN",
                        "noob": "NOOB",
                        "raw": "No preprocessing",
                    },
                    value=prep,
                    clearable=False,
                    multi=False,
                ),
                html.Br(),
                html.H6("How should CpG's be selected"),
                dcc.Dropdown(
                    id="cpg-selection",
                    options={
                        "random": "Random",
                        "top": "Top varying (all samples)",
                        "balanced": ("Top varying (balanced selection)"),
                    },
                    value=cpg_selection,
                    clearable=False,
                    multi=False,
                ),
                html.Br(),
                html.Div(
                    [
                        html.H6("Select Balancing Column:"),
                        dcc.Dropdown(
                            id="balancing-column",
                            options=annotation_columns,
                            value=annotation_columns[0],
                            clearable=False,
                            multi=False,
                            placeholder="Choose a column for balancing...",
                        ),
                        html.Br(),
                    ],
                    id="balancing-column-container",
                    style={"display": "none"},
                ),
                html.H6("Calculate CNV"),
                dcc.Dropdown(
                    id="precalculate-cnv",
                    options={
                        ON: "Precalculate all (much longer!)",
                        OFF: "When clicking on dots",
                    },
                    value=ON if precalculate else OFF,
                    clearable=False,
                    multi=False,
                ),
            ],
        ),
        dbc.Tab(
            label="UMAP",
            children=[
                html.Br(),
                html.H5("Determine UMAP settings."),
                html.Br(),
                html.H6(
                    "n_neighbors",
                    style={"font-family": "monospace"},
                ),
                dcc.Input(
                    id="umap-n_neighbors",
                    type="number",
                    min=2,
                    max=100,
                    step=1,
                    value=n_neighbors,
                ),
                html.Br(),
                html.Br(),
                html.H6("metric", style={"font-family": "monospace"}),
                dcc.Dropdown(
                    id="umap-metric",
                    value=metric,
                    options=UMAP_METRICS,
                ),
                html.Br(),
                html.H6("min_dist", style={"font-family": "monospace"}),
                dcc.Input(
                    id="umap-min_dist",
                    type="number",
                    min=0,
                    value=min_dist,
                ),
                html.Br(),
            ],
        ),
        dbc.Tab(
            label="Highlight",
            children=[
                html.Br(),
                html.Br(),
                html.H6("Sample IDs to highlight in UMAP"),
                dcc.Dropdown(
                    id="ids-to-highlight",
                    options=sample_ids,
                    value=ids_to_highlight,
                    multi=True,
                ),
                html.Br(),
                html.H6("Umap coloring"),
                dcc.Dropdown(
                    id="umap-annotation-color",
                    options=annotation_columns,
                    value=annotation_columns[0],
                    multi=True,
                ),
                html.Br(),
                html.H6("Color scheme"),
                dcc.Dropdown(
                    id="umap-color-scheme",
                    options={
                        "discrete": "Discrete Colors",
                        "continuous": "Continuous Colors",
                    },
                    value=color_scheme,
                    multi=False,
                ),
                html.Br(),
                html.H6("Genes to highlight in CNV"),
                dcc.Dropdown(
                    id="selected-genes",
                    options=get_all_genes(),
                    multi=True,
                ),
            ],
        ),
        dbc.Tab(
            label="Upload",
            children=[
                html.Br(),
                html.Span(
                    "Do not upload too many files at once, else it "
                    "might not work!"
                ),
                html.Br(),
                dcc.Upload(
                    html.Div(
                        [
                            html.Span("Drag & Drop or "),
                            html.A("Select IDAT File pairs"),
                            html.Br(),
                        ],
                    ),
                    style={
                        "width": "100%",
                        "height": "60px",
                        "lineHeight": "60px",
                        "borderWidth": "1px",
                        "borderStyle": "dashed",
                        "borderRadius": "5px",
                        "textAlign": "center",
                    },
                    multiple=True,
                    id="upload-idat",
                ),
                html.Br(),
                html.Div(
                    dbc.Button(
                        "Console",
                        id="toggle-button-upload",
                        n_clicks=0,
                        size="sm",
                    ),
                    className="d-grid gap-2",
                ),
                html.Div(
                    id="console-out-upload",
                    style={
                        "overflow": "hidden",
                        "fontFamily": "monospace",
                        "fontSize": "9px",
                        "whiteSpace": "pre-wrap",
                        "height": "40vh",
                        "display": "flex",
                        "flexDirection": "column-reverse",
                    },
                ),
                html.Br(),
                html.Div(id="output-idat-upload"),
            ],
        ),
        dbc.Tab(
            label="Classify",
            children=[
                html.Br(),
                html.H6("Classifiers to use"),
                dcc.Dropdown(
                    id="clf-clf-dropdown",
                    options=clf_options,
                    value=[
                        "vtl-kbest(k=10000)-lr(max_iter=10000)",
                        "vtl-kbest(k=10000)-et",
                    ],
                    multi=True,
                ),
                html.Br(),
                dbc.Row(
                    [
                        dbc.Col(
                            dbc.Button(
                                "Start",
                                id="clf-start-button",
                                color="primary",
                            ),
                            width={"size": 6},
                        ),
                    ],
                ),
                html.Div(id="clf-error-out"),
                html.Br(),
                html.Div(
                    id="clf-out",
                    style={
                        "overflow": "hidden",
                        "fontFamily": "monospace",
                        "fontSize": "9px",
                        "whiteSpace": "pre-wrap",
                        "display": "flex",
                        "flexDirection": "column-reverse",
                    },
                ),
            ],
        ),
    ]
    return dbc.Col([dbc.Tabs(tabs)], width={"size": 2})


def extract_sub_dataframe(
    data_frame: pd.DataFrame,
    columns: np.ndarray,
    fill: float = 0.49,
) -> pd.DataFrame:
    """Extracts a sub-dataframe based on the intersection of provided columns.

    Args:
        data_frame (pd.DataFrame): The original DataFrame.
        columns (list or array): The column names to be extracted.
        fill (float, optional): The value to fill for non-overlapping columns.
            Defaults to 0.49.

    Returns:
        pd.DataFrame: The sub-dataframe with the specified columns.
    """
    result_np = np.full((len(data_frame.index), len(columns)), fill)
    left_idx, right_idx = _overlap_indices(columns, data_frame.columns)
    result_np[:, left_idx] = data_frame.values[:, right_idx]
    return pd.DataFrame(result_np, columns=columns, index=data_frame.index)


def get_balanced_indices(
    feature_labels: Sequence[str],
    seed: int | None = None,
) -> np.ndarray:
    """Returns indices of a balanced selection of feature labels.

    Args:
        feature_labels (list or array-like): Labels of features to balance.
        seed (int, optional): Random seed for reproducibility.

    Returns:
        np.ndarray: Sorted indices of the balanced selection.
    """
    feature_labels_array = np.array(feature_labels)
    class_frequencies = Counter(feature_labels_array)
    min_class_size = min(class_frequencies.values())
    if min_class_size <= 1:
        problematic_classes = [
            cls for cls, count in class_frequencies.items() if count <= 1
        ]
        msg = f"Only 1 sample for the following classes: {problematic_classes}"
        raise ValueError(msg)
    class_to_indices = {
        label: np.where(feature_labels_array == label)[0]
        for label in class_frequencies
    }
    rng = np.random.default_rng(seed)
    balanced_sample_indices = np.hstack(
        [
            rng.choice(class_to_indices[label], min_class_size, replace=False)
            for label in class_frequencies
        ]
    )
    balanced_sample_indices.sort()
    logger.info("Balanced classes with %s samples each", min_class_size)
    return balanced_sample_indices


def get_cpgs_from_file(
    input_path: str | Path | Any,
) -> np.ndarray | None:
    """Reads CpG sites from a file and return a numpy array."""
    if not isinstance(input_path, str | Path):
        return None

    path = Path(input_path).expanduser()

    if isinstance(input_path, str) and not path.exists():
        return None
    try:
        cpgs_df = read_dataframe(path, header=None)
        cpgs = [cpg for cpg in cpgs_df.values.flatten() if not pd.isna(cpg)]
        return np.array(cpgs)

    except Exception as exc:
        raise ValueError(f"Failed to read CpGs from file {path}") from exc


[docs] class MethylAnalysis: """Main class for methylation analysis including a GUI application. Main class for methylation analysis, providing methods for setting up analysis parameters, reading data, and running a Dash-based web application for data visualization. Args: analysis_dir (str or Path): Directory containing IDAT files for analysis. annotation (str or Path): Path to an annotation spreadsheet used to map sample files located in both `analysis_dir` and `test_dir`. One of the columns must contain the ID corresponding to the IDAT files (such as SentrixID or ID from files downloaded from GEO). If not provided, the system will attempt to identify the correct column automatically. If the annotation file is missing, it will search for a spreadsheet within the `analysis_dir` if available. (default: None) reference_dir (str or Path): Directory containing CNV neutral reference IDAT files. Must be provided if you wanna generate CNV plots. (default: None) output_dir (str or Path): Directory where output files will be saved (default: "/tmp/mepylome/analysis"). test_dir (Path or None): Directory for test files, including new cases for analysis or validation. Files uploaded via the GUI will be placed here. If set to `None`, the application will automatically use a temporary directory. (default: None) prep (str): Prepreparation method used for methylation microarrays: 'illumina', 'swan', or 'noob (default: 'illumina'). cpgs (str, np.ndarray, list, set, or Path, optional): Specifies the CpG sites to analyze. Possible values: 1. A list, set, or NumPy array of official Illumina CpG site names. 2. A path to a CSV file containing the CpG sites. 3. A string specifying a predefined array type: - `'450k'` : The CpG sites from the Illumina 450k array. - `'epic'` : The CpG sites from the Illumina EPIC array. - `'epicv2'` : The CpG sites from the Illumina EPIC v2 array. - `'msa48'` : The CpG sites from the Illumina MSA array. 4. A `'+'`-joined string of the options above combining multiple array types, returning the intersection of their CpG sites. For example: - `'450k+epic'` : CpG sites both in the 450k and EPIC arrays. - `'epic+epicv2'`: CpG sites both in the EPIC and EPICv2 arrays. 5. `'auto'` (default): Automatically detects all array types from IDAT files in `analysis_dir` and returns the intersection of CpG sites. This process may take longer as all files need to be read and, if necessary, decompressed. cpg_blacklist (set or list, optional): A list or set of CpG sites to exclude. Default is None. n_cpgs (int): Number of CpG sites to select for UMAP (default: 25000). classifiers (object or list of objects, optional): Classifier model(s) (default: None). Each classifier can be provided as: - A dictionary containing: - 'model' (object): The classifier model object as defined below (required). - 'name' (str, optional): A name for the classifier (default: "Custom_Classifier_<index>"). - 'cv' (int or cross-validation generator, optional): Cross-validation strategy (default: `self.cv_default`). - A classifier model object (e.g., `RandomForestClassifier()`, `vtl-kbest-rf`), in which case the 'name' and 'cv' are automatically generated (see above). A classifier model can be one of: - A scikit-learn classifier object (trained or untrained). - A string in the format `"scaler-selector-classifier"`. See the documentation of `fit_and_evaluate_clf` in `mepylome.analysis.methyl_clf` for all valid values. - A custom class, that inherits from `TrainedClassifier`. cv_default (int or cross-validation generator, optional): Determines the default cross-validation splitting strategy (default: 5). n_jobs_clf (int): Number of parallel processes to run for classifying (default: 1). Choose -1 for using all available cores. n_jobs_cnv (int, optional): Number of parallel processes to use for CNV precalculation. If None, a reasonable number of cores will be automatically chosen based on the system and workload. (default: None) precalculate_cnv (bool): If set to `True`, CNV data will be precalculated before the main analysis. This process takes approximately 2-5 seconds per case initially, but it will improve performance during runtime by reducing computation time. (default: False) load_full_betas (bool): If True, loads beta values for all CpG sites into memory (when needed), enabling fast random access to the full methylation matrix. This can significantly increase memory usage. If False, only the specified `n_cpgs` CpG sites are loaded on demand. For supervised classifier training, the same reduced matrix (`betas_sel`) used for UMAP visualization is used. This greatly reduces memory consumption and is typically sufficient, though it may be slightly slower (default: True). feature_matrix (pandas.DataFrame or numpy.ndarray, optional): A user-provided feature matrix to be used for UMAP dimensionality reduction. If provided, this matrix will be used instead of `betas_sel`. If not provided (default is None), the `betas_sel` containing methylation beta values will be used for UMAP. (default: None) overlap (bool): Flag to analyze only samples that are both in the analysis directory and within the annotation file (default: False). analysis_ids (list, optional): A list of sample IDs. If provided, the analysis will be restricted to these samples only. If `None`, the analysis will include all available samples. (default: None) test_ids (list, optional): A list of sample IDs within `test_dir`. - If provided, only these samples will be used. - If `None`, all available IDAT files in `test_dir` will be used. (default: None) cpg_selection (str): Method to select CpG sites for UMAP ('top', 'random', or 'balanced') (default: 'top'). - 'top': Selects CpG sites with the highest variance. - 'random': Selects CpG sites randomly. - 'balanced': Selects the most varying CpG sites while ensuring a balanced distribution across groups based on `balancing_feature`. This method takes an **equal number of sample files from `self.analysis_dir`** for each group defined by `balancing_feature`. It is especially useful when the dataset is **imbalanced**, where some groups have significantly more samples than others. balancing_feature (str): Column in `self.annotation` used for balancing when `cpg_selection='balanced'`. The balancing feature determines the groups/categories used to create a stratified selection of CpG sites. do_seg (bool): If set, enables segmentation analysis on CNV data and adds horizontal segmentation lines to the CNV plot. This will take an additional 2-5 seconds per sample. (default: False) host (str): Host address for the Dash application (default: 'localhost'). port (int): Port number for the Dash application (default: 8050). debug (bool): Flag to enable debug mode for the Dash application (default: False). umap_parms (dict): Parameters for UMAP algorithm (default: {'metric': 'manhattan', 'min_dist': 0.1, 'n_neighbors': 15, 'verbose': True}). use_gpu (bool): Whether to use GPU acceleration for UMAP via cuML and CuPy (default: False). Set to True to enable GPU-backed UMAP computations, which can significantly speed up large datasets. This requires the `cuml` and `cupy` libraries to be installed, along with appropriate NVIDIA drivers and a working CUDA setup. verbose (int): Sets the (global) logging verbosity level: - 0: Errors and warnings only. - 1: Info, warnings, and errors (default). - 2: Debug, info, warnings, and errors. Note: Many parameters can be modified within the GUI application after initialization, but not all. Attributes: analysis_dir (Path): Path to the directory containing IDAT files for analysis. annotation (str or Path): Path to an annotation spreadsheet used to map sample files located in both `analysis_dir` and `test_dir`. overlap (bool): Flag to analyze only samples that are both in the analysis directory and within the annotation file (default: False). analysis_ids (list): A list of sample IDs. The analysis will be restricted to these samples only. If `None`, the analysis will include all available samples. test_ids (list): A list of sample IDs in 'test_dir' that will be used. n_cpgs (int): Number of CpG sites to select for UMAP (default: 25000). n_jobs_clf (int): Number of parallel processes to run for classifying. If equal to -1 all available cores will be used. n_jobs_cnv (int): Number of parallel processes to use for CNV precalculation. If None, a reasonable number of cores will be automatically chosen based on the system and workload. reference_dir (str or Path): Directory containing CNV neutral reference IDAT files. Must be provided if you wanna generate CNV plots. output_dir (Path): Path to the directory where output files will be saved (default: "/tmp/mepylome/analysis"). test_dir (Path or None): Directory for test files, including new cases for analysis or validation. Files uploaded via the GUI will be placed here. If set to `None`, the application will automatically use a temporary directory. prep (str): Prepreparation method used for methylation microarrays: 'illumina', 'swan', or 'noob (default: 'illumina'). cpg_selection (str): Method to select CpG sites for UMAP ('top', 'random', or 'balanced') (default: 'top'). - 'top': Selects CpG sites with the highest variance. - 'random': Selects CpG sites randomly. - 'balanced': Selects the most varying CpG sites while ensuring a balanced distribution across groups based on `balancing_feature`. This method takes an **equal number of sample files from `self.analysis_dir`** for each group defined by `balancing_feature`. It is especially useful when the dataset is **imbalanced**, where some groups have significantly more samples than others. balancing_feature (str): Column in `annotation` used for balancing when `cpg_selection='balanced'`. The balancing feature determines the groups/categories used to create a stratified selection of CpG sites. host (str): Host address for the Dash application (default: 'localhost'). port (int): Port number for the Dash application (default: 8050). debug (bool): Flag to enable debug mode for the Dash application (default: False). cnv_dir (Path): Directory for CNV (Copy Number Variation) data, initially set to None. umap_dir (Path): Directory for UMAP (Uniform Manifold Approximation and Projection) data, initially set to None. umap_cpgs (numpy.array): CpG sites for UMAP analysis, initially set to None. precalculate_cnv (bool): Flag to precalculate CNV information by invoking 'precompute_cnvs' (default: False). load_full_betas (bool): If True, loads beta values for all CpG sites into memory (when needed), enabling fast random access to the full methylation matrix. This can significantly increase memory usage. If False, only the specified `n_cpgs` CpG sites are loaded on demand. For supervised classifier training, the same reduced matrix (`betas_sel`) used for UMAP visualization is used. This greatly reduces memory consumption and is typically sufficient, though it may be slightly slower (default: True). betas_sel (pandas.DataFrame): DataFrame containing a selected subset of beta values used for dimensionality reduction. Initially set to None. betas_all (pandas.DataFrame): Dataframe containing beta values for all CpG sites, initially set to None. feature_matrix (pandas.DataFrame or numpy.ndarray, optional): A user-provided feature matrix to be used for UMAP dimensionality reduction. If provided, this matrix will be used instead of `betas_sel` for UMAP plots and instead of `betas_all` for classifying (default: None). betas_dir (Path): Path to the betas directory, initially set to None. umap_plot (plotly.Figure): Plot for UMAP, initially set to EMPTY_FIGURE. umap_plot_path (Path): Path to the CSV file containing the UMAP plot data, initially set to None. umap_df (pandas.DataFrame): Dataframe containing UMAP data, initially set to empty data frame. umap_parms (dict): Parameters for UMAP algorithm (default: {'metric': 'manhattan', 'min_dist': 0.1, 'n_neighbors': 15, 'verbose': True}). use_gpu (bool): Whether to use GPU acceleration for UMAP via cuML and CuPy (default: False). Set to True to enable GPU-backed UMAP computations, which can significantly speed up large datasets. This requires the `cuml` and `cupy` libraries to be installed, along with appropriate NVIDIA drivers and a working CUDA setup. raw_umap_plot (plotly.Figure): Raw UMAP plot data, initially set to None. cnv_plot (plotly.Figure): Plot for CNV (Copy Number Variation) visualization, initially set to EMPTY_FIGURE. cnv_id (str): ID for CNV (Copy Number Variation) sample, initially set to None. dropdown_id (list): ID for dropdown selection, initially set to None. ids (list): List of IDs, initially empty. ids_to_highlight (list): IDs to highlight in the plot, initially set to empty list. app (dash.dash.Dash): Dash application object, initially set to None. Raises: ValueError: If `cpg_selection` is not 'top', 'balanced', or 'random'. Examples: >>> # Basic usage >>> from mepylome import MethylAnalysis >>> analysis0 = MethylAnalysis() >>> analysis0.run_app() >>> # Usage if directories are known in advance >>> analysis1 = MethylAnalysis( >>> analysis_dir='/path/to/idat/dir', >>> reference_dir='/path/to/reference/idat/dir', >>> annotation='/path/to/annotation/spread/sheat/with/2/cols', >>> output_dir='/path/to/mepylome/output', >>> ) >>> analysis1.run_app() """ analysis_dir: Path analysis_ids: list[str] | None annotation: Path app: Dash | None balancing_feature: str | None betas_all: pd.DataFrame | None betas_dir: Path betas_sel: pd.DataFrame | None clf_dir: Path cnv_dir: Path cnv_id: str | None cnv_plot: go.Figure cpg_blacklist: set[str] cpg_selection: str cv_default: Any debug: bool do_seg: bool dropdown_id: Sequence[str] | None feature_matrix: pd.DataFrame | np.ndarray | None host: str ids: list[str] ids_to_highlight: list load_full_betas: bool n_cpgs: int n_jobs_clf: int n_jobs_cnv: int | None output_dir: Path overlap: bool port: int precalculate_cnv: bool prep: PrepType raw_umap_plot: go.Figure | None reference_dir: Path test_dir: Path test_ids: list[str] | None umap_cpgs: np.ndarray | None umap_df: pd.DataFrame | None umap_dir: Path umap_parms: dict[str, Any] umap_plot: go.Figure umap_plot_path: Path use_gpu: bool _balanced_sorted_cpgs: np.ndarray | None _balanced_sorted_cpgs_var: np.ndarray | None _classifiers: list[dict[str, Any]] | None _clf_log: Path _cpgs: np.ndarray _idat_handler: IdatHandler | None _internal_cpgs_hash: str | None _old_selected_columns: list[str] | None _prev_vars: dict[str, Any] _prog_bar: ProgressBar _sorted_cpgs: np.ndarray | None _testdir_provided: bool _use_discrete_colors: bool def __init__( self, analysis_dir: str | Path | None = None, *, annotation: str | Path | None = None, reference_dir: str | Path | None = None, output_dir: str | Path | None = None, test_dir: str | Path | None = None, prep: PrepType = "illumina", cpgs: str | Sequence[str] | set[str] | np.ndarray = "auto", cpg_blacklist: Sequence[str] | set[str] | np.ndarray | None = None, n_cpgs: int = DEFAULT_N_CPGS, classifiers: list[dict[str, Any]] | None = None, cv_default: int | Any = 5, n_jobs_clf: int = 1, n_jobs_cnv: int | None = None, precalculate_cnv: bool = False, load_full_betas: bool = True, feature_matrix: pd.DataFrame | np.ndarray | None = None, overlap: bool = False, analysis_ids: Sequence[str] | set[str] | np.ndarray | None = None, test_ids: Sequence[str] | set[str] | np.ndarray | None = None, cpg_selection: str = "top", do_seg: bool = False, host: str = "localhost", port: int = 8050, debug: bool = False, umap_parms: dict[str, Any] | None = None, use_gpu: bool = False, verbose: int = 1, balancing_feature: str | None = None, ) -> None: def _resolve_path(p: str | Path | None, default_path: Path) -> Path: """Convert None or string to Path.""" if p is None: return default_path if isinstance(p, str): return Path(p).expanduser() if isinstance(p, Path): return p.expanduser() raise ValueError( f"_resolve_path: expected str, Path, or None, got " f"{type(p).__name__}: {p}" ) def _to_list(seq: Any | None) -> list[Any] | None: """Convert a sequence to a list if input is not None.""" return None if seq is None else list(seq) self.analysis_dir = _resolve_path(analysis_dir, INVALID_PATH) self.analysis_ids = _to_list(analysis_ids) self.annotation = _resolve_path(annotation, INVALID_PATH) self.app = None self.balancing_feature = balancing_feature self.betas_all = None self.betas_sel = None self.cnv_id = None self.cnv_plot = EMPTY_FIGURE self.cpg_blacklist = set(cpg_blacklist or []) self.cpg_selection = cpg_selection self.cv_default = cv_default self.debug = debug self.dropdown_id = None self.feature_matrix = feature_matrix self.host = host self.ids = [] self.ids_to_highlight = [] self.load_full_betas = load_full_betas self.n_cpgs = n_cpgs self.n_jobs_clf = n_jobs_clf self.n_jobs_cnv = n_jobs_cnv self.output_dir = _resolve_path(output_dir, DEFAULT_OUTPUT_DIR) self.overlap = overlap self.port = port self.precalculate_cnv = precalculate_cnv self.prep = prep self.raw_umap_plot = None self.reference_dir = _resolve_path(reference_dir, INVALID_PATH) self.test_dir = _resolve_path(test_dir, INVALID_PATH) self.test_ids = _to_list(test_ids) self.umap_cpgs = None self.umap_df = None self.umap_parms = MethylAnalysis._get_umap_parms(umap_parms) self.umap_plot = EMPTY_FIGURE self.use_gpu = use_gpu self._balanced_sorted_cpgs = None self._balanced_sorted_cpgs_var = None self._classifiers = classifiers self._clf_log = make_log_file(f"{self.analysis_dir.name}-clf") self._idat_handler = None self._internal_cpgs_hash = None self._old_selected_columns = None self._prog_bar = ProgressBar() self._sorted_cpgs = None self._testdir_provided = self.test_dir != INVALID_PATH self._use_discrete_colors = True ensure_directory_exists(self.output_dir) # Set logging level dynamically main_logger = logging.getLogger("mepylome") main_logger.setLevel(VERBOSITY_LEVELS.get(verbose, logging.INFO)) for handler in main_logger.handlers: handler.setLevel(VERBOSITY_LEVELS.get(verbose, logging.INFO)) if self.cpg_selection not in ["top", "balanced", "random"]: msg = ( "Invalid 'cpg_selection' (expected: 'top', 'balanced', or " "'random')" ) raise ValueError(msg) if self.annotation != INVALID_PATH and not self.annotation.exists(): logger.warning( "Warning: The provided annotation file '%s' does not exist", self.annotation, ) if self.annotation == INVALID_PATH or not self.annotation.exists(): self.annotation = guess_annotation_file(self.analysis_dir) if not self.annotation.exists(): logger.info("No annotation file found") logger.info("Try to import cbseg, linear_segment or ruptures...") self.do_seg = False if _get_cgsegment() is None else do_seg # Set test dir, as it is needed by _get_cpgs self._set_test_dir() self._cpgs = self._get_cpgs(cpgs) self._prev_vars = self._get_vars_or_hashes() self._update_paths() self.read_umap_plot_from_disk() logger.info("Initialization completed") @property def cpgs(self) -> np.ndarray: """Array of CpG sites to analyze, sorted in order. When setting, the input should be the same as the `cpgs` argument in the constructor (`__init__`). Raises: ValueError: If the provided `cpgs` value is not a valid type or format. """ return self._cpgs @cpgs.setter def cpgs( self, cpgs: str | Sequence[str] | set[str] | np.ndarray, ) -> None: """Set the CpG sites for analysis.""" self._cpgs = self._get_cpgs(cpgs) @property def idat_handler(self) -> IdatHandler: """Handles the management of IDAT files and associated metadata. Returns: IdatHandler: An instance of IdatHandler configured with current settings. """ if self._idat_handler is not None: self._old_selected_columns = self._idat_handler.selected_columns self_parameters = { "analysis_dir": self.analysis_dir, "annotation": self.annotation, "overlap": self.overlap, "test_dir": self.test_dir, "analysis_ids": self.analysis_ids, "test_ids": self.test_ids, } # Reinitialize IdatHandler if values have changed if ( self._idat_handler is None or self._idat_handler.init_parameters() != self_parameters ): self._idat_handler = IdatHandler( analysis_dir=self.analysis_dir, annotation=self.annotation, overlap=self.overlap, test_dir=self.test_dir, analysis_ids=self.analysis_ids, test_ids=self.test_ids, ) # Restore old selected columns if they are still valid if self._old_selected_columns and all( x in self._idat_handler.samples_annotated.columns for x in self._old_selected_columns ): self._idat_handler.selected_columns = self._old_selected_columns return self._idat_handler @property def classifiers(self) -> list[dict[str, Any]]: """Retrieves the configuration for classifiers. This property returns a list of dictionaries, where each dictionary includes: - 'name' (str): A human-readable name for the classifier (e.g., 'Random Forest'). - 'model' (object): The classifier model instance. - 'cv' (int or cross-validation generator): Determines the cross-validation splitting strategy. Returns: list of dict: Classifier configurations. """ return self._get_classifiers(self._classifiers) @classifiers.setter def classifiers(self, classifiers: Any | list[Any] | None) -> None: """Sets the configuration for classifiers. This setter accepts either a single classifier model or a list of classifier models. If a model is provided without additional configuration, it will be automatically wrapped in a dictionary with default values for 'name' and 'cv'. Args: classifiers (object or list of objects): A classifier model or a list of classifier models and configurations. This argument is handled the same way as `self.classifiers`. For full details on the format and options, refer to the docstring for `self.classifiers`. Examples: >>> clf = { >>> "model": RandomForestClassifier(), >>> "name": "Custom RF", >>> "cv": 10, >>> } >>> analysis.classifiers = ["kbest-rf", clf] >>> analysis.classifiers [{'model': 'kbest-rf', 'name': 'Custom_Classifier_0', 'cv': StratifiedKFold(n_splits=5, random_state=None, shuffle=True)}, {'model': RandomForestClassifier(), 'name': 'Custom RF', 'cv': StratifiedKFold(n_splits=10, random_state=None, shuffle=True)}] """ self._classifiers = classifiers def _get_classifiers( self, classifiers: Any | list[Any] | None, ) -> list[dict[str, Any]]: if classifiers is None: return [] if not isinstance(classifiers, list): classifiers = [classifiers] result = [] for i, clf in enumerate(classifiers): clf_copy = clf.copy() if isinstance(clf, dict) else {"model": clf} clf_copy["name"] = clf_copy.get("name", f"Custom_Classifier_{i}") cv = clf_copy.get("cv", self.cv_default) if isinstance(cv, int): cv = StratifiedKFold(n_splits=cv, shuffle=True) clf_copy["cv"] = cv if "model" not in clf_copy: msg = "Each classifier in 'classifiers' must have a 'model'." raise ValueError(msg) result.append(clf_copy) return result @staticmethod def _get_umap_parms( umap_parms: dict[str, Any] | None, ) -> dict[str, Any]: """Returns UMAP parameters with defaults if not provided.""" umap_parms = {} if umap_parms is None else umap_parms default = { "metric": "manhattan", "min_dist": 0.1, "n_neighbors": 15, "verbose": True, } return {**default, **umap_parms} def _get_cpgs_hash(self) -> str: """Returns or computes and caches the hash of the CpG array.""" if self._internal_cpgs_hash is None: self._internal_cpgs_hash = input_args_id(self.cpgs) assert self._internal_cpgs_hash is not None return self._internal_cpgs_hash def _get_test_files_hash(self) -> str: """Returns the hash of the test files.""" if not self.test_dir.exists(): return "" return input_args_id( extra_hash=sorted(str(x) for x in self.test_dir.iterdir()) ) def _get_vars_or_hashes(self) -> dict[str, Any]: """Returns current variables and hashes.""" return { "analysis_dir": self.analysis_dir, "prep": self.prep, "n_cpgs": self.n_cpgs, "cpg_selection": self.cpg_selection, "cpgs": self._get_cpgs_hash(), "test_files": self._get_test_files_hash(), "analysis_ids": self.analysis_ids, "test_ids": self.test_ids, } def _get_cpgs( self, input_var: str | Sequence[str] | set[str] | np.ndarray = "auto", ) -> np.ndarray: """Returns CpG sites based on the provided input variable.""" self._internal_cpgs_hash = None def exclude_blacklist( cpgs: np.ndarray | Iterable[str], ) -> np.ndarray: return np.sort(np.array(list(set(cpgs) - self.cpg_blacklist))) logger.info("Determine CpG sites...") if isinstance(input_var, np.ndarray | set | list | pd.Index): return exclude_blacklist(input_var) cpgs_from_file = get_cpgs_from_file(input_var) if cpgs_from_file is not None: return exclude_blacklist(cpgs_from_file) if input_var == "auto": logger.info("Automatically determine array types...") detected = { str(ArrayType.from_idat(path)) for path in self.idat_handler.paths } logger.info( "The following array types were detected: %s", ", ".join(detected), ) input_var_set = detected - {str(ArrayType.UNKNOWN)} elif isinstance(input_var, str): input_var_set = set(input_var.split("+")) logger.info( "The following array types were provided: %s", ", ".join(input_var_set), ) supported_types = {str(x) for x in ArrayType} - { str(ArrayType.UNKNOWN), str(ArrayType.ILLUMINA_27K), } if input_var_set.issubset(supported_types): if not input_var_set: return np.array([]) logger.info("Load manifests and calculate CpG overlap...") cpg_sets = [ set(Manifest(array_type).methylation_probes) for array_type in supported_types if array_type in input_var_set ] cpgs = set.intersection(*cpg_sets) return exclude_blacklist(cpgs) mismatches = ", ".join(input_var_set - supported_types) types_str = ", ".join(supported_types) msg = ( "'cpgs' must be one of the following:\n" "- a list, set, or array of CpG sites\n" f"- a '+' joined string of valid parameters: {types_str}\n" f"Received invalid input: {mismatches}" ) raise ValueError(msg) def _set_test_dir(self) -> None: if self._testdir_provided: ensure_directory_exists(self.test_dir) return test_hash_key = input_args_id( self.analysis_dir, "test", ) self.test_dir = self.output_dir / f"{test_hash_key}" ensure_directory_exists(self.test_dir) def _update_paths(self) -> None: """Update file paths and directories based on current settings. This method recalculates and updates various internal paths and directories whenever changes occur in related attributes like 'analysis_dir', 'prep', 'n_cpgs', 'cpg_selection', and uploaded files in 'test_dir'. It ensures that the correct paths are set for storing beta values, copy number variation data, uploaded files, and UMAP plots. """ logger.info("Update filepaths...") if not self.output_dir.exists(): return if self._prev_vars["analysis_dir"] != self.analysis_dir: self.cpgs = self._get_cpgs() # betas dir betas_hash_key = input_args_id( self.analysis_dir, "betas", self.prep, ) self.betas_dir = self.output_dir / f"{betas_hash_key}" # cnv dir cnv_hash_key = input_args_id( self.analysis_dir, "cnv", self.reference_dir, self.prep, self.do_seg, ) self.cnv_dir = self.output_dir / f"{cnv_hash_key}" ensure_directory_exists(self.cnv_dir) # test dir self._set_test_dir() # umap dir cur_vars = self._get_vars_or_hashes() umap_dir_hash_key = input_args_id( self.analysis_dir, "umap", self.annotation, ) umap_plot_hash_key = input_args_id( self.analysis_dir, "umap", self.prep, self.n_cpgs, self.cpg_selection, self.balancing_feature, extra_hash=[ cur_vars["cpgs"], cur_vars["test_files"], self.annotation, ], ) self.umap_dir = self.output_dir / f"{umap_dir_hash_key}" self.umap_plot_path = self.umap_dir / f"{umap_plot_hash_key}.csv" ensure_directory_exists(self.umap_dir) # clf dir clf_label = re.sub( r"\W+", "", "_".join(self.idat_handler.selected_columns) ) if self.feature_matrix is not None: features = self.feature_matrix elif self.load_full_betas: features = self.load_full_betas else: features = self.n_cpgs clf_hash_key = input_args_id( self.analysis_dir, "clf", clf_label, features, self.prep, extra_hash=[ cur_vars["cpgs"], self.annotation, self.analysis_ids, ], ) self.clf_dir = self.output_dir / f"{clf_hash_key}" # Reset betas_sel if necessary dependencies = [ "analysis_dir", "cpg_selection", "cpgs", "n_cpgs", "prep", "analysis_ids", "test_ids", "test_files", ] if any(self._prev_vars[arg] != cur_vars[arg] for arg in dependencies): self.betas_sel = None # Reset betas_all if necessary dependencies = [ "analysis_dir", "cpgs", "prep", "analysis_ids", "test_ids", "test_files", ] if any(self._prev_vars[arg] != cur_vars[arg] for arg in dependencies): self.betas_all = None # Update variables/hashes self._prev_vars = cur_vars
[docs] def make_umap(self) -> None: """Generates the UMAP plot. This method extracts the beta values required for UMAP computation, computes the UMAP 2D embedding, and creates and displays the UMAP plot based on the computed embedding. """ self._prog_bar.reset(len(self.idat_handler), text="(betas)") self.set_betas() self._prog_bar.reset(1, 1) self.compute_umap() self.make_umap_plot()
[docs] def compute_umap(self) -> None: """Applies the UMAP algorithm on 'betas_sel'. Saves the 2D embedding in 'umap_df' and and on disk. Raises: AttributeError: If a dimension mismatch occurs, or if 'betas_sel' is not set. """ matrix_to_use = ( self.betas_sel if self.feature_matrix is None else self.feature_matrix ) if matrix_to_use is None: msg = "'betas_sel' is not set. First run 'set_betas'" raise AttributeError(msg) n_rows = len(matrix_to_use) if self.betas_sel is not None: chosen_index = self.betas_sel.index source_name = "self.betas_sel" elif isinstance(self.feature_matrix, pd.DataFrame): chosen_index = self.feature_matrix.index source_name = "self.feature_matrix" # Ensure all feature_matrix samples exist in idat_handler.ids missing = set(chosen_index) - set(self.idat_handler.ids) if missing: raise ValueError( "Invalid sample IDs in feature_matrix index: " f"{list(missing)}. All samples must exist in " "self.idat_handler.ids." ) else: chosen_index = self.idat_handler.ids source_name = "self.feature_matrix" # Warn if row counts mismatch and we are not using idat_handler.ids if n_rows != len(self.idat_handler.ids): msg = ( f"Dimension mismatch 0: '{source_name}' has {n_rows} rows but " f"idat_handler.ids has {len(self.idat_handler.ids)}. This may " f"occur due to invalid IDAT files. Check '{self.betas_dir}' " "for erroneous files." ) if n_rows != len(chosen_index): raise ValueError(msg) logger.warning(msg) if self.use_gpu: logger.info("Using GPU backend: cuml.manifold.UMAP") import cupy as cp from cuml.manifold import UMAP logger.info("Transferring data to GPU...") if isinstance(matrix_to_use, pd.DataFrame): matrix_to_use = cp.asarray(matrix_to_use.values) else: matrix_to_use = cp.asarray(matrix_to_use) else: logger.info("Using CPU backend: umap.UMAP") from umap import UMAP assert matrix_to_use is not None logger.info( "Starting UMAP for matrix with shape %s...", matrix_to_use.shape ) with DualOutput(LOG_FILE): umap_2d = UMAP(**self.umap_parms).fit_transform(matrix_to_use) if self.use_gpu: umap_2d = cp.asnumpy(umap_2d) umap_df = pd.DataFrame( umap_2d, columns=["Umap_x", "Umap_y"], index=chosen_index ) # Remove invalid IDAT files annotation = self.idat_handler.samples_annotated.reindex(umap_df.index) self.umap_df = pd.concat( [ umap_df, annotation, ], axis=1, ) self.umap_df.to_csv(self.umap_plot_path, sep="\t", index=True)
[docs] def make_umap_plot(self) -> None: """Generates a UMAP plot from the given 2D embedding. Generates the UMAP plot from the data provided in 'umap_df'. The scatter plot color is based on selected columns in 'idat_handler.selected_columns'. Raises: AttributeError: If a dimension mismatch occurs, or if 'umap_df' is not set. """ logger.info("Make UMAP plot...") if self.umap_df is None: msg = "'umap_df' not set. Run 'make_umap' instead." raise AttributeError(msg) self.ids = list(self.umap_df.index) umap_color_all = self.idat_handler.features() missing = set(self.ids) - set(umap_color_all.index) if missing: raise KeyError( f"Missing {len(missing)} IDs (e.g. {list(missing)[:5]}) in " f"features. Cache: {self.output_dir}" ) umap_color = umap_color_all.loc[self.ids] self.umap_df["Umap_color"] = umap_color.astype(str) self.umap_plot = umap_plot_from_data( self.umap_df, self._use_discrete_colors ) self.umap_plot = self.umap_plot.update_layout( margin={"l": 0, "r": 0, "t": 30, "b": 0}, ) self.raw_umap_plot = self.umap_plot self.dropdown_id = None self._umap_plot_highlight()
[docs] def read_umap_plot_from_disk(self) -> None: """Reads UMAP plot from disk if available from previous analysis.""" if self.umap_plot_path is not None and self.umap_plot_path.exists(): logger.info("Read umap plot from disk...") self.umap_df = pd.read_csv( self.umap_plot_path, sep="\t", index_col=0 ) self.umap_df = self.umap_df.fillna("") try: self.make_umap_plot() except AttributeError: logger.info("Probable dimension mismatch")
[docs] def set_betas(self) -> None: """Sets the beta values DataFrame ('betas_sel') for further analysis. This method reads the IDAT files located in 'analysis_dir', extracts the beta values, and saves them locally in 'output_dir'. Depending on the configuration ('cpg_selection' and 'load_full_betas' flags), it either extracts a subset of CpGs for UMAP computation or loads all CpGs for subsequent processing into memory. Raises: ValueError: If no valid samples are found. """ if not self.idat_handler: msg = "No valid samples found" raise ValueError(msg) self._update_paths() assert self.betas_dir is not None def get_random_cpgs() -> np.ndarray: logger.info("Selecting random CpG's...") return np.sort( np.random.default_rng().choice( self.cpgs, self.n_cpgs, replace=False ) ) def _get_betas(cpgs: np.ndarray) -> pd.DataFrame: logger.info("Retrieving beta values...") assert self.betas_dir is not None return get_betas( idat_handler=self.idat_handler, cpgs=cpgs, prep=self.prep, betas_dir=self.betas_dir, pbar=self._prog_bar, ) # Load all beta values if necessary if self.betas_all is None and self.load_full_betas: self.betas_all = _get_betas(self.cpgs) self._sorted_cpgs, self._sorted_cpgs_var = ( reordered_cpgs_by_variance(self.betas_all) ) # Load all ordered CpG's if load_full_betas if False if self._sorted_cpgs is None and ( self.cpg_selection in ["top", "balanced"] ): assert self.betas_dir is not None self._sorted_cpgs, self._sorted_cpgs_var = ( reordered_cpgs_by_variance_online( idat_handler=self.idat_handler, cpgs=self.cpgs, prep=self.prep, betas_dir=self.betas_dir, pbar=self._prog_bar, ) ) # Handle CpG selection if self.cpg_selection == "random": self.umap_cpgs = get_random_cpgs() self.betas_sel = ( extract_sub_dataframe(self.betas_all, self.umap_cpgs) if self.betas_all is not None else _get_betas(self.umap_cpgs) ) elif self.cpg_selection == "top": assert self._sorted_cpgs is not None self.umap_cpgs = self._sorted_cpgs[: self.n_cpgs] self.betas_sel = ( self.betas_all[self.umap_cpgs] if self.betas_all is not None else _get_betas(self.umap_cpgs) ) elif self.cpg_selection == "balanced": features = self.idat_handler.features( self.balancing_feature or [] ).tolist() balanced_indices = get_balanced_indices(features, seed=42) if self._balanced_sorted_cpgs is None: if self.betas_all is not None: ( self._balanced_sorted_cpgs, self._balanced_sorted_cpgs_var, ) = reordered_cpgs_by_variance( self.betas_all.iloc[balanced_indices] ) else: ids_subset = [ self.idat_handler.ids[i] for i in balanced_indices ] ( self._balanced_sorted_cpgs, self._balanced_sorted_cpgs_var, ) = reordered_cpgs_by_variance_online( idat_handler=self.idat_handler, cpgs=self.cpgs, prep=self.prep, betas_dir=self.betas_dir, pbar=self._prog_bar, ids=ids_subset, ) self.umap_cpgs = self._balanced_sorted_cpgs[: self.n_cpgs] self.betas_sel = ( self.betas_all[self.umap_cpgs] if self.betas_all is not None else _get_betas(self.umap_cpgs) ) else: msg = f"Invalid 'cpg_selection': {self.cpg_selection}" raise ValueError(msg)
def _get_coordinates(self, sample_id: str) -> pd.Series: """Returns UMAP 2D embedding coordinates.""" return self.umap_df[self.umap_df.index == sample_id].iloc[0][ ["Umap_x", "Umap_y"] ] def _umap_plot_highlight(self, cnv_id: str | None = None) -> None: """Highlights the selected samples and the sample selected for CNV.""" if cnv_id is not None: self.cnv_id = cnv_id self.dropdown_id = self.ids_to_highlight self.umap_plot = go.Figure(self.raw_umap_plot) for id_ in self.dropdown_id: x, y = self._get_coordinates(id_) self.umap_plot.add_annotation( x=x, y=y, text=f"{id_}", showarrow=True, arrowhead=2, arrowcolor="blue", font={"color": "blue"}, ) if self.cnv_id is not None: x, y = self._get_coordinates(self.cnv_id) self.umap_plot.add_annotation( x=x, y=y, text=f"{self.cnv_id}", showarrow=True, arrowhead=2, ) def _retrieve_zoom(self, current_plot: dict[str, Any] | None) -> None: """Retrieves and applies the zoom level to the UMAP plot.""" if current_plot is None: return self.umap_plot.layout.xaxis = current_plot["layout"]["xaxis"] self.umap_plot.layout.yaxis = current_plot["layout"]["yaxis"]
[docs] def make_cnv_plot( self, sample_id: str, genes_sel: Sequence[str] | None = None, ) -> None: """Generates a copy number variation (CNV) plot for a specific sample. This method generates a CNV plot for the specified sample and optionally highlights specific genes within the plot. Args: sample_id (str): ID of the sample for which CNV plot is generated. genes_sel (list or None, optional): List of specific genes to highlight in the plot. Raises: FileNotFoundError: If the specified sample ID is not found in the analysis directory or if the reference directory does not exist. """ idat_basepath = self.idat_handler.id_to_path[sample_id] if not is_valid_idat_basepath(idat_basepath): msg = f"Sample {sample_id} not found in {self.analysis_dir}" raise FileNotFoundError(msg) if not self.reference_dir.exists(): msg = f"Reference dir {self.reference_dir} does not exist" raise FileNotFoundError(msg) genes_sel = () if genes_sel is None else tuple(genes_sel) logger.info("Make CNV for %s...", sample_id) self.cnv_plot = get_cnv_plot( sample_path=idat_basepath, reference_dir=self.reference_dir, prep=self.prep, cnv_dir=self.cnv_dir, genes_sel=genes_sel, do_seg=self.do_seg, )
[docs] def precompute_cnvs(self, ids: Sequence[str] | None = None) -> None: """Precalculates CNVs for all samples and saves them to disk. This method performs CNV analysis, and writes the output to the configured CNV directory. If `ids` is not provided, the method will compute CNVs for all samples found in the `analysis_dir`. Args: ids (list, optional): A list of sample IDs to process. If `None`, the function will compute CNVs for all samples in the `analysis_dir`. Default is `None`. Note: Precalculating CNVs improves performance but requires additional memory space in the output directory. """ logger.info("Precalculate CNV's...") self._update_paths() if ids is None: ids = self.idat_handler.ids self._prog_bar.reset(len(ids), text="(CNV)") write_cnv_to_disk( sample_path=[self.idat_handler.id_to_path[x] for x in ids], reference_dir=self.reference_dir, cnv_dir=self.cnv_dir, prep=self.prep, do_seg=self.do_seg, pbar=self._prog_bar, n_cores=self.n_jobs_cnv, ) self._prog_bar.reset(1, 1)
[docs] def get_cnv( self, sample_id: str, extract: Sequence[str] | None = None, ) -> tuple[pd.DataFrame | None, ...]: """Retrieves the CNV information for a specified sample. This method locates the IDAT file corresponding to the provided `sample_id`, processes it to generate CNV data if not already available, and reads the resulting CNV information from disk. Args: sample_id (str): The identifier for the sample whose CNV data is to be retrieved. extract (list): Specifies the data to extract from the CNV analysis. Available options include: - "bins": Raw CNV data at the bin level. - "detail": Detailed CNV information (generally genes). - "segments": Segmented CNV regions. - "metadata": CNV analysis metadata. Returns: tuple: A tuple containing the following elements: - bins (DataFrame): Data representing CNV bins. - detail (DataFrame): Gene CNV information. - segments (DataFrame): Segmented CNV data. If CNV data is not found or cannot be generated, returns None for each extract value. """ if extract is None: extract = ["bins", "detail", "segments"] write_cnv_to_disk( sample_path=[self.idat_handler.id_to_path[sample_id]], reference_dir=self.reference_dir, cnv_dir=self.cnv_dir, prep=self.prep, do_seg=self.do_seg, pbar=self._prog_bar, ) basename = self.idat_handler.id_to_basename[sample_id] if (self.cnv_dir / (basename + ZIP_ENDING)).exists(): return read_cnv_data_from_disk( self.cnv_dir, basename, extract=extract, ) return (None,) * len(extract)
[docs] def cn_summary(self, ids: Sequence[str]) -> tuple[go.Figure, pd.DataFrame]: """Create a copy number summary plot for the given samples. This method generates an overview of CNV gain and loss patterns across chromosomes for a list of sample IDs. It returns both the visual plot and the data used to generate it. Args: ids (list of str): A list of sample IDs to include in the CNV summary. Returns: plot (plotly.graph_objects.Figure): A Plotly figure showing CNV summary results. df_cn_summary (pd.DataFrame): A DataFrame containing the data behind the plot. Raises: ValueError: If `do_seg` is not True. CNV summary plots require segmentation to be enabled. """ if not self.do_seg: msg = "To use CN-summary plots you must set 'do_seg' to 'True'." raise ValueError(msg) self.precompute_cnvs(ids) basenames = [self.idat_handler.id_to_basename[x] for x in ids] plot, df_cn_summary = get_cn_summary(self.cnv_dir, basenames) return plot, df_cn_summary
[docs] def classify( self, *, ids: str | Sequence[str] | None = None, values: pd.DataFrame | np.ndarray | None = None, clf_list: Any, ) -> list[ClassifierResult]: """Classify samples using specified classifiers. This method performs classification on given samples, defined either by `ids` or by `values`, using one or more supervised classifiers. The labels for classification are derived from the `selected_columns`. Classification can either use a provided `feature_matrix` (custom features), or default to CpG methylation data (`betas_all`). All samples in `analysis_dir` resp. those in `analysis_ids` with valid label will be used for learning. Classifiers are applied to the data, and the method returns their predictions and performance reports. Args: ids (str, list, tuple, or None): Sample IDs for prediction/classification. If `values` is provided, `ids` must be `None`. values (pd.DataFrame, np.ndarray, or None): Feature matrix for prediction/classification. If `ids` is provided, `values` must be `None`. clf_list (object or list of objects): A classifier model or a list of classifier models and configurations. This argument is handled the same way as `self.classifiers`. For full details on the format and options, refer to the docstring for `self.classifiers`. Returns: list[ClassifierResult]: A list of ClassifierResult objects, each containing the following attributes: - prediction (pd.DataFrame): A DataFrame containing the predicted labels with their associated probabilities. - model (sklearn.base.BaseEstimator or TrainedClassifier): The trained classifier object used for prediction. - metrics (dict): A dictionary of evaluation metrics for the classifier, such as accuracy, precision, recall, etc. - reports (dict): A dictionary containing textual and HTML reports of the classifier's performance. The keys are: - "txt": A plain-text report (e.g., classification report). - "html": An HTML-formatted report for richer visualization. Outputs: Log file: Contains training time, classifier performance metrics, and evaluation results for each classifier. Raises: ValueError: If not exactly one if `ids` or `values` is set. """ if sum(x is not None for x in (ids, values)) != 1: msg = "Provide exactly one of 'ids' or 'values'." raise ValueError(msg) if ids and isinstance(ids, str): ids = [ids] # Clean the clf log file self._clf_log.write_text("") self._update_paths() ensure_directory_exists(self.clf_dir) clfs = self._get_classifiers(clf_list) def _clf_path(clf: dict[str, Any]) -> Path: filename = input_args_id(clf["model"], clf["cv"]) + ".pkl" return self.clf_dir / filename # Only load data to memory if training is needed if all(_clf_path(clf).exists() for clf in clfs): X, y, _values = self._load_training_data(ids, values_only=True) else: X, y, _values = self._load_training_data(ids) values = _values if _values is not None else values assert values is not None def _log(string: str) -> None: with self._clf_log.open("a") as f: f.write(string) print(string, end="") # Stop if there is no data to fit. if isinstance(X, pd.DataFrame) and X.empty: _log("No data to fit.\n") return [] results = [] for i, clf in enumerate(clfs): if logger.isEnabledFor(logging.INFO): _log(f"Start training classifier {i + 1}/{len(clfs)}...\n") start_time = time.time() clf_result = fit_and_evaluate_clf( X=X, y=y, X_test=values, id_test=ids, save_path=_clf_path(clf), clf=clf["model"], cv=clf["cv"], n_jobs=self.n_jobs_clf, ) elapsed_time = time.time() - start_time if logger.isEnabledFor(logging.INFO): _log(f"Time used for classification: {elapsed_time:.2f} s\n\n") if ( logger.isEnabledFor(logging.INFO) and len(clf_result.reports["txt"]) == 1 ): _log(clf_result.reports["txt"][0] + "\n\n\n") results.append(clf_result) return results
def _load_training_data( self, ids: Sequence[str] | None, values_only: bool = False, ) -> tuple[Any, list[Any] | None, pd.DataFrame | None]: """Load training data for classification.""" cpgs_path = self.clf_dir / "cpgs.npy" if values_only and cpgs_path.exists(): cpgs = np.load(cpgs_path) if ids and len(ids): values = get_betas( idat_handler=self.idat_handler, ids=ids, cpgs=cpgs, prep=self.prep, betas_dir=self.betas_dir, pbar=self._prog_bar, ) else: values = None return None, None, values self.set_betas() y = self.idat_handler.features().tolist() logger.info("Loading feature matrix into memory.") if self.feature_matrix is not None: assert self.betas_sel is not None X = pd.DataFrame(self.feature_matrix, index=self.betas_sel.index) elif self.load_full_betas: X = self.betas_all elif self.betas_sel is not None: logger.info( "Using only %d CpGs for supervised classification " "(load_full_betas=False)", self.betas_sel.shape[1], ) X = self.betas_sel else: msg = "No valid feature matrix could be selected." raise ValueError(msg) assert X is not None values = X.loc[ids] if ids else None def _invalid_class(cls: Any) -> bool: return isinstance(cls, str) and cls.strip("|") == "" # Remove all test samples and samples with unknown classification. test_indices = set(X.index.get_indexer(self.idat_handler.test_ids)) valid_indices = [ i for i, x in enumerate(y) if not _invalid_class(x) and i not in test_indices ] X = X.iloc[valid_indices] y = [y[i] for i in valid_indices] # Save to file np.save(cpgs_path, np.array(X.columns, dtype=str)) return X, y, values
[docs] def mlh1_report_pages(self, ids: Sequence[str]) -> list[str]: """Generate MLH1 promoter methylation report HTML pages. Parameters: ids (list of str): Sample IDs. Returns: list of str: HTML reports, one per sample. """ prev_cpg_blacklist = self.cpg_blacklist prev_cpgs = self.cpgs self.cpg_blacklist = set() self.cpgs = list(MLH1_CPGS) self.set_betas() assert self.betas_all is not None array_types = { id_: str(ArrayType.from_idat(self.idat_handler.id_to_path[id_])) for id_ in ids } mlh1_overlap = { k: list(set(MLH1_CPGS).intersection(Manifest(k).data_frame.IlmnID)) for k in ["450k", "epic", "epicv2", "msa48"] } probes_df = self.betas_all.loc[ids] result = [] for id_ in ids: array_type = array_types[id_] cpg_overlap = mlh1_overlap[array_type] probes = probes_df.loc[id_][cpg_overlap] result.append(make_single_mlh1_report_page(probes)) self.cpg_blacklist = prev_cpg_blacklist self.cpgs = prev_cpgs self._update_paths() return result
[docs] def get_app(self) -> Dash: """Returns a Dash application object for methylation analysis.""" current_dir = Path(__file__).resolve().parent assets_folder = current_dir.parent / "data" / "assets" app = Dash( __name__, update_title="", assets_folder=str(assets_folder), external_stylesheets=[dbc.themes.BOOTSTRAP], ) app._favicon = "favicon.svg" # type: ignore[assignment] app.title = "mepylome" side_navigation = get_side_navigation( sample_ids=self.ids, ids_to_highlight=self.ids_to_highlight, annotation_columns=self.idat_handler.columns, analysis_dir=self.analysis_dir, annotation=self.idat_handler.annotation, reference_dir=self.reference_dir, test_dir=self.test_dir, output_dir=self.output_dir, cpgs=self.cpgs, n_cpgs=self.n_cpgs, prep=self.prep, precalculate=self.precalculate_cnv, cpg_selection=self.cpg_selection, n_neighbors=self.umap_parms["n_neighbors"], metric=self.umap_parms["metric"], min_dist=self.umap_parms["min_dist"], use_discrete_colors=self._use_discrete_colors, custom_clfs=self.classifiers, ) dash_plots = dbc.Col( [ dcc.Graph( id="umap-plot", figure=self.umap_plot, config={ "scrollZoom": True, "doubleClick": "autosize", "modeBarButtonsToRemove": ["lasso2d", "select"], "displaylogo": False, }, style={"height": "70vh"}, ), html.Div(id="umap-error"), dcc.Graph( id="cnv-plot", figure=self.cnv_plot, config={ "scrollZoom": True, "doubleClick": "reset", "modeBarButtonsToRemove": ["lasso2d", "select"], "displaylogo": False, }, ), ], width={"size": 10}, ) app.layout = html.Div( [ get_navbar(), dbc.Container( [ dbc.Row( [side_navigation, dash_plots], style={"margin-top": "20px"}, ), ], fluid=True, ), ], ) @app.callback( [ Output("umap-plot", "figure"), Output("cnv-plot", "figure"), Output("umap-error", "children"), ], [ Input("umap-plot", "clickData"), Input("ids-to-highlight", "value"), Input("umap-annotation-color", "value"), Input("umap-color-scheme", "value"), Input("selected-genes", "value"), ], State("umap-plot", "figure"), ) def update_plots( click_data: dict[str, Any] | None, ids_to_highlight: list[str] | None, umap_color_columns: list[str] | None, umap_color_scheme: str | None, genes_sel: list[str] | None, curr_umap_plot: dict[str, Any] | None, ) -> tuple[Any, Any, Any]: def update_umap_plot() -> tuple[Any, Any, Any]: try: self.make_umap_plot() self._umap_plot_highlight(cnv_id=self.cnv_id) self._retrieve_zoom(curr_umap_plot) return self.umap_plot, no_update, "" except AttributeError: return no_update, no_update, no_update genes_sel = genes_sel or [] trigger = callback_context.triggered[0]["prop_id"].split(".")[0] self.ids_to_highlight = ids_to_highlight or [] if trigger == "ids-to-highlight": self._umap_plot_highlight() self._retrieve_zoom(curr_umap_plot) return self.umap_plot, no_update, "" if ( trigger == "umap-annotation-color" and umap_color_columns is not None ): self.idat_handler.selected_columns = umap_color_columns update_umap_plot() if ( trigger == "umap-color-scheme" and umap_color_scheme is not None ): self._use_discrete_colors = umap_color_scheme == "discrete" update_umap_plot() if trigger == "umap-plot" and isinstance(click_data, dict): points = click_data.get("points") if isinstance(points, list): first_point = points[0] if points else {} sample_id = first_point.get("hovertext") if sample_id is None: return no_update, no_update, "" self._umap_plot_highlight(cnv_id=sample_id) self._retrieve_zoom(curr_umap_plot) try: self.make_cnv_plot(sample_id, genes_sel) return self.umap_plot, self.cnv_plot, "" except FileNotFoundError as exc: logger.info("umap failed: %s", exc) logger.info("sample_id: %s", sample_id) logger.info("MethylAnalysis: %s", self) error_message = ( f"{exc} - There is probably no CNV neutral " "reference set for the array type of the " "selected sample. To solve this, add missing CNV " f"neutral sets to '{self.reference_dir}' and " f"remove the error file in '{self.cnv_dir}'." ) return self.umap_plot, EMPTY_FIGURE, error_message except Exception as exc: logger.info("umap failed: %s", exc) logger.info("sample_id: %s", sample_id) logger.info("MethylAnalysis: %s", self) return self.umap_plot, EMPTY_FIGURE, str(exc) if trigger == "selected-genes" and genes_sel is not None: try: assert self.cnv_id is not None self.make_cnv_plot(self.cnv_id, genes_sel) return no_update, self.cnv_plot, "" except Exception as exc: logger.info("selected-genes failed: %s", exc) logger.info("self.cnv_id: %s", self.cnv_id) logger.info("genes_sel: %s", genes_sel) return no_update, no_update, str(exc) return self.umap_plot, self.cnv_plot, "" @app.callback( [ Output("analysis-dir", "valid"), Output("analysis-path-validation", "children"), Output("annotation-file", "value"), ], [Input("analysis-dir", "value")], prevent_initial_call=False, ) def validate_analysis_path(input_path: str) -> tuple[bool, str, Any]: try: path = Path(input_path).expanduser() # Directory exists but not writable if path.is_dir() and not os.access(path, os.W_OK): return False, f"Protected directory: {path}", no_update # Directory exists and writable if path.is_dir(): self.analysis_dir = path try: annotation_str = str(self.idat_handler.annotation) except Exception: self._idat_handler = None self.annotation = INVALID_PATH annotation_str = str(INVALID_PATH) return True, "", annotation_str return False, f"Not a directory: {path}", no_update except Exception as exc: logger.info( "An error occurred (1) (validate_analysis_path): %s", exc ) return False, "Invalid path format", no_update @app.callback( [ Output("annotation-file", "valid"), Output("annotation-file-validation", "children"), Output("umap-annotation-color", "options"), Output("umap-annotation-color", "value"), ], [Input("annotation-file", "value")], prevent_initial_call=False, ) def validate_annotation_file( input_path: str, ) -> tuple[bool, str, Any, Any]: selection = self.idat_handler.selected_columns try: path = Path(input_path).expanduser() if path.exists() and not os.access(path, os.W_OK): return ( False, f"Protected file: {path}", no_update, selection, ) if path.exists(): self.annotation = path return True, "", self.idat_handler.columns, selection return False, f"Not a file: {path}", no_update, selection except Exception as exc: logger.info( "An error occured (1) (validate_annotation_file): %s", exc ) return False, "Invalid path format", no_update, selection @app.callback( [ Output("reference-dir", "valid"), Output("reference-path-validation", "children"), ], [Input("reference-dir", "value")], prevent_initial_call=False, ) def validate_reference_path(input_path: str) -> tuple[bool, str]: try: path = Path(input_path).expanduser() if path.is_dir() and not os.access(path, os.W_OK): return False, f"Protected directory: {path}" if path.is_dir(): self.reference_dir = path return True, "" return False, f"Not a directory: {path}" except Exception as exc: logger.info( "An error occured (1) (validate_reference_path): %s", exc ) return False, "Invalid path format" @app.callback( [ Output("test-dir", "valid"), Output("test-path-validation", "children"), ], [Input("test-dir", "value")], prevent_initial_call=False, ) def validate_test_path(input_path: str) -> tuple[bool, str]: try: path = Path(input_path).expanduser() if path.is_dir() and not os.access(path, os.W_OK): return False, f"Protected directory: {path}" if path.is_dir(): self.test_dir = path return True, "" return False, f"Not a directory: {path}" except Exception as exc: logger.info( "An error occured (1) (validate_test_path): %s", exc ) return False, "Invalid path format" @app.callback( [ Output("output-dir", "valid"), Output("output-path-validation", "children"), ], [Input("output-dir", "value")], prevent_initial_call=False, ) def validate_output_path(input_path: str) -> tuple[bool, str]: try: path = Path(input_path).expanduser() if path == DEFAULT_OUTPUT_DIR: self.output_dir = path return True, "" if path.is_dir() and not os.access(path, os.W_OK): return False, f"Protected directory: {path}" if path.is_dir(): self.output_dir = path return True, "" return False, f"Not a directory: {path}" except Exception as exc: logger.info( "An error occured (2) (validate_output_path): %s", exc ) return False, "Invalid path format" @app.callback( [ Output("umap-plot", "figure", allow_duplicate=True), Output("ids-to-highlight", "options"), Output("output-div", "children"), Output("running-state", "data"), ], [ Input("start-button", "n_clicks"), ], [ State("num-cpgs", "value"), State("analysis-dir", "value"), State("annotation-file", "value"), State("reference-dir", "value"), State("test-dir", "value"), State("output-dir", "value"), State("preprocessing-method", "value"), State("analysis-dir", "valid"), State("test-dir", "valid"), State("output-dir", "valid"), State("precalculate-cnv", "value"), State("cpg-selection", "value"), State("balancing-column", "value"), ], prevent_initial_call=True, running=[ (Output("start-button", "disabled"), True, False), ], ) def on_umap_start_button_click( n_clicks: int | None, n_cpgs: int | None, analysis_dir: str, annotation: str, reference_dir: str, test_dir: str, output_dir: str, prep: PrepType | None, analysis_dir_valid: bool | None, test_dir_valid: bool | None, output_dir_valid: bool | None, precalculate_cnv: bool | None, cpg_selection: str | None, balancing_feature: str | None, ) -> tuple[Any, Any, Any, dict[str, str]]: if not n_clicks: return no_update, no_update, "", {} error_message = None if n_cpgs is None: error_message = "Invalid no. of CpGs." elif not analysis_dir_valid: error_message = "Invalid Analysis path." elif not test_dir_valid: error_message = "Invalid Test path." elif not output_dir_valid: error_message = "Invalid Output path." elif prep is None: error_message = "Invalid preprocessing method." elif precalculate_cnv is None: error_message = "Invalid precalculation method." elif cpg_selection is None: error_message = "Invalid CpG selection method." elif balancing_feature is None: error_message = "Invalid balancing features." if error_message: return no_update, no_update, error_message, {} assert n_cpgs is not None assert prep is not None assert precalculate_cnv is not None assert cpg_selection is not None assert balancing_feature is not None self.n_cpgs = n_cpgs self.output_dir = Path(output_dir).expanduser() self.reference_dir = Path(reference_dir).expanduser() self.test_dir = Path(test_dir).expanduser() self.prep = prep self.precalculate_cnv = precalculate_cnv == ON self.cpg_selection = cpg_selection self.analysis_dir = Path(analysis_dir).expanduser() self.annotation = Path(annotation).expanduser() self.balancing_feature = ( balancing_feature if cpg_selection == "balanced" else None ) try: ensure_directory_exists(self.output_dir) self.make_umap() except Exception as exc: # BUG: Error 'no module named tqdm.auto' if mepylome is new # installed. This error disapears after running tutorial # Error was produced via cli with -a -A and -o -C 'epic' # --overlap -S 'top'. Maybe this error occurs only on Mac OS? logger.info("An error occured (3): %s", exc) else: return ( self.umap_plot, self.ids, no_update, {"status": "umap_done"}, ) return no_update, no_update, "", {} @app.callback( Output("console-out-setting", "style"), [Input("toggle-button-setting", "n_clicks")], [State("console-out-setting", "style")], ) def toggle_console_out( n_clicks: int, current_style: dict[str, Any], ) -> dict[str, Any]: return { **current_style, "display": "flex" if n_clicks % 2 == 0 else "none", } @app.callback( Output("balancing-column-container", "style"), Input("cpg-selection", "value"), ) def toggle_column_dropdown( selected_method: str | None, ) -> dict[str, str]: """Show column dropdown only if 'balanced' is selected.""" if selected_method == "balanced": return {"display": "block"} return {"display": "none"} @app.callback( [ Input("umap-n_neighbors", "value"), Input("umap-metric", "value"), Input("umap-min_dist", "value"), ], prevent_initial_call=True, ) def update_umap_parms( n_neighbors: int, metric: str, min_dist: float, ) -> None: self.umap_parms["n_neighbors"] = n_neighbors self.umap_parms["metric"] = metric self.umap_parms["min_dist"] = min_dist @app.callback( [ # This ensures button is enabled in Colab Output("start-button", "disabled") ], [Input("running-state", "data")], prevent_initial_call=True, running=[ (Output("start-button", "disabled"), True, False), ], ) def precalculate_cnv_wrapper( state: dict[str, Any] | None, ) -> list[bool]: if ( state and state.get("status") == "umap_done" and self.precalculate_cnv ): self.precompute_cnvs() return [False] @app.callback( [ Output("umap-progress-bar", "value"), Output("umap-progress-bar", "label"), Output("console-out-setting", "children"), Output("console-out-upload", "children"), Output("clf-out", "children"), ], [Input("clock", "n_intervals")], ) def update_progress(n: int) -> tuple[int, str, str, str, list[str]]: progress = self._prog_bar.get_progress() out_str = self._prog_bar.get_text() with LOG_FILE.open("r") as file: log_str = "" lines = file.readlines() n_top = 50 last_lines = lines if len(lines) <= n_top else lines[-n_top:] for line in last_lines: log_str = log_str + line with self._clf_log.open("r") as file: clf_str = file.readlines() return progress, out_str, log_str, log_str, clf_str @app.callback( Output("output-idat-upload", "children"), Input("upload-idat", "contents"), State("upload-idat", "filename"), ) def update_output( list_of_contents: list[str], list_of_names: list[str], ) -> list[html.Div] | None: logger.info("Uploading files...") def parse_contents(contents: str, filename: str) -> html.Div: file_path = self.test_dir / filename content_string = contents.split(",")[1] decoded = base64.b64decode(content_string) with file_path.open("wb") as f: f.write(decoded) logger.info("Upload of %s completed", filename) return html.Div( [ html.H6(filename), ] ) if list_of_contents is not None: children = [] for c, n in zip(list_of_contents, list_of_names, strict=True): children.append(parse_contents(c, n)) # Reload idat handler now that there are new files self._idat_handler = None self._update_paths() # Update cpgs as uploaded files may have different array types self.cpgs = self._get_cpgs() return children return no_update @app.callback( Output("clf-error-out", "children"), [ Input("clf-start-button", "n_clicks"), ], [ State("clf-clf-dropdown", "value"), ], prevent_initial_call=True, running=[ (Output("clf-start-button", "disabled"), True, False), ], ) def on_clf_start_button_click( n_clicks: int | None, clf_list: Sequence[str] | None, ) -> str | Any | None: if not n_clicks: return no_update error_message = None if clf_list is None or len(clf_list) == 0: error_message = "No classifiers selected." elif self.cnv_id is None: error_message = "No sample selected." if error_message: return error_message try: assert clf_list is not None parsed_clf_list = [ self.classifiers[int(x)] if x.isdigit() else x for x in clf_list ] _ = self.classify(ids=self.cnv_id, clf_list=parsed_clf_list) except Exception as exc: logger.info("An error occured (4): %s", exc) return f"{exc}" return "" return app
[docs] def run_app(self, *, open_tab: bool = False) -> None: """Runs the mepylome Dash application. Args: open_tab (bool, optional): Whether to automatically open a new browser tab with the application URL. Defaults to False. """ self.app = self.get_app() free_port = get_free_port(self.port) if open_tab: def open_browser_tab() -> None: webbrowser.open_new_tab(f"http://{self.host}:{free_port}") threading.Timer(1, open_browser_tab).start() self.app.run( debug=self.debug, host=self.host, use_reloader=False, port=free_port, )
def __repr__(self) -> str: title = f"{self.__class__.__name__}()" header = title + "\n" + "*" * len(title) lines = [header] def format_value(value: object) -> tuple[str, str]: length_info = "" if isinstance(value, pd.DataFrame | pd.Series | pd.Index): display_value = str(value) elif isinstance(value, np.ndarray): display_value = str(value) length_info = f"\n\n[{len(value)} items]" elif isinstance(value, go.Figure): data_str = ( str(value.data[0])[:70].replace("\n", " ") if value.data else "No data" ) layout_str = str(value.layout)[:70].replace("\n", " ") data_str += "..." if len(data_str) == 70 else "" layout_str += "..." if len(layout_str) == 70 else "" display_value = ( f"Figure(\n" f" data: {data_str}\n" f" layout: {layout_str}\n" f")" ) else: display_value = str(value)[:80] + ( "..." if len(str(value)) > 80 else "" ) if isinstance(value, Sized) and len(str(value)) > 80: length_info = f"\n\n[{len(value)} items]" return display_value, length_info for attr, value in sorted(self.__dict__.items()): display_value, length_info = format_value(value) lines.append(f"{attr}:\n{display_value}{length_info}") return "\n\n".join(lines)