-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathparsnp.vcf2fasta.py
executable file
·70 lines (62 loc) · 1.92 KB
/
parsnp.vcf2fasta.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#!/usr/bin/env python
import sys
from argparse import ArgumentParser
def parseArgs():
parser = ArgumentParser(description='Parses a Parsnp VCF file and '
'prints all SNPs to standard out. This is particularly useful '
'when manipulating the file and harvesttools has an error. '
'Multiallelic SNPs must be removed and FILTER tags are ignored. '
'Sample names and their binary data must begin on the 10th '
'column. For SNP extraction from default Parsnp, '
'`harvesttools -i parsnp.ggr -S SNPs.fa` is recommended.',
add_help=False)
req = parser.add_argument_group('Required')
req.add_argument('-i', '--input', required=True, metavar='FILE',
help='input VCF file from Parsnp')
opt = parser.add_argument_group('Optional')
opt.add_argument('-h', '--help', action='help',
help='show this help message and exit')
return parser.parse_args()
# Main
opts = parseArgs()
# Get sample IDs
with open(opts.input, 'r') as ifh:
for ln in ifh:
if any(ln.startswith('#CHROM') for x in ln):
samples_d = {k:[] for k in ln.rstrip('\n').split('\t')[9:]}
samples_l = ln.rstrip('\n').split('\t')[9:]
break
# Parse the VCF file
ifh = open(opts.input, 'r')
for ln in ifh:
if not ln.startswith('#'):
data = ln.rstrip('\n').split('\t')
ref = data[3]
if ',' in data[4]:
alt = data[4].split(',')
else:
alt = [data[4]]
binary_data = data[9:]
for i, datum in enumerate(binary_data):
if datum == '0':
nuc = ref
elif datum == '1':
nuc = alt[0]
elif datum == '2':
nuc = alt[1]
elif datum == '3':
nuc = alt[2]
else:
sys.stderr.write('ERROR: ALT value >3\n')
sample_id = samples_l[i]
samples_d[sample_id].append(nuc)
ifh.close()
# Write FastA to Std Out
snp_len = len(samples_d[sample_id])
for sample in samples_l:
snps = ''.join(samples_d[sample])
if len(snps) != snp_len:
sys.stderr.write('ERROR: unequal sequence records printed\n')
else:
print('>{}'.format(sample))
print(snps)