|
33 | 33 | import tempfile |
34 | 34 | import time |
35 | 35 | import traceback |
| 36 | +import zlib |
36 | 37 | from multiprocessing.queues import SimpleQueue |
37 | 38 |
|
38 | 39 | import six |
|
48 | 49 |
|
49 | 50 | usage = """rethinkdb export [-c HOST:PORT] [-p] [--password-file FILENAME] [--tls-cert filename] [-d DIR] |
50 | 51 | [-e (DB | DB.TABLE)]... |
51 | | - [--format (csv | json | ndjson)] [--fields FIELD,FIELD...] [--delimiter CHARACTER] |
| 52 | + [--format (csv | json | ndjson | jsongz)] [--fields FIELD,FIELD...] [--delimiter CHARACTER] |
52 | 53 | [--clients NUM]""" |
53 | 54 | help_description = ( |
54 | 55 | "`rethinkdb export` exports data from a RethinkDB cluster into a directory" |
@@ -118,11 +119,11 @@ def parse_options(argv, prog=None): |
118 | 119 | parser.add_option( |
119 | 120 | "--format", |
120 | 121 | dest="format", |
121 | | - metavar="json|csv|ndjson", |
| 122 | + metavar="json|csv|ndjson|jsongz", |
122 | 123 | default="json", |
123 | 124 | help="format to write (defaults to json. ndjson is newline delimited json.)", |
124 | 125 | type="choice", |
125 | | - choices=["json", "csv", "ndjson"], |
| 126 | + choices=["json", "csv", "ndjson", "jsongz"], |
126 | 127 | ) |
127 | 128 | parser.add_option( |
128 | 129 | "--clients", |
@@ -150,6 +151,17 @@ def parse_options(argv, prog=None): |
150 | 151 | ) |
151 | 152 | parser.add_option_group(csvGroup) |
152 | 153 |
|
| 154 | + jsongzGroup = optparse.OptionGroup(parser, "jsongz options") |
| 155 | + jsongzGroup.add_option( |
| 156 | + "--compression-level", |
| 157 | + dest="compression_level", |
| 158 | + metavar="NUM", |
| 159 | + default=None, |
| 160 | + help="compression level, an integer from 0 to 9 (defaults to -1 default zlib compression)", |
| 161 | + type="int", |
| 162 | + ) |
| 163 | + parser.add_option_group(jsongzGroup) |
| 164 | + |
153 | 165 | options, args = parser.parse_args(argv) |
154 | 166 |
|
155 | 167 | # -- Check validity of arguments |
@@ -185,6 +197,15 @@ def parse_options(argv, prog=None): |
185 | 197 | if options.delimiter: |
186 | 198 | parser.error("--delimiter option is only valid for CSV file formats") |
187 | 199 |
|
| 200 | + if options.format == "jsongz": |
| 201 | + if options.compression_level is None: |
| 202 | + options.compression_level = -1 |
| 203 | + elif options.compression_level < 0 or options.compression_level > 9: |
| 204 | + parser.error("--compression-level must be an integer from 0 and 9") |
| 205 | + else: |
| 206 | + if options.compression_level: |
| 207 | + parser.error("--compression-level option is only valid for jsongz file formats") |
| 208 | + |
188 | 209 | # - |
189 | 210 |
|
190 | 211 | return options |
@@ -226,6 +247,43 @@ def json_writer(filename, fields, task_queue, error_queue, format): |
226 | 247 | pass |
227 | 248 |
|
228 | 249 |
|
| 250 | +def json_gz_writer(filename, fields, task_queue, error_queue, format, compression_level): |
| 251 | + try: |
| 252 | + with open(filename, "wb") as out: |
| 253 | + # wbits 31 = MAX_WBITS + gzip header and trailer |
| 254 | + compressor = zlib.compressobj(compression_level, zlib.DEFLATED, 31) |
| 255 | + def compress_write(str): |
| 256 | + out.write(compressor.compress(str.encode("utf-8"))) |
| 257 | + |
| 258 | + first = True |
| 259 | + compress_write("[") |
| 260 | + item = task_queue.get() |
| 261 | + while not isinstance(item, StopIteration): |
| 262 | + row = item[0] |
| 263 | + if fields is not None: |
| 264 | + for item in list(row.keys()): |
| 265 | + if item not in fields: |
| 266 | + del row[item] |
| 267 | + if first: |
| 268 | + compress_write("\n") |
| 269 | + first = False |
| 270 | + else: |
| 271 | + compress_write(",\n") |
| 272 | + |
| 273 | + compress_write(json.dumps(row)) |
| 274 | + item = task_queue.get() |
| 275 | + |
| 276 | + compress_write("\n]\n") |
| 277 | + out.write(compressor.flush()) |
| 278 | + except BaseException: |
| 279 | + ex_type, ex_class, tb = sys.exc_info() |
| 280 | + error_queue.put((ex_type, ex_class, traceback.extract_tb(tb))) |
| 281 | + |
| 282 | + # Read until the exit task so the readers do not hang on pushing onto the queue |
| 283 | + while not isinstance(task_queue.get(), StopIteration): |
| 284 | + pass |
| 285 | + |
| 286 | + |
229 | 287 | def csv_writer(filename, fields, delimiter, task_queue, error_queue): |
230 | 288 | try: |
231 | 289 | with open(filename, "w") as out: |
@@ -331,6 +389,19 @@ def export_table( |
331 | 389 | options.format, |
332 | 390 | ), |
333 | 391 | ) |
| 392 | + elif options.format == "jsongz": |
| 393 | + filename = directory + "/%s/%s.jsongz" % (db, table) |
| 394 | + writer = multiprocessing.Process( |
| 395 | + target=json_gz_writer, |
| 396 | + args=( |
| 397 | + filename, |
| 398 | + options.fields, |
| 399 | + task_queue, |
| 400 | + error_queue, |
| 401 | + options.format, |
| 402 | + options.compression_level, |
| 403 | + ), |
| 404 | + ) |
334 | 405 | elif options.format == "csv": |
335 | 406 | filename = directory + "/%s/%s.csv" % (db, table) |
336 | 407 | writer = multiprocessing.Process( |
|
0 commit comments