#!/usr/bin/env python from __future__ import division __doc__ = """ SYNOPSIS pileuptools.py COMMAND [options] [-h,--help] DESCRIPTION Commands available: var_compare Summarize SNPs detected from multiple samples unique_vars Filter group of pileup files for variations that occur uniquely in only one file pileup2bed Convert samtools pileup file to bed file pileup2snpeff Parse samtools consensus pileup file and output non variant sites nonvar_sites Parse samtools consensus pileup file and output non variant sites snp_summary Parse samtools consensus pileup and output penetrance summary of snps Tab delimited file with: seq_id, position, ref_base, consensus_base, fraction A, C, G, T EXAMPLES TODO: Show some examples of how to use this script. AUTHOR Lance Parsons LICENSE Copyright (c) 2011, Lance Parsons All rights reserved. This script is licensed by the Simplified BSD License See LICENSE.TXT and """ __author__ = "Lance R. Parsons" __version__ = '0.1' from operator import attrgetter from optparse import OptionParser import os.path import sys import warnings def main(): command_list = set(['var_compare', 'unique_vars', 'pileup2bed', 'pileup2snpeff', 'nonvar_sites', 'snp_summary', 'help']) if len(sys.argv) < 2: exit("No command specified, use one of: %s" % ", ".join(command_list)) cmd = sys.argv[1] if cmd not in command_list: exit("Unrecognized command '%s', use one of %s" % (cmd, ", ".join(command_list))) if cmd == 'var_compare': var_compare(sys.argv[1:]) elif cmd == 'unique_vars': unique_vars(sys.argv[1:]) elif cmd == 'pileup2bed': pileup2bed(sys.argv[1:]) elif cmd == 'nonvar_sites': nonvar_sites(sys.argv[1:]) elif cmd == 'pileup2snpeff': pileup2snpeff(sys.argv[1:]) elif cmd == 'snp_summary': snp_summary(sys.argv[1:]) elif cmd == 'help': help() else: print("Unrecognized command specified, use one of: %s" % ', '.join(command_list)) def help(): print globals()['__doc__'] def var_compare(args): ''' Summarize SNPs detected from multiple samples ''' usage = "Usage: %prog var_compare [options] pileup_consensus_file" parser = OptionParser(usage=usage) (opts, args) = parser.parse_args(args) if len(args) < 2: print ("ERROR: Must specify at least one file") parser.print_help() sys.exit() file_list = args[1:] for file in file_list: try: f = open(file) f.close(); except Exception, e: print "ERROR: Could not read from %s: %s" % (file, e) print usage % (sys.argv[0:]) sys.exit() all_variations = {} all_variations_count = {} file_vars = {} for file in file_list: file_vars[file] = parse_variation_file(file, all_variations, all_variations_count) sorted_vars = sorted(all_variations.values(), key=attrgetter('position')) sorted_vars = sorted(sorted_vars, key=attrgetter('seq_id')) for var in sorted_vars: count = 0 line = "%s\t%s\t%s\t%s\t%i" % (var.seq_id, var.position, var.reference_base, var.consensus_base, all_variations_count[var.key()]) for file in file_list: if var.key() in file_vars[file]: count += 1 val = os.path.basename(file) else: val = "" line = line + "\t%s" % val print line return def unique_vars(args): ''' Filter group of pileup files for variations that occur uniquely in only one file ''' usage = "Usage: %prog unique_vars [options] pileup_variations_file1 pileup_variations_file2 ..." parser = OptionParser(usage=usage) parser.add_option("-o", "--output", dest="output_dir", type="string", default=".", help="Output Directory",) (opts, args) = parser.parse_args(args) if len(args) < 2: exit("Must specify at least one file") file_list = args[1:] for file in file_list: try: f = open(file) f.close(); except Exception, e: print "ERROR: Could not read from %s: %s" % (file, e) print usage % (sys.argv[0:]) sys.exit() # Get all variations and counts all_variations = {} all_variations_count = {} file_vars = {} for file in file_list: file_vars[file] = parse_variation_file(file, all_variations, all_variations_count) # Sort variations by seq_id then position sorted_vars = sorted(all_variations.values(), key=attrgetter('position')) sorted_vars = sorted(sorted_vars, key=attrgetter('seq_id')) # Output list of unique vars for each file for file in file_list: outfile = open(os.path.join(opts.output_dir, os.path.basename(file) + '.unique'), 'w') for var in sorted_vars: line = "%s" % (var.pileup_line()) if var.key() in file_vars[file] and all_variations_count[var.key()] == 1: outfile.write(line + "\n") outfile.close() def pileup2bed(args): ''' Convert samtools pileup file to bed file ''' usage = "Usage: %prog pileup2bed input.pileup [options]" parser = OptionParser(usage=usage) parser.add_option("-n", "--name", dest="track_name", type="string", help="Track Name",) parser.add_option("-d", "--desc", dest="track_description", type="string", help="Track Description",) (opts, args) = parser.parse_args(args) input = sys.stdin if len(args) >= 2: input = args[1] name = os.path.basename(input) else: name = "Pileup" if opts.track_name: name = opts.track_name if opts.track_description: desc = opts.track_description else: desc = name print """track name=%s description="%s" """ % (name, desc) f = open(input) for line in f: var = Variation(line.strip().split("\t")) name = "%s->%s" % (var.reference_base, var.consensus_base) score = 1000 # Hard to interpret so leave it out strand = "." item_rgb = "0,0,0" line = "%s\t%i\t%i\t%s\t%i\t%s\t%i\t%i\t%s" % (var.seq_id, var.position - 1, var.position, name, score, strand, var.position - 1, var.position, item_rgb) print line return def nonvar_sites(args): ''' Parse samtools consensus pileup file and output non variant sites ''' usage = "Usage: %prog nonvar_sites pileup_consensus_file" parser = OptionParser(usage=usage) (opts, args) = parser.parse_args(args) if len(args) < 2: print ("ERROR: Must specify at least one file") parser.print_help() sys.exit() f = open(args[1]) for line in f: var = Variation(line.strip().split("\t")) if (var.reference_base == var.consensus_base): print line.strip() def parse_variation_file(file, all_variations, all_variations_count): f = open(file) vars = {} for line in f: var = Variation(line.strip().split("\t")) vars[var.key()] = 1 if (var.key() in all_variations): all_variations_count[var.key()] += 1 pass else: all_variations_count[var.key()] = 1 all_variations[var.key()] = var return vars def pileup2snpeff(args): ''' Parse samtools consensus pileup file and output snpEff compatible file ''' usage = "Usage: %prog pileup2snpeff pileup_consensus_file" parser = OptionParser(usage=usage) (opts, args) = parser.parse_args(args) if len(args) < 2: print ("ERROR: Must specify pileup file") parser.print_help() sys.exit() f = open(args[1]) for line in f: var = Variation(line.strip().split("\t")) if (var.consensus_base in ['A', 'C', 'G', 'T']): print "%s\t%i\t%i\t%s/%s\t+" % (var.seq_id, var.position, var.position, var.reference_base, var.consensus_base) def snp_summary(args): ''' Parse samtools consensus pileup and output penetrance summary of snps Tab delimited file with: seq_id, position, ref_base, consensus_base, fraction A, C, G, T ''' usage = "Usage: %prog snp_summary pileup_variations_file" parser = OptionParser(usage=usage) (opts, args) = parser.parse_args(args) if len(args) < 2: print ("ERROR: Must specify a pileup varations file") parser.print_help() sys.exit() # hist_data = list() f = open(args[1]) print 'seq_id\tpostion\tref base\tconsensus base\tcoverage\tcount A\tcount C\tcount G\tcount T\tPrimary Allele\tPrimary Allele Frequency\tSecondary Allele\tSecondary Allele Frequency' for line in f: var = Variation(line.strip().split("\t")) #bases = ('A', 'C', 'G', 'T') count = {} count['A'] = var.base_count('A') count['C'] = var.base_count('C') count['G'] = var.base_count('G') count['T'] = var.base_count('T') #print var.coverage-1 #print sum(count.values()) if (var.coverage != sum(count.values())): warnings.warn("Counts do not add up to coverage. Sum of counts: %i Coverage: %i\n" % (sum(count.values()), var.coverage)) #print "Here!" count_sorted = count.keys() count_sorted.sort(key=count.__getitem__, reverse=True) primary_allele = count_sorted[0] primary_allele_freq = count[count_sorted[0]] / var.coverage secondary_allele = count_sorted[1] secondary_allele_freq = count[count_sorted[1]] / var.coverage print "%s\t%i\t%s\t%s\t%i\t%i\t%i\t%i\t%i\t%s\t%f\t%s\t%f" % (var.seq_id, var.position, var.reference_base, var.consensus_base, var.coverage, count['A'], count['C'], count['G'], count['T'], primary_allele, primary_allele_freq, secondary_allele, secondary_allele_freq,) class Variation: ''' store data on genomic variations (snps and small indels) ''' ''' chromosome, 1-based coordinate, reference base, consensus base, consensus quality, SNP quality, maximum mapping quality, the number of reads covering the site, read bases and base qualities. chromosome, 1-based coordinate, a star, the genotype, consensus quality, SNP quality, RMS mapping quality, # covering reads, the first alllele, the second allele, # reads supporting the first allele, # reads supporting the second allele and # reads containing indels different from the top two alleles ''' def __init__(self, pileup_data): ''' Constructor ''' self.seq_id = pileup_data[0] self.position = int(pileup_data[1]) self.reference_base = pileup_data[2] self.consensus_base = pileup_data[3] self.consensus_quality = int(pileup_data[4]) self.snp_quality = int(pileup_data[5]) self.mapping_quality = int(pileup_data[6]) self.coverage = int(pileup_data[7]) if self.reference_base == '*': self.type = 'indel' self.read_bases = None self.read_qualities = None self.first_allele = pileup_data[8] self.second_allele = pileup_data[9] self.first_allele_reads = int(pileup_data[10]) self.second_allele_reads = int(pileup_data[11]) self.other_allele_reads = int(pileup_data[12]) self.unknown1 = int(pileup_data[13]) self.unknown2 = int(pileup_data[14]) else: self.type = 'snv' self.read_bases = pileup_data[8] self.read_qualities = pileup_data[9] self.first_allele = None self.second_allele = None self.first_allele_reads = None self.second_allele_reads = None self.other_allele_reads = None def base_count(self, base): count = self.read_bases.count(base.lower()) + self.read_bases.count(base.upper()) if (base.upper() == self.reference_base) or (base.lower() == self.reference_base): count += self.read_bases.count('.') count += self.read_bases.count(',') return count def __repr__(self): return repr((self.seq_id, self.position, self.reference_base, self.consensus_base, self.consensus_quality, self.snp_quality, self.max_mapping_quality, self.coverage, self.read_bases, self.read_qualities)) def key(self): return "%s%s%s%s" % (self.seq_id, str(self.position), self.reference_base, self.consensus_base) def pileup_line(self): if self.type == 'snv': return "%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s" % (self.seq_id, str(self.position), self.reference_base, self.consensus_base, str(self.consensus_quality), str(self.snp_quality), str(self.mapping_quality), str(self.coverage), self.read_bases, self.read_qualities) elif self.type == 'indel': return "%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s" % (self.seq_id, str(self.position), self.reference_base, self.consensus_base, str(self.consensus_quality), str(self.snp_quality), str(self.mapping_quality), str(self.coverage), self.first_allele, self.second_allele, self.first_allele_reads, self.second_allele_reads, self.other_allele_reads, self.unknown1, self.unknown2) else: return "" #return '\t'.join([self.seq_id, str(self.position), self.reference_base, self.consensus_base, str(self.consensus_quality), # str(self.snp_quality), str(self.max_mapping_quality), str(self.coverage), self.read_bases, self.read_qualities]) if __name__ == '__main__': main() sys.exit()