Source code for mepylome.dtypes.manifests

"""Module for handling Illumina array manifest files.

This module contains a single class ``Manifest`` for reading and processing
Illumina array manifest files, which contain information about probes and their
characteristics.
"""

import logging
import pickle
from pathlib import Path
from typing import IO, Any

import numpy as np
import pandas as pd
import pyranges1 as pr

from mepylome.dtypes.arrays import ArrayType
from mepylome.dtypes.cache import cache_key, input_args_id
from mepylome.dtypes.chromosome import Chromosome
from mepylome.dtypes.probes import Channel, InfiniumDesignType, ProbeType
from mepylome.utils.files import (
    download_file,
    ensure_directory_exists,
    get_csv_file,
    reset_file,
)
from mepylome.utils.varia import CONFIG, MEPYLOME_CACHE_DIR, MEPYLOME_TMP_DIR

logger = logging.getLogger(__name__)

__all__ = ["Manifest"]


MANIFEST_DIR = MEPYLOME_CACHE_DIR / "manifests" / "manifest_files_v0"
DOWNLOAD_DIR = MEPYLOME_TMP_DIR / "manifests"

ENDING_CONTROL_PROBES = CONFIG["suffixes"]["manifest_control_probes"]
NONE = -1

MANIFEST_URL = {
    ArrayType(type_): url for type_, url in CONFIG["urls"]["manifest"].items()
}
PROCESSED_MANIFEST_URL = (
    CONFIG["urls"]["processed_manifests"] % MANIFEST_DIR.name
)
REMOTE_FILENAME = {
    ArrayType(type_): url for type_, url in CONFIG["files"]["remote"].items()
}
LOCAL_FILENAME = {
    ArrayType(type_): url for type_, url in CONFIG["files"]["local"].items()
}


PROBES_COLUMNS = [
    "IlmnID",
    "Name",
    "AddressA_ID",
    "AddressB_ID",
    "Infinium_Design_Type",
    "Color_Channel",
    "CHR",
    "Chr",
    "MAPINFO",
    "AlleleA_ProbeSeq",
    "AlleleB_ProbeSeq",
]


CONTROL_COLUMNS = (
    "Address_ID",
    "Control_Type",
    "Color",
    "Extended_Type",
)


[docs] class Manifest: """Provides an object interface to an Illumina array manifest file. This class provides functionality for reading and processing Illumina array manifest files. A manifest can be loaded automatically based on the array type or provided as a raw manifest file. On first use, the necessary data is automatically downloaded if needed, transformed, and saved locally, which might take some time. The processed manifest is then saved locally and loaded in its processed form on subsequent uses. During a running session, all loaded manifests are cached in memory. Args: array_type (str or ArrayType): The type of array to process. Use either ArrayType (ArrayType.ILLUMINA_450K, ArrayType.ILLUMINA_EPIC, ArrayType.ILLUMINA_EPIC_V2) or corresponding string ('450k', 'epic', 'epicv2', 'msa48') proc_path (str or Path): The path to the local processed manifest file (default: None). raw_path (str or Path, optional): Path to the raw manifest file. Default is None. download_proc (bool, optional): If True and there is no locally saved processed manifest file, attempts to download the processed manifest file instead of the raw one. Defaults to True. Examples: >>> # To initialize a manifest object for Illumina 450k array: >>> manifest = Manifest("450k") >>> manifest >>> # To initialize a manifest object for Illumina EPIC array >>> manifest = Manifest(ArrayType.ILLUMINA_EPIC) >>> type_1 = manifest.probe_info(ProbeType.ONE) >>> # To load all manifests when first used: >>> Manifest.load() """ _cache: dict[Any, "Manifest"] = {} array_type: ArrayType | None ctrl_path: Path download_proc: bool proc_path: Path raw_path: Path | None _control_data_frame: pd.DataFrame _data_frame: pd.DataFrame _init_array_type: str | ArrayType | None _init_download_proc: bool _init_proc_path: str | Path | None _init_raw_path: str | Path | None _methyl_probes: np.ndarray | None _pickle_path: Path _snp_data_frame: pd.DataFrame def __new__( cls, array_type: str | ArrayType | None = None, raw_path: str | Path | None = None, proc_path: str | Path | None = None, download_proc: bool = True, ) -> "Manifest": key = cache_key(array_type, raw_path, proc_path) if key in cls._cache: return cls._cache[key] instance = super().__new__(cls) # Cache the instance cls._cache[key] = instance return instance def __getnewargs__(self) -> tuple: # Necessary for pickle return ( self._init_array_type, self._init_raw_path, self._init_proc_path, self._init_download_proc, ) def __init__( self, array_type: str | ArrayType | None = None, raw_path: str | Path | None = None, proc_path: str | Path | None = None, download_proc: bool = True, ) -> None: if hasattr(self, "_cached"): return self._cached = True self._init_array_type = array_type self._init_raw_path = raw_path self._init_proc_path = proc_path self._init_download_proc = download_proc def to_path(x: str | Path | None) -> Path | None: return x if x is None else Path(x) self.array_type = ArrayType(array_type) if array_type else None self.raw_path = to_path(raw_path) proc_path = to_path(proc_path) self.download_proc = download_proc # Load cached data from disk self._cache_key = input_args_id( "manifest", self.array_type, self.raw_path, proc_path ) self._pickle_path = MEPYLOME_TMP_DIR / f"{self._cache_key}.pkl" if self._pickle_path.exists(): with self._pickle_path.open("rb") as file_read: saved_instance = pickle.load(file_read) self.__dict__.update(saved_instance.__dict__) return if ( self.array_type == ArrayType.UNKNOWN and proc_path is None and raw_path is None ): self._data_frame = pd.DataFrame() self._control_data_frame = pd.DataFrame() self._snp_data_frame = pd.DataFrame() self._methyl_probes = None return # Set processed manifest path if proc_path is None: if ( self.array_type is not None and self.array_type != ArrayType.UNKNOWN ): self.proc_path = MANIFEST_DIR / LOCAL_FILENAME[self.array_type] elif self.raw_path is not None: if self.raw_path.suffix == ".zip": gz_filename = "proc_" + self.raw_path.stem + ".gz" else: gz_filename = "proc_" + self.raw_path.name + ".gz" self.proc_path = DOWNLOAD_DIR / gz_filename else: msg = "Provide either array_type or proc_path or raw_path" raise ValueError(msg) else: self.proc_path = proc_path self.ctrl_path = Manifest._get_control_path(self.proc_path) self._create_processed_manifest_files() self._data_frame = self._read_probes(self.proc_path) self._control_data_frame = self._read_control_probes(self.ctrl_path) self._snp_data_frame = self._read_snp_probes() self._methyl_probes = None # Save to disk with self._pickle_path.open("wb") as file_write: pickle.dump(self, file_write) @property def cache_key(self) -> str: return self._cache_key def _create_processed_manifest_files(self) -> None: """Create processed manifest files if they do not exist.""" if ( self.proc_path is None or self.ctrl_path is None or not (self.proc_path.exists() and self.ctrl_path.exists()) ): # If array type is given, download the files if self.array_type is not None: if ( not self.download_proc or not self._download_processed_manifest() ): self._download_manifest() csv_filename = REMOTE_FILENAME[self.array_type] self._process_manifest(csv_filename=csv_filename) # Else process from raw_path elif self.raw_path is not None: if self.raw_path.suffix == ".zip": csv_filename = self.raw_path.stem else: csv_filename = self.raw_path.name self._process_manifest(csv_filename=csv_filename) else: msg = "Provide either array_type or proc_path or raw_path" raise ValueError(msg) @property def data_frame(self) -> pd.DataFrame: """Pandas data frame of all manifest probes.""" return self._data_frame @property def control_data_frame(self) -> pd.DataFrame: """Pandas data frame of all manifest control probes.""" return self._control_data_frame @property def snp_data_frame(self) -> pd.DataFrame: """SNP probes from the manifest data frame.""" return self._snp_data_frame @property def methylation_probes(self) -> np.ndarray: """All type I and II probes.""" if self._methyl_probes is None: type_1 = self.probe_info(ProbeType.ONE) type_2 = self.probe_info(ProbeType.TWO) idx = np.sort( np.concatenate( [ type_1.IlmnID.index, type_2.IlmnID.index, ] ) ) self._methyl_probes = self._data_frame.iloc[idx]["IlmnID"].values return self._methyl_probes
[docs] def control_address( self, control_type: str | tuple[str] | list[str] | None = None, ) -> pd.Series: """Returns address IDs of all control probes of the specified type.""" if control_type is None: return self._control_data_frame.Address_ID # Ensure control_type is a list-like object if not isinstance(control_type, list | tuple): control_type = [control_type] # Use isin() with the list-like object return self._control_data_frame[ self._control_data_frame.Control_Type.isin(control_type) ].Address_ID
[docs] @staticmethod def load( array_types: list[str | ArrayType] | str | ArrayType | None = None, ) -> None: """Loads specified manifests into memory. Args: array_types (list or ArrayType, optional): List of array types or a single array type to load. Defaults to all available types. Examples: >>> # Load all manifests: >>> Manifest.load() >>> # Load specific manifests: >>> Manifest.load( >>> [ArrayType.ILLUMINA_450K, ArrayType.ILLUMINA_EPIC] >>> ) >>> Manifest.load("epicv2") """ if array_types is None: array_types = [ ArrayType.ILLUMINA_450K, ArrayType.ILLUMINA_EPIC, ArrayType.ILLUMINA_EPIC_V2, ArrayType.ILLUMINA_MSA48, ] if not isinstance(array_types, list): array_types = [array_types] for array_type in array_types: _ = Manifest(array_type)
def _download_manifest( self, ) -> None: """Downloads the appropriate manifest file. This method downloads the manifest file based on the `array_type` specified during the initialization of the Manifest object. It stores the downloaded file in the path of the `raw_path` attribute. Raises: KeyError: If the array type is not found in MANIFEST_URL. """ assert self.array_type is not None source_url = MANIFEST_URL[self.array_type] source_filename = Path(source_url).name logger.info("Downloading manifest: %s", source_filename) self.raw_path = DOWNLOAD_DIR / source_filename download_file(source_url, self.raw_path) def _download_processed_manifest(self) -> bool: """Download processed manifest file and return true if successful.""" logger.info("Downloading processed %s manifest", self.array_type) try: for path in [self.proc_path, self.ctrl_path]: assert path is not None ensure_directory_exists(path.parent) url = PROCESSED_MANIFEST_URL + path.name download_file(url, path) return True except Exception: return False @staticmethod def _get_control_path(probes_path: Path) -> Path: """Converts probes path to the control probes path.""" split_filename = probes_path.name.split(".") split_filename[0] += ENDING_CONTROL_PROBES return Path(probes_path.parent, ".".join(split_filename)) def _process_manifest(self, csv_filename: str | None = None) -> None: """Process the manifest file and save it locally to disk. This method processes the raw manifest file by extracting the necessary probe and control probe information, and then saves these processed details to local files with pathnames `probes_path` and `ctrl_path`. Args: csv_filename (str, optional): Name of the manifest file inside the archive. If not provided, it defaults to the name of the raw_path file. """ logger.info("Process raw manifest %s", self.raw_path) assert self.proc_path is not None assert self.raw_path is not None if csv_filename is None: csv_filename = self.raw_path.name ensure_directory_exists(self.proc_path.parent) ensure_directory_exists(self.ctrl_path.parent) with get_csv_file(self.raw_path, csv_filename) as manifest_file: # Process probes Manifest._seek_to_start(manifest_file) available_columns = pd.read_csv(manifest_file, nrows=0).columns Manifest._seek_to_start(manifest_file) valid_columns = [ col for col in PROBES_COLUMNS if col in available_columns ] probes_df = pd.read_csv( manifest_file, low_memory=False, usecols=valid_columns, ) n_probes = probes_df[probes_df.IlmnID.str.startswith("[")].index[0] probes_df = probes_df[:n_probes] probes_df = Manifest._process_probes(probes_df) probes_df.to_csv(self.proc_path, index=False) # Process controls Manifest._seek_to_start(manifest_file) controls_df = pd.read_csv( manifest_file, header=None, # Skip metadata and probe section skiprows=(3 + n_probes), usecols=range(len(CONTROL_COLUMNS)), ) controls_df.columns = CONTROL_COLUMNS if ( pd.to_numeric(controls_df["Address_ID"], errors="coerce") .notna() .all() ): controls_df["Address_ID"] = controls_df["Address_ID"].astype( "int32" ) controls_df.to_csv(self.ctrl_path, index=False) @staticmethod def _process_probes(data_frame: pd.DataFrame) -> pd.DataFrame: """Transforms manifest probes to a more efficient internal format.""" rename_map = { "MAPINFO": "Start", "CHR": "Chromosome", "Chr": "Chromosome", } data_frame = data_frame.rename(columns=rename_map) data_frame["Chromosome"] = Chromosome.pd_from_string( data_frame["Chromosome"] ) # IlmnID and Name are different in EPICv2 data_frame["IlmnID"] = data_frame["Name"] data_frame = data_frame.drop(columns=["Name"]) channel_to_int = {"Grn": Channel.GRN, "Red": Channel.RED} design_type_map = { "I": InfiniumDesignType.TYPE_I, "II": InfiniumDesignType.TYPE_II, } with pd.option_context("future.no_silent_downcasting", True): if "Color_Channel" in data_frame.columns: data_frame["Color_Channel"] = ( data_frame["Color_Channel"] .replace(channel_to_int) .infer_objects() ) if "Infinium_Design_Type" in data_frame.columns: data_frame["Infinium_Design_Type"] = ( data_frame["Infinium_Design_Type"] .replace(design_type_map) .infer_objects() ) data_frame["TypeI_N_CpG"] = 0 data_frame["TypeII_N_CpG"] = 0 if "AlleleB_ProbeSeq" in data_frame.columns: data_frame["TypeI_N_CpG"] = np.maximum( 0, data_frame["AlleleB_ProbeSeq"].fillna("").str.count("CG") - 1, ) if "AlleleA_ProbeSeq" in data_frame.columns: # R Stands for CG in AlleleA_ProbeSeq data_frame["TypeII_N_CpG"] = ( data_frame["AlleleA_ProbeSeq"].fillna("").str.count("R") ) data_frame["N_CpG"] = NONE if "Infinium_Design_Type" in data_frame.columns: is_type_I = ( data_frame["Infinium_Design_Type"] == InfiniumDesignType.TYPE_I ) is_type_II = ( data_frame["Infinium_Design_Type"] == InfiniumDesignType.TYPE_II ) data_frame.loc[is_type_I, "N_CpG"] = data_frame.loc[ is_type_I, "TypeI_N_CpG" ] data_frame.loc[is_type_II, "N_CpG"] = data_frame.loc[ is_type_II, "TypeII_N_CpG" ] # Use int32 to improve performance of indexing int_cols = [ "AddressA_ID", "AddressB_ID", "Infinium_Design_Type", "Start", "Color_Channel", "TypeI_N_CpG", "TypeII_N_CpG", ] for col in int_cols: if col in data_frame.columns: data_frame[col] = data_frame[col].fillna(NONE).astype("int32") if "Start" in data_frame.columns: data_frame["End"] = data_frame["Start"] drop_cols = ["AlleleA_ProbeSeq", "AlleleB_ProbeSeq"] existing_drop_cols = [ col for col in drop_cols if col in data_frame.columns ] data_frame = data_frame.drop(columns=existing_drop_cols) def get_probe_type( name: str, infinium_type: InfiniumDesignType ) -> int: """Determines the probe type (I, II, SnpI, SnpII or Control).""" probe_type = ProbeType.from_manifest_values(name, infinium_type) return probe_type.value if ( "IlmnID" in data_frame.columns and "Infinium_Design_Type" in data_frame.columns ): vectorized_get_type = np.vectorize(get_probe_type) data_frame["Probe_Type"] = vectorized_get_type( data_frame["IlmnID"].values, data_frame["Infinium_Design_Type"].values, ) # TODO: drop Infinium_Design_Type if not {"Chromosome", "Start", "End"}.issubset(data_frame.columns): return data_frame probes_ranges = pr.PyRanges(data_frame).sort_ranges() return pd.DataFrame(probes_ranges) @staticmethod def _seek_to_start(manifest_file: IO[bytes]) -> None: """Move the manifest file pointer to the start of the data section. Details: The function searches for the first occurrence of the left-most column named "IlmnID". """ reset_file(manifest_file) current_pos = manifest_file.tell() header_line = manifest_file.readline() while not header_line.startswith(b"IlmnID"): current_pos = manifest_file.tell() if not header_line: msg = ( "The first (left-most) column in your manifest must " "contain 'IlmnID'. This defines the header row." ) raise EOFError(msg) header_line = manifest_file.readline() if current_pos == 0: manifest_file.seek(current_pos) else: manifest_file.seek(current_pos - 1) def _read_probes(self, probes_file: str | Path) -> pd.DataFrame: """Reads and returns probes from local file `probes_file`.""" data_frame = pd.read_csv( probes_file, dtype=self._get_data_types(), ) # TODO: epicv2 has duplicated ID's for example c='cg22367159' data_frame = data_frame.drop_duplicates( subset=["IlmnID"], keep="first" ) return data_frame.reset_index(drop=True) def _read_control_probes( self, control_file: str | Path, ) -> pd.DataFrame: """Reads and returns control probes from local file `control_file`.""" data_frame = pd.read_csv( control_file, dtype=self._get_data_types(), ) # Use int32 instead of int64 to improve performance of indexing if ( pd.to_numeric(data_frame["Address_ID"], errors="coerce") .notna() .all() ): data_frame["Address_ID"] = ( data_frame["Address_ID"].astype("int32").copy() ) return data_frame def _read_snp_probes(self) -> pd.DataFrame: """Extracts SNP probes from the manifest data frame.""" snp_df = self.data_frame.copy() return snp_df[snp_df.IlmnID.str.match("rs", na=False)] def _get_data_types(self) -> dict[str, Any]: """Returns data types for the manifest columns.""" return { "IlmnID": np.dtype(object), "AddressA_ID": np.int32, "AddressB_ID": np.int32, "Infinium_Design_Type": np.int8, "Color_Channel": np.int8, "Chromosome": np.int8, "Start": np.int32, "TypeI_N_CpG": np.int8, "TypeII_N_CpG": np.int8, "N_CpG": np.int8, "End": np.int32, "Probe_Type": np.int8, }
[docs] def probe_info( self, probe_type: ProbeType, channel: Channel | None = None, ) -> pd.DataFrame: """Retrieves information about probes of a specified type and channel. Args: probe_type (ProbeType): The type of probe (I, II, SnpI, SnpII, Control). channel (Channel, optional): The color channel (RED or GRN). Defaults to None. Returns: DataFrame: DataFrame containing information about the specified probes. Raises: ValueError: If probe_type is not a valid ProbeType or if channel is not a valid Channel. """ if not isinstance(probe_type, ProbeType): msg = "probe_type is not a valid ProbeType" raise TypeError(msg) if channel and not isinstance(channel, Channel): msg = "channel not a valid Channel" raise TypeError(msg) data_frame = self.data_frame probe_type_mask = data_frame["Probe_Type"].values == probe_type.value if channel is None: return data_frame[probe_type_mask] channel_mask = data_frame["Color_Channel"].values == channel.value return data_frame[probe_type_mask & channel_mask]
def __repr__(self) -> str: title = f"Manifest({self.array_type}):" lines = [ title + "\n" + "*" * len(title), f"data_frame:\n{self.data_frame}", f"control_data_frame:\n{self.control_data_frame}", f"snp_data_frame:\n{self.snp_data_frame}", ] return "\n\n".join(lines)