import collections
import re
import csv
import gzip
import sys
import itertools
import codecs
try:
from collections import OrderedDict
except ImportError:
from ordereddict import OrderedDict
try:
import pysam
except ImportError:
pysam = None
# Metadata parsers/constants
RESERVED_INFO = {
'AA': 'String', 'AC': 'Integer', 'AF': 'Float', 'AN': 'Integer',
'BQ': 'Float', 'CIGAR': 'String', 'DB': 'Flag', 'DP': 'Integer',
'END': 'Integer', 'H2': 'Flag', 'MQ': 'Float', 'MQ0': 'Integer',
'NS': 'Integer', 'SB': 'String', 'SOMATIC': 'Flag', 'VALIDATED': 'Flag'
}
RESERVED_FORMAT = {
'GT': 'String', 'DP': 'Integer', 'FT': 'String', 'GL': 'Float',
'GQ': 'Float', 'HQ': 'Float'
}
_Info = collections.namedtuple('Info', ['id', 'num', 'type', 'desc'])
_Filter = collections.namedtuple('Filter', ['id', 'desc'])
_Format = collections.namedtuple('Format', ['id', 'num', 'type', 'desc'])
_SampleInfo = collections.namedtuple('SampleInfo', ['samples', 'gt_bases', 'gt_types', 'gt_phases'])
class _vcf_metadata_parser(object):
'''Parse the metadat in the header of a VCF file.'''
def __init__(self):
super(_vcf_metadata_parser, self).__init__()
self.info_pattern = re.compile(r'''\#\#INFO=<
ID=(?P<id>[^,]+),
Number=(?P<number>-?\d+|\.|[AG]),
Type=(?P<type>Integer|Float|Flag|Character|String),
Description="(?P<desc>[^"]*)"
>''', re.VERBOSE)
self.filter_pattern = re.compile(r'''\#\#FILTER=<
ID=(?P<id>[^,]+),
Description="(?P<desc>[^"]*)"
>''', re.VERBOSE)
self.format_pattern = re.compile(r'''\#\#FORMAT=<
ID=(?P<id>.+),
Number=(?P<number>-?\d+|\.|[AG]),
Type=(?P<type>.+),
Description="(?P<desc>.*)"
>''', re.VERBOSE)
self.meta_pattern = re.compile(r'''##(?P<key>.+)=(?P<val>.+)''')
def read_info(self, info_string):
'''Read a meta-information INFO line.'''
match = self.info_pattern.match(info_string)
if not match:
raise SyntaxError(
"One of the INFO lines is malformed: %s" % info_string)
try:
num = int(match.group('number'))
if num < 0:
num = None
except ValueError:
num = None
info = _Info(match.group('id'), num,
match.group('type'), match.group('desc'))
return (match.group('id'), info)
def read_filter(self, filter_string):
'''Read a meta-information FILTER line.'''
match = self.filter_pattern.match(filter_string)
if not match:
raise SyntaxError(
"One of the FILTER lines is malformed: %s" % filter_string)
filt = _Filter(match.group('id'), match.group('desc'))
return (match.group('id'), filt)
def read_format(self, format_string):
'''Read a meta-information FORMAT line.'''
match = self.format_pattern.match(format_string)
if not match:
raise SyntaxError(
"One of the FORMAT lines is malformed: %s" % format_string)
try:
num = int(match.group('number'))
if num < 0:
num = None
except ValueError:
num = None
form = _Format(match.group('id'), num,
match.group('type'), match.group('desc'))
return (match.group('id'), form)
def read_meta(self, meta_string):
match = self.meta_pattern.match(meta_string)
return match.group('key'), match.group('val')
class _Call(object):
[docs]
__slots__ = ['site', 'sample', 'data', 'gt_nums', 'called']
""" A genotype call, a cell entry in a VCF file"""
def __init__(self, site, sample, data):
#: The ``_Record`` for this ``_Call``
self.site = site
#: The sample name
self.sample = sample
#: Dictionary of data from the VCF file
self.data = data
self.gt_nums = self.data['GT']
#: True if the GT is not ./.
self.called = self.gt_nums is not None
def __repr__(self):
return "Call(sample=%s, GT=%s, GQ=%s)" % (self.sample, self.gt_nums, self.data.get('GQ', ''))
def __eq__(self, other):
""" Two _Calls are equal if their _Records are equal
and the samples and ``gt_type``s are the same
"""
return (self.site == other.site
and self.sample == other.sample
and self.gt_type == other.gt_type)
@property
def gt_bases(self):
[docs] '''The actual genotype alleles.
E.g. if VCF genotype is 0/1, return A/G
'''
# nothing to do if no genotype call
if self.called:
# grab the numeric alleles of the gt string; tokenize by phasing
phase_char = "/" if not self.phased else "|"
(a1, a2) = self.gt_nums.split(phase_char)
# lookup and return the actual DNA alleles
try:
return self.site.alleles[int(a1)] + \
phase_char + \
self.site.alleles[int(a2)]
except:
sys.stderr.write("Allele number not found in list of alleles\n")
else:
return None
@property
def gt_type(self):
[docs] '''The type of genotype.
hom_ref = 0
het = 1
hom_alt = 2 (we don;t track _which+ ALT)
uncalled = None
'''
# extract the numeric alleles of the gt string
if self.called:
# grab the numeric alleles of the gt string; tokenize by phasing
(a1, a2) = self.gt_nums.split("/") \
if not self.phased else self.gt_nums.split("|")
if a1 == a2:
if a1 == "0": return 0
else: return 2
else: return 1
else: return None
@property
def phased(self):
[docs] '''A boolean indicating whether or not
the genotype is phased for this sample
'''
return self.data['GT'] is not None and self.data['GT'].find("|") >= 0
def __getitem__(self, key):
""" Lookup value, backwards compatibility """
return self.data[key]
@property
def is_variant(self):
[docs] """ Return True if not a reference call """
if not self.called:
return None
return self.gt_type != 0
@property
def is_het(self):
[docs] """ Return True for heterozygous calls """
if not self.called:
return None
return self.gt_type == 1
class _Record(object):
[docs] """ A set of calls at a site. Equivalent to a row in a VCF file.
The standard VCF fields CHROM, POS, ID, REF, ALT, QUAL, FILTER,
INFO and FORMAT are available as properties.
The list of genotype calls is in the ``samples`` property.
"""
def __init__(self, CHROM, POS, ID, REF, ALT, QUAL, FILTER, INFO, FORMAT,
sample_indexes, samples=None):
self.CHROM = CHROM
self.POS = POS
self.ID = ID
self.REF = REF
self.ALT = ALT
self.QUAL = QUAL
self.FILTER = FILTER
self.INFO = INFO
self.FORMAT = FORMAT
#: 0-based start coordinate
self.start = self.POS - 1
#: 1-based end coordinate
self.end = self.start + len(self.REF)
#: list of alleles. [0] = REF, [1:] = ALTS
self.alleles = [self.REF]
self.alleles.extend(self.ALT)
#: list of ``_Calls`` for each sample ordered as in source VCF
self.samples = samples
self._sample_indexes = sample_indexes
def __eq__(self, other):
""" _Records are equal if they describe the same variant (same position, alleles) """
return (self.CHROM == other.CHROM and
self.POS == other.POS and
self.REF == other.REF and
self.ALT == other.ALT)
def __iter__(self):
return iter(self.samples)
def __str__(self):
return "Record(CHROM=%(CHROM)s, POS=%(POS)s, REF=%(REF)s, ALT=%(ALT)s)" % self.__dict__
def __cmp__(self, other):
return cmp( (self.CHROM, self.POS), (other.CHROM, other.POS))
def add_format(self, fmt):
self.FORMAT = self.FORMAT + ':' + fmt
def add_filter(self, flt):
if self.FILTER is None \
or self.FILTER == 'PASS'\
or self.FILTER == '.':
self.FILTER = ''
else:
self.FILTER = self.FILTER + ';'
self.FILTER = self.FILTER + flt
def add_info(self, info, value=True):
self.INFO[info] = value
def genotype(self, name):
[docs] """ Lookup a ``_Call`` for the sample given in ``name`` """
return self.samples[self._sample_indexes[name]]
@property
def num_called(self):
[docs] """ The number of called samples"""
return sum(s.called for s in self.samples)
@property
def call_rate(self):
[docs] """ The fraction of genotypes that were actually called. """
return float(self.num_called) / float(len(self.samples))
@property
def num_hom_ref(self):
[docs] """ The number of homozygous for ref allele genotypes"""
return len([s for s in self.samples if s.gt_type == 0])
@property
def num_hom_alt(self):
[docs] """ The number of homozygous for alt allele genotypes"""
return len([s for s in self.samples if s.gt_type == 2])
@property
def num_het(self):
[docs] """ The number of heterozygous genotypes"""
return len([s for s in self.samples if s.gt_type == 1])
@property
def num_unknown(self):
[docs] """ The number of unknown genotypes"""
return len([s for s in self.samples if s.gt_type is None])
@property
def aaf(self):
[docs] """ The allele frequency of the alternate allele.
NOTE 1: Punt if more than one alternate allele.
NOTE 2: Denominator calc'ed from _called_ genotypes.
"""
# skip if more than one alternate allele. assumes bi-allelic
if len(self.ALT) > 1:
return None
hom_ref = self.num_hom_ref
het = self.num_het
hom_alt = self.num_hom_alt
num_chroms = float(2.0*self.num_called)
return float(het + 2*hom_alt)/float(num_chroms)
@property
def nucl_diversity(self):
[docs] """
pi_hat (estimation of nucleotide diversity) for the site.
This metric can be summed across multiple sites to compute regional
nucleotide diversity estimates. For example, pi_hat for all variants
in a given gene.
Derived from:
\"Population Genetics: A Concise Guide, 2nd ed., p.45\"
John Gillespie.
"""
# skip if more than one alternate allele. assumes bi-allelic
if len(self.ALT) > 1:
return None
p = self.aaf
q = 1.0-p
num_chroms = float(2.0*self.num_called)
return float(num_chroms/(num_chroms-1.0)) * (2.0 * p * q)
def get_hom_refs(self):
[docs] """ The list of hom ref genotypes"""
return [s for s in self.samples if s.gt_type == 0]
def get_hom_alts(self):
[docs] """ The list of hom alt genotypes"""
return [s for s in self.samples if s.gt_type == 2]
def get_hets(self):
[docs] """ The list of het genotypes"""
return [s for s in self.samples if s.gt_type == 1]
def get_unknowns(self):
[docs] """ The list of unknown genotypes"""
return [s for s in self.samples if s.gt_type is None]
@property
def is_snp(self):
[docs] """ Return whether or not the variant is a SNP """
if len(self.REF) > 1: return False
for alt in self.ALT:
if alt not in ['A', 'C', 'G', 'T']:
return False
return True
@property
def is_indel(self):
[docs] """ Return whether or not the variant is an INDEL """
is_sv = self.is_sv
if len(self.REF) > 1 and not is_sv: return True
for alt in self.ALT:
if alt is None:
return True
elif len(alt) != len(self.REF):
# the diff. b/w INDELs and SVs can be murky.
if not is_sv:
# 1 2827693 . CCCCTCGCA C . PASS AC=10;
return True
else:
# 1 2827693 . CCCCTCGCA C . PASS SVTYPE=DEL;
return False
return False
@property
def is_sv(self):
[docs] """ Return whether or not the variant is a structural variant """
if self.INFO.get('SVTYPE') is None:
return False
return True
@property
def is_transition(self):
[docs] """ Return whether or not the SNP is a transition """
# if multiple alts, it is unclear if we have a transition
if len(self.ALT) > 1: return False
if self.is_snp:
# just one alt allele
alt_allele = self.ALT[0]
if ((self.REF == "A" and alt_allele == "G") or
(self.REF == "G" and alt_allele == "A") or
(self.REF == "C" and alt_allele == "T") or
(self.REF == "T" and alt_allele == "C")):
return True
else: return False
else: return False
@property
def is_deletion(self):
[docs] """ Return whether or not the INDEL is a deletion """
# if multiple alts, it is unclear if we have a transition
if len(self.ALT) > 1: return False
if self.is_indel:
# just one alt allele
alt_allele = self.ALT[0]
if alt_allele is None:
return True
if len(self.REF) > len(alt_allele):
return True
else: return False
else: return False
@property
def var_type(self):
[docs] """
Return the type of variant [snp, indel, unknown]
TO DO: support SVs
"""
if self.is_snp:
return "snp"
elif self.is_indel:
return "indel"
elif self.is_sv:
return "sv"
else:
return "unknown"
@property
def var_subtype(self):
[docs] """
Return the subtype of variant.
- For SNPs and INDELs, yeild one of: [ts, tv, ins, del]
- For SVs yield either "complex" or the SV type defined
in the ALT fields (removing the brackets).
E.g.:
<DEL> -> DEL
<INS:ME:L1> -> INS:ME:L1
<DUP> -> DUP
The logic is meant to follow the rules outlined in the following
paragraph at:
http://www.1000genomes.org/wiki/Analysis/Variant%20Call%20Format/vcf-variant-call-format-version-41
"For precisely known variants, the REF and ALT fields should contain
the full sequences for the alleles, following the usual VCF conventions.
For imprecise variants, the REF field may contain a single base and the
ALT fields should contain symbolic alleles (e.g. <ID>), described in more
detail below. Imprecise variants should also be marked by the presence
of an IMPRECISE flag in the INFO field."
"""
if self.is_snp:
if self.is_transition:
return "ts"
elif len(self.ALT) == 1:
return "tv"
else: # multiple ALT alleles. unclear
return "unknown"
elif self.is_indel:
if self.is_deletion:
return "del"
elif len(self.ALT) == 1:
return "ins"
else: # multiple ALT alleles. unclear
return "unknown"
elif self.is_sv:
if self.INFO['SVTYPE'] == "BND":
return "complex"
elif self.is_sv_precise:
return self.INFO['SVTYPE']
else:
# first remove both "<" and ">" from ALT
return self.ALT[0].strip('<>')
else:
return "unknown"
@property
def sv_end(self):
[docs] """ Return the end position for the SV """
if self.is_sv:
return self.INFO['END']
return None
@property
def is_sv_precise(self):
[docs] """ Return whether the SV cordinates are mapped
to 1 b.p. resolution.
"""
if self.INFO.get('IMPRECISE') is None and not self.is_sv:
return False
elif self.INFO.get('IMPRECISE') is not None and self.is_sv:
return False
elif self.INFO.get('IMPRECISE') is None and self.is_sv:
return True
@property
def is_monomorphic(self):
[docs] """ Return True for reference calls """
return len(self.ALT) == 1 and self.ALT[0] is None
class Reader(object):
""" Reader for a VCF v 4.0 file, an iterator returning ``_Record objects`` """
def __init__(self, fsock=None, filename=None, compressed=False, prepend_chr=False):
""" Create a new Reader for a VCF file.
You must specify either fsock (stream) or filename. Gzipped streams
or files are attempted to be recogized by the file extension, or gzipped
can be forced with ``compressed=True``
"""
super(VCFReader, self).__init__()
if not (fsock or filename):
raise Exception('You must provide at least fsock or filename')
if fsock:
self.reader = fsock
if filename is None and hasattr(fsock, 'name'):
filename = fsock.name
compressed = compressed or filename.endswith('.gz')
elif filename:
compressed = compressed or filename.endswith('.gz')
self.reader = open(filename, 'rb' if compressed else 'rt')
self.filename = filename
if compressed:
self.reader = gzip.GzipFile(fileobj=self.reader)
if sys.version > '3':
self.reader = codecs.getreader('ascii')(self.reader)
#: metadata fields from header
self.metadata = None
#: INFO fields from header
self.infos = None
#: FILTER fields from header
self.filters = None
#: FORMAT fields from header
self.formats = None
self.samples = None
self._sample_indexes = None
self._header_lines = []
self._tabix = None
self._prepend_chr = prepend_chr
self._parse_metainfo()
self._format_cache = {}
def __iter__(self):
return self
def _parse_metainfo(self):
'''Parse the information stored in the metainfo of the VCF.
The end user shouldn't have to use this. She can access the metainfo
directly with ``self.metadata``.'''
for attr in ('metadata', 'infos', 'filters', 'formats'):
setattr(self, attr, OrderedDict())
parser = _vcf_metadata_parser()
line = self.reader.next()
while line.startswith('##'):
self._header_lines.append(line)
line = line.strip()
if line.startswith('##INFO'):
key, val = parser.read_info(line)
self.infos[key] = val
elif line.startswith('##FILTER'):
key, val = parser.read_filter(line)
self.filters[key] = val
elif line.startswith('##FORMAT'):
key, val = parser.read_format(line)
self.formats[key] = val
else:
key, val = parser.read_meta(line.strip())
self.metadata[key] = val
line = self.reader.next()
fields = line.rstrip().split('\t')
self.samples = fields[9:]
self._sample_indexes = dict([(x,i) for (i,x) in enumerate(self.samples)])
def _map(self, func, iterable, bad='.'):
'''``map``, but make bad values None.'''
return [func(x) if x != bad else None
for x in iterable]
def _parse_info(self, info_str):
'''Parse the INFO field of a VCF entry into a dictionary of Python
types.
'''
if info_str == '.':
return {}
entries = info_str.split(';')
retdict = OrderedDict()
for entry in entries:
entry = entry.split('=')
ID = entry[0]
try:
entry_type = self.infos[ID].type
except KeyError:
try:
entry_type = RESERVED_INFO[ID]
except KeyError:
if entry[1:]:
entry_type = 'String'
else:
entry_type = 'Flag'
if entry_type == 'Integer':
vals = entry[1].split(',')
val = self._map(int, vals)
elif entry_type == 'Float':
vals = entry[1].split(',')
val = self._map(float, vals)
elif entry_type == 'Flag':
val = True
elif entry_type == 'String':
val = entry[1]
try:
if self.infos[ID].num == 1 and entry_type != 'String':
val = val[0]
except KeyError:
pass
retdict[ID] = val
return retdict
def _parse_sample_format(self, samp_fmt):
""" Parse the format of the calls in this _Record """
samp_fmt = samp_fmt.split(':')
samp_fmt_types = []
samp_fmt_nums = []
for fmt in samp_fmt:
try:
entry_type = self.formats[fmt].type
entry_num = self.formats[fmt].num
except KeyError:
entry_num = None
try:
entry_type = RESERVED_FORMAT[fmt]
except KeyError:
entry_type = 'String'
samp_fmt_types.append(entry_type)
samp_fmt_nums.append(entry_num)
return samp_fmt, samp_fmt_types, samp_fmt_nums
def _parse_samples(self, samples, samp_fmt, site):
'''Parse a sample entry according to the format specified in the FORMAT
column.'''
# check whether we already know how to parse this format
if samp_fmt in self._format_cache:
samp_fmt, samp_fmt_types, samp_fmt_nums = \
self._format_cache[samp_fmt]
else:
sf, samp_fmt_types, samp_fmt_nums = self._parse_sample_format(samp_fmt)
self._format_cache[samp_fmt] = (sf, samp_fmt_types, samp_fmt_nums)
samp_fmt = sf
samp_data = []
for name, sample in itertools.izip(self.samples, samples):
# parse the data for this sample
sampdict = dict([(x, None) for x in samp_fmt])
for fmt, entry_type, entry_num, vals in itertools.izip(
samp_fmt, samp_fmt_types, samp_fmt_nums, sample.split(':')):
# short circuit the most common
if vals == '.' or vals == './.':
sampdict[fmt] = None
continue
# we don't need to split single entries
if entry_num == 1 or ',' not in vals:
if entry_type == 'Integer':
sampdict[fmt] = int(vals)
elif entry_type == 'Float':
sampdict[fmt] = float(vals)
else:
sampdict[fmt] = vals
if entry_num != 1:
sampdict[fmt] = (sampdict[fmt])
continue
vals = vals.split(',')
if entry_type == 'Integer':
sampdict[fmt] = self._map(int, vals)
elif entry_type == 'Float' or entry_type == 'Numeric':
sampdict[fmt] = self._map(float, vals)
else:
sampdict[fmt] = vals
# create a call object
call = _Call(site, name, sampdict)
samp_data.append(call)
return samp_data
def next(self):
'''Return the next record in the file.'''
line = self.reader.next()
row = line.split()
chrom = row[0]
if self._prepend_chr:
chrom = 'chr' + chrom
pos = int(row[1])
if row[2] != '.':
ID = row[2]
else:
ID = None
ref = row[3]
alt = self._map(str, row[4].split(','))
try:
qual = int(row[5])
except ValueError:
try:
qual = float(row[5])
except ValueError:
qual = None
filt = row[6].split(';') if ';' in row[6] else row[6]
if filt == 'PASS':
filt = None
info = self._parse_info(row[7])
try:
fmt = row[8]
except IndexError:
fmt = None
record = _Record(chrom, pos, ID, ref, alt, qual, filt,
info, fmt, self._sample_indexes)
if fmt is not None:
samples = self._parse_samples(row[9:], fmt, record)
record.samples = samples
return record
def fetch(self, chrom, start, end=None):
""" fetch records from a Tabix indexed VCF, requires pysam
if start and end are specified, return iterator over positions
if end not specified, return individual ``_Call`` at start or None
"""
if not pysam:
raise Exception('pysam not available, try "pip install pysam"?')
if not self.filename:
raise Exception('Please provide a filename (or a "normal" fsock)')
if not self._tabix:
self._tabix = pysam.Tabixfile(self.filename)
if self._prepend_chr and chrom[:3] == 'chr':
chrom = chrom[3:]
# not sure why tabix needs position -1
start = start - 1
if end is None:
self.reader = self._tabix.fetch(chrom, start, start+1)
try:
return self.next()
except StopIteration:
return None
self.reader = self._tabix.fetch(chrom, start, end)
return self
class Writer(object):
""" VCF Writer """
fixed_fields = "#CHROM POS ID REF ALT QUAL FILTER INFO FORMAT".split()
def __init__(self, stream, template):
self.writer = csv.writer(stream, delimiter="\t")
self.template = template
for line in template.metadata.iteritems():
stream.write('##%s=%s\n' % line)
for line in template.infos.itervalues():
stream.write('##INFO=<ID=%s,Number=%s,Type=%s,Description="%s">\n' % tuple(self._map(str, line)))
for line in template.formats.itervalues():
stream.write('##FORMAT=<ID=%s,Number=%s,Type=%s,Description="%s">\n' % tuple(self._map(str, line)))
for line in template.filters.itervalues():
stream.write('##FILTER=<ID=%s,Description="%s">\n' % tuple(self._map(str, line)))
self._write_header()
def _write_header(self):
# TODO: write INFO, etc
self.writer.writerow(self.fixed_fields + self.template.samples)
def write_record(self, record):
""" write a record to the file """
ffs = self._map(str, [record.CHROM, record.POS, record.ID, record.REF]) \
+ [self._format_alt(record.ALT), record.QUAL or '.', record.FILTER or '.',
self._format_info(record.INFO), record.FORMAT]
samples = [self._format_sample(record.FORMAT, sample)
for sample in record.samples]
self.writer.writerow(ffs + samples)
def _format_alt(self, alt):
return ','.join([x or '.' for x in alt])
def _format_info(self, info):
if not info:
return '.'
return ';'.join(["%s=%s" % (x, self._stringify(y)) for x, y in info.iteritems()])
def _format_sample(self, fmt, sample):
if sample.data["GT"] is None:
return "./."
return ':'.join(self._stringify(sample.data[f]) for f in fmt.split(':'))
def _stringify(self, x, none='.'):
if type(x) == type([]):
return ','.join(self._map(str, x, none))
return str(x) if x is not None else none
def _map(self, func, iterable, none='.'):
'''``map``, but make None values none.'''
return [func(x) if x is not None else none
for x in iterable]
def __update_readme():
import sys, vcf
file('README.rst', 'w').write(vcf.__doc__)
# backwards compatibility
VCFReader = Reader
VCFWriter = Writer