Source code for variant_extractor.variants

# Copyright 2022 - Barcelona Supercomputing Center
# Author: Rodrigo Martin
# MIT License
from typing import NamedTuple, Optional, List, Dict, Any, Union
from enum import Enum, auto

import pysam


def _build_filter(rec: pysam.VariantRecord) -> List[Union[str, int]]:
    return [f for f in rec.filter]


def _build_info(rec: pysam.VariantRecord) -> Dict[str, Any]:
    info = dict()
    for key, value in rec.info.items():
        info[key] = value
    return info


def _build_format(rec: pysam.VariantRecord) -> List[str]:
    return [f for f in rec.format]


def _build_samples(rec: pysam.VariantRecord) -> Dict[str, Dict[str, Any]]:
    samples = dict()
    for sample_name in rec.samples:
        sample_dict = dict()
        for key, value in rec.samples[sample_name].items():
            sample_dict[key] = value
        samples[sample_name] = sample_dict
    return samples


[docs] class VariantType(Enum): """Enumeration with the different types of variations """ SNV = auto() DEL = auto() INS = auto() DUP = auto() INV = auto() CNV = auto() TRA = auto() SGL = auto()
[docs] class BreakendSVRecord(NamedTuple): """NamedTuple with the information of a breakend notated SV record """ prefix: Optional[str] """Prefix of the SV record with breakend notation. For example, for :code:`G]17:198982]` the prefix will be :code:`G`""" bracket: str """Bracket of the SV record with breakend notation. For example, for :code:`G]17:198982]` the bracket will be :code:`]`""" contig: str """Contig of the SV record with breakend notation. For example, for :code:`G]17:198982]` the contig will be :code:`17`""" pos: int """Position of the SV record with breakend notation. For example, for :code:`G]17:198982]` the position will be :code:`198982`""" suffix: Optional[str] """Suffix of the SV record with breakend notation. For example, for :code:`G]17:198982]` the suffix will be :code:`None`"""
[docs] class ShorthandSVRecord(NamedTuple): """NamedTuple with the information of a shorthand SV record """ type: str """One of the following, :code:`'DEL'`, :code:`'INS'`, :code:`'DUP'`, :code:`'INV'` or :code:`'CNV'`""" extra: Optional[List[str]] """Extra information of the SV. For example, for :code:`<DUP:TANDEM:AA>` the extra will be :code:`['TANDEM', 'AA']`"""
def _str_value(value): if isinstance(value, str): return value elif isinstance(value, float): return f'{value:.2f}' elif hasattr(value, '__iter__'): return ','.join([_str_value(v) for v in value]) elif value is None: return '.' else: return str(value) def _convert_info_key_value(key, value): if value is None: return key elif isinstance(value, bool): return key if value else None else: return key+'='+_str_value(value) def _convert_sample_value(key, value): if key == 'GT': return '/'.join([_str_value(v) for v in value]) else: return _str_value(value)
[docs] class VariantRecord(): """NamedTuple with the information of a variant record """ contig: str """Contig name""" pos: int """Position of the variant in the contig""" end: int """End position of the variant in the contig (same as `pos` for TRA and SNV)""" length: int """Length of the variant""" id: Optional[str] """Record identifier""" ref: str """Reference sequence""" alt: str """Alternative sequence""" qual: Optional[float] """Quality score for the assertion made in ALT""" filter: List[Union[str, int]] """Filter status. PASS if this position has passed all filters. Otherwise, it contains the filters that failed""" variant_type: VariantType """Variant type""" alt_sv_breakend: Optional[BreakendSVRecord] """Breakend SV info, present only for SVs with breakend notation. For example, :code:`G]17:198982]`""" alt_sv_shorthand: Optional[ShorthandSVRecord] """Shorthand SV info, present only for SVs with shorthand notation. For example, :code:`<DUP:TANDEM>`"""
[docs] def __init__(self, rec: pysam.VariantRecord, contig: str, pos: int, end: int, length: int, id: Optional[str], ref: str, alt: str, variant_type: VariantType, alt_sv_breakend: Optional[BreakendSVRecord] = None, alt_sv_shorthand: Optional[ShorthandSVRecord] = None): self._rec = rec self.contig = contig self.pos = pos self.end = end self.length = length self.id = id self.ref = ref self.alt = alt self.qual = rec.qual self.filter = _build_filter(rec) self.variant_type = variant_type self.alt_sv_breakend = alt_sv_breakend self.alt_sv_shorthand = alt_sv_shorthand self._info = None self._format = None self._samples = None
@property def info(self): """Additional information""" if self._info is None: self._info = _build_info(self._rec) return self._info @info.setter def info(self, value): self._info = value @property def format(self): """Specifies data types and order of the genotype information""" if self._format is None: self._format = _build_format(self._rec) return self._format @format.setter def format(self, value): self._format = value @property def samples(self): """Genotype information for each sample""" if self._samples is None: self._samples = _build_samples(self._rec) return self._samples @samples.setter def samples(self, value): self._samples = value def _replace(self, **kwargs): new_record = VariantRecord(self._rec, self.contig, self.pos, self.end, self.length, self.id, self.ref, self.alt, self.variant_type, self.alt_sv_breakend, self.alt_sv_shorthand) for key, value in kwargs.items(): setattr(new_record, key, value) return new_record def _info_str(self, rec_str: List[str]) -> str: # If info has not been loaded, return the original info string if self._info is None and len(rec_str) > 7: return rec_str[7] info_list = [] for key, value in self.info.items(): info_str = _convert_info_key_value(key, value) if info_str is None: continue info_list.append(info_str) if self.alt_sv_shorthand: info_list.insert(0, 'END='+str(self.end)) info = ";".join(info_list) return info def _format_str(self, rec_str: List[str]) -> str: # If format has not been loaded, return the original format string if self._format is None and len(rec_str) > 8: return rec_str[8] return ":".join(self.format) def _samples_str(self, rec_str: List[str]) -> str: # If samples and format have not been loaded, return the original samples string if self._samples is None and self._format is None and len(rec_str) > 9: return '\t'.join(rec_str[9:]) samples_list = [":".join([_convert_sample_value(k, self.samples[sample_name][k]) for k in self.format]) for sample_name in self.samples] samples = "\t".join(samples_list) return samples def __str__(self): rec_str_split = str(self._rec).split('\t') contig = self.contig pos = self.pos id_ = self.id if self.id else '.' ref = self.ref alt = self.alt qual = _str_value(self.qual) filter_ = ";".join(map(str, self.filter)) if self.filter else '.' info = self._info_str(rec_str_split) format_ = self._format_str(rec_str_split) samples = self._samples_str(rec_str_split) return f'{contig}\t{pos}\t{id_}\t{ref}\t{alt}\t{qual}\t{filter_}\t{info}\t{format_}\t{samples}'.strip()