-
Notifications
You must be signed in to change notification settings - Fork 0
/
header.py
123 lines (102 loc) · 3.18 KB
/
header.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
"""
Utilities for reading header of filterbank file
"""
from struct import unpack
import numpy as np
HEADER_KEYWORD_TYPES = {
b'telescope_id': b'<l',
b'machine_id': b'<l',
b'data_type': b'<l',
b'barycentric': b'<l',
b'pulsarcentric': b'<l',
b'nbits': b'<l',
b'nsamples': b'<l',
b'nchans': b'<l',
b'nifs': b'<l',
b'nbeams': b'<l',
b'ibeam': b'<l',
b'rawdatafile': b'str',
b'source_name': b'str',
b'az_start': b'<d',
b'za_start': b'<d',
b'tstart': b'<d',
b'tsamp': b'<d',
b'fch1': b'<d',
b'foff': b'<d',
b'refdm': b'<d',
b'period': b'<d',
b'src_raj': b'angle',
b'src_dej': b'angle',
}
def read_header(filename):
"""
Read Filterbank header and return a dictionary of key-value pairs
"""
with open(filename, 'rb') as file_header:
header_dict = {}
keyword, value = read_next_header_keyword(file_header)
try:
assert keyword == b'HEADER_START'
except AssertionError:
raise RuntimeError("Not a valid Filterbank file.")
while True:
keyword, value = read_next_header_keyword(file_header)
if keyword == b'HEADER_END':
break
else:
header_dict[keyword] = value
return header_dict
def read_next_header_keyword(file_header):
"""
Read key-value pair from header
"""
n_bytes = np.fromstring(file_header.read(4), dtype='uint32')[0]
if n_bytes > 255:
n_bytes = 16
keyword = file_header.read(n_bytes)
if b'HEADER_START' in keyword or b'HEADER_END' in keyword:
return keyword, 0
dtype = HEADER_KEYWORD_TYPES[keyword]
if dtype == b'<l':
val = unpack(dtype, file_header.read(4))[0]
if dtype == b'<d':
val = unpack(dtype, file_header.read(8))[0]
if dtype == b'str':
str_len = np.fromstring(file_header.read(4), dtype='int32')[0]
val = file_header.read(str_len)
if dtype == b'angle':
val = unpack('<d', file_header.read(8))[0]
val = fil_double_to_angle(val)
return keyword, val
def fil_double_to_angle(angle):
"""
Reads a little-endian double in ddmmss.s (or hhmmss.s) format and then
converts to Float degrees (or hours).
"""
negative = (angle < 0.0)
angle = np.abs(angle)
data_matrix = np.floor((angle / 10000))
angle -= 10000 * data_matrix
time_minutes = np.floor((angle / 100))
time_seconds = angle - 100 * time_minutes
data_matrix += time_minutes / 60.0 + time_seconds / 3600.0
if negative:
data_matrix *= -1
return data_matrix
def len_header(filename):
"""
Return the length of the header in bytes
"""
chunk_size = 512
with open(filename, 'rb') as file_:
header_sub_count = 0
eoh_found = False
while not eoh_found:
header_sub = file_.read(chunk_size)
header_sub_count += 1
if b'HEADER_END' in header_sub:
idx_end = header_sub.index(b'HEADER_END') + len(b'HEADER_END')
eoh_found = True
break
idx_end = int((header_sub_count - 1) * chunk_size + idx_end)
return idx_end