Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 20 additions & 17 deletions nslsii/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ def configure_ipython_logging(
return bluesky_ipython_log_file_path


def configure_kafka_publisher(RE, beamline_name, override_config_path=None):
def configure_kafka_publisher(RE, beamline_name, override_config_path=None, **kwargs):
""" Read a Kafka configuration file and subscribe a Kafka publisher to the RunEngine.

A configuration file is required. Environment variable BLUESKY_KAFKA_CONFIG_FILE
Expand Down Expand Up @@ -486,7 +486,8 @@ def configure_kafka_publisher(RE, beamline_name, override_config_path=None):
RE,
beamline_name=beamline_name,
bootstrap_servers=bootstrap_servers,
producer_config=bluesky_kafka_configuration["runengine_producer_config"]
producer_config=bluesky_kafka_configuration["runengine_producer_config"],
**kwargs
)
else:
kafka_publisher_details = _subscribe_kafka_queue_thread_publisher(
Expand All @@ -496,6 +497,7 @@ def configure_kafka_publisher(RE, beamline_name, override_config_path=None):
producer_config=bluesky_kafka_configuration[
"runengine_producer_config"
],
*kwargs
)

return bluesky_kafka_configuration, kafka_publisher_details
Expand Down Expand Up @@ -680,25 +682,29 @@ def _read_bluesky_kafka_config_file(config_file_path):
)


def _subscribe_kafka_publisher(RE, beamline_name, bootstrap_servers, producer_config, _publisher_factory=None):
def _subscribe_kafka_publisher(RE, beamline_name, bootstrap_servers, producer_config, _publisher_factory=None, *,
document_source='runengine'):
"""
Subscribe a RunRouter to the specified RE to create Kafka Publishers.
Each Publisher will publish documents from a single run to the
Kafka topic "<beamline_name>.bluesky.runengine.documents".
Kafka topic "<beamline_name>.bluesky.<document_source>.documents".

Parameters
----------
RE: RunEngine
RE : RunEngine
the RunEngine to which the RunRouter will be subscribed
beamline_name: str
beamline_name : str
beamline start_name, for example "csx", to be used in building the
Kafka topic to which messages will be published
bootstrap_servers: str
bootstrap_servers : str
Comma-delimited list of Kafka server addresses as a string such as ``'10.0.137.8:9092'``
producer_config: dict
producer_config : dict
dictionary of Kafka Producer configuration settings
_publisher_factory: callable, optional
_publisher_factory : callable, optional
intended only for testing, default is bluesky_kafka.Publisher, optionally specify a callable
that constructs a Publisher-like object
document_source : str, optional
The document source.

Returns
-------
Expand All @@ -712,7 +718,7 @@ def _subscribe_kafka_publisher(RE, beamline_name, bootstrap_servers, producer_co
from bluesky_kafka.utils import list_topics
from event_model import RunRouter

topic = f"{beamline_name.lower()}.bluesky.runengine.documents"
topic = f"{beamline_name.lower()}.bluesky.{document_source}.documents"

if _publisher_factory is None:
_publisher_factory = Publisher
Expand Down Expand Up @@ -795,7 +801,7 @@ def publish_or_abort_run(name_, doc_):


def _subscribe_kafka_queue_thread_publisher(
RE, beamline_name, bootstrap_servers, producer_config, publisher_queue_timeout=1
RE, beamline_name, bootstrap_servers, producer_config, publisher_queue_timeout=1, document_source='runengine'
):
"""
Create and start a separate thread to publish bluesky documents as Kafka
Expand All @@ -818,6 +824,8 @@ def _subscribe_kafka_queue_thread_publisher(
such as ``'kafka1:9092,kafka2:9092``
producer_config: dict
dictionary of Kafka Producer configuration settings
document_source : str, optional
The document source.

Returns
-------
Expand All @@ -830,20 +838,17 @@ def _subscribe_kafka_queue_thread_publisher(
un-subscribe the function from the RunEngine, in case someone ever wants to do that.

"""
from bluesky_kafka import BlueskyKafkaException
from bluesky_kafka.tools.queue_thread import build_kafka_publisher_queue_and_thread

nslsii_logger = logging.getLogger("nslsii")
beamline_runengine_topic = None
kafka_publisher_token = None
publisher_thread_stop_event = None
kafka_publisher_re_token = None
publisher_queue_thread_details = None

try:
nslsii_logger.info("connecting to Kafka broker(s): '%s'", bootstrap_servers)
beamline_runengine_topic = (
f"{beamline_name.lower()}.bluesky.runengine.documents"
f"{beamline_name.lower()}.bluesky.{document_source}.documents"
)

publisher_queue_thread_details = build_kafka_publisher_queue_and_thread(
Expand All @@ -853,8 +858,6 @@ def _subscribe_kafka_queue_thread_publisher(
publisher_queue_timeout=publisher_queue_timeout,
)

publisher_thread_stop_event = publisher_queue_thread_details.publisher_thread_stop_event

kafka_publisher_re_token = RE.subscribe(
publisher_queue_thread_details.put_on_publisher_queue
)
Expand Down