-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathparse_chromap.py
111 lines (85 loc) · 3.64 KB
/
parse_chromap.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import pandas as pd
import argparse
import numpy as np
import scipy.sparse as sp
import anndata as ad
from tqdm import tqdm
import gzip
import os
import sys
def get_options():
parser = argparse.ArgumentParser(prog='parse_chromap.py')
parser.add_argument('-i', '--sample', help='Sample name (sample_BC_barcode.bed)', required=True)
parser.add_argument('-b', '--bin_file', help='BED file with genomic bins')
parser.add_argument('-p', '--path', help='Path of BED files', default='.')
parser.add_argument('-S', '--bin_size', help='Bin size', default=5000, type=int)
parser.add_argument('-N', '--no_nfr', help='Discard NFR from tnH signal', action='store_true')
parser.add_argument('-l', '--frag_len', help='Max size for NFR', default=120, type=int)
options = parser.parse_args()
return options
def parse_chromap():
options = get_options()
sample = options.sample
bin_file = options.bin_file
bins = pd.read_table(bin_file, header=None)
bin_size = options.bin_size
chroms = set(bins[0])
offsets = dict.fromkeys(chroms)
path = options.path
for c in chroms:
offsets[c] = np.where(bins[0]==c)[0][0]
barcodes = {'tn5':['CGTACTAG','TCCTGAGC','TCATGAGC','CCTGAGAT'],
'tnH':['TAAGGCGA','GCTACGCT','AGGCTCCG','CTGCGCAT']}
whitelist = [x.split()[0] for x in open(f"{path}/{sample}_whitelist.tsv")]
bidx = dict([(whitelist[x], x) for x in range(len(whitelist))])
M = dict([(x, sp.lil_matrix((len(whitelist), len(bins)))) for x in barcodes.keys()])
for tn in barcodes:
for barcode in barcodes[tn]:
filename = f'{path}/{sample}_BC_{barcode}.bed'
if os.path.exists(filename):
is_gzip = False
elif os.path.exists(f'{filename}.gz'):
is_gzip = True
filename = f'{filename}.gz'
else:
sys.stderr.write(f"File type not supported for sample {sample}, {barcode}\n")
continue
if is_gzip:
fh = gzip.open(filename)
else:
fh = open(filename)
print(tn, barcode)
for line in tqdm(fh):
if is_gzip:
line = line.decode('ascii')
chrom, start, end, name, dup_count = line.split()
if not chrom in offsets:
continue
if tn == 'tnH' and options.no_nfr:
if int(end) - int(start) > options.frag_len:
continue
ostart = int(start) // bin_size
oend = int(end) // bin_size
dup_count = int(dup_count)
if ostart != oend:
c_idx = ostart + offsets[chrom]
r_idx = bidx[name]
M[tn][r_idx, c_idx] = M[tn][r_idx, c_idx] + 1
c_idx = oend + offsets[chrom]
r_idx = bidx[name]
M[tn][r_idx, c_idx] = M[tn][r_idx, c_idx] + 1
else:
c_idx = ostart + offsets[chrom]
r_idx = bidx[name]
M[tn][r_idx, c_idx] = M[tn][r_idx, c_idx] + 1
for tn in M:
M[tn] = sp.csr_matrix(M[tn])
adata = ad.AnnData(M['tn5'])
for tn in M.keys():
adata.layers[tn] = M[tn]
adata.obs_names = whitelist
adata.var_names = [f"{x[0]}:{x[1]}-{x[2]}" for x in bins.values]
adata.uns['NFR'] = {'skip_NFR':options.no_nfr, 'threshold':options.frag_len}
adata.write(f"{sample}.h5ad")
if __name__ == '__main__':
parse_chromap()