Skip to content

Commit

Permalink
Merge pull request #159 from DUNE/feature_evt_builder_updates
Browse files Browse the repository at this point in the history
Event builder updates prior to FSD reflow
  • Loading branch information
mjkramer authored Feb 7, 2025
2 parents c602473 + 88c927a commit efa1246
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 55 deletions.
184 changes: 134 additions & 50 deletions src/proto_nd_flow/reco/charge/raw_event_builder.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from collections import defaultdict
import numpy as np
import logging

from h5flow import H5FLOW_MPI
if H5FLOW_MPI:
from mpi4py import MPI

from proto_nd_flow.util.array import fill_with_last


class RawEventBuilder(object):
'''
Expand All @@ -14,18 +17,23 @@ class RawEventBuilder(object):
'''
version = '0.0.0'

default_rollover_ticks = 1E7

def __init__(self, **params):
'''
Initialize given parameters for the class, each parameter is
optional with a default provided by the implemented class
'''
pass
self.rollover_ticks = params.get('rollover_ticks',
self.default_rollover_ticks)

def get_config(self):
'''
:returns: a `dict` of the instance configuration parameters
'''
return dict()
return dict(
rollover_ticks=self.rollover_ticks,
)

def build_events(self, packets, unix_ts, mc_assn=None):
'''
Expand Down Expand Up @@ -97,6 +105,53 @@ def cross_rank_set_attrs(self, *attrs):
d = dict([(attr, getattr(self, attr)) for attr in attrs])
comm.send(d, dest=rank + 1)

def unroll_timestamps(self, packets: np.ndarray) -> np.ndarray:
'''
Calculates "unrolled" timestamps for an array of packets. The
unrolled timestamps increase monotonically, rather than rolling over
every ~second. Each SYNC packet introduces an additional cumulative
offset (of self.rollover_ticks, e.g. 1E7) that gets added to each
subsequent raw timestamp, giving the unrolled timestamps. We round
the LArPix timestamp of the SYNC to the nearest self.rollover_ticks,
which takes care of the case when a SYNC is missed by the PACMAN.
Each IO group is treated independently here.
'''
offsets = np.zeros((len(packets),), dtype='i8')
for io_group in np.unique(packets['io_group']):
mask = packets['io_group'] == io_group
sync_mask = (mask &
(packets['packet_type'] == 6) &
(packets['trigger_type'] == 83))
sync_ts = np.zeros_like(offsets)
# Replace 0 with ~1E7 at each SYNC; ~2E7 if PACMAN missed prev SYNC
# (assuming self.rollover_ticks is 1E7)
sync_ts[sync_mask] = packets[sync_mask]['timestamp']
# And round to the nearest 1E7 to prevent clock drift
sync_ts[sync_mask] = (np.round(sync_ts[sync_mask] / self.rollover_ticks)
* self.rollover_ticks)
# Now get the cumulative sum of all _preceding_ increments
# (subtracting sync_ts[mask] => "preceding")
offsets[mask] = np.cumsum(sync_ts[mask]) - sync_ts[mask]
# Finally: If the receipt_timestamp is less than the timestamp, this
# means that a SYNC arrived while the packet was traveling across
# the tile. In that case, subtract the timestamp of the preceding SYNC.
oops_mask = (mask &
(packets['packet_type'] == 0) &
(packets['receipt_timestamp'] < packets['timestamp']))
last_sync_ts = fill_with_last(sync_ts)
offsets[oops_mask] -= last_sync_ts[oops_mask]

ts = packets['timestamp'].astype('i8') + offsets

# Timestamp packets require special treatment, since their timestamp
# field is actually a unix timestamp. For these, we just subtract this
# unix timestamp back out, so that their "ts" is the corresponding entry
# of "offsets".
unix_mask = packets['packet_type'] == 4
ts[unix_mask] -= packets[unix_mask]['timestamp'].astype('i8')

return ts


class TimeDeltaRawEventBuilder(RawEventBuilder):
'''
Expand Down Expand Up @@ -243,13 +298,11 @@ class SymmetricWindowRawEventBuilder(RawEventBuilder):

default_window = 1820 // 2
default_threshold = 10
default_rollover_ticks = 1E7

def __init__(self, **params):
super(SymmetricWindowRawEventBuilder, self).__init__(**params)
self.window = params.get('window', self.default_window)
self.threshold = params.get('threshold', self.default_threshold)
self.rollover_ticks = params.get('rollover_ticks', self.default_rollover_ticks)

self.event_buffer = np.empty((0,)) # keep track of partial events from previous calls
self.event_buffer_unix_ts = np.empty((0,), dtype='u8')
Expand All @@ -259,7 +312,7 @@ def get_config(self):
return dict(
window=self.window,
threshold=self.threshold,
rollover_ticks=self.rollover_ticks,
**super().get_config(),
)

def build_events(self, packets, unix_ts, mc_assn=None, ts=None, return_ts=False):
Expand All @@ -282,22 +335,7 @@ def build_events(self, packets, unix_ts, mc_assn=None, ts=None, return_ts=False)
packets = np.append(self.event_buffer, packets) if len(self.event_buffer) else packets

if ts is None:
# correct for rollovers
rollover = np.zeros((len(packets),), dtype='i8')
for io_group in np.unique(packets['io_group']):
# find rollovers
mask = (packets['io_group'] == io_group) & (packets['packet_type'] == 6) & (packets['trigger_type'] == 83)
rollover[mask] = self.rollover_ticks
# calculate sum of rollovers
mask = (packets['io_group'] == io_group)
rollover[mask] = np.cumsum(rollover[mask]) - rollover[mask]
# correct for readout delay (only in real data)
if mc_assn is None:
mask = (packets['io_group'] == io_group) & (packets['packet_type'] == 0) \
& (packets['receipt_timestamp'].astype(int) - packets['timestamp'].astype(int) < 0)
rollover[mask] -= self.rollover_ticks

ts = packets['timestamp'].astype('i8') + rollover
ts = self.unroll_timestamps(packets)

sorted_idcs = np.argsort(ts)
ts = ts[sorted_idcs]
Expand Down Expand Up @@ -408,9 +446,9 @@ class ExtTrigRawEventBuilder(RawEventBuilder):
An external trigger based event builder. Events are sliced such that they always follow an external trigger and the readout window is configurable. The default is set to 182 x 1.1 units (10% grace period). Note the event builder may contain more than one trigger if they are within a readout window time.
'''
default_window = 1820 * 1.1
default_rollover_ticks = 1E7
default_shifted_event_dt = -70 #This is for accounting the fact that the trigger packet can potentially arrive 7 microseconds later than the beam spill
default_trig_io_grp = 1 # -1 -> all io groups
default_extendable = False

default_build_off_beam_events=False
default_off_beam_window = 1820 // 2
Expand All @@ -419,8 +457,8 @@ class ExtTrigRawEventBuilder(RawEventBuilder):
def __init__(self, **params):
super(ExtTrigRawEventBuilder, self).__init__(**params)
self.window = params.get('window', self.default_window)
self.rollover_ticks = params.get('rollover_ticks', self.default_rollover_ticks)
self.trig_io_grp = params.get('trig_io_grp', self.default_trig_io_grp)
self.extendable = params.get('extendable', self.default_extendable)
self.build_off_beam_events = params.get('build_off_beam_events', self.default_build_off_beam_events)
self.off_beam_window = params.get('off_beam_window', self.default_off_beam_window)
self.off_beam_threshold = params.get('off_beam_threshold', self.default_off_beam_threshold)
Expand All @@ -432,29 +470,41 @@ def __init__(self, **params):
self.prepend_count = 0
self.last_beam_trigger_idx = None

if not isinstance(self.trig_io_grp, list):
self.trig_io_grp = [self.trig_io_grp]
self.window = self.to_iog_dict(self.window)
self.shifted_event_dt = self.to_iog_dict(self.shifted_event_dt)
self.extendable = self.to_iog_dict(self.extendable)

def to_iog_dict(self, var):
'''
Convert a scalar or list VAR into a dict, keyed by io_group.
If a scalar, use a defaultdict that always yields VAR.
If a list, assume the i'th element corresponds to the i'th io_group
in self.trig_io_grp.
'''
if isinstance(var, list):
assert len(var) == len(self.trig_io_grp)
return {self.trig_io_grp[i]: v for i, v in enumerate(var)}
assert len(self.trig_io_grp) == 1
if self.trig_io_grp == [-1]:
return defaultdict(lambda: var)
return {self.trig_io_grp[0]: var}

def get_config(self):
return dict(
window=self.window,
trig_io_grp=self.trig_io_grp,
rollover_ticks=self.rollover_ticks,
)

window=list(self.window.items()),
shifted_event_dt=list(self.shifted_event_dt.items()),
extendable=list(self.extendable.items()),
**super().get_config(),
)

def build_events(self, packets, unix_ts, mc_assn=None):
if len(packets) == 0:
return ([], []) if mc_assn is None else ([], [], [])

rollover = np.zeros((len(packets),), dtype='i8')
for io_group in np.unique(packets['io_group']):
mask = (packets['io_group'] == io_group) & (packets['packet_type'] == 6) & (packets['trigger_type'] == 83)
rollover[mask] = self.rollover_ticks
mask = (packets['io_group'] == io_group)
rollover[mask] = np.cumsum(rollover[mask]) - rollover[mask]
if mc_assn is None:
mask = (packets['io_group'] == io_group) & (packets['packet_type'] == 0) \
& (packets['receipt_timestamp'].astype(int) - packets['timestamp'].astype(int) < 0)
rollover[mask] -= self.rollover_ticks

ts = packets['timestamp'].astype('i8') + rollover
ts = self.unroll_timestamps(packets)
sorted_idcs = np.argsort(ts)
ts = ts[sorted_idcs]

Expand All @@ -464,26 +514,61 @@ def build_events(self, packets, unix_ts, mc_assn=None):
mc_assn = mc_assn[sorted_idcs]

trig_mask = packets['packet_type'] == 7
if self.trig_io_grp != -1:
trig_mask &= packets['io_group'] == self.trig_io_grp
beam_trigger_idxs = np.where(trig_mask)[0]
if self.trig_io_grp != [-1]:
iog_masks = [packets['io_group'] == iog for iog in self.trig_io_grp]
trig_mask &= np.logical_or.reduce(iog_masks)
trigger_idcs = np.where(trig_mask)[0]

events = []
event_unix_ts = []
event_mc_assn = [] if mc_assn is not None else None

start_times = []

# Mask to keep track of packets associated to beam events
# Only used if off-beam events are built later with unused packets
# Mask to keep track of packets associated to triggers
used_mask = np.zeros( len(unix_ts) ) < -1
for i, start_idx in enumerate(beam_trigger_idxs):
this_trig_time = ts[start_idx]+self.shifted_event_dt
this_trig_time += self.rollover_ticks if this_trig_time < 0 else 0 # Considered Roll-Over Issue
used_trig_idcs = set()
for i, start_idx in enumerate(trigger_idcs):
if start_idx in used_trig_idcs:
continue
used_trig_idcs.add(start_idx)

this_io_group = packets[start_idx]['io_group']
this_trig_time = ts[start_idx] + self.shifted_event_dt[this_io_group]
last_io_group, last_trig_time = this_io_group, this_trig_time
start_times.append(this_trig_time)
# FIXME & (ts % 1E7 != 0) is a hot fix for PPS signal
hotfix_mask = (ts % 1E7 != 0) | ((ts % 1E7 == 0) & trig_mask)
mask = ((ts - this_trig_time) >= 0) & ((ts - this_trig_time) <= self.window) & hotfix_mask

if self.extendable[this_io_group]:
while True:
# Scan for further triggers in the window
pileup_trig_mask = ((ts - last_trig_time) > 0) \
& ((ts - last_trig_time) <= self.window[last_io_group]) \
& hotfix_mask \
& trig_mask
if not pileup_trig_mask.any():
break
for pileup_trig_idx in np.where(pileup_trig_mask)[0]:
iog = packets[pileup_trig_idx]['io_group']
if (self.trig_io_grp != -1) and (iog not in self.trig_io_grp):
continue
used_trig_idcs.add(pileup_trig_idx)
last_io_group = iog
last_trig_time = ts[pileup_trig_idx]
# If we find a non-extendable ("beam") trigger then we
# stop at the end of that trigger's window
if not self.extendable[last_io_group]:
break
else: # no break
continue # Scan over new window starting from last trig
break # Or, if we broke out of "for", break out of "while"

mask = ((ts - this_trig_time) >= 0) \
& ((ts - last_trig_time) <= self.window[last_io_group]) \
& ~used_mask \
& hotfix_mask

events.append(packets[mask])
event_unix_ts.append(unix_ts[mask])
if mc_assn is not None:
Expand All @@ -498,8 +583,7 @@ def build_events(self, packets, unix_ts, mc_assn=None):

# build off beam events using SymmetricRawEventBuilder
off_beam_config = {'window' : self.off_beam_window,
'threshold' : self.off_beam_threshold,
'rollover_ticks' : self.rollover_ticks
'threshold' : self.off_beam_threshold
}
off_beam_builder = SymmetricWindowRawEventBuilder( **off_beam_config )

Expand Down
20 changes: 15 additions & 5 deletions src/proto_nd_flow/reco/charge/raw_event_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,18 +402,28 @@ def next(self):
# find unix timestamp groups
ts_mask = packet_buffer['packet_type'] == 4
ts_grps = np.split(packet_buffer, np.argwhere(ts_mask).ravel())
unix_ts_grps = [np.full(len(ts_grp[1:]), ts_grp[0], dtype=packet_buffer.dtype) for ts_grp in ts_grps if len(ts_grp)]
unix_ts_grps = [np.full(len(ts_grp), ts_grp[0], dtype=packet_buffer.dtype)
for ts_grp in ts_grps if len(ts_grp)]
unix_ts = np.concatenate(unix_ts_grps, axis=0) \
if len(unix_ts_grps) else np.empty((0,), dtype=packet_buffer.dtype)
packet_buffer = packet_buffer[~ts_mask]
if self.is_mc:
mc_assn = mc_assn[~ts_mask[1:]]
packet_buffer['timestamp'] = packet_buffer['timestamp'].astype(int) % (2**31) # ignore 32nd bit from pacman triggers
# Insert a null MC association at the beginning, corresponding to
# the timestamp packet we inserted above. We can grab such a "null"
# by taking the MC assn from any timestamp packet. Take the 1st one.
a_null_mc_assn = mc_assn[np.argwhere(ts_mask[1:]).ravel()[0]]
mc_assn = np.insert(mc_assn, [0], a_null_mc_assn)
# ignore 32nd bit from pacman triggers
# (don't do this for timestamp packets, where the timestamp is a unix ts)
packet_buffer[~ts_mask]['timestamp'] = \
packet_buffer[~ts_mask]['timestamp'].astype(int) % (2**31)
self.last_unix_ts = unix_ts[-1] if len(unix_ts) else self.last_unix_ts

if self.sync_noise_cut_enabled and not self.is_mc:
# remove all packets that occur before the cut
sync_noise_mask = (packet_buffer['timestamp'] > self.sync_noise_cut[0]) & (packet_buffer['timestamp'] < self.sync_noise_cut[1])
sync_noise_mask = ((packet_buffer['timestamp'] > self.sync_noise_cut[0]) &
(packet_buffer['timestamp'] < self.sync_noise_cut[1]))
# don't apply cut to timestamp packets
sync_noise_mask |= packet_buffer['packet_type'] == 4
packet_buffer = packet_buffer[sync_noise_mask]
unix_ts = unix_ts[sync_noise_mask]
if self.is_mc:
Expand Down
16 changes: 16 additions & 0 deletions src/proto_nd_flow/util/array.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import numpy as np


def fill_with_last(arr: np.ndarray) -> np.ndarray:
'''
Given an array ARR, copy it and replace all zeros with the last
nonzero value. E.g.,
[3, 0, 0, 5, 0, 8, 0, 0, 0] => [3, 3, 3, 5, 5, 8, 8, 8, 8].
TODO: Replace with a faster implementation. (This one is based on
the code that calculates unix_ts in raw_event_generator.py.)
'''
groups = np.split(arr, np.argwhere(arr).ravel())
# SLOW:
groups = [np.full(len(group), group[0])
for group in groups if len(group)]
return np.concatenate(groups, axis=0)
1 change: 1 addition & 0 deletions yamls/fsd_flow/reco/charge/RawEventGenerator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ params:
event_builder_config:
window: 3300 # 0.1 usec ticks; 10% buffer added to previous 3000
trig_io_grp: 4
extendable: True
rollover_ticks: 10000000 # PPS = 1e7 ticks
build_off_beam_events : True

0 comments on commit efa1246

Please sign in to comment.