-
Notifications
You must be signed in to change notification settings - Fork 18
/
movecassdata.py
executable file
·142 lines (117 loc) · 5.23 KB
/
movecassdata.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
#!/usr/bin/env python
import sys
from util.cass import get_full_cass_dataset, insert_dataset, initialize_worker
from util.common import StreamKey, TimeRange, ZULU_TIMESTAMP_FORMAT, zulu_timestamp_to_ntp_time
from ooi_data.postgres.model import *
from preload_database.database import create_engine_from_url, create_scoped_session
engine = create_engine_from_url(None)
session = create_scoped_session(engine)
MetadataBase.query = session.query_property()
def convert_zulu_to_ntp(zulu_timestamp):
try:
return zulu_timestamp_to_ntp_time(zulu_timestamp)
except (ValueError) as e:
usage(str(e))
return None
def get_float(value):
try:
return float(value)
except (ValueError):
return None
def get_ntp_time(time_stamp):
# If timestamp can be converted to float, it's assumed to be NTP
# Otherwise it's assumed to be zulu. If it's neither None will be returned.
float_time_stamp = get_float(time_stamp)
return float_time_stamp if float_time_stamp else convert_zulu_to_ntp(time_stamp)
def get_stream_key(old_or_new, subsite, node, sensor, method, stream_id):
# If stream id is all digits, it's assumed to be a valid stream id
# else it's assumed to be a valid stream name
if stream_id.isdigit():
stream = Stream.query.get(int(stream_id))
else:
stream = Stream.query.filter(Stream.name == stream_id).first()
if stream is None:
usage(old_or_new + " stream id (" + stream_id + ") is invalid: must be existing stream id or name")
return None
stream_key = StreamKey(subsite, node, sensor, method, stream)
if stream_key is None:
usage("Invalid values were used for the " + old_or_new + " stream key")
return stream_key
def get_time_range(time_stamps):
time_stamp_values = time_stamps.split("|")
if len(time_stamp_values) != 2:
usage("Timestamp argument incorrect")
return None
# Obtain NTP times from the time stamp values
begin_time_stamp = get_ntp_time(time_stamp_values[0])
end_time_stamp = get_ntp_time(time_stamp_values[1])
# Confirm valid NTP times were obtained
if begin_time_stamp is None or end_time_stamp is None:
return None
if begin_time_stamp > end_time_stamp:
usage("Begin timestamp (" + time_stamp_values[0] + ") must not exceed end timestamp (" +
time_stamp_values[1] + ")")
return None
print str(begin_time_stamp), str(end_time_stamp)
return TimeRange(begin_time_stamp, end_time_stamp)
def split_refdes(old_or_new, refdes):
refdes_values = refdes.split("-", 2)
if len(refdes_values) != 3:
usage(old_or_new + " refdes (" + refdes + ") is incorrectly specified")
return (None, None, None)
subsite = refdes_values[0]
node = refdes_values[1]
sensor = refdes_values[2]
dash_in_sensor = sensor.find("-")
another_dash = sensor.rfind("-")
if dash_in_sensor < 0 or dash_in_sensor == len(sensor) - 1 or dash_in_sensor != another_dash:
usage(old_or_new + " sensor (" + sensor + ") is incorrectly specified")
return (None, None, None)
return (subsite, node, sensor)
def split_sk_vals(old_or_new, skentry):
skvals = skentry.split(":")
if len(skvals) != 3:
usage(old_or_new + " stream-key argument incorrect")
return (None, None, None)
return (skvals[0], skvals[1], skvals[2])
def usage(mesg):
print("BAD INPUT: " + str(mesg))
print("USAGE: movecassdata.py '<oldsk>' '<newsk>' '<timestamp_range>'")
print("<oldsk>,<newsk> as <subsite>-<node>-<sensor>:<method>:<stream>")
print("<stream> as <stream_nbr> or <stream_name>")
print("<timestamp_range> as <beg_ntp>|<end_ntp> or <beg_zulu>|<end_zulu>")
def main(args):
if len(args) != 3:
usage("Expecting 3 arguments, got " + str(len(args)))
return
# Extract the arguments
old_sk_vals = args[0]
new_sk_vals = args[1]
time_stamps = args[2]
# Extract the refdes, method and stream id from the stream_key values
(old_refdes, old_method, old_stream_id) = split_sk_vals("old", old_sk_vals)
(new_refdes, new_method, new_stream_id) = split_sk_vals("new", new_sk_vals)
if old_refdes is None or new_refdes is None:
return
# Extract the subsite, node and sensor from the refdes values
(old_subsite, old_node, old_sensor) = split_refdes("old", old_refdes)
(new_subsite, new_node, new_sensor) = split_refdes("new", new_refdes)
if old_subsite is None or new_subsite is None:
return
# Create StreamKeys
old_stream_key = get_stream_key("old", old_subsite, old_node, old_sensor, old_method, old_stream_id)
new_stream_key = get_stream_key("new", new_subsite, new_node, new_sensor, new_method, new_stream_id)
if old_stream_key is None or new_stream_key is None:
return
# Use the time stamp ranges to create a TimeRange
time_range = get_time_range(time_stamps)
if time_range is None:
return
initialize_worker()
dep_datasets = get_full_cass_dataset(old_stream_key, time_range, keep_exclusions=True)
if dep_datasets:
for dep, dataset in dep_datasets.iteritems():
insert_dataset(new_stream_key, dataset)
if __name__ == "__main__":
# forward all but the program name argument
main(sys.argv[1:])