-
Notifications
You must be signed in to change notification settings - Fork 2
/
demo.py
106 lines (82 loc) · 2.86 KB
/
demo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# -*- coding: utf-8 -*-
"""
Demo service that exercises much of the instrumentation in this package.
Supply a honeycomb API key to send the spans there for visual exploration.
Usage:
HONEYCOMB_API_KEY=secret nameko run demo \
--define AMQP_URI=pyamqp://guest:guest@localhost/ \
--define WEB_SERVER_ADDRESS=0.0.0.0:8000
"""
import logging
import os
import random
import time
import nameko
import opentelemetry.instrumentation.requests
import requests
from kombu.messaging import Exchange, Queue
from nameko.events import EventDispatcher, event_handler
from nameko.messaging import Publisher, consume
from nameko.rpc import ServiceRpc, rpc
from nameko.timer import timer
from nameko.web.handlers import http
from opentelemetry import trace
from opentelemetry.ext.honeycomb import HoneycombSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from nameko_opentelemetry import NamekoInstrumentor
logging.basicConfig()
logger = logging.getLogger(__name__)
provider = TracerProvider(resource=Resource.create({"blah.blah": "foobar"}))
trace.set_tracer_provider(provider)
# instrument requests
opentelemetry.instrumentation.requests.RequestsInstrumentor().instrument(
tracer_provider=provider, entrypoint_adapters={}
)
# instrument nameko
NamekoInstrumentor().instrument(tracer_provider=provider)
# export spans to honeycomb
exporter = HoneycombSpanExporter(
service_name="demo-service",
writekey=os.environ.get("HONEYCOMB_API_KEY"),
dataset="test",
)
trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(exporter))
exchange = Exchange(name="barfoo")
queue = Queue(name="barfoo", exchange=exchange)
class Service:
name = "demo"
dispatch = EventDispatcher()
publish = Publisher()
demo_rpc = ServiceRpc("demo")
@consume(queue)
def handle_barfoo(self, body):
time.sleep(random.random())
return body
@event_handler("demo", "foobar")
def handle_foobar(self, payload):
time.sleep(random.random())
self.publish("barfoo", routing_key="barfoo")
return "ok"
@http("GET", "/matt")
def matt(self, request):
time.sleep(random.random())
self.dispatch("foobar", "yo")
time.sleep(random.random())
return "ok"
@rpc
def upper(self, string):
time.sleep(random.random())
requests.get(f"http://{nameko.config['WEB_SERVER_ADDRESS']}/matt")
time.sleep(random.random())
return string.upper()
@http("GET", "/hello")
def hello(self, request):
time.sleep(random.random())
requests.get("http://google.com")
time.sleep(random.random())
return 200, self.demo_rpc.upper("matt")
@timer(interval=1)
def tick(self):
requests.get(f"http://{nameko.config['WEB_SERVER_ADDRESS']}/hello")