Source code for vcf.filters

try:
    from rpy2 import robjects
except:
    robjects = None


[docs]class Base(object): """ Base class for vcf_filter.py filters. Use the class docstring to provide the filter description as it appears in vcf_filter.py """ name = 'f' """ name used to activate filter and in VCF headers """ @classmethod
[docs] def customize_parser(self, parser): """ hook to extend argparse parser with custom arguments """ pass
def __init__(self, args): """ create the filter using argparse ``args`` """ self.threshold = 0 def __call__(self): """ filter a site, return not None if the site should be filtered """ raise NotImplementedError('Filters must implement this method')
[docs] def filter_name(self): """ return the name to put in the VCF header, default is ``name`` + ``threshold`` """ return '%s%s' % (self.name, self.threshold)
[docs]class SiteQuality(Base): """ Filter low quailty sites """ name = 'sq' @classmethod def customize_parser(self, parser): parser.add_argument('--site-quality', type=int, default=30, help='Filter sites below this quality') def __init__(self, args): self.threshold = args.site_quality def __call__(self, record): if record.QUAL < self.threshold: return record.QUAL
[docs]class VariantGenotypeQuality(Base): """ Filters sites with only low quality variants. It is possible to have a high site quality with many low quality calls. This filter demands at least one call be above a threshold quality. """ name = 'mgq' @classmethod def customize_parser(self, parser): parser.add_argument('--genotype-quality', type=int, default=50, help='Filter sites with no genotypes above this quality') def __init__(self, args): self.threshold = args.genotype_quality def __call__(self, record): if not record.is_monomorphic: vgq = max([x['GQ'] for x in record if x.is_variant]) if vgq < self.threshold: return vgq
[docs]class ErrorBiasFilter(Base): """ Filter sites that look like correlated sequencing errors. Some sequencing technologies, notably pyrosequencing, produce mutation hotspots where there is a constant level of noise, producing some reference and some heterozygote calls. This filter computes a Bayes Factor for each site by comparing the binomial likelihood of the observed allelic depths under: * A model with constant error equal to the MAF. * A model where each sample is the ploidy reported by the caller. The test value is the log of the bayes factor. Higher values are more likely to be errors. Note: this filter requires rpy2 """ name = 'eb' @classmethod def customize_parser(self, parser): parser.add_argument('--eblr', type=int, default=-10, help='Filter sites above this error log odds ratio') def __init__(self, args): self.threshold = args.eblr if robjects is None: raise Exception('Please install rpy2') self.ll_test = robjects.r(''' function(ra, aa, gt, diag=F) { ra_sum = sum(ra) aa_sum = sum(aa) ab = aa_sum / (ra_sum + aa_sum) gtp = 0.5 + (0.48*(gt-1)) error_likelihood = log(dbinom(aa, ra+aa, ab)) gt_likelihood = log(dbinom(aa, ra+aa, gtp)) if (diag) { print(ra) print(aa) print(gtp) print(ab) print(error_likelihood) print(gt_likelihood) } error_likelihood = sum(error_likelihood) gt_likelihood = sum(gt_likelihood) c(error_likelihood - gt_likelihood, ab) } ''') def __call__(self, record): if record.is_monomorphic: return None passed, tv, ab = self.bias_test(record.samples) if tv > self.threshold: return tv def bias_test(self, calls): calls = [x for x in calls if x.called] #TODO: single genotype assumption try: # freebayes ra = robjects.IntVector([x['RO'][0] for x in calls]) aa = robjects.IntVector([x['AO'][0] for x in calls]) except KeyError: # GATK ra = robjects.IntVector([x['AD'][0] for x in calls]) aa = robjects.IntVector([x['AD'][1] for x in calls]) gt = robjects.IntVector([x.gt_type for x in calls]) test_val, ab = self.ll_test(ra, aa, gt) return test_val < 0, test_val, ab
[docs]class DepthPerSample(Base): 'Threshold read depth per sample' name = 'dps' @classmethod def customize_parser(self, parser): parser.add_argument('--depth-per-sample', type=int, default=5, help='Minimum required coverage in each sample') def __init__(self, args): self.threshold = args.depth_per_sample def __call__(self, record): # do not test depth for indels if record.is_indel: return mindepth = min([sam['DP'] for sam in record.samples]) if mindepth < self.threshold: return mindepth
[docs]class AvgDepthPerSample(Base): 'Threshold average read depth per sample (read_depth / sample_count)' name = 'avg-dps' @classmethod def customize_parser(self, parser): parser.add_argument('--avg-depth-per-sample', type=int, default=3, help='Minimum required average coverage per sample') def __init__(self, args): self.threshold = args.avg_depth_per_sample def __call__(self, record): avgcov = float(record.INFO['DP']) / len(record.samples) if avgcov < self.threshold: return avgcov
[docs]class SnpOnly(Base): 'Choose only SNP variants' name = 'snp-only' def __call__(self, record): if not record.is_snp: return True def filter_name(self): return self.name

Project Versions