Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 66 additions & 2 deletions samples/probe_dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,63 @@
"""A simple Knot DNS probe client."""

import argparse
import datetime
import json
import libknot
import libknot.probe
import sys

def extract_query_name(probe, fieldname):
return probe.qname_str()

def extract_dns_header(probe, fieldname):
val = getattr(probe, fieldname)
return {
"additionals": val.additionals,
"answers": val.answers,
"authorities": val.authorities,
"flag_aa": val.flag_aa,
"flag_ad": val.flag_ad,
"flag_cd": val.flag_cd,
"flag_qr": val.flag_qr,
"flag_ra": val.flag_ra,
"flag_rd": val.flag_rd,
"flag_tc": val.flag_tc,
"flag_z": val.flag_z,
"id": val.id,
"opcode": val.opcode,
"questions": val.questions,
"rcode": val.rcode,
}

def extract_addr(probe, fieldname):
val = getattr(probe, fieldname)
return probe.addr_str(val)

def extract_safe(probe, fieldname):
convert_func = getattr
try:
convert_func = {
'local_addr': extract_addr,
'remote_addr': extract_addr,
'query_hdr': extract_dns_header,
'reply_hdr': extract_dns_header,
'query_name': extract_query_name,
}[fieldname]
except KeyError:
pass

return convert_func(probe, fieldname)

def convert_json(probe):
probe_as_dict = {}
for field in probe._fields_:
fieldname = field[0]
fieldtype = field[1]
fieldlen = field[2] if len(field) > 2 else None

probe_as_dict[fieldname] = extract_safe(probe, fieldname)
return probe_as_dict

def probe_loop(args):
try:
Expand All @@ -25,7 +78,13 @@ def probe_loop(args):
while (True):
if probe.consume(data, 1000) > 0:
for item in data:
print(item.str(color=not args.no_color, timestamp=not args.no_timestamp))
if args.json:
item_json = convert_json(item)
if not args.no_timestamp:
item_json["timestamp"] = datetime.datetime.now().isoformat()
print(json.dumps(item_json))
else:
print(item.str(color=not args.no_color, timestamp=not args.no_timestamp))
except KeyboardInterrupt:
sys.exit(0)

Expand All @@ -49,10 +108,15 @@ def probe_loop(args):
default=1,
help="the probe channel"
)
parser.add_argument(
"--json",
action='store_true',
help="Print JSON formatted"
)
parser.add_argument(
"--no-color",
action='store_true',
help="don't colorize the output"
help="don't colorize the output (JSON is never colorized)"
)
parser.add_argument(
"--no-timestamp",
Expand Down