-
Notifications
You must be signed in to change notification settings - Fork 1
/
serverlesscurator.py
executable file
·69 lines (56 loc) · 2.78 KB
/
serverlesscurator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# Run Elasticsearch Curator from AWS Lambda.
#
# Edit serverless-curator.yaml to define which indices should be purged.
from __future__ import print_function
import os
import certifi
import curator
import yaml
import time
from curator.exceptions import NoIndices
from elasticsearch import Elasticsearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
# This is the entry point where Lambda will start execution.
def handler(event, context):
# For this function, we don't care about 'event' and 'context',
# but they need to be in the function signature anyway.
with open('serverless-curator.yaml') as config_file:
config = yaml.load(config_file)
# Create a place to track any indices that are deleted.
deleted_indices = {}
# Create a place to track backup indices
backup_indices = {}
# We can define multiple Elasticsearch clusters to manage, so we'll have
# an outer loop for working through them.
for cluster_config in config:
cluster_name = cluster_config['name']
deleted_indices[cluster_name] = []
backup_indices[cluster_name] = []
awsauth = AWS4Auth(os.getenv('AWS_ACCESS_KEY_ID'), os.getenv('AWS_SECRET_ACCESS_KEY'), cluster_config['region'], 'es', session_token=os.getenv('AWS_SESSION_TOKEN'))
# Create a collection to the cluster. We're using mangaged clusters in
# Elastic Cloud for this example, so we can enable SSL security.
es = Elasticsearch(cluster_config['endpoint'], http_auth=awsauth, connection_class=RequestsHttpConnection)
# Now we'll work through each set of time-series indices defined in our config for this cluster.
for index in cluster_config['indices']:
prefix = index['prefix']
print('Checking "%s" indices on %s cluster.' %
(prefix, cluster_name))
# Fetch all the index names.
index_list = curator.IndexList(es)
try:
# Reduce the list to those that match the prefix.
index_list.filter_by_regex(kind='prefix', value=prefix)
# Reduce again, by age.
index_list.filter_by_age(source='name', direction='older',
timestring='%Y.%m.%d', unit='days',
unit_count=index['days'])
curator.DeleteIndices(index_list).do_action()
# If nothing is left in the list, we'll get a NoIndices exception.
# That's OK.
except NoIndices:
pass
# Record the names of any indices we removed.
deleted_indices[cluster_name].extend(index_list.working_list())
lambda_response = {'backup': backup_indices, 'deleted': deleted_indices}
print(lambda_response)
return lambda_response