-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcache.py
72 lines (59 loc) · 1.83 KB
/
cache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#
# Copyright 2019 Sony Mobile Communications Inc.
# SPDX-License-Identifier: MIT
#
"""
Provides actions over cached responses from Gerrit:
- read/write cached json Gerrit response from/to a gzip file
- adding cache lines
- filtering of cache based on given criteria
"""
import gzip
import json
from os import path
class Cache:
"""
See file docstring.
"""
def __init__(self, filename):
self._filename = filename
self._data = {}
def read(self):
"""
Read a gzip file and unpack into a json object.
"""
if not path.exists(self._filename):
self._data = []
else:
fcache = gzip.open(self._filename, "rb")
try:
self._data = json.load(fcache)
finally:
fcache.close()
def write(self):
"""
Dump json response from Gerrit into a byte sequence encoded with utf-8
and archive in a gzip file.
"""
fcache = gzip.open(self._filename, "wb")
try:
fcache.write(
bytearray(
json.dumps(self._data, indent=4, sort_keys=True), encoding="utf-8"
)
)
finally:
fcache.close()
# pylint: disable=missing-docstring
def filter_key(self, key):
return [x[key] for x in self._data]
def filter_delta(self, key, delta):
self._data = [x for x in self._data if x[key] not in delta]
def filter_threshold(self, key, threshold):
self._data = [x for x in self._data if x[key] > threshold]
def filter_predicate(self, predicate):
self._data = [x for x in self._data if predicate(x)]
def get_by_predicate(self, predicate):
return [x for x in self._data if predicate(x)]
def append(self, change):
self._data.append(change)