Source code for mepylome.dtypes.idat

"""Contains a IDAT file parser."""

import gzip
import io
import os
from collections.abc import Generator
from contextlib import contextmanager
from enum import IntEnum, unique
from pathlib import Path
from typing import Any, BinaryIO, cast

import numpy as np

__all__ = ["IdatParser"]

DEFAULT_IDAT_VERSION = 3
DEFAULT_IDAT_FILE_ID = "IDAT"


def read_byte(infile: BinaryIO) -> int:
    return int.from_bytes(infile.read(1), byteorder="little", signed=False)


def read_short(infile: BinaryIO) -> int:
    return int.from_bytes(infile.read(2), byteorder="little", signed=False)


def read_int(infile: BinaryIO) -> int:
    return int.from_bytes(infile.read(4), byteorder="little", signed=True)


def read_long(infile: BinaryIO) -> int:
    return int.from_bytes(infile.read(8), byteorder="little", signed=True)


def read_char(infile: BinaryIO, num_bytes: int) -> str:
    return infile.read(num_bytes).decode("utf-8")


def read_string(infile: BinaryIO) -> str:
    num_bytes = read_byte(infile)
    num_chars = num_bytes % 128
    shift = 0
    while num_bytes // 128 == 1:
        num_bytes = read_byte(infile)
        shift += 7
        offset = (num_bytes % 128) * (2**shift)
        num_chars += offset
    return read_char(infile, num_chars)


def read_array(
    infile: BinaryIO,
    dtype: str | np.dtype,
    n: int,
) -> np.ndarray:
    dtype = np.dtype(dtype)
    total_size = dtype.itemsize * n
    alldata = infile.read(total_size)

    if len(alldata) != total_size:
        msg = "End of file reached before number of results parsed"
        raise EOFError(msg)

    return np.frombuffer(alldata, dtype)


@unique
class IdatSectionCode(IntEnum):
    """Section codes used in Illumina idat files."""

    ILLUMINA_ID = 102
    STD_DEV = 103
    MEAN = 104
    NUM_BEADS = 107  # how many replicate measurements for each probe
    MID_BLOCK = 200
    RUN_INFO = 300
    RED_GREEN = 400
    MOSTLY_NULL = 401  # manifest
    BARCODE = 402
    CHIP_TYPE = 403  # format
    MOSTLY_A = 404  # label
    UNKNOWN_1 = 405  # opa
    UNKNOWN_2 = 406  # sampleid
    UNKNOWN_3 = 407  # descr
    UNKNOWN_4 = 408  # plate
    UNKNOWN_5 = 409  # well
    UNKNOWN_6 = 410
    UNKNOWN_7 = 510  # unknown
    NUM_SNPS_READ = 1000


def _get_file_size(file_like: BinaryIO) -> int:
    """Get the size of a file-like object."""
    # Check if the file-like object has a fileno method
    if isinstance(file_like, io.BufferedReader | gzip.GzipFile):
        return os.fstat(file_like.fileno()).st_size

    if isinstance(file_like, io.BytesIO):
        current_pos = file_like.tell()
        file_like.seek(0, io.SEEK_END)
        size = file_like.tell()
        file_like.seek(current_pos)
        return size

    msg = "Cannot determine file size. Unknown file format"
    raise ValueError(msg)


@contextmanager
def get_file_object(
    file: str | Path | BinaryIO,
) -> Generator[BinaryIO, None, None]:
    """Returns a file-like object for reading an IDAT file.

    Supports plain and gzipped IDAT files.

    Args:
        file: Path to the file (str or Path) or an already open file-like
            object.

    Yields:
        A binary file-like object.
    """
    if isinstance(file, io.BufferedIOBase | io.BytesIO):
        # Already a file-like object
        yield file
    else:
        if not isinstance(file, str | Path):
            raise TypeError(f"Expected file-like object, got {type(file)}")
        path = Path(file)
        f = gzip.open(path, "rb") if path.suffix == ".gz" else path.open("rb")
        try:
            yield cast(BinaryIO, f)
        finally:
            f.close()


[docs] class IdatParser: """Reads and parses an IDAT file. Stores all extracted values from the IDAT file as attributes. Args: file (str or file-like object): Path to the IDAT file or a file-like object. Can also be a gzipped IDAT file. intensity_only (bool, optional): Whether to read only intensity values, which makes parsing faster. Defaults to False. Examples: >>> filepath = "/path/to/idat/file_Grn.idat" >>> idat_data = IdatParser(filepath) >>> ids = idat_data.illumina_ids >>> print(idat_data) """ def __init__( self, file: str | Path | BinaryIO, *, intensity_only: bool = False, array_type_only: bool = False, ) -> None: """Reads and parses the IDAT file.""" self.intensity_only = intensity_only self.array_type_only = array_type_only self._file = file with get_file_object(file) as idat_file: self.file_size = _get_file_size(idat_file) self._parse_header(idat_file) self._parse_body(idat_file) def _parse_header(self, idat_file: BinaryIO) -> None: file_type = read_char(idat_file, len(DEFAULT_IDAT_FILE_ID)) # Assert file is indeed IDAT format if file_type != DEFAULT_IDAT_FILE_ID: msg = ( f"Parser could not open file {self._file} as its not a valid " "IDAT file." ) raise ValueError(msg) idat_version = read_long(idat_file) # Assert correct IDAT file version if idat_version != DEFAULT_IDAT_VERSION: msg = ( f"Parser could not open file {self._file} as its not a " "version 3 IDAT file." ) raise ValueError(msg) self.num_fields = read_int(idat_file) self.offsets = { read_short(idat_file): read_long(idat_file) for _ in range(self.num_fields) } def _parse_body(self, idat_file: BinaryIO) -> None: def seek_to_section(section_code: "IdatSectionCode") -> None: idat_file.seek(self.offsets[section_code.value]) seek_to_section(IdatSectionCode.NUM_SNPS_READ) self.n_snps_read = read_int(idat_file) if self.array_type_only: return seek_to_section(IdatSectionCode.ILLUMINA_ID) self.illumina_ids = read_array(idat_file, "<i4", self.n_snps_read) seek_to_section(IdatSectionCode.MEAN) self.probe_means = read_array(idat_file, "<u2", self.n_snps_read) if self.intensity_only: return seek_to_section(IdatSectionCode.STD_DEV) self.std_dev = read_array(idat_file, "<u2", self.n_snps_read) seek_to_section(IdatSectionCode.NUM_BEADS) self.n_beads = read_array(idat_file, "<u1", self.n_snps_read) seek_to_section(IdatSectionCode.MID_BLOCK) n_mid_block = read_int(idat_file) self.mid_block = read_array(idat_file, "<i4", n_mid_block) seek_to_section(IdatSectionCode.RUN_INFO) runinfo_entry_count = read_int(idat_file) self.run_info: list[Any] = [None] * runinfo_entry_count for i in range(runinfo_entry_count): self.run_info[i] = [ read_string(idat_file), # run_time read_string(idat_file), # block_type read_string(idat_file), # block_pars read_string(idat_file), # block_code read_string(idat_file), # code_version ] seek_to_section(IdatSectionCode.RED_GREEN) self.red_green = read_int(idat_file) seek_to_section(IdatSectionCode.MOSTLY_NULL) self.mostly_null = read_string(idat_file) seek_to_section(IdatSectionCode.BARCODE) self.barcode = read_string(idat_file) seek_to_section(IdatSectionCode.CHIP_TYPE) self.chip_type = read_string(idat_file) seek_to_section(IdatSectionCode.MOSTLY_A) self.mostly_a = read_string(idat_file) seek_to_section(IdatSectionCode.UNKNOWN_1) self.unknown_1 = read_string(idat_file) seek_to_section(IdatSectionCode.UNKNOWN_2) self.unknown_2 = read_string(idat_file) seek_to_section(IdatSectionCode.UNKNOWN_3) self.unknown_3 = read_string(idat_file) seek_to_section(IdatSectionCode.UNKNOWN_4) self.unknown_4 = read_string(idat_file) seek_to_section(IdatSectionCode.UNKNOWN_5) self.unknown_5 = read_string(idat_file) seek_to_section(IdatSectionCode.UNKNOWN_6) self.unknown_6 = read_string(idat_file) seek_to_section(IdatSectionCode.UNKNOWN_7) self.unknown_7 = read_string(idat_file) def __repr__(self) -> str: with np.printoptions(edgeitems=2): result = ( f"IdatParser(\n" f" file_size: {self.file_size}\n" f" num_fields: {self.num_fields}\n" f" n_snps_read: {self.n_snps_read}\n" f" illumina_ids: {self.illumina_ids!r}\n" f" probe_means: {self.probe_means!r}\n" ) if self.intensity_only: return result + ")" return result + ( f" std_dev: {self.std_dev!r}\n" f" n_beads: {self.n_beads!r}\n" f" mid_block: {self.mid_block!r}\n" f" red_green: {self.red_green}\n" f" mostly_null: {self.mostly_null}\n" f" barcode: {self.barcode}\n" f" chip_type: {self.chip_type}\n" f" mostly_a: {self.mostly_a}\n" f" unknown_1: {self.unknown_1}\n" f" unknown_2: {self.unknown_2}\n" f" unknown_3: {self.unknown_3}\n" f" unknown_4: {self.unknown_4}\n" f" unknown_5: {self.unknown_5}\n" f" unknown_6: {self.unknown_6}\n" f" unknown_7: {self.unknown_7}\n" ")" )