forked from vimeo/graphite-influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
graphite_influxdb.py
116 lines (100 loc) · 4.28 KB
/
graphite_influxdb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import re
import logging
try:
from graphite_api.intervals import Interval, IntervalSet
from graphite_api.node import LeafNode, BranchNode
except ImportError:
from graphite.intervals import Interval, IntervalSet
from graphite.node import LeafNode, BranchNode
from django.core.cache import cache
from influxdb import InfluxDBClient
def config_to_client(config=None):
if config is not None:
cfg = config.get('influxdb', {})
host = cfg.get('host', 'localhost')
port = cfg.get('port', 8086)
user = cfg.get('user', 'graphite')
passw = cfg.get('pass', 'graphite')
db = cfg.get('db', 'graphite')
else:
from django.conf import settings
host = getattr(settings, 'INFLUXDB_HOST', 'localhost')
port = getattr(settings, 'INFLUXDB_PORT', 8086)
user = getattr(settings, 'INFLUXDB_USER', 'graphite')
passw = getattr(settings, 'INFLUXDB_PASS', 'graphite')
db = getattr(settings, 'INFLUXDB_DB', 'graphite')
return InfluxDBClient(host, port, user, passw, db)
class InfluxdbReader(object):
__slots__ = ('client', 'path', 'logger')
def __init__(self, client, path, logger):
self.client = client
self.path = path
self.logger = logger
def fetch(self, start_time, end_time):
data = self.client.query("select time, value from %s where time > %ds and time < %ds order asc" % (self.path, start_time, end_time))
datapoints = []
start = 0
end = 0
step = 10
try:
points = data[0]['points']
start = points[0][0]
end = points[-1][0]
step = points[1][0] - start
datapoints = [p[2] for p in points]
except Exception:
pass
time_info = start, end, step
self.logger.debug("influx REQUESTED RANGE for %s: %d to %d" % (self.path, start_time, end_time))
self.logger.debug("influx RETURNED RANGE for %s: %d to %d" % (self.path, start, end))
return time_info, datapoints
def get_intervals(self):
last_data = self.client.query("select * from %s limit 1" % self.path)
first_data = self.client.query("select * from %s limit 1 order asc" % self.path)
last = 0
first = 0
try:
last = last_data[0]['points'][0][0]
first = first_data[0]['points'][0][0]
except Exception:
pass
return IntervalSet([Interval(first, last)])
class InfluxdbFinder(object):
__slots__ = ('client', 'logger', 'series', 'seriesCacheKey')
def __init__(self, config=None):
self.client = config_to_client(config)
self.series = None
self.seriesCacheKey = 'INFLUXDBSERIES'
# from graphite_api.app import app
# self.logger = app.logger
logging.basicConfig()
self.logger = logging.getLogger("graphite-influxdb")
self.logger.setLevel(logging.DEBUG)
def find_nodes(self, query):
# query.pattern is basically regex, though * should become [^\.]+ and . \.
# but list series doesn't support pattern matching/regex yet
self.logger.info(query)
regex = query.pattern.replace('.', '\.').replace('*', '[^\.]+')
self.logger.info("find_nodes query: %s -> %s" % (query.pattern, regex))
regex = re.compile(regex)
series = cache.get(self.seriesCacheKey)
if series is None:
self.logger.info('CACHE MISS: list series')
series = self.client.query('list series')
cache.set(self.seriesCacheKey, series, 300)
else:
self.logger.info('CACHE HIT: list series')
#for s in series:
#self.logger.info("matching %s" % s['name'])
series = [s['name'] for s in series if regex.match(s['name']) is not None]
seen_branches = set()
# for leaf "a.b.c" we should yield branches "a" and "a.b"
for s in series:
self.logger.info("leaf %s" % s)
yield LeafNode(s, InfluxdbReader(self.client, s, self.logger))
branch = s.rpartition('.')[0]
while branch != '' and branch not in seen_branches:
self.logger.info("branch %s" % branch)
yield BranchNode(branch)
seen_branches.add(branch)
branch = branch.rpartition('.')[0]