Skip to content

Commit 15302dd

Browse files
processing via #23
1 parent 3ade632 commit 15302dd

File tree

1 file changed

+34
-6
lines changed

1 file changed

+34
-6
lines changed

atlas/analysis.ipynb

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -270,17 +270,45 @@
270270
" columns_to_preload = json.load(pathlib.Path(\"columns_to_preload.json\").open())[\"JET_JER_Effective\"] # or []\n",
271271
"\n",
272272
" with performance_report(filename=\"process_custom.html\"):\n",
273-
" out, report = utils.custom_process(preprocess_output, processor_class=Analysis, schema=run.schema, client=client, preload=columns_to_preload)\n",
273+
" # out, report = utils.custom_process(preprocess_output, processor_class=Analysis, schema=run.schema, client=client, preload=columns_to_preload)\n",
274+
"\n",
275+
" # version from https://github.com/iris-hep/integration-challenge/pull/23\n",
276+
" import sys\n",
277+
" sys.path.insert(0, \"..\")\n",
278+
"\n",
279+
" import util._dask\n",
280+
" cloudpickle.register_pickle_by_value(util)\n",
281+
"\n",
282+
" futures, futurekey2item = util._dask.dask_map(\n",
283+
" Analysis().process,\n",
284+
" preprocess_output,\n",
285+
" client=client,\n",
286+
" NanoEventsFactory_kwargs = {\n",
287+
" \"preload\": lambda b: b in [],\n",
288+
" \"schemaclass\": run.schema,\n",
289+
" }\n",
290+
" )\n",
291+
"\n",
292+
" final_future, failed_items = util._dask.dask_reduce(\n",
293+
" futures,\n",
294+
" futurekey2item=futurekey2item,\n",
295+
" client=client,\n",
296+
" treereduction=4\n",
297+
" )\n",
298+
" result = final_future.result()\n",
299+
" out, report = result[\"out\"], result[\"report\"]\n",
300+
" print(f\"failed chunks: {failed_items}\")\n",
301+
"\n",
274302
" print(f\"preloaded columns: {len(columns_to_preload)}, {columns_to_preload[:4]} {\"etc.\" if len(columns_to_preload) > 4 else \"\"}\")\n",
275303
" print(f\"preloaded but unused columns: {len([c for c in columns_to_preload if c not in report[\"columns\"]])}\")\n",
276304
" print(f\"used but not preloaded columns: {len([c for c in report[\"columns\"] if c not in columns_to_preload])}\")\n",
277305
"\n",
278306
" # shortened version of report, dropping extra columns and per-chunk information\n",
279-
" display(\n",
280-
" dict((k, v) for k, v in report.items() if k not in [\"columns\", \"chunk_info\"]) | \n",
281-
" {\"columns\": report[\"columns\"][0:10] + [\"...\"]} | \n",
282-
" {\"chunk_info\": list(report[\"chunk_info\"].items())[:2] + [\"...\"]}\n",
283-
" )\n",
307+
" # display(\n",
308+
" # dict((k, v) for k, v in report.items() if k not in [\"columns\", \"chunk_info\"]) | \n",
309+
" # {\"columns\": report[\"columns\"][0:10] + [\"...\"]} | \n",
310+
" # {\"chunk_info\": list(report[\"chunk_info\"].items())[:2] + [\"...\"]}\n",
311+
" # )\n",
284312
"\n",
285313
"else:\n",
286314
" # coffea Runner-based processing\n",

0 commit comments

Comments
 (0)