@@ -60,7 +60,7 @@ def open(filename, mode="rb", compresslevel=gzip_ng._COMPRESS_LEVEL_TRADEOFF,
60
60
gzip_file = io .BufferedReader (
61
61
_ThreadedGzipReader (filename , block_size = block_size ))
62
62
else :
63
- gzip_file = io . BufferedWriter (
63
+ gzip_file = FlushableBufferedWriter (
64
64
_ThreadedGzipWriter (
65
65
filename ,
66
66
mode .replace ("t" , "b" ),
@@ -101,6 +101,7 @@ def __init__(self, filename, queue_size=2, block_size=1024 * 1024):
101
101
self .worker = threading .Thread (target = self ._decompress )
102
102
self ._closed = False
103
103
self .running = True
104
+ self ._calling_thread = threading .current_thread ()
104
105
self .worker .start ()
105
106
106
107
def _check_closed (self , msg = None ):
@@ -110,15 +111,15 @@ def _check_closed(self, msg=None):
110
111
def _decompress (self ):
111
112
block_size = self .block_size
112
113
block_queue = self .queue
113
- while self .running :
114
+ while self .running and self . _calling_thread . is_alive () :
114
115
try :
115
116
data = self .fileobj .read (block_size )
116
117
except Exception as e :
117
118
self .exception = e
118
119
return
119
120
if not data :
120
121
return
121
- while self .running :
122
+ while self .running and self . _calling_thread . is_alive () :
122
123
try :
123
124
block_queue .put (data , timeout = 0.05 )
124
125
break
@@ -166,6 +167,12 @@ def closed(self) -> bool:
166
167
return self ._closed
167
168
168
169
170
+ class FlushableBufferedWriter (io .BufferedWriter ):
171
+ def flush (self ):
172
+ super ().flush ()
173
+ self .raw .flush ()
174
+
175
+
169
176
class _ThreadedGzipWriter (io .RawIOBase ):
170
177
"""
171
178
Write a gzip file using multiple threads.
@@ -215,6 +222,7 @@ def __init__(self,
215
222
if "b" not in mode :
216
223
mode += "b"
217
224
self .lock = threading .Lock ()
225
+ self ._calling_thread = threading .current_thread ()
218
226
self .exception : Optional [Exception ] = None
219
227
self .level = level
220
228
self .previous_block = b""
@@ -313,30 +321,35 @@ def write(self, b) -> int:
313
321
self .input_queues [worker_index ].put ((data , zdict ))
314
322
return len (data )
315
323
316
- def flush (self ):
324
+ def _end_gzip_stream (self ):
317
325
self ._check_closed ()
318
326
# Wait for all data to be compressed
319
327
for in_q in self .input_queues :
320
328
in_q .join ()
321
329
# Wait for all data to be written
322
330
for out_q in self .output_queues :
323
331
out_q .join ()
332
+ # Write an empty deflate block with a lost block marker.
333
+ self .raw .write (zlib_ng .compress (b"" , wbits = - 15 ))
334
+ trailer = struct .pack ("<II" , self ._crc , self ._size & 0xFFFFFFFF )
335
+ self .raw .write (trailer )
336
+ self ._crc = 0
337
+ self ._size = 0
324
338
self .raw .flush ()
325
339
340
+ def flush (self ):
341
+ self ._end_gzip_stream ()
342
+ self ._write_gzip_header ()
343
+
326
344
def close (self ) -> None :
327
345
if self ._closed :
328
346
return
329
- self .flush ()
347
+ self ._end_gzip_stream ()
330
348
self .stop ()
331
349
if self .exception :
332
350
self .raw .close ()
333
351
self ._closed = True
334
352
raise self .exception
335
- # Write an empty deflate block with a lost block marker.
336
- self .raw .write (zlib_ng .compress (b"" , wbits = - 15 ))
337
- trailer = struct .pack ("<II" , self ._crc , self ._size & 0xFFFFFFFF )
338
- self .raw .write (trailer )
339
- self .raw .flush ()
340
353
if self .closefd :
341
354
self .raw .close ()
342
355
self ._closed = True
@@ -353,7 +366,7 @@ def _compress(self, index: int):
353
366
try :
354
367
data , zdict = in_queue .get (timeout = 0.05 )
355
368
except queue .Empty :
356
- if not self .running :
369
+ if not ( self .running and self . _calling_thread . is_alive ()) :
357
370
return
358
371
continue
359
372
try :
@@ -369,41 +382,31 @@ def _compress(self, index: int):
369
382
def _write (self ):
370
383
index = 0
371
384
output_queues = self .output_queues
372
- fp = self .raw
373
- total_crc = 0
374
- size = 0
375
385
while True :
376
386
out_index = index % self .threads
377
387
output_queue = output_queues [out_index ]
378
388
try :
379
389
compressed , crc , data_length = output_queue .get (timeout = 0.05 )
380
390
except queue .Empty :
381
- if not self .running :
382
- self ._crc = total_crc
383
- self ._size = size
391
+ if not (self .running and self ._calling_thread .is_alive ()):
384
392
return
385
393
continue
386
- total_crc = zlib_ng .crc32_combine (total_crc , crc , data_length )
387
- size += data_length
388
- fp .write (compressed )
394
+ self . _crc = zlib_ng .crc32_combine (self . _crc , crc , data_length )
395
+ self . _size += data_length
396
+ self . raw .write (compressed )
389
397
output_queue .task_done ()
390
398
index += 1
391
399
392
400
def _compress_and_write (self ):
393
401
if not self .threads == 1 :
394
402
raise SystemError ("Compress_and_write is for one thread only" )
395
- fp = self .raw
396
- total_crc = 0
397
- size = 0
398
403
in_queue = self .input_queues [0 ]
399
404
compressor = self .compressors [0 ]
400
405
while True :
401
406
try :
402
407
data , zdict = in_queue .get (timeout = 0.05 )
403
408
except queue .Empty :
404
- if not self .running :
405
- self ._crc = total_crc
406
- self ._size = size
409
+ if not (self .running and self ._calling_thread .is_alive ()):
407
410
return
408
411
continue
409
412
try :
@@ -413,9 +416,9 @@ def _compress_and_write(self):
413
416
self ._set_error_and_empty_queue (e , in_queue )
414
417
return
415
418
data_length = len (data )
416
- total_crc = zlib_ng .crc32_combine (total_crc , crc , data_length )
417
- size += data_length
418
- fp .write (compressed )
419
+ self . _crc = zlib_ng .crc32_combine (self . _crc , crc , data_length )
420
+ self . _size += data_length
421
+ self . raw .write (compressed )
419
422
in_queue .task_done ()
420
423
421
424
def _set_error_and_empty_queue (self , error , q ):
0 commit comments