"""Provides CNV analysis functionality including segmentation and plotting.
This module provides classes and functions for copy number variation (CNV)
analysis.
"""
import heapq
import io
import logging
import pickle
import zipfile
from collections.abc import Callable
from functools import lru_cache
from pathlib import Path
from typing import Any
import numpy as np
import pandas as pd
import pyranges1 as pr
from mepylome.dtypes.arrays import ArrayType
from mepylome.dtypes.beads import MethylData, ReferenceMethylData
from mepylome.dtypes.cache import cache_key, input_args_id, memoize
from mepylome.dtypes.chromosome import Chromosome
from mepylome.dtypes.manifests import Manifest
from mepylome.dtypes.plots import CNVPlot
from mepylome.utils.files import ensure_directory_exists, get_resource_path
from mepylome.utils.varia import CONFIG, MEPYLOME_TMP_DIR
logger = logging.getLogger(__name__)
ZIP_ENDING = CONFIG["suffixes"]["cnv_zip"]
PACKAGE_DIR = get_resource_path("mepylome")
# Data copied from conumee
GAPS = PACKAGE_DIR / CONFIG["paths"]["gaps"]
# Gene annotation is generated by scripts/make_gene_annotation.py
GENES = PACKAGE_DIR / CONFIG["paths"]["genes"]
def _get_cgsegment() -> Callable[[list[float]], list[list[int]]] | None:
try:
import ruptures
def function(bin_values: list[float]) -> list[list[int]]:
algo = ruptures.KernelCPD("linear", min_size=5).fit(bin_values)
segments = algo.predict(pen=1.5)
return [
[start, end]
for start, end in zip([0] + segments, segments, strict=False)
]
return function
except Exception:
pass
try:
import linear_segment
def function(bin_values: list[float]) -> list[list[int]]:
segments = linear_segment.segment(
bin_values, shuffles=1000, cutoff=0.3
)
return [[s.start, s.end] for s in segments]
return function
except Exception:
pass
try:
import cbseg
def function(bin_values: list[float]) -> list[list[int]]:
segments = cbseg.segment(bin_values, shuffles=1000, p=0.0001)
return [[s.start, s.end] for s in segments]
return function
except Exception:
logger.warning(
"**Warning**: Segmentation won't be calculated due to missing "
"'linear_segment', 'cbseg' or 'ruptures' package. See "
"documentation"
)
return None
[docs]
class Annotation:
"""Genomic annotations for CNV such as as binning and gene locations.
Args:
manifest (Manifest, optional): The manifest containing annotation
details. Can be determined from array_type.
array_type (str, optional): The type of array used for annotation.
Can be determined from manifest.
gap (pyranges.PyRanges): The genomic gaps. If unset default values
will be used.
detail (pyranges.PyRanges, optional): Detailed annotation (usually
genes).
bin_size (int, optional): The base-pair size of annotation bins.
Defaults to 50000.
min_probes_per_bin (int, optional): The minimum number of probes
per bin. Defaults to 15.
Attributes:
manifest (Manifest): The manifest to use.
array_type (str): The array type of the manifest.
probes (list): The Illumina ID's of the manifest after adjusting the
manifest to relevant genomic ranges.
gap (pyranges.PyRanges): The genomic gaps except for the CNV analysis.
detail (pyranges.PyRanges): Detailed annotation information (usually
genes).
bin_size (int): The base-pair size of the bins.
min_probes_per_bin (int): The minimum number of probes per bin.
chromsizes (dict): Dictionary containing chromosome sizes.
"""
_cache: dict[Any, "Annotation"] = {}
manifest: Manifest
array_type: ArrayType
probes: np.ndarray
bins: pr.PyRanges
gap: pr.PyRanges
detail: pr.PyRanges
bin_size: int
min_probes_per_bin: int
chromsizes: dict[str, int]
adjusted_manifest: pr.PyRanges
_cpg_bins: pd.DataFrame
_cpg_detail: pd.DataFrame
_init_manifest: Manifest | None
_init_array_type: str | ArrayType | None
_init_gap: pr.PyRanges | None
_init_detail: pr.PyRanges | None
_init_bin_size: int
_init_min_probes_per_bin: int
_cached: bool
_pickle_path: Path
def __new__(
cls,
manifest: Manifest | None = None,
array_type: str | ArrayType | None = None,
gap: pr.PyRanges | None = None,
detail: pr.PyRanges | None = None,
bin_size: int = 50000,
min_probes_per_bin: int = 15,
) -> "Annotation":
key = cache_key(
manifest,
array_type,
gap,
detail,
bin_size,
min_probes_per_bin,
)
if key in cls._cache:
return cls._cache[key]
instance = super().__new__(cls)
# Cache the instance
cls._cache[key] = instance
return instance
def __getnewargs__(self) -> tuple:
# Necessary for pickle
return (
self._init_manifest,
self._init_array_type,
self._init_gap,
self._init_detail,
self._init_bin_size,
self._init_min_probes_per_bin,
)
def __init__(
self,
manifest: Manifest | None = None,
array_type: str | ArrayType | None = None,
gap: pr.PyRanges | None = None,
detail: pr.PyRanges | None = None,
bin_size: int = 50000,
min_probes_per_bin: int = 15,
) -> None:
# Don't need to initialize if instance is cached.
if hasattr(self, "_cached"):
return
self._cached = True
self._init_manifest = manifest
self._init_array_type = array_type
self._init_gap = gap
self._init_detail = detail
self._init_bin_size = bin_size
self._init_min_probes_per_bin = min_probes_per_bin
pickle_hash = input_args_id(
"annotation",
self._init_manifest,
self._init_array_type,
self._init_gap,
self._init_detail,
self._init_bin_size,
self._init_min_probes_per_bin,
)
self._pickle_path = MEPYLOME_TMP_DIR / f"{pickle_hash}.pkl"
if self._pickle_path.exists():
with self._pickle_path.open("rb") as file:
saved_instance = pickle.load(file)
self.__dict__.update(saved_instance.__dict__)
return
logger.info("Constructing annotation...")
if manifest is None and array_type is None:
raise ValueError("Either array_type or manifest must be provided")
if array_type is None:
assert manifest is not None
self.array_type = ArrayType(manifest.array_type)
else:
self.array_type = ArrayType(array_type)
self.manifest = manifest or Manifest(self.array_type)
self.bin_size = bin_size
self.min_probes_per_bin = min_probes_per_bin
self.gap = gap if gap is not None else self.default_gaps()
self.detail = detail if detail is not None else self.default_genes()
if detail is not None:
# PyRanges ranges start at 0
self.detail["Start"] -= 1
self.detail = self.detail.sort_ranges()
self.chromsizes = pr.example_data.chromsizes
manifest_df = self.manifest.data_frame.copy()
manifest_df = manifest_df[
[x.startswith("cg") for x in manifest_df.IlmnID.values]
]
manifest_df = manifest_df[
Chromosome.is_valid_chromosome(manifest_df.Chromosome)
]
manifest_df.Chromosome = Chromosome.pd_to_string(
manifest_df.Chromosome
)
# PyRanges ranges start at 0
manifest_df["Start"] -= 1
self.adjusted_manifest = pr.PyRanges(manifest_df)
self.probes = self.adjusted_manifest["IlmnID"].values
self.bins = self.make_bins()
self.bins["bins_index"] = np.arange(len(self.bins))
self._cpg_bins = pd.DataFrame(
self.bins.join_overlaps(self.adjusted_manifest)[
["bins_index", "IlmnID"]
].set_index("bins_index")
)
if self.detail is None:
self._cpg_detail = pd.DataFrame(columns=["Name", "IlmnID"])
else:
self._cpg_detail = pd.DataFrame(
self.detail.join_overlaps(self.adjusted_manifest)[
["Name", "IlmnID"]
]
)
# Save to disk
with self._pickle_path.open("wb") as file:
pickle.dump(self, file)
logger.info("Constructing annotation done")
[docs]
@staticmethod
@lru_cache
def default_gaps() -> pr.PyRanges:
"""Default genomic gaps.
Details:
The default value of conumee2.
"""
gap_df = pd.read_csv(GAPS)
# PyRanges ranges start at 0
gap_df["Start"] -= 1
return pr.PyRanges(gap_df)
[docs]
@staticmethod
@lru_cache
def default_genes() -> pr.PyRanges:
"""Default PyRanges object including gene names with coordinates.
Details:
Data downloaded from: https://grch37.ensembl.org/biomart/martview
"""
genes_df = pd.read_csv(GENES, sep="\t")
# PyRanges ranges start at 0
genes_df["Start"] -= 1
return pr.PyRanges(genes_df)
[docs]
def make_bins(self) -> pr.PyRanges:
"""Creates equidistant bins and then removes genomic gaps."""
bins = pr.tile_genome(self.chromsizes, int(self.bin_size))
bins = bins[bins["Chromosome"] != "chrM"]
if self.gap is not None:
bins = bins.subtract_overlaps(self.gap)
return self.merge_bins(bins)
[docs]
def merge_bins(self, bins: pr.PyRanges) -> pr.PyRanges:
"""Merges adjacent bins until all contain a minimum of probes."""
bins = bins.count_overlaps(
self.adjusted_manifest, overlap_col="N_probes"
)
merged = (
bins.groupby("Chromosome")
.apply(
lambda x: self.merge_bins_in_chromosome(
x.reset_index(drop=True), self.min_probes_per_bin
)
)
.reset_index()[["Chromosome", "Start", "End", "N_probes"]]
)
return pr.PyRanges(merged).sort_ranges().reset_index(drop=True)
[docs]
@staticmethod
def merge_bins_in_chromosome(
bin_df: pd.DataFrame, min_probes_per_bin: int
) -> pd.DataFrame:
"""Merges adjacent bins until all contain a minimum of probes.
Args:
bin_df (DataFrame): DataFrame containing bin information for a
single chromosome.
min_probes_per_bin (int): Minimum number of probes per bin required
for merging.
Returns:
DataFrame: Merged bins in the chromosome.
"""
I_START, I_END, I_N_PROBES, I_LEFT, I_RIGHT = range(5)
INVALID = np.iinfo(np.int64).max
# Calculate Left and Right neighbors
bin_df["Left"] = bin_df.index - 1
bin_df["Right"] = bin_df.index + 1
# Need to regularly extract minimum; use min-heap
heap = [
(x, y)
for x, y in zip(bin_df.N_probes, bin_df.index, strict=True)
if x < min_probes_per_bin
]
heapq.heapify(heap)
matrix = bin_df[
["Start", "End", "N_probes", "Left", "Right"]
].values.astype(np.int64)
def update_neighbors(left: int, mid: int, right: int) -> None:
matrix[mid, I_N_PROBES] = INVALID
if left > 0:
matrix[left, I_RIGHT] = matrix[mid, I_RIGHT]
if right < matrix.shape[0]:
matrix[right, I_LEFT] = matrix[mid, I_LEFT]
while heap and heap[0][0] < min_probes_per_bin:
n_probes, i_min = heap[0]
real_n_probes = matrix[i_min, I_N_PROBES]
# Check if n_probes changed due to merging and needs to be updated
if n_probes != real_n_probes:
heapq.heapreplace(heap, (real_n_probes, i_min))
continue
heapq.heappop(heap)
n_probes_left = INVALID
n_probes_right = INVALID
# Left
i_left = matrix[i_min, I_LEFT]
if (
i_left > 0
and matrix[i_left, I_N_PROBES] != INVALID
and matrix[i_min, I_START] == matrix[i_left, I_END]
):
n_probes_left = matrix[i_left, I_N_PROBES]
# Right
i_right = matrix[i_min, I_RIGHT]
if (
i_right < matrix.shape[0]
and matrix[i_right, I_N_PROBES] != INVALID
and matrix[i_min, I_END] == matrix[i_right, I_START]
):
n_probes_right = matrix[i_right, I_N_PROBES]
# Invalid
if n_probes_left == INVALID and n_probes_right == INVALID:
update_neighbors(i_left, i_min, i_right)
if logger.isEnabledFor(logging.DEBUG):
row = (
bin_df.loc[i_min, ["Start", "End"]]
.astype(str)
.tolist()
)
row_str = "-".join(row)
logger.debug(
"Could not merge %s. Removed instead.", row_str
)
continue
if n_probes_right == INVALID or n_probes_left <= n_probes_right:
i_merge = i_left
else:
i_merge = i_right
matrix[i_merge, I_N_PROBES] += matrix[i_min, I_N_PROBES]
matrix[i_merge, I_START] = min(
matrix[i_merge, I_START], matrix[i_min, I_START]
)
matrix[i_merge, I_END] = max(
matrix[i_merge, I_END], matrix[i_min, I_END]
)
update_neighbors(i_left, i_min, i_right)
bin_df[["Start", "End", "N_probes"]] = matrix[:, :3]
bin_df = bin_df[bin_df.N_probes != INVALID]
return bin_df[["Start", "End", "N_probes"]]
[docs]
@staticmethod
def load(
array_types: list[str | ArrayType] | str | ArrayType | None = None,
) -> None:
"""Loads specified annotation into memory.
Args:
array_types (list or ArrayType, optional): List of array types or
a single array type to load. Defaults to all available types.
Examples:
>>> # Load all annotations:
>>> Annotation.load()
>>> # Load specific annotation:
>>> Annotation.load(
>>> [ArrayType.ILLUMINA_450K, ArrayType.ILLUMINA_EPIC]
>>> )
>>> Annotation.load("epicv2")
"""
if array_types is None:
array_types = [
ArrayType.ILLUMINA_450K,
ArrayType.ILLUMINA_EPIC,
ArrayType.ILLUMINA_EPIC_V2,
ArrayType.ILLUMINA_MSA48,
]
if not isinstance(array_types, list):
array_types = [array_types]
for array_type in array_types:
_ = Annotation(array_type=array_type)
def __repr__(self) -> str:
title = "Annotation():"
lines = [
title + "\n" + "*" * len(title),
f"probes:\n{self.probes}",
f"min_probes_per_bin:\n{self.min_probes_per_bin}",
f"bin_size:\n{self.bin_size}",
f"gap:\n{self.gap}",
f"detail:\n{self.detail}",
f"array_type:\n{self.array_type}",
f"adjusted_manifest:\n{self.adjusted_manifest}",
f"chromsizes:\n{self.chromsizes}",
f"bins:\n{self.bins}",
]
return "\n\n".join(lines)
@memoize
def cached_index(left_arr: pd.Index, right_arr: pd.Index) -> np.ndarray:
"""Cached indices to improve speed of pandas loc/iloc operations.
Return the cached indices of elements in left_array based on their presence
in right_array.
"""
return left_arr.get_indexer(right_arr)
def _pd_loc(pd_df: pd.DataFrame, pd_col: pd.Series | pd.Index) -> pd.DataFrame:
"""Cached version of pd_df.loc[pd_col] to speed up computation."""
return pd_df.iloc[cached_index(pd_df.index, pd_col.values)]
[docs]
class CNV:
"""Class for Copy Number Variation (CNV) analysis.
Attributes:
sample (MethylData): MethylData object representing the sample.
reference (MethylData): MethylData object representing the CNV-
neutral references.
annotation (Annotation): Annotation object containing genomic
annotation information.
bins (PyRanges): PyRanges object representing genomic bins.
probes (Index): Index of probe IDs.
coef: Coefficient of linear regression.
_ratio: Difference between observed sample intensity and expected
intensity calculated by linear regression from references.
ratio: The values from _ratio as DataFrame with Illumina ID's as
indices.
noise: Noise level. A quality measure for the sample bead.
detail: Detailed information (usually Genes).
segments: Segments calculated by circular binary segmentation.
Args:
sample (MethylData): MethylData object representing the sample.
reference (MethylData or ReferenceMethylData): MethylData object
representing the reference, or ReferenceMethylData object for
multiple references.
annotation (Annotation, optional): Annotation object containing
genomic annotation information. Defaults to annotation
associated with the sample array type.
Examples:
>>> sample = MethylData(file="path/to/idat/file")
>>> reference = MethylData(file="path/to/idat/reference/dir")
>>> cnv = CNV(sample, reference)
>>> cnv.set_bins()
>>> cnv.set_detail()
>>> cnv.set_segments()
>>> cnv.plot()
Raises:
ValueError: If sample does not contain exactly 1 probe, or if
reference is not of type MethylData or ReferenceMethylData.
Reference:
Daenekas, B., Pérez, E., Boniolo, F., Stefan, S., Benfatto, S., Sill,
M., Sturm, D., Jones, D. T. W., Capper, D., Zapatka, M., & Hovestadt,
V. (2024). Conumee 2.0: enhanced copy-number variation analysis from
DNA methylation arrays for humans and mice. In J. Kelso (Ed.),
Bioinformatics (Vol. 40, Issue 2). Oxford University Press (OUP).
https://doi.org/10.1093/bioinformatics/btae029
"""
annotation: "Annotation"
bins: pr.PyRanges
coef: np.ndarray
detail: pr.PyRanges | None
noise: float
probe: str # TODO: Change this variable name (sample_id?)
probes: pd.Index
ratio: pd.DataFrame
reference: "MethylData"
sample: "MethylData"
segments: pd.DataFrame | None
_ratio: np.ndarray
def __init__(
self,
sample: "MethylData",
reference: MethylData | ReferenceMethylData,
annotation: Annotation | None = None,
) -> None:
if len(sample.probes) != 1:
msg = "sample must contain exactly 1 probe."
raise ValueError(msg)
self.sample = sample
self.probe = self.sample.probes[0]
if isinstance(reference, MethylData):
self.reference = reference
elif isinstance(reference, ReferenceMethylData):
self.reference = reference[self.sample.array_type]
else:
msg = (
"'reference' must be of type 'MethylData' "
"or 'ReferenceMethylData'"
)
raise TypeError(msg)
self.annotation = annotation or Annotation(
array_type=sample.array_type
)
if not (
self.sample.array_type
== self.annotation.array_type
== self.reference.array_type
):
msg = (
f"Array type mismatch: sample ({self.sample.array_type}), "
f"reference ({self.reference.array_type}), "
f"annotation ({self.annotation.array_type}).\n"
"All must be the same."
)
raise ValueError(msg)
self.set_itensity(self.sample)
self.set_itensity(self.reference)
self.bins = self.annotation.bins
self.probes = self.annotation.adjusted_manifest.IlmnID
self.detail = None
self.segments = None
self.fit()
[docs]
@classmethod
def set_all(
cls,
sample: MethylData,
reference: MethylData | ReferenceMethylData,
annotation: Annotation | None = None,
*,
do_seg: bool = True,
) -> "CNV":
"""Create a CNV object and perform CNV analysis.
Args:
sample (MethylData): MethylData object representing the sample.
reference (MethylData or ReferenceMethylData): MethylData object
representing the reference, or ReferenceMethylData object for
multiple references.
annotation (Annotation, optional): Annotation object containing
genomic annotation information. Defaults to annotation
associated with the sample array type.
do_seg (bool, optional): Indicates whether to perform
segmentation, which can be computationally intensive. Defaults
to True.
Returns:
CNV: CNV object with fitted data and optionally segmented.
Examples:
>>> cnv = CNV.set_all(sample, reference, do_seg=do_seg)
>>> # Note: This command is equivalent to:
>>> cnv = CNV(sample, reference)
>>> cnv.set_bins()
>>> cnv.set_detail()
>>> if do_seg:
>>> cnv.set_segments()
"""
cnv = cls(
sample=sample,
reference=reference,
annotation=annotation,
)
cnv.set_bins()
cnv.set_detail()
if do_seg:
cnv.set_segments()
return cnv
[docs]
def set_itensity(self, methyl_data: "MethylData") -> None:
"""Calculates intensity values from methylation data."""
if getattr(methyl_data, "intensity", None) is not None:
return
logger.debug("%s Setting intensity...", self.probe)
intensity = methyl_data.methyl + methyl_data.unmethyl
prefix = (
f"{self.probe}"
if methyl_data == self.sample
else f"{self.reference.probes[0]},..."
)
# Replace NaN values with 1
nan_indices = np.isnan(intensity)
if np.any(nan_indices):
intensity[nan_indices] = 1
logger.debug("%s: Intensities that are NA set to 1", prefix)
# Replace values less than 1 with 1
lt_one_indices = intensity < 1
if np.any(lt_one_indices):
intensity[lt_one_indices] = 1
logger.debug("%s: Intensities < 0 set to 1", prefix)
# Check abnormal low and high intensities
mean_intensity = np.mean(intensity, axis=1)
if np.min(mean_intensity) < 5000:
logger.info("%s: Intensities are abnormally low (< 5000)", prefix)
if np.max(mean_intensity) > 50000:
logger.info(
"%s: Intensities are abnormally high (> 50000)", prefix
)
methyl_data.intensity = pd.DataFrame(
intensity.T,
columns=methyl_data.probes,
index=methyl_data.methyl_ilmnid,
)
[docs]
def fit(self) -> None:
"""Fits linear regression model to calculate CNV at every CpG site.
This method fits a linear regression model to the intensity data of the
sample and reference and calculates the CNV at every CpG site.
"""
logger.info("%s Performing fit...", self.probe)
from sklearn.linear_model import LinearRegression
assert self.sample.intensity is not None
assert self.reference.intensity is not None
smp_intensity = _pd_loc(
self.sample.intensity, self.probes
).values.ravel()
idx = _pd_loc(self.sample.intensity, self.probes).index
ref_intensity = _pd_loc(self.reference.intensity, self.probes).values
correlation = np.array(
[np.corrcoef(smp_intensity, z)[0, 1] for z in ref_intensity.T]
)
if any(correlation >= 0.99):
logger.info(
"%s Sample found in reference set. Excluded from fitting.",
self.probe,
)
ref_intensity = ref_intensity[:, correlation < 0.99]
X = np.log2(ref_intensity)
y = np.log2(smp_intensity)
reg = LinearRegression().fit(X, y)
y_pred = reg.predict(X)
y_pred[y_pred < 0] = 0
self.coef = reg.coef_
self._ratio = y - y_pred
self.ratio = pd.DataFrame({"ratio": self._ratio}, idx)
self.noise = np.sqrt(
np.mean((self._ratio[1:] - self._ratio[:-1]) ** 2)
)
[docs]
def set_bins(self) -> None:
"""Calculates CNV within each bin based on the results of 'fit'.
This method calculates copy number variation (CNV) within each bin
by taking the median of the ratios obtained from the linear regression
model fit in the 'fit' method.
"""
logger.info("%s Setting bins...", self.probe)
cpg_bins = self.annotation._cpg_bins.copy()
cpg_bins["ratio"] = _pd_loc(self.ratio, cpg_bins.IlmnID).ratio.values
result = cpg_bins.groupby("bins_index", dropna=False)["ratio"].agg(
["median", "var"]
)
bins_df = self.bins.copy()
bins_df["Median"] = np.nan
bins_df["Var"] = np.nan
bins_df.loc[result.index, ["Median", "Var"]] = result.values
self.bins = pr.PyRanges(bins_df)
[docs]
def set_detail(self) -> None:
"""Calculates CNV for the detail object based on the results of 'fit'.
This method calculates copy number variation (CNV) for the detail
object (usually genes) by aggregating the ratios obtained from the
linear regression model fit in the 'fit' method for each genomic region
specified in the detail object. The result includes the median ratio,
variance, and count of probes within each region.
"""
logger.info("%s Setting detail...", self.probe)
cpg_detail = self.annotation._cpg_detail.copy()
cpg_detail["ratio"] = _pd_loc(
self.ratio, cpg_detail.IlmnID
).ratio.values
result = cpg_detail.groupby("Name", dropna=False)["ratio"].agg(
["median", "var", "count"]
)
detail_df = pd.DataFrame(self.annotation.detail.set_index("Name"))
detail_df["Median"] = np.nan
detail_df["Var"] = np.nan
detail_df["N_probes"] = 0
idx = cached_index(detail_df.index, result.index.values)
detail_df.iloc[
idx, detail_df.columns.get_indexer(["Median", "Var", "N_probes"])
] = result.values
detail_df["N_probes"] = detail_df["N_probes"].astype(int)
detail_df = detail_df.reset_index()
self.detail = pr.PyRanges(detail_df)
@staticmethod
def _get_segments(df: pd.DataFrame) -> pd.DataFrame:
"""Performs circular binary segmentation to identify CNV segments.
This method applies the circular binary segmentation (CBS) algorithm
to identify copy number variation (CNV) segments based on the median
values of genomic bins. The CBS algorithm is a time-intensive
operation that segments genomic regions based on change-points in
intensity values.
Args:
df (DataFrame): DataFrame containing the median values of
genomic bins.
Returns:
DataFrame: DataFrame containing CNV segments with columns for
chromosome, start position, and end position.
"""
cbsegment = _get_cgsegment()
assert cbsegment is not None
bin_values = df["Median"].values
seg = cbsegment(bin_values)
return pd.DataFrame(
[[df["Start"].iloc[s[0]], df["End"].iloc[s[1] - 1]] for s in seg],
columns=["Start", "End"],
)
[docs]
def set_segments(self) -> None:
"""Sets CNV segments based on circular binary segmentation.
This method applies the circular binary segmentation (CBS) algorithm
to identify copy number variation (CNV) segments in the dataset.
It calculates the CNV segments for each chromosome and stores them
in the 'segments' attribute of the object.
"""
if _get_cgsegment() is None:
return
logger.info("%s Setting segments...", self.probe)
segments = pr.PyRanges(
self.bins.groupby("Chromosome")
.apply(self._get_segments)
.reset_index()
)
overlap = segments.join_overlaps(self.annotation.adjusted_manifest)
overlap["ratio"] = self.ratio.loc[overlap.IlmnID].values
result = (
overlap.groupby(["Chromosome", "Start", "End"], dropna=False)[
"ratio"
]
.agg(["median", "mean", "var", "count"])
.reset_index()
.rename(
columns={
"count": "N_probes",
"median": "Median",
"mean": "Mean",
"var": "Var",
}
)
)
self.segments = pr.PyRanges(result).sort_ranges()
[docs]
def write(
self,
path: str | Path,
data: str | list[str] = "all",
) -> None:
"""Writes CNV data to disk as a zip file.
This method writes the CNV data to disk as a zip file containing
CSV files. It allows specifying which data to include in the zip
file, such as bins, detail, segments, and metadata.
Args:
path (str): The path to save the zip file.
data (str or list of str, optional): Specifies which data to
include in the zip file. Valid options are "all", "bins",
"detail", "segments", and "metadata". Defaults to "all".
Raises:
ValueError: If an invalid data option is specified.
"""
logger.info("%s Write data to disk...", self.probe)
default = {"all", "bins", "detail", "segments", "metadata"}
if not isinstance(data, list):
data = [data]
data_set = set(data)
if "all" in data_set:
data_set = default
invalid = data_set - default
if invalid:
msg = (
f"Invalid file(s) specified: {invalid}. "
f"Valid options are: {default}"
)
raise ValueError(msg)
dfs_to_write = []
if "bins" in data_set and self.bins is not None:
dfs_to_write.append(("bins.csv", pd.DataFrame(self.bins)))
if "detail" in data_set and self.detail is not None:
dfs_to_write.append(("detail.csv", pd.DataFrame(self.detail)))
if "segments" in data_set and self.segments is not None:
dfs_to_write.append(("segments.csv", pd.DataFrame(self.segments)))
if "metadata" in data_set:
metadata_df = pd.DataFrame(
{
"Array_type": [str(self.annotation.array_type)],
"Noise": [self.noise],
},
)
dfs_to_write.append(("metadata.csv", metadata_df))
base_path = Path(path).expanduser()
if base_path.suffix == ".zip":
base_path = base_path.with_suffix("")
file_name = base_path.name
buffer = io.BytesIO()
with zipfile.ZipFile(buffer, "w") as zf:
for df_name, df in dfs_to_write:
csv_bytes = df.to_csv(index=False).encode("utf-8")
zf.writestr(f"{file_name}_{df_name}", csv_bytes)
buffer.seek(0)
with base_path.with_suffix(".zip").open("wb") as f:
f.write(buffer.read())
[docs]
def plot(self) -> None:
"""Generates and displays a plot of the CNV data."""
logger.info("%s Plotting...", self.probe)
cnv_dir = Path(MEPYLOME_TMP_DIR, "cnv_zips")
ensure_directory_exists(cnv_dir)
cnv_file = self.probe + ZIP_ENDING
cnv_path = Path(cnv_dir, cnv_file)
if "Median" not in self.bins.columns:
self.set_bins()
if self.detail is None:
self.set_detail()
self.write(cnv_path)
cnv_plot = CNVPlot(cnv_dir, cnv_file)
cnv_plot.run_app()
def __repr__(self) -> str:
title = "CNV():"
lines = [
title + "\n" + "*" * len(title),
f"sample:\n{self.sample.probes}",
f"reference:\n{self.reference.probes}",
f"_ratio: {self._ratio}",
f"bins:\n{self.bins}",
f"detail:\n{self.detail}",
f"segments:\n{self.segments}",
f"coef:\n{self.coef}",
f"noise:\n{self.noise}",
f"ratio:\n{self.ratio}",
]
return "\n\n".join(lines)