-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathvector_logging.py
173 lines (146 loc) · 6.26 KB
/
vector_logging.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
import logging
import asyncio
import json
import os
import sys
import contextvars
def FuckAiohttp(record: logging.LogRecord) -> bool:
str_msg = str(getattr(record, "msg", ""))
if "was destroyed but it is pending" in str_msg:
return False
if str_msg.startswith("task:") and str_msg.endswith(">"):
return False
return True
prompt_context: contextvars.ContextVar[int] = contextvars.ContextVar("prompt_context")
# adapted from https://stackoverflow.com/questions/50144628/python-logging-into-file-as-a-dictionary-or-json
class JsonFormatter(logging.Formatter):
"""
Formatter that outputs JSON strings after parsing the LogRecord.
"""
def format(self, record: logging.LogRecord) -> str:
record.message = record.getMessage()
# this is all the normal logrecord attributes except "levelname", "module", "funcName", "lineno"
# we're doing this to recover extras
# https://github.com/python/cpython/blob/main/Lib/logging/__init__.py#L1653
exclude = [
"name",
"msg",
"args",
"levelno",
"pathname",
"filename",
"exc_info",
"exc_text",
"stack_info",
"created",
"msecs",
"relativeCreated",
"thread",
"threadName",
"processName",
"process",
]
message_dict = {k: v for k, v in record.__dict__.items() if k not in exclude}
prompt = prompt_context.get(None)
if prompt:
message_dict["prompt"] = prompt
if record.exc_info:
# Cache the traceback text to avoid converting it multiple times
# (it's constant anyway)
if not record.exc_text:
record.exc_text = self.formatException(record.exc_info)
if record.exc_text:
message_dict["exc_info"] = record.exc_text
if record.stack_info:
message_dict["stack_info"] = self.formatStack(record.stack_info)
return json.dumps(message_dict, default=str)
USE_VECTOR = os.getenv("HONEYCOMB_API_KEY") and os.path.exists("./vector")
class Vector:
# all the stuff in this class declaration will be run once and available from self
logger = logging.getLogger()
logger.setLevel("DEBUG")
prompt_context = prompt_context
if USE_VECTOR:
print("using vector")
original_stdout = os.dup(sys.stdout.fileno())
original_stderr = os.dup(sys.stderr.fileno())
# explicit pipe we'll use for tee later
tee_read, tee_write = os.pipe()
# explicit pipe for vector -- we'll be passing this to tee
vector_read, vector_write = os.pipe()
# adapted from https://stackoverflow.com/a/651718
# Cause tee's stdin to get a copy of our stdin/stdout (as well as that
# of any child processes we spawn)
# the pipe will buffer everything we write until vector starts reading
# note that logging calls will appear before prints
# note note any fatal errors between this and tee starting will be lost forever!
os.dup2(tee_write, sys.stdout.fileno())
os.dup2(tee_write, sys.stderr.fileno())
# set up logging
# write structured logs to only vector, not original stderr
vector_file = os.fdopen(vector_write, mode="w")
vector_handler = logging.StreamHandler(vector_file)
vector_handler.addFilter(FuckAiohttp)
vector_handler.setLevel("DEBUG")
vector_handler.setFormatter(JsonFormatter())
logger.addHandler(vector_handler)
# we want to write formatted logs only to the original stderr, not vector
# normally this would be sys.stder
# but we need to open the duplicated fd as a file
stderr_file = os.fdopen(original_stderr, mode="w")
console_handler: logging.StreamHandler = logging.StreamHandler(stderr_file)
else:
console_handler = logging.StreamHandler()
fmt = logging.Formatter("{levelname} {module}:{lineno}: {message}", style="{")
console_handler.setLevel(
((os.getenv("LOGLEVEL") or os.getenv("LOG_LEVEL")) or "DEBUG").upper()
)
console_handler.setFormatter(fmt)
console_handler.addFilter(FuckAiohttp)
logger.addHandler(console_handler)
# if i hear about epoll selector one more time i'mna end it
logging.getLogger("asyncio").setLevel("INFO")
logging.info("starting")
async def init_vector(self) -> None:
if not USE_VECTOR:
return
self.tee = await asyncio.create_subprocess_shell(
# write to vector's fd and stdout
f"tee /dev/fd/{self.vector_write}",
# if we just set stdin to just PIPE, it would be a StreamWriter and not have a real fd
# so we're using the explicit pipe we opened earlier
stdin=self.tee_read,
stdout=self.original_stdout,
stderr=self.original_stderr,
# tee should have access to the vector fd
pass_fds=[self.vector_write],
)
self.vector = await asyncio.create_subprocess_shell(
# "cat - > /tmp/fake_vector",
"./vector --quiet -c vector.toml",
stdin=self.vector_read,
env={"HONEYCOMB_API_KEY": os.environ["HONEYCOMB_API_KEY"]},
)
logging.info("started vector")
# this seems to make things exit early without logging
# anyway init waits for orphaned processes to exit so it's fine
# async def cleanup(self) -> None:
# self.vector.terminate()
# self.tee.terminate()
# await self.vector.communicate()
# await self.tee.communicate()
def sync_start_vector() -> None:
asyncio.run(Vector().init_vector())
async def main() -> None:
vector = Vector()
await vector.init_vector()
# this goes to tee, vector sees it's not json and leaves it unchanged
print("example print")
# subprocesses inherit the same thing
await (await asyncio.create_subprocess_shell("date")).wait()
# logging is special and prettyprinted to original stdout but structured for vector
logging.info("a log message with extra information", extra={"attribute": "42"})
if __name__ == "__main__":
asyncio.run(main())
print("message after asyncio")
# open("/tmp/test", "a").write("we ran\n")