-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathprompt_processing_techprop.tex
739 lines (570 loc) · 40.4 KB
/
prompt_processing_techprop.tex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
\documentclass[pdftex,12pt,letter]{article}
\usepackage[margin=0.75in]{geometry}
\usepackage{verbatim}
\usepackage{graphicx}
\usepackage{xspace}
\usepackage{cite}
\usepackage{url}
\usepackage[pdftex,pdfpagelabels,bookmarks,hyperindex,hyperfigures]{hyperref}
\newcommand{\pd}{protoDUNE\xspace}
%\newcommand{\pdsp}{pD/SP\xspace}
\newcommand{\xrd}{XRootD\xspace}
\newcommand{\expname}{\textit{NP04}\xspace}
\newcommand{\PP}{Prompt Processing\xspace}
\newcommand{\PPS}{\textbf{p3s}\xspace}
\title{A Design of the \pd/\expname Prompt Processing System}
\date{\today}
\author{N. Benekos, M.\,Potekhin and B.\,Viren}
\begin{document}
\maketitle
\begin{abstract}
\noindent Data Quality Monitoring (DQM) and \PP necessary for its implementation
are important work areas in the Single-Phase \pd (CERN experiment \expname).
This note describes design parameters of the \textit{\pd Prompt Processing System} (\PPS).
We start with a brief summary of the data characteristics
and data handling patterns as documented in DocDB\,1086 \cite{docdb1086}, DocDB\ and
1212 \cite{docdb1212}. We present the basic requirements as per DocDB 1811 \cite{docdb1811}
which differ in many respects from those of Workload Management
Systems utilized on the Grid (for example, a job may be removed from the system entirely
if the overall workflow is stalling due to limtied CPU, job priorities may depend on
the time a particular piece of data was produced etc). A design is proposed with a focus
on low latency, high throughput and automation which satisfies these requirements. It employs a pilot-based mechanism for
computational payload deployment and a dynamically prioritized job queue to ensure timely
delivery of time-critical data.
\end{abstract}
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
\section{Overview}
\subsection{Terminology}
Some of the terms, acronyms and abbreviations used in this document are explained in the glossary in Appendix \ref{sec:glossary}.
\subsection{Motivation}
The LArTPC in \expname is a very large and complex device instrumented with highly sensitive electronics.
Experience with previously built smaller prototypes as well as current experiments utilizing this technology
(e.g. $\mu$BooNE) has demonstrated the importance of Data Quality Management (DQM) at all stages of
the detector integration and data taking. In order to accomplish this, the \underline{p}rotoDUNE
\underline{P}rompt \underline{P}rocessing \underline{S}ystem (\textbf{p3s}) will be put in place.
\subsection{\expname Data Scenarios and Data Handling}
\label{sec:rawdata}
Quantitative information about scenarios for data taking in the Single-Phase \pd is collected and maintained in \cite{docdb1086}.
The raw data rate produced by the LArTPC is assumed tl be $\sim$1.4\,GB/s after lossless compression. This is based on
the assumption that a large number of Cosmic Ray triggers will be produced in addition to beam triggers, leading
to a virtually uniform trigger rate in and out of spill. In order to size up
the system and account for vsrious contingencies rates up to 3\,GB/s are under consideration.
Rates and volume of data coming from other detector components will be much smaller
and won't effect the overall scale of the data handling and processing system.
Outline of the design of the raw data handling system is documented in \cite{docdb1212} and other
prior \pd documentation. According to it, the data is transmitted from the online buffer to the
CERN EOS \cite{eos} via a 20\,Gbps full-duplex network connection.
The EOS system serves as the hub for the \pd raw data distribution from which
it gets committed to the tape archive at CERN and also transmitted to FNAL and potentially
other US and international locations. It will also be one of the principal staging areas for \PP.
All or most links in the data transmission chain will be based on F-FTS \cite{fts} will allows us
to reuse an existing and well supported technology which has been determined to satisfy
the \pd requirements.
\subsection{Data Streams}
\label{sec:streams}
For reasons that are outside of the scope of this document the experimental raw data in \expname
will come as three distinct streams at the time they are captured:
\begin{itemize}
\item \textbf{LArTPC} The TPC stream which is dominant in terms of rate and volume
\item \textbf{BI} Beam Instrumentation data from Cherenkov, TOF and other systems
\item \textbf{CRT} Cosmic Ray Tagger data
\end{itemize}
\noindent All three streams will follow a similar path in that they will be transmitted from their respective
online buffers to mass storage (which in this case is EOS, see \ref{sec:staging}). An important differentiating
feature of the \textbf{BI} data is that it wil be bundled in pieces potentially as large as coveing
a SPS cycle which is in tens of seconds. It can be assumed that the data in all three streams will be tagged
with time stamps derived from the same source which is crucial for matching of these date in every step
of processing.
\subsection{Goals and Time Scale}
\label{sec:outline}
According to \cite{docdb1811} the data needs to undergo prompt processing on the scale
of \textit{tens of minutes} (or better) from the time it was taken, with the benchmark number of
10\,min often being quoted for reference purposes. This is motivated by the need to have
\textit{actionable} DQM information in time for operators to take action and prevent loss
of useable data and/or valuable beam time.
The goal is to provide a more in-depth data QA and assessment of the detector's health and operating
conditions than is afforded by functionality implemented as a part of the generic monitoring running as
artDAQ processes on the DAQ machines. Most calculations done in \PPS
will result in some sort of a ``visual product'', for example a histogram, plot, event display
or a summary table in order to provide information to the experiment operators in a timely manner.
Since the volume of visual and numeric informaiton pertinent to DQM may be large and hard
to quickly comprehend, there will be automated alerts to the operators when certain parameters
are outside of their nominal range.
\subsection{The Scope of the Prompt Processing}
\label{sec:scope}
\subsubsection{TPC Data Processing}
\label{sec:categories}
This is a compressed summary of information presented in \cite{docdb1811}.
The categories are as follows:
\begin{description}
\item[DAQ (no data decompression)] A summary of DAQ-level data such as summaries of data
rates or summaries of any metadata, status codes the provided by the DAQ etc.
Parts of this functionality will exist within the artDAQ monitor processes, perhaps with
additional detail provided by the prompt processing system.
\item[ADC (requires data decompression)] A summary of ADC-level data e.g. mean/RMS
values at channel level and as a statistics over various groupings and level of detail
(ASIC, FEMB, RCE, APA etc).
\item[FFT] A summary of the ADC-level data in frequency space. It requires running a discrete Fourier
transform (FFT) on channel waveforms. This largely provides measures of noise and its
evolution.
\item[SIG] A summary of the data \textit{after signal processing}.
The processing is in both time domain and in frequency domain (so uses the output of FFT).
It includes items such as
\begin{itemize}
\item ``stuck code'' mitigation
\item coherent noise removal
\item noise subtraction and filtering
\item deconvolution of the response function
\item calculation of signal correlations for diagnostic purposes
\end{itemize}
\item[RECO] Results from running some type of reconstruction (perhaps greatly simplified).
It may, for example, provide a coarse count of straight muon track candidates.
\end{description}
\noindent It is possible that \textbf{ADC} and \textbf{FFT} stages will be implemented as a single processing
block since \textbf{FFT} will require uncompressed data in any case. Likewise, some other stages in the processing
chain and visualization may be conflated in case writing out intermediate (transient) data creates an I/O bottleneck,
while still preserving modularity to the largest extent possible. Such design decisions will be made at a later time.
Note that the categories listed above can be thought of as stages of processing (for the most part), where
the output of a stage serves as input to the next one. This can be conceptualized as a DAG (see \ref{sec:dag}).
While defining the scope of \PP one needs to balance the computational costs and other costs of implementing
a particular part of the chain vs the estimated net effect on overall DQM. For example, producing correlation
plots of ``channel vs time'' before and after the \textbf{SIG} stage produces 2D projections of the raw event
display (before any reconstruction) and based on prior experience constitutes a powerful tool for both
visual and quantitative monitoring of the detector performance.
The \textbf{RECO} stage may provide information on the detector performance which is more complete
and sensitive to potential anomalies than signal processing alone. However, doing full and high-quality
event reconstruction would require substantial resources to implement which will reduce resource availability
for other parts of \PP. It also requires a sophisticated orchestration of data flow which would include
merging with \textbf{BI} and \textbf{CRT} streams and at least rudimentary calibrations.
For these reasons, reconstruction will be limited to high-speed clustering and hit detection algorithms.
One useful applicaiton of this type of reconstruction would be correlating clusters and hits with
the beam profile.
\subsubsection{Sampling (pre-scaling) the TPC Data in \PPS}
\label{sec:downsampling}
The categories of processing as outlined above may have drastically different resource requirements.
%i.e. the ability to select a fractional sample of the data stream as is progresses through the chain, at every stage.
As a very rough estimate, if 100 cores are allocated to FFT calculation this may be done for approximately 10\% of the events
in streaming mode. Then, depending on the type of deconvolution being employed (1D vs 2D) this step
may require an order of magnitude more CPU to complete. To stay within a reasonable footprint,
it may be necessary to put only a fraction of the output of the FFT stage through deconvolution stage
(note that the FFT output is valuable in and by itself since it allows to ascertain noise conditions in individual channels
on a continuous basis). Event reconstruction may require even more CPU depending on the desired level of precision and/or
complexity and so it may also be necessary to scale down the number of events at this stage
(while still aiming for quick turnaround). Consideration is therefore given to ``downsampling'' or ``prescaling''
of data as it progresses in the prompt processing chain, whereby a certain predetermined fraction of the data
produced in a prompt processing stage is selected for processing in the next stage.
This can be achieved by two methods or some combination thereof:
\begin{itemize}
\item Selecting a fraction of files produced by the previous stage
\item Reading only a fraction of events from a given input file
\end{itemize}
\noindent Implementing the latter mode of data access is very important since it reduces
the volume of the data being copied and/or read by running processes by avoiding
operations on whole files.
Parameters governing this fractional data selection must be dynamically configurable.
\subsubsection{Visualization}
\label{sec:viz_intro}
The \textit{Visualization} category of processing (\textbf{Vis})
may take data output from any of the above listed stages in
order to efficiently present it to the end-user.
% This stage will need to
%have a sampling fraction based on both the processing requirements
%(e.g. processing time) and based on how fast a human can absorb and understand
%the information as it undergoes updates.
It may include items such as:
\begin{itemize}
\item Histograms of statistical quantities and strip charts showing their history.
\item Various statistics dynamically updated over some some fixed time window.
\item 2D displays of underlying values such as spectrograms of the \textbf{FFT}
output (vs channel), or time vs channel number using output from \textbf{ADC} and \textbf{SIG}.
\end{itemize}
\noindent Vizual infromation produced by this type of processing will be presented to the users
via an interactive Wev interface and where necessary shall be preserved for record keeping,
troubleshooting and other applications. It should be noted that certain types of visualizaion
will result in massive amounts of output data so adequate design and planning for that stage
must be in place.
\subsubsection{Other Applications of the TPC Data Processing Components}
Results of the intermediate stages of Data Processing may need to be preserved, for
example the data in graphic form for audit/run documentation purposes.
Also, there could be a potential for large processing efficiency
gains to be made if the entire data can be run through the \textbf{SIG} processing followed by ZS.
Certain portion of log files produced during prompt processing may contain information useful
for future reference and debugging, so provisions will be made to capture these data.
\subsubsection{Other Data Processing}
\label{sec:otherdata}
Prompt processing will also need to include the data produced by the \textbf{BI} and \textbf{CRT}
systems (see \ref{sec:streams}). At a minimum this will include collection and plotting of the
ADC spectra which will allow the operator to ascertain the beam status and performance
and make sure the trigger is configured correctly (for both the beam and the cosmic tay trigger).
In addition, there can be a stage that reconstructs the beam particle trajectories based on
position-sensitive detectors which are part of \textbf{BI} or at least plots the beam profile
at the points of measurement.
\section{Prompt Processing as a DAG}
\label{sec:dag}
\subsection{TPC data}
Data and processes involved in prompt processing as outlined in \ref{sec:outline} (for LArTPC) can be
modeled together as a DAG. This is illustrated in Fig.\ref{fig:dag1}
in the form of a high-level conceptual diagram.
\begin{figure}[tbh]
\centering
\includegraphics[width=1.0\textwidth]{figures/prompt_dag_2.pdf}
\caption{An example of DAG representing prompt processing in \expname.}
\label{fig:dag1}
\end{figure}
\noindent In this diagram, oval shapes represent data, rectangular blocks correspond to
elements of workload responsible for processing the data and diamonds stand for processes
which create``visual products'' (see \ref{sec:outline} and \ref{sec:viz_intro}).
The \textbf{ADC} and \textbf{FFT} components are conflated here for the following reason - while being functionally
different, both depend on data being decompressed. Since it makes sense to do decompression only once, it may
be optimal to combine these two steps in a single unit of execution.
\subsection{BI and CRT data}
Processing for the \textbf{BI} and \textbf{CRT} data streams (see \ref{sec:streams}) will be different but will
follow the same general logic. There will be initial simple steps involving unpacking of the data and collecting
histograms, and later stages may include particle tracking and beam profile measurement, and beam
composition. It is expected therefore that this processing will likewise be modeled as a DAG.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
\section{\PPS Design Parameters}
\label{sec:parameters}
\subsection{Computing Resources}
\label{resources}
The following computing resources are being considered for use in \PPS:
\begin{itemize}
\item The ``neut'' cluster \cite{neut} at CERN (HTCondor). See Appendix \ref{appendix:neut} for
more detail.
\item The lxbatch facility \cite{lxbatch} at CERN Tier-0 (LSF-based).
\item Potentially, resources at a few US institutions such as FNAL and BNL. This option is facilitated
by utilizing EOS interfaces such as XRootD. Since the latency for these resources will be much larger
than for local ones at CERN, this will only be appropriate for the portions of the workflow which
are less time critical than others (e.g. visualization and histogramming for long-term record keeping).
\end{itemize}
\noindent Optimally, all available resources will be managed transparently in one unified system and
the workload distribution will be dynamically adjustable.
\subsection{Staging the Data}
\label{sec:staging}
CERN EOS appears to be the best suited platform from which to
serve the data for prompt processing for the following reasons:
\begin{itemize}
\item The online buffer is likely to operate at a significant data rate (see above) and additional I/O load on the
buffer must be taken into consideration in order to guarantee stability of
operation for the online buffer and DAQ in general.
\item It can be be expected that the data arrives to EOS rather quickly after having been captured in the online buffer
(e.g.$\sim$1\,min) so the latency due to this transfer is acceptable.
\item EOS has variety of interfaces including \xrd \cite{xrootd} which simplifies access from various types of
systems and locations (both inside and outside the CERN perimeter). Leveraging this capability of EOS will
allow to avoid implementaion of a special data transmission handling chain for prompt processing
(such as explicit file copy to the Worker Node and back).
\end{itemize}
\noindent Advantages of EOS nonwithstanding, one must plan for the continency of it experiencing
an outage due to network or any other kind of a problem. For that reason a fallback staging area
must exist to allow for at least partial \PP to continue in the vicinity of the detector in case of
an outage.
\subsection{Location of the computing resources}
The CERN Tier-0 computing facility is located in the West Area. As to the ``neut'' cluster (\ref{resources}),
there are two practical possibilities for its placement:
\begin{itemize}
\item In the CERN West Area (main campus)
\item In or close to the extension of the North Area experimental hall (EHN1) where \expname is located
\end{itemize}
\noindent Each of these locations has advantages and disadvantages. If the first scenario,
there is no additional I/O load on the online buffer (as noted in \ref{sec:staging}) and network proximity
to EOS is a plus. At the same time, there are two potential issues with this option:
\begin{itemize}
\item In case of outage of either the network link to \expname or EOS itself
the prompt processing would unaboidably stall even as the experiment may still be taking data and recording
it in its online buffer.
\item Data transfer from EHN1 to EOS takes a certain amount of time which may be relatively small (minutes, as defined
in a typical F-FTS configuration and used in practice)
but not negligible compared to the overall target latency of \PPS.
\end{itemize}
\noindent \textbf{Placement of the computing resources for prompt processing in the detector
vicinity with access to the online buffer solves both problems}. If the data in the online buffer are accessed locally
this will create an additional I/O load on the local storage system which, however, is
not expected to be large and in general at a few percent level of total.
There is also an option for \PPS
to read data from EOS since the bandwidth in the direction opposite to the main flow of data
is essentially unallocated. In case of a network outage p3s can be reconfigured to read
data from the online buffer, subject to caveats as explained above.
Final decision regarding placement of the computing resources will be made at a later time.
It is possible that some of the initial stages of the processing chain will located near the
experiment, while more advanced resontruction takes place at CERN Tier-0 facilities or even at FNAL.
The situation is complicated if it is decided that \PPS needs to handle all three data streams
(\ref{sec:streams}) in some stages of processing: LArTPC, \textbf{BI} and \textbf{CRT}.
Ways to ensure these data are available for \PP are being investigated at the time of writing.
\subsection{Prioritization of Processing Steps and Timeout Strategy}
\label{sec:priority}
Quick turnaround is the key for the prompt processing to be useful. For each processing category
to be complete it must include the visualization component in order to make the results available
to the experts. What fraction of the total stream of events ends up being processed is relatvely less
important compared to timely completion of all steps (NB. with downsampling of the data, see
\ref{sec:downsampling}).
A use case may help to illustrate this. Let's assume a predetermined fraction of the events needs to
be visualized as described in \ref{sec:scope}.
% (perhaps with simplified algorithm aimed at identifying cosmic muon candidates).
This information needs to be made available to the operators as quickly as possbile in order to ascertain the performance
of the detector at a level of confidence much higher than that provided by ADC histograms.
Consider the case where the \textbf{FFT} stage (see \ref{sec:categories})
is running successfully but is consuming a fraction of the computing resource so large that
the \textbf{SIG} stage is starved and takes a longer time to complete, making
it less relevant and less useful. This situation can be prevented by prioritizing each step higher
than the preceding one (again, with downsampling as per \ref{sec:downsampling}). In many
cases having a ``visual product'' to present the data to the operator is crucial (e.g. histograms,
event displays etc) so corresponding processes need to have sufficiently high priority.
It is therefore necessary to provide a mechanism for flexible prioritization of the different
classes of jobs running in the prompt processing system.
There can be various reasons why a job may take than is practical, thus consuming valuable resources
and creating a bottleneck -- from bugs in software to a type of event which is genuinly difficult to analyze.
These jobs need to be triaged, diagnosed and documented separately from the main workflow and perhaps
using a different computing resource. For this reason, maximum allowed execution time (effectively a timeout)
will need to be set for each processing category in prompt processing chain in order to prevent stalling of the system.
\subsection{Workflow: Automation And State}
Prompt processing represents a use case quite different from managed production or user analysis, and is
in essence procesing data in streaming mode. It is rather obvious that in this situation manual job submission won't scale to the
needs of the experiment, and it will be necessary to automate the prompt processing workflow.
Automation can be achieved in number of different ways. In the simples case, processes belonging to a particular stage
of processing can be waiting for files produced by the previous stage and matching a particular name pattern
(in a fashion similar to the ``dropbox'' mechanism). This design eschews the use of a database and it is effectively
the file system which keeps the state of the workflow. For example, jobs performing calculations in the \textbf{SIG}
stage could be waiting for files containing the output of the \textbf{FFT} stage. These scheme is conceptually simple but has a number of
disadvantages, such as
\begin{itemize}
\item Matching multiple jobs to multiple files is not trivial and there may be race conditions etc
\item Detection of error status would require additional scripting
\item Likewise, generating a view of workload distribution across the processing stages is possible but requires
scripting
\item Changing characteristics of the workload is complicated
\item Bookkeeping is difficult
\end{itemize}
\noindent Problems such as listed above can be alleviated if the state of the workflow is kept in a database.
An additional and obvious advantage of this approach is that an efficient and responsive Web interface to
the system can be created with minimal effort.
\subsection{Delivery of DQM results, Monitoring, Interfaces and Integration}
The requirements for monitoring the results of the \PP are listed in \cite{docdb1811}.
We propose modular Web interface which can be configured to be used by itself, or integrated
with other elements of the \expname monitoring. A useful functionality that may be included in DQM
is to produce an automatic alarm when some of the benchmark parameters of the data are outside
of predetermined limits.
% The issues of the design of the corresponding web service and integration with other monitoring
% components (such as those in DAQ) will be addressed at a later time.
Depending on the design, the \PP system may need to interface the data handling system (i.e.~F-FTS).
A simple example would be prevention of the data being purged from EOS while it's still needed for processing.
It is possible that an additional DAQ interface will be required to ensure that raw data is passed to the
\PP system.
Monitoring of the status of the \PP facility (such as a cluster located in EHN1) will be done
with tools like ARDA \cite{arda}.
\section{The Proposed Design}
\label{sec:design}
\subsection{Summary of the Assumptions}
Most important of the design parameters itemized in Sec.\ref{sec:parameters} are
\begin{itemize}
\item An assumption that \textbf{a dedicated data transfer system won't be needed} for the
operation of prompt processing since the data will be read from and written to CERN EOS.
\item The workflow and its elements can be modeled as a DAG (Sec.\ref{sec:dag}).
\item Payloads within the prompt processing workflow will need to be prioritized to ensure
the quickest turnaround time possible given the resources available.
\item Timeouts will be enforced in each processing stage to ensure reliable throughput. Jobs
failing due to the timeout conditions will be logged and triaged separately.
\item A \textbf{database} will be used to manage the workflow and marshal data to jobs, to keep and
account for the states of jobs in various stages of processing (including errors and failures),
and to monitor the progress of execution.
\end{itemize}
\subsection{Implementing the workflow}
In order to implement DAG as a functioning workflow, the following technique may be applied:
each stage in the processing upon reaching successful completion will create entries in the database for its
descendants. These entries will contain references to input data consisent with downsampling
strategy presented in \ref{sec:downsampling}.
\subsection{Just-in-Time Submission}
\label{sec:justintime}
Latencies inherent in any batch system make it problematic to use common batch submission directly
since quick turnaround and throughput are required. A well known method to counter this is the use
of pilot jobs, submitted in advance and securing batch slots while optionally validating the software
environment present on the Worker Node, network connectivity and other components. The
payloads can be then distributed to primed and validated WNs immediately as the input data becomes
available, based on the desired prioritization of jobs. Alternatively, instead of running pilots it is possible to
seed the available compute element with a few classes of jobs according to \ref{sec:categories} and
let them wait till the relevant input arrives, or they time out -- this will help guarantee that
CPU cycles are not wasted by idling jobs (same actually applies to pilots). Either way, the key
here is \textit{``preemptive''} submission of pilots or jobs such that the latency of the start of job
execution is minimal. The pilot vs job decision will be made at a later time. Let us note for now that
the pilot method offers considerably more flexibility in adding additional types of jobs (payloads)
to the processing chain and modifying the distribution of resources across these different types.
\subsection{Job Queues}
The number of jobs assigned to each category listed in \ref{sec:categories} will be optimized based
on the eventual design of the processing software. For now the working assumption is that it will
be configurable so the exact number does not need to be set at this time.
Each job is characterized as its type (category), state, and references to its input and output data
(e.g.file paths for input and output). Jobs belonging to different categories are chained (see \ref{sec:categories} and \ref{sec:dag}). This is
visualized in the digram presented in Fig.\ref{fig:queues1}, which depicts three queues each containing
jobs of a specific type.
\begin{figure}[tbh]
\centering
\includegraphics[width=1.1\textwidth]{figures/prompt_queues_1.pdf}
\caption{Categories of prompt processing jobs grouped in queues. In this example, the depths of queues are different for different job types.}
\label{fig:queues1}
\end{figure}
For example, if we consider the DAG depicted in Fig.\ref{fig:dag1}, then ``Type 1''
is \textbf{ADC/FFT}, ``Type 2'' is \textbf{SIG}, and ``Type 3'' is \textbf{Vis}. For illustration purposes,
the queues in this diagram have different depths $(i,j,k)$ -- which may indeed happen in practice.
Jobs consume data produced in the previous stage of prompt processing. For that reason, they
will need to be defined dynamically once a new pirce of input becomes available. For example,
the attribute $I_{21}$ will be assigned the value of $O_{11}$ once the corresponding $Job_1$ of Type 1
is complete. Generation of job definition is a crucial part of overall automation of prompt processing.
Keeping fixed numbers of jobs of each type running in the system at all times (with timeouts of necessary)
is indeed one design option but it may not be optimal since execution times of any job will fluctuate and there
will likely be idle jobs in some stages waiting for input.
As an alternative, the queues depicted in Fig.\ref{fig:queues1} can be conflated into a single queue
with priorities assigned to each job along the lines explained in \ref{sec:priority}. This is shown in
Fig.\ref{fig:queues2}.
\begin{figure}[tbh]
\centering
\includegraphics[width=0.85\textwidth]{figures/prompt_queues_2.pdf}
\caption{The unified queue of prompt processing jobs.}
\label{fig:queues2}
\end{figure}
Jobs now have additional attributes, such as the type and priority.
The depth of such queue will be set according to the available number of Worker Nodes assigned for
prompt processing (e.g. it can be a certain portion of the ``neut'' cluster and a certain number of batch slots on
lxbatch). There are several ways to ensure that higher priority jobs are placed in the queue (for example by
dropping jobs of lower priority still in the ``wait'' condition) and the exact method will be chosen at a later time.
Let us assume that the pilot method is chosen for ``just-in-time'' job submission (see \ref{sec:justintime}). In this
case, the following is a possbile scenario of how this unified queue will operate, assuming it is implemented
as a database table:
\begin{itemize}
\item A fixed number of pilot jobs (wrapper scripts) are submitted to the available batch facilities (which can include both
lxbatch and ``neut'' but potentially also other sites such as FNAL).
\item Because of possible timeouts, completions and error conditions encountered by the pilots their number will naturally
diminish over time so it must be replenished in order to remain at an appropriate level. This can be achieved by reusing
existing software created for this purpose in other experiments (e.g. the AutoPyFactory from ATLAS).
\item Data arriving from DAQ and its online buffer to EOS triggers the mechanism which makes entries in the queue for the
first stage of processing (e.g. ADC/FFT). Entries for this first stage are arranged in the LIFO manner. In other words, preference
in the queue order is given to the most recent data detected, in order to ensure that the output contains the freshest
data to help the processing remain prompt and current. Note that this is different from conventional production or analysis --
first of all, processing is done on a fraction of data, and second, the data is prioritized by its timestamp.
\item A broker process is listening to HTTP messages from the pilots signaling that they are ready to accept their payload.
\item A table sweep selects the job entries of the highest priority and replies to the pilot's HTTP request with information
sufficient for locating and initiating the correct payload (this can be a URL point to a script etc).
\item Upon the payload assignment, the pilot notifies the broker about its status and the state of the job (e.g.
that the execution started, or some kind of error was encountered).
\item The job record is then updated based on this information. For example, the state of the job can undergo the following
transitions during the process described above: from ``waiting'' state to ``being assigned'' state to ``running'' state.
\item Upon completion, the pilot send notification to the broker, the queue is updated with the state information (the job is ``finished'') and
a new entry is made in the queue for the next stage of processing, with input defined as the data coming from the stage that just completed.
\item This process is iterative by nature, i.e.~the following stages operate in the same manner as links in the processing chain.
\item The ``visual products'' created throughout the processing chain are exported to a Web service to create the presentation
layer for the end user.
\item Parts of the output from all stages which are deemed to be of value for record keeping and for additional
diagnostics are committed to long term storage (possibly utilizing F-FTS and SAM). This functionality can be included
in the general cleanup logic -- most of the data in the chain won't need to be persisted and can be deleted once
execution of the corresponding DAG completes, while a few data elements will be sent to storage. This can be
implemented as a periodic table sweep and application of rules which will be configurable.
\end{itemize}
\section{Summary}
A design of the \pd Prompt Processing system (\PPS) for \expname is proposed. Variations are possible
while the version which currently appears optimal is outlined below:
\begin{enumerate}
\item The pilot job mechanism is proposed to minimize latencies inherent in the Grid and general batch system job submission.
A pilot can be thought of as a payload-agnostic wrapper script which includes the functionality necessary to communicate
with the Web service distributing payloads and keeping the state of the workflow.
\item Arrival of raw data from DAQ triggers generation of entries in the job queue implemented as a database table.
\item Different payloads which are parts of the Prompt Processing system will be assigned different priorities in order to ensure
adequate throughput and timely delivery of the results to the end user. The priorities may be dynamically changing based on
the state of the set of DAGs currently being processed. For example, more resources may be allocated to longer DAGs which already
completed a few stages, in order to reach results more quickly. There may be a few ``hot slots'' left idling to be able to quickly
start execution of the highest priority jobs (such as the final reconstruction stage). The exact mechanism will be determined
at a later time.
\item Payloads are matched to pilots according to priorities.
\item The pilots initiate the execution on the WNs where they run.
\item The state of each job is updated in the database based on messages received from the pilot.
\item The database also serves as the basis for the monitoring monitoring and debugging.
\item Completion of each job results in adding another entry to the queue with priority set according to the processing stage as
explained in \ref{sec:priority} (unless this is a leaf in the DAG describing the workflow).
\item Visual products and other data required by the operators of the experiment are made available via a Web service.
A predetermined fraction of the data produced by various stages of the prompt processing is committed to long storage
for record keeping.
\end{enumerate}
\newpage
\appendix
\section{Glossary}
\label{sec:glossary}
\begin{description}
\item [artDAQ] The online software platform based on the \textit{art} framework and utilized in \pd
\item [CRT] Cosmic Ray Tagger
\item [DAG] Directed Acyclic Graph
\item [SQM] Data Quality Management
\item [EHN1] Extension of the North Area Experimental Hall whre \expname will be located
\item [EOS] Disk-based mass storage at CERN
\item [BI] Beam Instrumentation
\item [F-FTS] Fermi File Transfer System
\item [``neut''] The \textit{CERN Neutrino Platform} Computing Farm
\item [p3s] \pd Prompt Processing System
\item [RCE] Reconfigurable Cluster Element (element of DAQ)
\item [SAM] Metadata and file catalog system deployed at FNAL
\end{description}
\section{The Neutrino Platform Computing Farm}
\label{appendix:neut}
\subsection{Purpose}
The Neutrino Platform Computing Farm (sometimes referred to as the \textit{``neut cluster''} \cite{neut})
serves as an imposrtant and flexible computing resource that can be utilized for a variety of tasks including
simulation, reconstruction and software prototyping. It is proposed to utilize the cluster (or a substantial part of it) as the
core element of the Prompt Processing in \pd.
\subsection{Hardware}
The cluster comprises $\sim$300 DELL PowerEdge servers each containing directly attached HDD storage.
The servers are interconnected via a network switch and
also share a common NAS of 32\,TB capacity which simplifies software provisioning and
sharing of data. There are plans to add more nodes to the cluster as well as to increase
its storage.
The cluster is located within the CERN network perimeter and has network access to central
storage (e.g. EOS and CASTOR) and other services at CERN.
\subsection{Batch System and Monitoring}
The cluster batch system of the \textit{``neut cluster''} is based on HTCondor\cite{htcondor}.
There are plans to utilize ARDA \cite{arda} Dashboard for monitoring the cluster.
It is built as a Web service utilizing technologies necessary to achieve a high level of user interactivity.
\subsection{Software}
A variety of software is available on the cluster, including infrastructure elements such as DB server and
Web server software, Web framework toolkit (Django), as well as simulation, reconstruction and analysis
software such as ROOT \cite{root} and LArSoft \cite{larsoft}. Also installed is the FNAL middeware developed by FIFE
\cite{fife}.
Initial setup of XRootD\cite{xrootd} has been performed, which will allow to quickly implement storage federation
on the cluster if needed.
\clearpage
\begin{thebibliography}{1}
\bibitem{docdb1086}
{DUNE DocDB 1086: \textit{ protoDUNE/SP data scenarios with full stream (spreadsheet)}}\\
\url{http://docs.dunescience.org:8080/cgi-bin/ShowDocument?docid=1086}
\bibitem{docdb1212}
{DUNE DocDB 1212: \textit{Design of the Data Management System for the protoDUNE Experiment}}\\
\url{http://docs.dunescience.org:8080/cgi-bin/ShowDocument?docid=1212}
\bibitem{docdb1811}
{DUNE DocDB 1811: \textit{Prompt Processing System Requirements for the Single-Phase protoDUNE}}\\
\url{http://docs.dunescience.org:8080/cgi-bin/ShowDocument?docid=1811}
\bibitem{eos}
{The CERN Exabyte Scale Storage}\\
\url{http://information-technology.web.cern.ch/services/eos-service}
\bibitem{fts}
{The Fermilab File Transfer System}\\
\url{http://cd-docdb.fnal.gov/cgi-bin/RetrieveFile?docid=5412&filename=datamanagement-changeprocedures.pdf&version=1}
\bibitem{xrootd}
{XRootD}\\
\url{http://www.xrootd.org}
\bibitem{neut}
{Neutrino Computing Cluster at CERN}\\
\url{https://twiki.cern.ch/twiki/bin/view/CENF/NeutrinoClusterCERN}
\bibitem{lxbatch}
{The CERN batch computing service}\\
\url{http://information-technology.web.cern.ch/services/batch}
\bibitem{htcondor}
{HTCondor}
\url{https://research.cs.wisc.edu/htcondor/}
\bibitem{arda}
{The CERN Dashboard project}\\
CERN-IT-NOTE-2007-048) and \url{http://dashboard.cern.ch/}
\bibitem{root}
{ROOT}\\
\url{https://root.cern.ch/}
\bibitem{larsoft}
{The Liquid Argon Software (LArSoft) Collaboration}\\
\url{http://larsoft.org/}
\bibitem{fife}
{Fabric for Intensity Frontier Experiments}\\
\url{http://fife.fnal.gov/}
\end{thebibliography}
\end{document}