Skip to content
This repository has been archived by the owner on Mar 25, 2024. It is now read-only.

Commit

Permalink
Merge branch 'develop' into feat/api_tornado
Browse files Browse the repository at this point in the history
  • Loading branch information
BenMoon authored Jan 16, 2024
2 parents fe4438d + 1a95e44 commit 08f9dfa
Show file tree
Hide file tree
Showing 16 changed files with 82 additions and 72 deletions.
9 changes: 5 additions & 4 deletions pymepix/SPIDR/spidrcontroller.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,14 @@ class SPIDRController(Logger):
"""

def __init__(self, dst_ip_port, src_ip_port):
def __init__(self, dst_ip_port, pc_ip, udp_ip_port):
Logger.__init__(self, SPIDRController.__name__)

self.info("Connecting to camera on {}:{}".format(*dst_ip_port))
self._src_ip_port = src_ip_port
self._pc_ip = pc_ip
self._udp_ip_port = udp_ip_port
# TCP connection
self._sock = socket.create_connection(dst_ip_port, source_address=(src_ip_port[0], 0))
self._sock = socket.create_connection(dst_ip_port, source_address=(pc_ip, 0))
self._request_lock = threading.Lock()
self._req_buffer = np.ndarray(shape=(512,), dtype=np.uint32)
self._reply_buffer = bytearray(4096)
Expand All @@ -105,7 +106,7 @@ def _initDevices(self):

for x in range(count):
self._devices.append(SpidrDevice(self, x))
self._devices[x].serverPort = self._src_ip_port[1] + x
self._devices[x].serverPort = self._udp_ip_port[1] + x

def prepare(self):
self.disableExternalRefClock()
Expand Down
25 changes: 8 additions & 17 deletions pymepix/TPX4/tpx4chipdevice.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

import weakref

import numpy as np

from pymepix.core.log import Logger

class Tpx4ChipDevice(Logger):
Expand All @@ -16,46 +14,39 @@ def __init__(self, _ctrl, device_num):

self.info("Device {} with device number {} created".format(self._dev_num, self._dev_num))

self._ipAddrDest = None
self._serverPort = None


@property
def linkStatus(self):
# NEEDS IMPLEMENTATION

return True, True, True

@property
def ipAddrSrc(self):
# NEEDS IMPLEMENTATION

return "{}.{}.{}.{}".format(127,0,0,1)

@ipAddrSrc.setter
def ipAddrSrc(self, ipaddr):
# NEEDS IMPLEMENTATION
pass


self._ipAddrDest = ipaddr

@property
def ipAddrDest(self):
# NEEDS IMPLEMENTATION

return "{}.{}.{}.{}".format(127,0,0,1)
return self._ipAddrDest

@ipAddrDest.setter
def ipAddrDest(self, ipaddr):
# NEEDS IMPLEMENTATION
pass
self._ipAddrDest = ipaddr

@property
def serverPort(self):
# NEEDS IMPLEMENTATION
return 50000
return self._serverPort

@serverPort.setter
def serverPort(self, value):
# NEEDS IMPLEMENTATION
pass
self._serverPort = value


@property
Expand Down
9 changes: 6 additions & 3 deletions pymepix/TPX4/tpx4controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@

class Timepix4Controller(Logger):

def __init__(self, dst_ip_port, src_ip_port):
def __init__(self, dst_ip_port, pc_ip, udp_ip_port):
Logger.__init__(self, Timepix4Controller.__name__)

self._src_ip_port = src_ip_port
self._spidrAddr = dst_ip_port
self._pc_ip = pc_ip
self._udp_ip_port = udp_ip_port

self._devices = []
self._initDevices()
Expand All @@ -31,7 +33,8 @@ def _initDevices(self):

for x in range(count):
self._devices.append(Tpx4ChipDevice(self, x))
self._devices[x].serverPort = self._src_ip_port[1] + x
self._devices[x].ipAddrDest = self._udp_ip_port[0]
self._devices[x].serverPort = self._udp_ip_port[1] + x



Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from enum import Enum


class ApiDataType(Enum):
class ChannelDataType(Enum):
COMMAND = 'comm'
PIXEL = 'pixel'
TOF = 'tof'
Expand Down
File renamed without changes.
10 changes: 5 additions & 5 deletions pymepix/api/data_channel.py → pymepix/channel/data_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import zmq
import queue

from pymepix.api.data_types import ApiDataType
from pymepix.channel.channel_types import ChannelDataType
from pymepix.processing.datatypes import MessageType


Expand Down Expand Up @@ -51,15 +51,15 @@ def run(self):
self.socket.send_pyobj(new_data)

def send(self, data_type, data):
if data_type == ApiDataType.COMMAND:
if data_type == ChannelDataType.COMMAND:
self.q.put_nowait({'type': data_type.value, 'data': data.value})
else:
self.q.put_nowait({'type': data_type.value, 'data': data})

def send_data_by_message_type(self, message_type, data):
if message_type == MessageType.PixelData:
self.q.put_nowait({'type': ApiDataType.PIXEL.value, 'data': data})
self.q.put_nowait({'type': ChannelDataType.PIXEL.value, 'data': data})
elif message_type == MessageType.EventData:
self.q.put_nowait({'type': ApiDataType.TOF.value, 'data': data})
self.q.put_nowait({'type': ChannelDataType.TOF.value, 'data': data})
elif message_type == MessageType.CentroidData:
self.q.put_nowait({'type': ApiDataType.CENTROID.value, 'data': data})
self.q.put_nowait({'type': ChannelDataType.CENTROID.value, 'data': data})
15 changes: 9 additions & 6 deletions pymepix/config/default.yaml
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
name: 'ions'
timepix:
pc_ip : '192.168.1.1'
tpx_ip : '192.168.1.10'
tpx_ip : '192.168.1.11'
tpx_port: 50000
udp_ip: '192.168.1.11'
udp_port: 8192
sophy_config : 'path_to_sophy_config'
remote_processing_host: '131.169.168.130:13049'
camera_generation: 3
trainID:
connected: False
device : '/dev/ttyUSB0'
src_ip_port:
- '192.168.1.1'
- 8192
tango_api:
port: 9994
api_channel:
ip: '127.0.0.1'
port: 5056
port: 5056

16 changes: 10 additions & 6 deletions pymepix/config/ecomo_10Gb.yaml
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
name: 'ions'
timepix:
pc_ip : '192.168.100.1'
udp_port: 11113
tpx_ip : '192.168.100.10'
pc_ip : '192.168.1.1'
tpx_ip : '192.168.1.11'
tpx_port: 50000
udp_ip: '192.168.1.11'
udp_port: 8192
sophy_config : '/home/cfelcmi/timepix/10Gb2_50V_IKrum150.spx'
camera_generation: 3
trainID:
connected: False
device : '/dev/ttyUSB0'
tango_api:
port: 9994
tcpchannel:
ip : '127.0.0.1'
port: 5056
api_channel:
ip: '127.0.0.1'
port: 5056

16 changes: 10 additions & 6 deletions pymepix/config/ecomo_el.yaml
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
name: 'electrons'
name: 'ions'
timepix:
pc_ip : '192.168.1.1'
pc_port: 5566
tpx_ip : '192.168.1.10'
tpx_ip : '192.168.1.11'
tpx_port: 50000
udp_ip: '192.168.1.11'
udp_port: 8192
sophy_config : '/home/cfelcmi/timepix/ElectronTPX_IKrum150.spx'
camera_generation: 3
trainID:
connected: False
device : '/dev/ttyUSB0'
tango_api:
port: 9993
tcpchannel:
ip : '127.0.0.1'
port: 9994
api_channel:
ip: '127.0.0.1'
port: 5056

12 changes: 8 additions & 4 deletions pymepix/config/ecomo_ion.yaml
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
name: 'ions'
timepix:
pc_ip : '192.168.1.1'
udp_port: 11113
tpx_ip : '192.168.1.11'
tpx_port: 50000
udp_ip: '192.168.1.11'
udp_port: 8192
sophy_config : '/home/cfelcmi/timepix/10Gb2_50V_IKrum150.spx'
camera_generation: 3
trainID:
connected: False
device : '/dev/ttyUSB0'
tango_api:
port: 9994
tcpchannel:
ip : '127.0.0.1'
port: 5056
api_channel:
ip: '127.0.0.1'
port: 5056

11 changes: 5 additions & 6 deletions pymepix/processing/baseacquisition.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
class AcquisitionStage(Logger):
"""Defines a single acquisition stage
Usually not created directly. Instead created by :class:`AcquisitionPipeline`
Usually not created directly. Instead, it is created by :class:`AcquisitionPipeline`
Represent a single pipeline stage and handles management of queues and message passing
as well as creation and destruction of processing objects.
Expand All @@ -42,7 +42,6 @@ class AcquisitionStage(Logger):
------------
stage: int
Initial position in the pipeline, lower stages are executed first
"""

def __init__(self, stage, num_processes=1):
Expand Down Expand Up @@ -130,7 +129,7 @@ def setArgs(self, *args, **kwargs):
def processes(self):
return self._pipeline_objects

def build(self, input_queue=None, output_queue=None, file_writer=None):
def build(self, input_queue=None, output_queue=None):
self._input_queue = input_queue
self._output_queue = output_queue

Expand All @@ -142,7 +141,7 @@ def build(self, input_queue=None, output_queue=None, file_writer=None):
self.debug("I am creating the queue")
self._output_queue = Queue()
else:
self.debug("Recieved the queue {}".format(output_queue))
self.debug("Received the queue {}".format(output_queue))
self.debug("Building stage {} ".format(self._stage_number))
self.info("Creating {} processes".format(self._num_processes))
for n in range(self._num_processes):
Expand All @@ -155,8 +154,8 @@ def build(self, input_queue=None, output_queue=None, file_writer=None):
)
p.daemon = True
self._pipeline_objects.append(p)
if self._output_queue is None:
self._output_queue = p.outputQueues()[-1]
# if self._output_queue is None: do we really need it?
# self._output_queue = p.outputQueues()[-1]

@property
def outputQueue(self):
Expand Down
3 changes: 2 additions & 1 deletion pymepix/processing/logic/centroid_calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,8 @@ def perform_centroiding_dbscan(self, chunks):
# return p.map(self.calculate_centroids_dbscan, chunks)

if self.number_of_processes>1:
return Parallel(n_jobs=self.number_of_processes)(delayed(calculate_centroids_dbscan)(c,self.tot_threshold, self._tof_scale, self.epsilon, self.min_samples, self._cent_timewalk_lut) for c in chunks)
# return Parallel(n_jobs=self.number_of_processes)(delayed(calculate_centroids_dbscan)(c,self.tot_threshold, self._tof_scale, self.epsilon, self.min_samples, self._cent_timewalk_lut) for c in chunks)
return Parallel(n_jobs=None)(delayed(calculate_centroids_dbscan)(c,self.tot_threshold, self._tof_scale, self.epsilon, self.min_samples, self._cent_timewalk_lut) for c in chunks)

return map(self.calculate_centroids_dbscan, chunks)

Expand Down
2 changes: 1 addition & 1 deletion pymepix/processing/pipeline_centroid_calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@


class PipelineCentroidCalculator(BasePipelineObject):
"""Performs centroiding on EventData recieved from Packet processor"""
"""Performs centroiding on EventData received from Packet processor"""

def __init__(
self,
Expand Down
4 changes: 3 additions & 1 deletion pymepix/processing/rawtodisk.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ def _run_filewriter_thr(self, sock_addr, context=None):
self.info(f"zmq connect to tcp://127.0.0.1:{cfg.default_cfg['zmq_port']}")

# socket to maxwell
if remote_server := cfg.default_cfg.get('remote_processing_host') is not None:
remote_server = cfg.default_cfg.get('remote_processing_host')

if remote_server is not None:
self.info(f'connecting to processing server {remote_server}')
max_sock = context.socket(zmq.PUSH)
max_sock.connect(f"tcp://{remote_server}")
Expand Down
20 changes: 9 additions & 11 deletions pymepix/pymepix_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
from .SPIDR.spidrcontroller import SPIDRController
from .TPX4.tpx4controller import Timepix4Controller
from .timepixdevice import TimepixDevice
from pymepix.api.data_types import Commands, ApiDataType
from pymepix.api.data_channel import Data_Channel
from pymepix.channel.channel_types import Commands, ChannelDataType
from pymepix.channel.data_channel import Data_Channel

from .timepix4device import Timepix4Device

Expand Down Expand Up @@ -76,8 +76,6 @@ class PymepixConnection(Logger):
>>> timepix[0].loadSophyConfig('W0026_K06_50V.spx')
"""

def data_thread(self):
Expand All @@ -94,9 +92,10 @@ def data_thread(self):
self._channel.send_data_by_message_type(data_type, data)

def __init__(self,
cam_address=(cfg.default_cfg["timepix"]["tpx_ip"], 50000),
src_ip_port=(cfg.default_cfg['timepix']['pc_ip'],
int(cfg.default_cfg.get('timepix').get('pc_port', 8192))),
spidr_address=(cfg.default_cfg['timepix']['tpx_ip'],
int(cfg.default_cfg.get('timepix').get('tpx_port', 50000))),
udp_ip_port=(cfg.default_cfg['timepix']['udp_ip'], cfg.default_cfg['timepix']['udp_port']),
pc_ip = cfg.default_cfg['timepix']['pc_ip'],
api_address=(cfg.default_cfg['api_channel']['ip'],
cfg.default_cfg['api_channel']['port']),

Expand All @@ -107,14 +106,13 @@ def __init__(self,

self._channel = Data_Channel()
self._channel.start()
#self._channel_address = tuple(cfg.default_cfg.get('tcp_channel', ['127.0.0.1', 5056]))
self._channel.register(f'tcp://{api_address[0]}:{api_address[1]}')

self.camera_generation = camera_generation

controllerClass = self._timepix_controller_class_factory(camera_generation)

self._controller = controllerClass(cam_address, src_ip_port)
self._controller = controllerClass(spidr_address, pc_ip, udp_ip_port)

TimepixDeviceClass = self._timepix_device_class_factory(camera_generation)
self._timepix_devices: list[TimepixDeviceClass] = []
Expand Down Expand Up @@ -230,11 +228,11 @@ def start_recording(self, path):

self._timepix_devices[0].start_recording(path)

self._channel.send(ApiDataType.COMMAND, Commands.START_RECORD)
self._channel.send(ChannelDataType.COMMAND, Commands.START_RECORD)

def stop_recording(self):
self._timepix_devices[0].stop_recording()
self._channel.send(ApiDataType.COMMAND, Commands.STOP_RECORD)
self._channel.send(ChannelDataType.COMMAND, Commands.STOP_RECORD)

def start(self):
"""Starts acquisition"""
Expand Down

0 comments on commit 08f9dfa

Please sign in to comment.