-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwhere_are_write_indices.py
146 lines (128 loc) · 4.76 KB
/
where_are_write_indices.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
"""
Will attempt to tie where a write index may be. E.g. if on hot, the index will have a exclude warm setting. Optional index/tier filters available for checking specific items. You may also provide a list of indices to filter from a file.
Results can be sorted by physical primary store size.
"""
from getpass import getpass, getuser
from os import supports_effective_ids
import re
import argparse
import requests
import tabulate
import bitmath
# Flip flop since that's how exclude works
TIER_MAP = {"hot": "warm", "warm": "hot"}
def main(args):
# Setup
api = args.api
es_user = args.es_user
es_pass = args.es_pass or getpass()
auth = es_user, es_pass
tier_filter = args.tier
index_filter = args.index
filter_from_file = args.filter_from_file
if filter_from_file:
filter_from_file = open(filter_from_file).read().splitlines()
sort_by_size = args.sort_by_size
if tier_filter:
print(f'Looking for indices on tier "{tier_filter}"')
if index_filter:
print(f'Looking for indices with substring "{index_filter}"')
# Get write index -> alias
write_aliases = {}
alias_regex = re.compile(r"(.*)\-\d{4}\.\d{2}\.\d{2}\-\d{6}$")
current_aliases = requests.get(f"{api}/_aliases", auth=auth)
current_aliases.raise_for_status()
for index, index_config in current_aliases.json().items():
try:
extracted_alias = alias_regex.findall(index)[0]
except IndexError:
print(f"{index} has no alias matching the rollover pattern")
continue
if index_filter:
if index_filter not in extracted_alias:
continue
for alias, alias_config in index_config["aliases"].items():
if extracted_alias == alias:
if alias_config.get("is_write_index", True):
write_aliases[index] = alias
if sort_by_size:
print(f"Getting index sizes for sorting later")
index_to_size = {
item["index"]: int(item["pri.store.size"])
for item in requests.get(
f"{api}/_cat/indices",
auth=auth,
params={"h": "index,pri.store.size", "format": "json", "bytes": "b"},
).json()
if item["pri.store.size"]
}
# Map index to settings
index_to_tier = {}
index_settings = requests.get(
f"{api}/_settings",
auth=auth,
params={"filter_path": "**.routing.allocation.exclude.tier"},
)
index_settings.raise_for_status()
for index, index_config in index_settings.json().items():
if index not in write_aliases.keys():
continue
tier_excluded = TIER_MAP.get(
index_config["settings"]["index"]["routing"]["allocation"]["exclude"][
"tier"
]
)
if tier_filter:
if tier_excluded != tier_filter:
continue
index_to_tier[index] = tier_excluded
if filter_from_file:
print(
f"Using filtered indices from file, prior to filtering there is {len(index_to_tier)} indices"
)
index_to_tier = {
i: t for i, t in index_to_tier.items() if i in filter_from_file
}
print(f"After filtering there are {len(index_to_tier)} indices")
# Show
sorted_index_to_tier = sorted(index_to_tier.items(), key=lambda d: (d[1], d[0]))
if sort_by_size:
sorted_index_to_tier = sorted(
index_to_tier.items(), key=lambda d: index_to_size[d[0]], reverse=True
)
for i in range(len(sorted_index_to_tier)):
index, value = sorted_index_to_tier[i]
size = (
bitmath.Byte(index_to_size[index])
.best_prefix()
.format("{value:.02f} {unit}")
)
row = (
index,
size,
value,
)
sorted_index_to_tier[i] = row
print(
"\n"
+ tabulate.tabulate(sorted_index_to_tier, headers=("Index", "Size", "Tier"))
)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("api")
parser.add_argument("--es-user", default=getuser())
parser.add_argument("--es-pass", default=None)
parser.add_argument(
"--filter-from-file", default=None, help="filter indices based on file contents"
)
parser.add_argument("--sort-by-size", default=False, action="store_true")
parser.add_argument(
"--index", default=None, help="filter indices with this substring"
)
parser.add_argument(
"--tier",
default=None,
help="filter write indices based on tier, expects fixed tiers such as hot, warm, leave blank for no filter",
)
args = parser.parse_args()
main(args)