-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcommands.lua
1378 lines (1252 loc) · 78.6 KB
/
commands.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
--[[
This file contains the implementations and argument specifications of the main commands the program offers,
using the modules `benmet.features` and `benmet.util`.
These commands are returned as a table in the following form 'benmet.main' expects:
{
[<command-name>] = {
any-args_name = <name of "any args" (non-options) to display in help text>,
any_args_min = <minimum number of "any args" (non-options), nil defaults to 0>,
any_args_max = <maximum number of "any args" (non-options), nil defaults to unlimited,
summary = <a _short descriptive text to display in the program-wide help text>,
description = <a _longer descriptive text to display in the command-specific help text>,
implementation = <the command implementation, as a function value to be called as function(<benmet.features module>, <benmet.util module>, <arguments>, <options>) that may return an integer value to determine the program return code>,
allow_anything_as_args_verbatim = <flag to ignore '--' (only exception being '--help') and pass everything as an "any arg" (nothing as an option); nil defaults to false>,
options = <table of available command options (prefixed with '--') of the form: {
[<option-name>] = {
description = <description to display in help text>,
required = <if the command requires this option to run; nil defaults to false>,
is_flag = <whether the option does not take an argument (via next program argument or '='-suffix) and is instead a boolean that signals whether the option was present; incompatible with forward_as_arg, nil defaults to false>,
shorthand_for = <list of other options (must all be flags) this flag activates>,
forward_as_arg = <boolean that indicates this option is supposed to be ignored and passed through as a normal arg, as a sort of section marker f.e. "--sources a b --targets b c"; incompatible with is_flag, nil defaults to false>,
allow_multiple = <boolean or number (limit) that indicates whether this option can be supplied multiple times (only supported for normal, non-flag, non-forwarded-as-args options)>,
},
}; nil defaults to no options>,
benmet_util_skip_library_imports = <boolean that indicates whether benmet.util should _not try importing external dependencies (pure_lua_SHA and lunajson); nil defaults to false>,
},
}
--]]
local relative_path_prefix = _G.benmet_relative_path_prefix
-- declaration of program argument structure
-- common options
local option_with_run_id = {description = "override the auto-generated 'RUN-id' parameter"}
local option_param_file = {description = "use the given parameter file as initial inputs to calculate step parameters"} --TODO (maybe): support multiple?
local option_pipeline_default_params = {is_flag = true, description = "add an instance of all-defaulted parameters, as if supplying an empty parameter file"}
local option_pipeline_all_params = {is_flag = true, description = "select all pipelines regardless of parameters"}
local option_pipeline_params_from_stdin = {is_flag = true, description = "read standard input as an additional parameter file"}
local option_pipeline_target = {description = "the target step of the pipeline(s)"} --TODO(potentially?): could be made optional if we had a default target step in steps/index.txt
local option_pipeline_all_targets = {is_flag = true, description = "select all pipelines regardless of targets"}
local option_pipeline_all = {is_flag = true, shorthand_for = {'all-targets', 'all-params'}, description = "select all pipelines"}
local option_pipeline_accept_param = {description = "accept an unused parameter in pipeline parameterizations", allow_multiple = true}
local option_pipeline_ignore_param = {description = "remove a parameter from pipeline parameterizations", allow_multiple = true}
local option_pipeline_accept_unrecognized_params = {description = "accept unrecognized parameters in pipeline parameterizations", is_flag = true}
local option_pipeline_ignore_unrecognized_params = {description = "remove unrecognized parameters from pipeline parameterizations", is_flag = true}
-- common command structure
local pipeline_operation_structure_options = {
['target'] = option_pipeline_target,
['all-targets'] = option_pipeline_all_targets,
['default-params'] = option_pipeline_default_params,
['params-from-stdin'] = option_pipeline_params_from_stdin,
['all-params'] = option_pipeline_all_params,
['all'] = option_pipeline_all,
['ignore-param'] = option_pipeline_ignore_param,
['accept-param'] = option_pipeline_accept_param,
['ignore-unrecognized-params'] = option_pipeline_ignore_unrecognized_params,
['accept-unrecognized-params'] = option_pipeline_accept_unrecognized_params,
}
local pipeline_operation_structure_base_options_with_error_state_handling = {
['target'] = option_pipeline_target,
['all-targets'] = option_pipeline_all_targets,
['default-params'] = option_pipeline_default_params,
['params-from-stdin'] = option_pipeline_params_from_stdin,
['all-params'] = option_pipeline_all_params,
['all'] = option_pipeline_all,
['include-errors'] = {is_flag = true, description = "also select pipelines with error status"},
['only-errors'] = {is_flag = true, description = "only select pipelines with error status"},
['include-continuable'] = {is_flag = true, description = "also select pipelines with status 'continuable'"},
['only-continuable'] = {is_flag = true, description = "only select pipelines with status 'continuable'"},
['ignore-param'] = option_pipeline_ignore_param,
['accept-param'] = option_pipeline_accept_param,
['ignore-unrecognized-params'] = option_pipeline_ignore_unrecognized_params,
['accept-unrecognized-params'] = option_pipeline_accept_unrecognized_params,
}
local pipeline_operation_structure_options_with_error_state_handling_allowing_startable_selection = {
['target'] = option_pipeline_target,
['all-targets'] = option_pipeline_all_targets,
['default-params'] = option_pipeline_default_params,
['params-from-stdin'] = option_pipeline_params_from_stdin,
['all-params'] = option_pipeline_all_params,
['all'] = option_pipeline_all,
['include-errors'] = {is_flag = true, description = "also select pipelines with error status"},
['only-errors'] = {is_flag = true, description = "only select pipelines with error status"},
['include-continuable'] = {is_flag = true, description = "also select pipelines with status 'continuable'"},
['only-continuable'] = {is_flag = true, description = "only select pipelines with status 'continuable'"},
['include-startable'] = {is_flag = true, description = "also select pipelines with status 'startable'"},
['only-startable'] = {is_flag = true, description = "only select pipelines with status 'startable'"},
['ignore-param'] = option_pipeline_ignore_param,
['accept-param'] = option_pipeline_accept_param,
['ignore-unrecognized-params'] = option_pipeline_ignore_unrecognized_params,
['accept-unrecognized-params'] = option_pipeline_accept_unrecognized_params,
}
local dependencies_txt_description = "This file contains lines of the syntax '<dependers>: <dependees>', where both sides are space-separated, possibly empty lists of step names. Both dependers and dependees may appear in multiple lines. All steps must appear as dependers at least once."
local pipelines_param_construction_note = "Constructs all parameter combinations within each supplied parameter file (JSON arrays of object entries, also multi-value parameter files of legacy line-based format are supported)."
local pipelines_param_rejection_note = "By default, parameter combinations are rejected if they contain parameters not consumed by any steps in the target step's dependency chain. This can be configured via options '--(ignore|accept)-param' and '--(ignore|accept)-unrecognized-params'."
-- common pipeline command routines
-- results in no iterations when used in for-in loop
local empty_iterator__next = function() end
-- parse common pipeline options and arguments into an array of iterators and a warning printer, and remove parsed arguments in-place
-- checks for option '--all-params', errors if present and any parameter iterators were created
-- handles option '--default-params' by adding an iterator returning a single {} for default parameters
-- handles all arguments by adding an iterator over all parameter combinations from the given multivalue parameter files
local parse_param_iterator_constructors_and_warning_printers_from_pipeline_arguments_options = function(features, util, arguments, options)
local no_iterators_flag = options['all-params']
local iterator_constructor_list = not no_iterators_flag and {}
local warning_printer_list = iterator_constructor_list and {}
if options['default-params'] then
assert(not no_iterators_flag, "option '--all-params' incompatible with option '--default-params'")
-- creating a stub parameter iterator for default params
local default_param_iterator__next = function(state, prev_index)
if prev_index then return nil end
return {}
end
local default_param_iterator_constructor = function()
return default_param_iterator__next, --[[state]]nil, --[[first_index]]nil
end
iterator_constructor_list[#iterator_constructor_list+1] = default_param_iterator_constructor
end
local param_files = {}
for i = 1, #arguments do
param_files[i] = arguments[i]
arguments[i] = nil
end
local read_stdin_as_file = options['params-from-stdin']
if #param_files > 0 or read_stdin_as_file then
assert(not no_iterators_flag, "option '--all-params' incompatible with parameter file arguments")
local failed_parsing_parameter_files = {}
-- creating a parameter iterator from a given parameter file
local initial_param_iterator_from_param_file_contents_constructor = function(param_file_name, reading_file_success, file_contents)
-- first read the file
local error_message, parsing_mode_hint = "(error message uninitialized)", "(error hint uninitialized)"
local successful = reading_file_success
if not successful then
error_message = file_contents
parsing_mode_hint = "(error reading file) "
-- fallthrough
else
-- next see if the beginning looks like a JSON array
if string.match(file_contents, "^%w*%[") then
-- parse it as JSON
parsing_mode_hint = "(tried parsing as JSON array) "
local param_array
successful, param_array = pcall(util.json_decode, file_contents)
if not successful then
error_message = param_array
-- fallthrough
else
-- convert all values to strings, as our line-based format would,
-- and check that all combinatorial arrays are non-empty
successful, error_message = pcall(util.coerce_json_multivalue_array_in_place, param_array)
if successful then -- else fallthrough
-- return the resulting iterator, which iterates over all array entries,
-- combinatorically creating all multivalues
return util.all_combinations_of_multivalues_in_list(param_array)
end
end
else
-- parse it as a multivalue param file in our custom line-based format
parsing_mode_hint = "(tried parsing as line-based multivalue param file) "
local multivalue_entries
successful, multivalue_entries = pcall(util.new_compat_deserialize_multivalue, file_contents)
if not successful then
error_message = multivalue_entries
-- fallthrough
else
-- return the resulting iterator
return util.all_combinations_of_multivalues(multivalue_entries)
end
end
end
-- in case of error, we fall through to here
-- add the file to our list of parsing failures
failed_parsing_parameter_files[#failed_parsing_parameter_files+1] = param_file_name .. ": "..tostring(parsing_mode_hint)..tostring(error_message)
-- return an empty iterator
return empty_iterator__next
end
for i = 1, #param_files do
local param_file = param_files[i]
iterator_constructor_list[#iterator_constructor_list+1] = function()
return initial_param_iterator_from_param_file_contents_constructor(param_file, pcall(util.read_full_file, param_file))
end
end
if read_stdin_as_file then
iterator_constructor_list[#iterator_constructor_list+1] = function()
return initial_param_iterator_from_param_file_contents_constructor('(stdin)', pcall(util.read_full_stdin))
end
end
warning_printer_list[#warning_printer_list+1] = function()
-- output all files that we failed to parse
if #failed_parsing_parameter_files > 0 then
print("The following parameter files could not be parsed (were ignored):")
for i = 1, #failed_parsing_parameter_files do
print("- "..failed_parsing_parameter_files[i])
end
print("Please manually verify the existence and contents of these files.")
end
end
end
local warning_printer = function()
for i = 1, #warning_printer_list do
warning_printer_list[i]()
end
end
return iterator_constructor_list, warning_printer
end
-- parse common pipeline options into a parameter coercion function provider and a warning printer
-- checks for options '--accept-param', '--ignore-param', '--accept-unrecognized-params' and '--ignore-unrecognized-params', errors if they overlap
-- the param coercion provider takes a target step name and returns a coercion function
-- the coercion function returns the result of the parameters specified to be ignored removed from the given initial_params table (leaving the original table unchanged), or nil if unrecognized params remained and fallback behaviour was left unspecified
local parse_unrecognized_param_coercer_provider_and_warning_printer_from_pipeline_options = function(features, util, options)
-- option validation
local default_unrecognized_param_behaviour = 'error'
if options['ignore-unrecognized-params'] then
assert(not options['accept-unrecognized-params'], "Flags '--ignore-unrecognized-params' and '--accept-unrecognized-params' are exclusive. Select handling of individual parameters with options '--ignore-param' and '--accept-param'.")
default_unrecognized_param_behaviour = 'ignore'
elseif options['accept-unrecognized-params'] then
default_unrecognized_param_behaviour = 'accept'
end
local params_to_ignore_list = options['ignore-param']
local params_to_accept_list = options['accept-param']
do -- check for overlap
local params_to_ignore_lookup = util.list_to_lookup_table(params_to_ignore_list)
local params_to_accept_lookup = util.list_to_lookup_table(params_to_accept_list)
local conflict_list = {}
for i = 1, #params_to_ignore_list do
if params_to_accept_lookup[params_to_ignore_list[i]] then
conflict_list[#conflict_list+1] = params_to_ignore_list[i]
end
end
if #conflict_list > 0 then
error("The following parameters were specified to be both ignored and accepted: "..table.concat(conflict_list, ", ").."\n Please remove the corresponding '--ignore-param' or '--accept-param' options.")
end
end
params_to_ignore_list = #params_to_ignore_list > 0 and params_to_ignore_list
-- actual logic
local unrecognized_parameters_total = default_unrecognized_param_behaviour == 'error' and {} -- [name] = index, [index] = {name, occurences}
local param_coercer_by_target_step_name = {}
local param_coercer_provider = function(target_step_name)
local param_coercer = param_coercer_by_target_step_name[target_step_name]
if not param_coercer then
local accepted_params_lookup = features.step_query_effective_inputs_lookup_union(target_step_name)
accepted_params_lookup = util.table_copy_shallow(accepted_params_lookup)
for i = 1, #params_to_accept_list do
accepted_params_lookup[params_to_accept_list[i]] = true
end
param_coercer = function(initial_params)
local coerced_params = initial_params
-- remove params to ignore
if params_to_ignore_list then
coerced_params = util.table_copy_shallow(coerced_params)
for i = 1, #params_to_ignore_list do
coerced_params[params_to_ignore_list[i]] = nil
end
end
if default_unrecognized_param_behaviour == 'accept' then -- early return if we accept all other params
return coerced_params
end
-- collect unrecognized present params
local unrecognized_param_list = {}
for k--[[,v]] in pairs(coerced_params) do
if not accepted_params_lookup[k] then
unrecognized_param_list[#unrecognized_param_list+1] = k
end
end
if #unrecognized_param_list == 0 then -- early return if there were no unrecognized params
return coerced_params
end
if default_unrecognized_param_behaviour == 'ignore' then -- remove unrecognized params
coerced_params = coerced_params == initial_params and util.table_copy_shallow(coerced_params)
or coerced_params
for i = 1, #unrecognized_param_list do
coerced_params[unrecognized_param_list[i]] = nil
end
return coerced_params
elseif default_unrecognized_param_behaviour == 'error' then -- collect unrecognized params for warning message
for i = 1, #unrecognized_param_list do
local param_name = unrecognized_param_list[i]
local index = unrecognized_parameters_total[param_name]
if not index then
index = #unrecognized_parameters_total+1
unrecognized_parameters_total[param_name] = index
unrecognized_parameters_total[index] = {param_name, 0}
end
local entry = unrecognized_parameters_total[index]
entry[2] = entry[2]+1
end
return nil
end
error("unreachable: unhandled default_unrecognized_param_behaviour value of '"..tostring(default_unrecognized_param_behaviour))
end
param_coercer_by_target_step_name[target_step_name] = param_coercer
end
return param_coercer
end
local warning_printer = function()
if not unrecognized_parameters_total then return end
if #unrecognized_parameters_total > 0 then
-- order by occurrences descendingly
table.sort(unrecognized_parameters_total, function(a, b) return a[2] > b[2] end)
print("Some parameter combinations contained parameters not consumed by any of the involved steps:")
for i = 1, #unrecognized_parameters_total do
local entry = unrecognized_parameters_total[i]
local occurrences = entry[2]
print("- '"..tostring(entry[1]).."' (in "..tostring(occurrences).." parameter combination"..(occurrences == 1 and "" or "s")..")")
end
print("The offending parameter combinations were not processed.")
end
end
return param_coercer_provider, warning_printer
end
-- collects and iterates over existing pipelines, filterable by target step and initial parameters
-- implementation helper function for all pipeline commands besides 'pipelines.launch' (which creates new pipelines instead of operating on existing ones)
local pipeline_collective_by_individuals_command = function(features, util, arguments, options, command_infinitive, with_target_step_name_initial_params_pipeline_file_path_f)
local target_step_name
local parameter_iterator_constructors, parameter_iterator_warning_printer
local param_coercer_provider, param_coercion_warning_printer
do -- verify arguments and options
-- we're either in --all-targets mode, or we have a single target_step_name
target_step_name = options.target[1]
if options['all-targets'] then
assert(not target_step_name, "option '--all-targets' incompatible with selecting individual '--target' step")
else
assert(target_step_name, "missing '--target' step specification (or '--all-targets' flag)")
end
-- parse parameter iterators, error in case of inconsistent options, false if '--all-params' was specified
parameter_iterator_constructors, parameter_iterator_warning_printer = parse_param_iterator_constructors_and_warning_printers_from_pipeline_arguments_options(features, util, arguments, options)
assert(not (parameter_iterator_constructors and #parameter_iterator_constructors == 0), "missing parameter files (or option '--all-params' or '--default-params')")
-- check for '--all-params' in combination with '--(accept|ignore)-(param|unrecognized-params)'
if not parameter_iterator_constructors then
assert(not (#options['accept-param'] > 0 or #options['ignore-param'] > 0 or options['accept-unrecognized-params'] or options['ignore-unrecognized-params']), "flag '--all-params' takes parameters from existing pipelines, and therefore ignores options '--accept-param' and '--ignore-param' as well as flags '--accept-unrecognized-params' and '--ignore-unrecognized-params'")
else
param_coercer_provider, param_coercion_warning_printer = parse_unrecognized_param_coercer_provider_and_warning_printer_from_pipeline_options(features, util, options)
end
end
-- actual work
-- first check what pipeline files exist
local entry_index_name_path_in_directory_or_cleanup_iterator = util.entry_index_name_path_in_directory_or_cleanup_iterator
local existing_param_hash_dir_lookup_by_target_step_name = {}
-- dispatch function over each step name the command applies to
-- uses existing_param_hash_dir_lookup_by_target_step_name as a cache, first call is hardcode-assumed to construct it
local foreach_target_step_name_pipeline_dir_path_returns_disjunction -- forward declaration necessary for self-reassignment
foreach_target_step_name_pipeline_dir_path_returns_disjunction =
target_step_name and function(with_target_step_name_hash_dir_path_f, --[[further_args]]...) -- forward with our single target step pipeline directory
local target_step_pipeline_dir_path = relative_path_prefix.."pipelines/"..target_step_name
return with_target_step_name_hash_dir_path_f(target_step_name, target_step_pipeline_dir_path, --[[further args]]...)
end
or function(with_target_step_name_hash_dir_path_f, --[[further_args]]...) -- iterate over the directory
local any_found
local pipelines_path = relative_path_prefix.."pipelines"
for _, step_name, hash_dir_path in entry_index_name_path_in_directory_or_cleanup_iterator(pipelines_path) do
any_found = with_target_step_name_hash_dir_path_f(step_name, hash_dir_path, --[[further args]]...)
or any_found
end
-- on subsequent calls, iterate over our cache instead
foreach_target_step_name_pipeline_dir_path_returns_disjunction = function(with_target_step_name_hash_dir_path_f, --[[further_args]]...)
local any_found
local pipelines_path = relative_path_prefix.."pipelines"
local step_pipeline_dir_path_prefix = pipelines_path.."/"
for step_name--[[, hash_dir_lookup]] in pairs(existing_param_hash_dir_lookup_by_target_step_name) do
local hash_dir_path = step_pipeline_dir_path_prefix..step_name
any_found = with_target_step_name_hash_dir_path_f(step_name, hash_dir_path, --[[further args]]...)
or any_found
end
return any_found
end
return any_found
end
-- collect what param hash directories exist, as a set-like lookup table
local any_pipelines_exist = foreach_target_step_name_pipeline_dir_path_returns_disjunction(
function --[[collect_step_pipeline_dir_lookup]](step_name, step_pipeline_dir_path)
local hash_dir_set = {}
local any_found
for _, hash_dir_name in entry_index_name_path_in_directory_or_cleanup_iterator(step_pipeline_dir_path) do
hash_dir_set[hash_dir_name] = true
any_found = true
end
if any_found then
existing_param_hash_dir_lookup_by_target_step_name[step_name] = hash_dir_set
return true
end
end)
-- early return if no pipelines exist
if not any_pipelines_exist then
print("No pipelines to "..command_infinitive.." currently exist.")
return
end
-- now select which pipeline files fall within the parameter selection
local found_any_pipelines
if not parameter_iterator_constructors then -- '--all-params' flag: do not filter based on parameters
-- Dispatch calling the command over each found pipeline instance's set of parameters.
found_any_pipelines = foreach_target_step_name_pipeline_dir_path_returns_disjunction(function(target_step_name, target_step_pipeline_dir_path)
local any_found
local hash_dir_path_prefix = target_step_pipeline_dir_path.."/"
-- we iterate over the hash named directories we collected previously
local existing_param_hash_dir_lookup = existing_param_hash_dir_lookup_by_target_step_name[target_step_name]
for hash_dir_name--[[, true]] in pairs(existing_param_hash_dir_lookup) do
local hash_dir_path = hash_dir_path_prefix .. hash_dir_name
local pipeline_file_path_prefix = hash_dir_path.."/"
-- now we iterate over the individual pipeline files within each directory
for _, pipeline_file_name, pipeline_file_path in entry_index_name_path_in_directory_or_cleanup_iterator(hash_dir_path) do
-- we read the initial parameters from the pipeline file
local initial_params = util.read_param_file_new_compat_deserialize(pipeline_file_path, "failed reading initial params from parameter file")
with_target_step_name_initial_params_pipeline_file_path_f(target_step_name, initial_params, pipeline_file_path)
any_found = true
end
end
return any_found
end, nil)
else -- iterate over parameters, filter based on them
-- Dispatch function (iteration body) calling the command over each pipeline's file that given initial parameters apply to.
-- Returns whether any pipeline matched.
local call_with_matching_pipelines_returns_any_match = function(target_step_name, target_step_pipeline_dir_path, initial_params)
local param_coercer = param_coercer_provider(target_step_name)
local initial_params = param_coercer(initial_params)
if not initial_params then
return false
end
-- check if the directory the instance's pipeline file would be in exists
local hash_dir_name = features.get_pipeline_hash_dir_name(initial_params)
local hash_dirs = existing_param_hash_dir_lookup_by_target_step_name[target_step_name]
if not hash_dirs[hash_dir_name] then -- early return if it doesn't exist
return
end
local hash_dir_path = target_step_pipeline_dir_path.."/"..hash_dir_name
-- now check if we were given an id
local pipeline_id = initial_params['RUN-id']
if pipeline_id then -- if the id was given, we try loading a specific file
local pipeline_file_path_prefix = hash_dir_path.."/"
local pipeline_file_path = pipeline_file_path_prefix..pipeline_id..".txt"
local exists, file_params = pcall(util.read_param_file_new_compat_deserialize, pipeline_file_path)
-- if the file exists, also compare the parameters to guard against a hash collision
if not (exists and util.tables_shallow_equal(file_params, initial_params)) then
return -- return no match
end
-- if everything is ok, call the given predicate
with_target_step_name_initial_params_pipeline_file_path_f(target_step_name, initial_params, pipeline_file_path)
return true -- return success
end
-- if no id was given, we consider every pipeline file in this directory
local any_found
-- iterate over all pipeline files
for _, pipeline_file_name, pipeline_file_path in entry_index_name_path_in_directory_or_cleanup_iterator(hash_dir_path) do
-- read the parameters
local file_params = util.read_param_file_new_compat_deserialize(pipeline_file_path)
-- insert this id, then compare if all parameters are equal
initial_params['RUN-id'] = file_params['RUN-id']
if util.tables_shallow_equal(file_params, initial_params) then
with_target_step_name_initial_params_pipeline_file_path_f(target_step_name, file_params, pipeline_file_path)
any_found = true
end
end
-- clear the inserted id from initial_params, just in case the table were to be reused
initial_params['RUN-id'] = nil
return any_found
end
-- iterate over parameter iterators
for i = 1, #parameter_iterator_constructors do
local parameter_iterator = parameter_iterator_constructors[i]
-- iterate over initial parameter configurations provided by the iterator,
-- and filter pipelines based on them
for initial_params in parameter_iterator() do
found_any_pipelines = foreach_target_step_name_pipeline_dir_path_returns_disjunction(call_with_matching_pipelines_returns_any_match, initial_params)
or found_any_pipelines
end
end
-- print a warning message for files that could not be parsed
parameter_iterator_warning_printer()
-- print a warning message for encountered unrecognized parameters in combinations, if no fallback flag was provided
param_coercion_warning_printer()
end
if not found_any_pipelines then
print("No pipelines "
..(parameter_iterator_constructors and "that match the given parameters " or "")
..(target_step_name and "towards the given target step " or "")
.."could be found.")
end
end
-- helper function that takes a doubly-nested table and counts the number of keys at the first level that contain a particular key at the second level
local count_number_of_key_1_by_key_2 = function(counts, data)
for key_1, by_key_2 in pairs(data) do
for key_2 in pairs(by_key_2) do
counts[key_2] = (counts[key_2] or 0) + 1
end
end
end
-- common implementation for commands 'pipelines.cancel' and 'pipelines.discard' (see their respective descriptions for details)
local pipelines_cancel_command_impl = function(features, util, arguments, options, operation_infinitive, allow_startable, discard_last_step_run_dir_and_pipeline_file)
local include_errors = options['include-errors']
local include_continuable = options['include-continuable']
local include_startable = allow_startable and options['include-startable']
local only_errors = options['only-errors']
local only_continuable = options['only-continuable']
local only_startable = allow_startable and options['only-startable']
assert(not (only_errors and only_continuable), "flags '--only-errors' and '--only-continuable' are mutually exclusive")
assert(not (only_errors and include_continuable), "flag '--only-errors' is incompatible with flag '--include-continuable'")
assert(not (only_continuable and include_errors), "flag '--only-continuable' is incompatible with flag '--include-errors'")
if allow_startable then
assert(not (only_errors and only_startable), "flags '--only-errors' and '--only-startable' are mutually exclusive")
assert(not (only_continuable and only_startable), "flags '--only-continuable' and '--only-startable' are mutually exclusive")
assert(not (only_errors and include_startable), "flag '--only-errors' is incompatible with flag '--include-startable'")
assert(not (only_continuable and include_startable), "flag '--only-continuable' is incompatible with flag '--include-startable'")
assert(not (only_startable and include_errors), "flag '--only-startable' is incompatible with flag '--include-errors'")
assert(not (only_startable and include_continuable), "flag '--only-startable' is incompatible with flag '--include-continuable'")
end
local select_pending = not (only_errors or only_continuable or only_startable)
local select_errors = (include_errors or only_errors)
local select_continuable = (include_continuable or only_continuable)
local select_startable = (include_startable or only_startable)
-- canceled pipeline file paths, grouped by launch status then target step name, for user-facing program output
local canceled_pipeline_lists = {}
local cancellation_status_lookup_by_error = {}
local cancellation_status_lookup_by_unexpected_status = {}
pipeline_collective_by_individuals_command(features, util, arguments, options, discard_last_step_run_dir_and_pipeline_file and "discard" or "cancel",
function(target_step_name, initial_params, existing_pipeline_file_path)
local successful, err_or_initial_status, new_status = xpcall(features.cancel_pipeline_instance, debug.traceback, target_step_name, initial_params, select_pending, select_errors, select_continuable, select_startable, discard_last_step_run_dir_and_pipeline_file)
if not successful then
print("Error canceling pipeline: "..err_or_initial_status)
elseif discard_last_step_run_dir_and_pipeline_file then
-- delete the corresponding pipeline file
util.remove_file(existing_pipeline_file_path)
print("deleted pipeline file '"..existing_pipeline_file_path.."'")
end
-- assign the pipeline to a collection according to its status
local cancellation_status_error
if not successful then
cancellation_status_error = cancellation_status_lookup_by_error[err_or_initial_status]
or {'cancellation-error', err_or_initial_status}
cancellation_status_lookup_by_error[err_or_initial_status] = cancellation_status_error
end
local cancellation_status = cancellation_status_error
if not cancellation_status and new_status then
cancellation_status = new_status == 'startable' and 'cancellation-success'
if not cancellation_status then
cancellation_status = cancellation_status_lookup_by_unexpected_status[new_status]
or {'cancellation-result-unexpected', new_status}
cancellation_status_lookup_by_unexpected_status[new_status] = cancellation_status
end
end
cancellation_status = cancellation_status
or err_or_initial_status == nil and 'finished'
or err_or_initial_status
local by_target_step_name = canceled_pipeline_lists[cancellation_status]
or {}
canceled_pipeline_lists[cancellation_status] = by_target_step_name
local list = by_target_step_name[target_step_name]
or {}
by_target_step_name[target_step_name] = list
list[#list+1] = existing_pipeline_file_path
end)
-- count pipelines the other way around too for more immediately informative output message
local status_counts_by_target_step_name = {}
count_number_of_key_1_by_key_2(status_counts_by_target_step_name, canceled_pipeline_lists)
-- report results back to the user
-- TODO: rework to print successful messages first and error messages last (to avoid overlooking errors), and
-- while at it maybe also order build statuses lexicographically?
-- my original idea was to have separate pipeline_lists for errored/successful, but that is pretty ugly and doesn't scale well,
-- so probably change table to list + index lookup and upgrade all entries to tables with a "priority" field for table.sort to consider before lexicographical ordering
local header_message_suffix_by_cancellation_status = {
startable = " were already startable, no cancellation necessary",
continuable = " were already continuable, no cancellation necessary",
finished = " had already finished execution, nothing left to cancel",
['cancellation-success'] = " were successfully canceled",
}
for cancellation_status, by_target_step_name in pairs(canceled_pipeline_lists) do
local header_message_suffix
if type(cancellation_status) == 'table' then
if cancellation_status[1] == 'cancellation-error' then
header_message_suffix = " failed being canceled with the following error: "..tostring(cancellation_status[2])
elseif cancellation_status[1] == 'cancellation-result-unexpected' then
header_message_suffix = " were canceled, but then unexpectedly reported status '"..tostring(cancellation_status[2]).."'"
else
header_message_suffix = " were canceled, new status is '"..tostring(cancellation_status[2]).."';\nERROR TRIGGERED IN COLLECTION LOGIC: unexpected first element '"..tostring(cancellation_status[1]).."' in cancellation status tuple (unreachable)"
end
else
cancellation_status = tostring(cancellation_status)
header_message_suffix = header_message_suffix_by_cancellation_status[cancellation_status]
or " reported status '"..cancellation_status.."' at some point; ERROR IN COLLECTION LOGIC: missing proper header message"
end
for target_step_name, pipeline_path_list in pairs(by_target_step_name) do
local only_status_this_target = status_counts_by_target_step_name[target_step_name] == 1
print((only_status_this_target and "all " or "")..#pipeline_path_list.." pipelines towards step '"..target_step_name.."'"..header_message_suffix)
-- sort to be lexicographically ascending, then print
table.sort(pipeline_path_list)
for i = 1, #pipeline_path_list do
print(" "..pipeline_path_list[i])
end
end
end
end
-- definition of all command structures and their implementation code
local program_command_structures = {
['auto-setup'] = {any_args_max = 0,
benmet_util_skip_library_imports = true,
summary = "clone benmet's own dependencies",
options = {
['reliable-commits'] = {is_flag = true, description = "check out tested commit hashes instead of the upstream HEAD"}
},
description = "This command clones the repositories used by benmet into sibling directories, unless the modules can already be found in the current package.path .\n- https://github.com/Egor-Skriptunoff/pure_lua_SHA.git provides an implementation for md5 hashing.\n- https://github.com/grafi-tt/lunajson.git provides JSON decoding and encoding.",
implementation = function(features, util, arguments, options)
local specify_commit_hash = options['reliable-commits']
local parent_dir = _G.benmet_get_main_script_dir_path() .. "/.."
local ensure_repo_available = function(module_name, repo_url, repo_name, commit_hash)
if pcall(require, module_name) then -- module found, early exit
return
end
local command_string = "git clone "..util.in_quotes(repo_url)
command_string = not commit_hash and command_string
or command_string .. " --no-checkout"
assert(util.execute_command_at(command_string, parent_dir))
if commit_hash then
-- FIXME: On Windows, combining both into a single shell command (with `&& cd ... &&`) always results in 'path not found' (as if the directory didn't exist yet).
-- However, this works fine on Linux in features.lua:543: rebuild_step_run_dir .
-- Windows may exhibit the same issue over there as well.
assert(util.execute_command_at("git checkout "..util.in_quotes(commit_hash).." --detach", parent_dir.."/"..repo_name))
end
return repo_name
end
local cloned_list = {}
cloned_list[#cloned_list+1] = ensure_repo_available('pure_lua_SHA.sha2', "https://github.com/Egor-Skriptunoff/pure_lua_SHA.git", "pure_lua_SHA", specify_commit_hash and "304d4121f080e68ef209d3f5fe093e5a955a4978")
cloned_list[#cloned_list+1] = ensure_repo_available('lunajson', "https://github.com/grafi-tt/lunajson.git", "lunajson", specify_commit_hash and "1dcf3fadd001a7d75673a4354fcbf16ce72c5cdb")
if #cloned_list == 0 then
print("all modules found, nothing to clone")
else
print("successfully cloned modules: "..table.concat(cloned_list, ", "))
end
end,
},
['add-repo'] = {any_args_min = 1, any_args_max = 1, any_args_name = 'git-url',
summary = "clone a repository",
options = {
['name'] = {description = "a new name for the cloned repository"},
},
description = "This command clones the repository from the given git-url into the expected location, which is a directory named 'repos' in the current working directory. This directory will be created if it does not yet exist.",
implementation = function(features, util, arguments, options)
local git_url = arguments[1]
local new_repository_name = options.name[1]
features.clone_new_repository(git_url, new_repository_name)
end,
},
['step.do'] = {any_args_min = 1, any_args_max = 1, any_args_name = 'step-name',
summary = "directly execute a particular step command",
options = {
['command'] = {required = true, description = "the step command to execute"},
['with-run-id'] = option_with_run_id,
['param-file'] = option_param_file,
},
description = "This command executes the given command of the specified step. The following commands are available:\n inputs: output the input parameters of this step (with their default values, if any)\n status: output the run's current status (startable|pending|continuable|finished)\n start: start a new run (status should be 'startable')\n cancel: cancel pending asynchronous operations (status should be 'pending')\n continue: continue if the last asynchronous operation has completed (status should be 'continuable')",
implementation = function(features, util, arguments, options)
local target_step_name = arguments[1]
local command = options.command[1]
local run_id_override = options['with-run-id'][1]
local param_file_path = options['param-file'][1]
if command == 'inputs' then -- special case: this is the only command that doesn't require a run directory
assert(not run_id_override, "command 'inputs' disregards all parameters, incompatible with option '--with-run-id'")
assert(not param_file_path, "command 'inputs' disregards all parameters, incompatible with option '--param-file'")
local output, return_status, error_details = features.step_query_inputs(target_step_name)
if not output then
local target_step_path = relative_path_prefix.."steps/"..target_step_name
if not util.directory_exists(target_step_path) then
print("build step '"..target_step_name.."' not found (no directory '"..target_step_path.."')")
else
local target_step_run_script_path = target_step_path.."/"..features.get_relative_step_script_path(target_step_name)
if not util.file_exists(target_step_run_script_path) then
print("build step '"..target_step_name.."' not available (no run script '"..target_step_run_script_path.."')")
else
print("failed to run build step command '"..command.."'"..(error_details and ":\n"..error_details or ""))
end
end
else
print(output)
end
return return_status
end
-- set up initial input parameters
local initial_params = param_file_path and util.read_param_file_new_compat_deserialize(param_file_path, "failed to parse param-file '"..param_file_path.."'")
or {}
initial_params['RUN-id'] = run_id_override or initial_params['RUN-id']
-- calculate the final target run directory and parameters
local last_params, last_step_run_in_params, last_special_params, last_step_run_hash_params, last_step_run_path, last_run_dir_exists, last_hash_collision
for step_index, step_count, step_name, original_active_params, error_trace, active_params_for_step, step_run_in_params, special_params, step_run_hash_params, step_run_path, run_dir_exists, hash_collision in features.new_iterate_step_dependency_run_paths(target_step_name, initial_params) do
assert(not error_trace, error_trace)
if step_index == step_count then
last_params, last_step_run_in_params, last_special_params, last_step_run_hash_params, last_step_run_path, last_run_dir_exists, last_hash_collision = active_params_for_step, step_run_in_params, special_params, step_run_hash_params, step_run_path, run_dir_exists, hash_collision
end
end
-- try executing the given command
local program_output, return_status = features.step_invoke_command(target_step_name, command, last_params, last_step_run_in_params, last_special_params, last_step_run_hash_params, last_step_run_path, last_run_dir_exists, last_hash_collision)
if program_output then
print(program_output)
else
print("failed to run build step command '"..command.."'")
end
return return_status
end,
},
['step.list-dependencies'] = {any_args_min = 1, any_args_max = 1, any_args_name = 'step-name',
summary = "display the preceding steps the given step depends on",
description = "Analyzes the acyclic step dependency graph declared in 'steps/index.txt' and outputs all steps the given step depends on.\n"..dependencies_txt_description,
implementation = function(features, util, arguments, options)
local step_name = arguments[1]
local dependencies = features.step_get_necessary_steps_inclusive(step_name)
print("step chain to execute step '"..step_name.."': '"..table.concat(dependencies, "'->'").."'")
end,
},
['step.trace-dependencies'] = {any_args_min = 1, any_args_max = 1, any_args_name = 'step-name',
summary = "display the known run directories a given step run will use",
options = {
['with-run-id'] = option_with_run_id,
['param-file'] = option_param_file,
},
description = "Hashes the available input parameters of all steps towards a particular run of a target step and outputs the run directories resulting from this.",
implementation = function(features, util, arguments, options)
local target_step_name = arguments[1]
local run_id_override = options['with-run-id'][1]
local param_file_path = options['param-file'][1]
-- set up initial input parameters
local initial_params = param_file_path and util.read_param_file_new_compat_deserialize(param_file_path, "failed to parse param-file '"..param_file_path.."'")
or {}
initial_params['RUN-id'] = run_id_override or initial_params['RUN-id']
-- print the introductory line
print("step trace to execute step '"..target_step_name.."'"..(param_file_path and " with param_file '"..param_file_path.."'" or "")..(run_id_override and " with RUN-id '"..run_id_override.."'" or "")..":")
-- now we print one line of the format "<step-name>: <run-path> - <validity/error>" for each step.
-- first is the generic function for printing any such line
local max_printed_step_index = 0
local trace_line = function(step_index, step_name, step_run_path, error_trace)
local line = ". "..step_name..": "
if step_run_path then
local validity = error_trace or "valid"
line = line..step_run_path.." - "..validity
else
line = line.."<unavailable>"
if error_trace then
line = line.." - "..error_trace
end
end
print(line)
max_printed_step_index = step_index
end
-- and here we iterate over every step and its run path (which is calculated by hashing sequentially aggregated parameters) that leads us to the target step
-- note: in case of error, we get an error_trace, but the loop continues (with information missing from further entries)
local last_step_name, last_step_run_path
for step_index, step_count, step_name, original_active_params, error_trace, active_params_for_step, step_run_in_params, special_params, step_run_hash_params, step_run_path, run_dir_exists, hash_collision in features.new_iterate_step_dependency_run_paths(target_step_name, initial_params) do
-- because the last step's output is applied at the next iteration, we delay tracing in case of no error
if last_step_name then
if original_active_params then -- applying last step's output (if any) worked
trace_line(step_index-1, last_step_name, last_step_run_path, false)
else -- applying last step's output (if any) failed
assert(last_step_name, "unreachable: nothing before the first step should be able to fail, so original active params should always be available")
trace_line(step_index-1, last_step_name, last_step_run_path, error_trace)
error_trace = false
end
end
if step_run_path and not hash_collision then -- the current step's run path could be computed and is not a hash collision
if step_index < step_count then -- delay printing to the next iteration in case of errors applying its output
last_step_name, last_step_run_path = step_name, step_run_path
else -- nothing left to do, print now
trace_line(max_printed_step_index+1, step_name, step_run_path, false)
end
else -- failed to compute current step's run path or encountered a hash collision
last_step_name, last_step_run_path = nil, nil
-- print its step name and the error that caused this
trace_line(step_index, step_name, step_run_path, error_trace)
end
end
end,
},
['pipelines.launch'] = {any_args_name = 'param-files',
summary = "launch new pipeline instances",
options = {
['target'] = {required = true, description = "the target step of the pipeline(s)"}, --TODO(potentially?): could be made optional if we had a default target step in steps/index.txt
-- TODO (potentially?): also implement 'all-targets' option flag
['default-params'] = option_pipeline_default_params,
['params-from-stdin'] = option_pipeline_params_from_stdin,
-- no-continue: don't continue an encountered already-continuable step
-- force-relaunch: delete all previously-existing step runs this pipeline incorporates -- would absolutely need dependency collision detection if implemented!
['ignore-param'] = option_pipeline_ignore_param,
['accept-param'] = option_pipeline_accept_param,
['ignore-unrecognized-params'] = option_pipeline_ignore_unrecognized_params,
['accept-unrecognized-params'] = option_pipeline_accept_unrecognized_params,
},
description = pipelines_param_construction_note.." For each one, starts a pipeline instance towards the specified target step.\nA pipeline instance is started by iterating over each step in the dependency chain towards the target step. If a step is already finished, it is skipped. If an encountered step finishes synchronously (that is, it doesn't suspend by reporting status 'pending'), the next step is started.\nOn the first suspending step that is encountered, the pipeline is suspended: A `pipeline file` that saves the initial parameters used for that particular instance, extended by a 'RUN-id' parameter if none was yet assigned, is created. Further pipeline operations on this pipeline instance use this file to retrace the pipeline's steps.\nIf no suspending step is encountered, the pipeline is completed in full.\n"..pipelines_param_rejection_note,
implementation = function(features, util, arguments, options)
local target_step_name = options.target[1]
-- parse parameter iterators; note that option '--all-params' is not available for this command, so would have been rejected during argument parsing
local parameter_iterator_constructors, parameter_iterator_warning_printer = parse_param_iterator_constructors_and_warning_printers_from_pipeline_arguments_options(features, util, arguments, options)
assert(#parameter_iterator_constructors > 0, "no parameter files specified, no pipelines launched (pass --default-params to launch a pipeline with default parameters)")
-- launched pipeline file paths, grouped by launch status then target step name, for user-facing program output
local launched_pipeline_lists = {}
local launch_status_lookup_by_error = {}
-- option parsing/checking
local param_coercer_provider, param_coercion_warning_printer = parse_unrecognized_param_coercer_provider_and_warning_printer_from_pipeline_options(features, util, options)
-- actual work
local launch_pipeline = function(target_step_name, initial_params)
local successful, err_or_finished_or_last_step, last_step_status, was_resumed, pipeline_file_path = xpcall(features.execute_pipeline_steps, debug.traceback, target_step_name, initial_params)
if not successful then
print("Error launching pipeline: "..err_or_finished_or_last_step)
end
-- assign the pipeline to a collection according to its status
local launch_status_error
if not successful then
launch_status_error = launch_status_lookup_by_error[err_or_finished_or_last_step]
or {'launch-error', err_or_finished_or_last_step}
launch_status_lookup_by_error[err_or_finished_or_last_step] = launch_status_error
end
local launch_status = launch_status_error
or err_or_finished_or_last_step == nil and 'launch-skipped-already-exists'
or err_or_finished_or_last_step == true and 'finished'
or (not was_resumed) and 'launch-hit-pending'
or last_step_status == 'continuable' and 'pending' -- we handle suspension the same, no matter if it immediately finished or not
or last_step_status
local by_target_step_name = launched_pipeline_lists[launch_status]
or {}
launched_pipeline_lists[launch_status] = by_target_step_name
local list = by_target_step_name[target_step_name]
or {}
by_target_step_name[target_step_name] = list
list[#list+1] = pipeline_file_path
end
local param_coercer = param_coercer_provider(target_step_name)
local parsed_anything
-- iterate over parameter iterators
for i = 1, #parameter_iterator_constructors do
local parameter_iterator_constructor = parameter_iterator_constructors[i]
-- iterate over initial parameter configurations provided by the iterator,
-- and launch pipelines based on them
for initial_params in parameter_iterator_constructor() do
parsed_anything = true
local coerced_params = param_coercer(initial_params)
if coerced_params then
launch_pipeline(target_step_name, coerced_params)
end
end
end
-- count pipelines the other way around for more immediately informative output message
local status_counts_by_target_step_name = {}
count_number_of_key_1_by_key_2(status_counts_by_target_step_name, launched_pipeline_lists)
-- report results back to the user
-- TODO: rework to print successful messages first and error messages last (to avoid overlooking errors), and
-- while at it maybe also order build statuses lexicographically?
-- my original idea was to have separate pipeline_lists for errored/successful, but that is pretty ugly and doesn't scale well,
-- so probably change table to list + index lookup and upgrade all entries to tables with a "priority" field for table.sort to consider before lexicographical ordering
local header_message_suffix_by_launch_status = {
startable = " were launched but seem to have aborted execution (reported status 'startable')",
pending = " were successfully launched and suspended execution",
finished = " were successfully launched and finished execution",
['launch-skipped-already-exists'] = " already existed (with same parameters, including id) and were therefore not launched",
['launch-hit-pending'] = " were launched but require a step that is still pending",
}
for launch_status, by_target_step_name in pairs(launched_pipeline_lists) do
local header_message_suffix
if type(launch_status) == 'table' then
header_message_suffix = " failed being launched with the following error: "..tostring(launch_status[2])
if launch_status[1] ~= 'launch-error' then
header_message_suffix = header_message_suffix.."\nADDITIONAL ERROR IN COLLECTION LOGIC: unexpected first element '"..tostring(launch_status[1]).."' in launch status tuple (unreachable)"
end
else
launch_status = tostring(launch_status)
header_message_suffix = header_message_suffix_by_launch_status[launch_status]
or " were launched and suspended execution, reporting unrecognized status '"..launch_status.."'"
end
for target_step_name, pipeline_path_list in pairs(by_target_step_name) do
local only_status_this_target = status_counts_by_target_step_name[target_step_name] == 1
print((only_status_this_target and "all " or "")..#pipeline_path_list.." pipelines towards step '"..target_step_name.."'"..header_message_suffix)
-- sort to be lexicographically ascending, then print
table.sort(pipeline_path_list)
for i = 1, #pipeline_path_list do
print(" "..pipeline_path_list[i])
end
end
end
-- print a warning message for files that could not be parsed
parameter_iterator_warning_printer()
-- print a warning message for encountered unrecognized parameters in combinations, if no fallback flag was provided
param_coercion_warning_printer()
local launched_anything
for launch_status, by_target_step_name in pairs(launched_pipeline_lists) do
local status_implies_launch = launch_status ~= 'launch-skipped-already-exists'
launched_anything = launched_anything or status_implies_launch -- I think this is a sensible criteria for the final output message, but maybe not for the return status? Or we could provide a flag for reducing expectations.
end
if not launched_anything then
print(
(parsed_anything and "" or "no parameters could be parsed, ")
.. "no pipelines were launched"
)
return parsed_anything and 1 or 2
end
end,
},
['pipelines.resume'] = {any_args_name = 'param-files',
summary = "resume previously-suspended pipeline instances",
options = pipeline_operation_structure_options,
description = pipelines_param_construction_note.." For each one, resumes all conforming previously-suspended pipeline instances towards the specified target step that are currently ready.\nA pipeline instance is resumed by iterating the dependency chain towards the target step up to the step that previously suspended itself for asynchronous completion. If this step run still reports status 'pending', it is not yet ready, and so the pipeline remains suspended.\nIf it now reports the status 'continuable', it is continued, and the dependency chain is subsequently followed and continued, as detailed for `pipelines.launch`. On the first suspending step that is encountered, this is stopped and the pipeline remains suspended. If no such step is encountered, the pipeline instance is completed and its corresponding `pipeline file` is deleted.\n"..pipelines_param_rejection_note,
implementation = function(features, util, arguments, options)
-- selected pipeline file paths, grouped by resumption status then target step name, for user-facing program output