Skip to content
Draft

Howl #44

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ INFO:saluki: 0 - low:7515, high:7551, num_messages:36

`saluki play mybroker:9092/source_topic mybroker:9092/dest_topic -t 1762209990 1762209992` - This will forward messages between the two given timestamps.

## `howl` - Produce fake run-like messages

`saluki-howl` emits `ev44` events, `pl72` run starts, and `6s4t` run stops to Kafka, in a format which
look somewhat like a real run.

```
saluki howl mybroker:9092 SOME_PREFIX
```

# Developer setup
`pip install -e .[dev]`

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ dependencies = [
"ess-streaming-data-types",
"confluent-kafka>=2.12.1", # for produce_batch in play()
"python-dateutil",
"tzdata"
"tzdata",
"numpy",
]
readme = {file = "README.md", content-type = "text/markdown"}
license-files = ["LICENSE"]
Expand Down
133 changes: 133 additions & 0 deletions src/saluki/howl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import json
import logging
import time
import uuid

import numpy as np
from confluent_kafka import Producer
from streaming_data_types import serialise_6s4t, serialise_ev44, serialise_pl72
from streaming_data_types.run_start_pl72 import DetectorSpectrumMap

logger = logging.getLogger("saluki")

RNG = np.random.default_rng()


def generate_fake_events(
msg_id: int,
events_per_frame: int,
tof_peak: float,
tof_sigma: float,
det_min: int,
det_max: int,
) -> bytes:
detector_ids = RNG.integers(low=det_min, high=det_max, size=events_per_frame)
tofs = np.maximum(0.0, RNG.normal(loc=tof_peak, scale=tof_sigma, size=events_per_frame))

return serialise_ev44(
source_name="saluki",
reference_time=[time.time() * 1_000_000_000],
message_id=msg_id,
reference_time_index=[0],
time_of_flight=tofs,
pixel_id=detector_ids,
)


def generate_run_start(det_max: int) -> bytes:
det_spec_map = DetectorSpectrumMap(
detector_ids=np.arange(0, det_max, dtype=np.int32),
spectrum_numbers=np.arange(0, det_max, dtype=np.int32),
n_spectra=det_max,
)
return serialise_pl72(
start_time=int(time.time() * 1000),
stop_time=None,
run_name=f"saluki-howl-{uuid.uuid4()}",
instrument_name="saluki-howl",
nexus_structure=json.dumps({}),
job_id=str(uuid.uuid4()),
filename=str(uuid.uuid4()),
detector_spectrum_map=det_spec_map,
)


def generate_run_stop() -> bytes:
return serialise_6s4t(
stop_time=int(time.time() * 1000),
job_id=str(uuid.uuid4()),
)


def howl(
broker: str,
topic_prefix: str,
events_per_frame: int,
frames_per_second: int,
frames_per_run: int,
tof_peak: float,
tof_sigma: float,
det_min: int,
det_max: int,
) -> None:
"""
Send messages vaguely resembling a run to Kafka.
"""

producer = Producer(
{
"bootstrap.servers": broker,
"queue.buffering.max.messages": 100000,
"queue.buffering.max.ms": 20,
}
)

target_frame_time = 1 / frames_per_second

frames = 0

ev44_size = len(
generate_fake_events(0, events_per_frame, tof_peak, tof_sigma, det_min, det_max)
)
rate_bytes_per_sec = ev44_size * frames_per_second
rate_mbit_per_sec = (rate_bytes_per_sec / 1024**2) * 8
logger.info(f"Attempting to simulate data rate: {rate_mbit_per_sec:.3f} Mbit/s")
logger.info(f"Each ev44 is {ev44_size} bytes")

producer.produce(
topic=f"{topic_prefix}_runInfo",
key=None,
value=generate_run_start(det_max),
)

while True:
start_time = time.time()
target_end_time = start_time + target_frame_time

producer.produce(
topic=f"{topic_prefix}_events",
key=None,
value=generate_fake_events(
frames, events_per_frame, tof_peak, tof_sigma, det_min, det_max
),
)
producer.poll(0)
frames += 1

if frames_per_run != 0 and frames % frames_per_run == 0:
logger.info(f"Starting new run after {frames_per_run} simulated frames")
producer.produce(
topic=f"{topic_prefix}_runInfo",
key=None,
value=generate_run_stop(),
)
producer.produce(
topic=f"{topic_prefix}_runInfo",
key=None,
value=generate_run_start(det_max),
)

sleep_time = max(target_end_time - time.time(), 0)
if sleep_time == 0:
logger.warning("saluki-howl cannot keep up with target event/frame rate")
time.sleep(sleep_time)
50 changes: 48 additions & 2 deletions src/saluki/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import sys

from saluki.consume import consume
from saluki.howl import howl
from saluki.listen import listen
from saluki.play import play
from saluki.sniff import sniff
Expand All @@ -15,6 +16,7 @@
_CONSUME = "consume"
_PLAY = "play"
_SNIFF = "sniff"
_HOWL = "howl"


def main() -> None:
Expand Down Expand Up @@ -52,7 +54,9 @@ def main() -> None:
_SNIFF, help="sniff - broker metadata", parents=[common_options]
)
sniff_parser.add_argument(
"broker", type=str, help="broker, optionally suffixed with a topic name to filter to"
"broker",
type=str,
help="broker, optionally suffixed with a topic name to filter to",
)

consumer_parser = argparse.ArgumentParser(add_help=False)
Expand All @@ -65,7 +69,9 @@ def main() -> None:
)

consumer_mode_parser = sub_parsers.add_parser(
_CONSUME, help="consumer mode", parents=[topic_parser, consumer_parser, common_options]
_CONSUME,
help="consumer mode",
parents=[topic_parser, consumer_parser, common_options],
)
consumer_mode_parser.add_argument(
"-m",
Expand Down Expand Up @@ -120,6 +126,34 @@ def main() -> None:
nargs=2,
)

howl_parser = sub_parsers.add_parser(
_HOWL,
help="replay mode - replay data into another topic",
parents=[common_options],
)
howl_parser.add_argument("broker", type=str, help="Kafka broker URL")
howl_parser.add_argument("topic_prefix", type=str, help="Topic prefix e.g. INSTNAME")
howl_parser.add_argument(
"--events-per-frame", type=int, help="Events per frame to simulate", default=100
)
howl_parser.add_argument(
"--frames-per-second", type=int, help="Frames per second to simulate", default=1
)
howl_parser.add_argument(
"--frames-per-run",
type=int,
help="Frames to take before beginning new run (0 to run forever)",
default=0,
)
howl_parser.add_argument(
"--tof-peak", type=float, help="Time-of-flight peak (ns)", default=10_000_000
)
howl_parser.add_argument(
"--tof-sigma", type=float, help="Time-of-flight sigma (ns)", default=2_000_000
)
howl_parser.add_argument("--det-min", type=int, help="Minimum detector ID", default=0)
howl_parser.add_argument("--det-max", type=int, help="Maximum detector ID", default=1000)

if len(sys.argv) == 1:
parser.print_help()
sys.exit(1)
Expand Down Expand Up @@ -169,6 +203,18 @@ def main() -> None:
except RuntimeError:
logger.debug(f"Sniffing whole broker {args.broker}")
sniff(args.broker)
elif args.command == _HOWL:
howl(
args.broker,
args.topic_prefix,
events_per_frame=args.events_per_frame,
frames_per_second=args.frames_per_second,
frames_per_run=args.frames_per_run,
tof_peak=args.tof_peak,
tof_sigma=args.tof_sigma,
det_min=args.det_min,
det_max=args.det_max,
)


if __name__ == "__main__":
Expand Down
3 changes: 2 additions & 1 deletion src/saluki/play.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ def play(
logger.debug(f"finished consuming {num_messages} messages")
consumer.close()
producer.produce_batch(
dest_topic, [{"key": message.key(), "value": message.value()} for message in msgs]
dest_topic,
[{"key": message.key(), "value": message.value()} for message in msgs],
)
logger.debug(f"flushing producer. len(p): {len(producer)}")
producer.flush(timeout=10)
Expand Down
4 changes: 3 additions & 1 deletion src/saluki/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ def fallback_deserialiser(payload: bytes) -> str:


def deserialise_and_print_messages(
msgs: List[Message], partition: int | None, schemas_to_filter_to: list[str] | None = None
msgs: List[Message],
partition: int | None,
schemas_to_filter_to: list[str] | None = None,
) -> None:
for msg in msgs:
try:
Expand Down
Empty file added tests/test_howl.py
Empty file.
7 changes: 5 additions & 2 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ def test_deserialising_message_which_raises_does_not_stop_loop(mock_message):
assert logger.info.call_count == 1


def test_deserialising_with_schema_list_ignores_messages_with_schema_not_in_list(mock_message):
def test_deserialising_with_schema_list_ignores_messages_with_schema_not_in_list(
mock_message,
):
with patch("saluki.utils.logger") as logger:
ok_message = Mock(spec=Message)
ok_message.value.return_value = serialise_fc00(config_change=1, streams=[]) # type: ignore
Expand Down Expand Up @@ -179,7 +181,8 @@ def test_uri_with_no_topic():


@pytest.mark.parametrize(
"timestamp", ["2025-11-19T15:27:11", "2025-11-19T15:27:11Z", "2025-11-19T15:27:11+00:00"]
"timestamp",
["2025-11-19T15:27:11", "2025-11-19T15:27:11Z", "2025-11-19T15:27:11+00:00"],
)
def test_parses_datetime_properly_with_string(timestamp):
assert dateutil_parsable_or_unix_timestamp(timestamp) == 1763566031000
Expand Down
Loading