From 15302ddfcf82426accffa5a89da7be51d5c29b6a Mon Sep 17 00:00:00 2001 From: Alexander Held Date: Thu, 4 Dec 2025 19:56:49 +0100 Subject: [PATCH] processing via #23 --- atlas/analysis.ipynb | 40 ++++++++++++++++++++++++++++++++++------ 1 file changed, 34 insertions(+), 6 deletions(-) diff --git a/atlas/analysis.ipynb b/atlas/analysis.ipynb index acc4daa..0335462 100644 --- a/atlas/analysis.ipynb +++ b/atlas/analysis.ipynb @@ -270,17 +270,45 @@ " columns_to_preload = json.load(pathlib.Path(\"columns_to_preload.json\").open())[\"JET_JER_Effective\"] # or []\n", "\n", " with performance_report(filename=\"process_custom.html\"):\n", - " out, report = utils.custom_process(preprocess_output, processor_class=Analysis, schema=run.schema, client=client, preload=columns_to_preload)\n", + " # out, report = utils.custom_process(preprocess_output, processor_class=Analysis, schema=run.schema, client=client, preload=columns_to_preload)\n", + "\n", + " # version from https://github.com/iris-hep/integration-challenge/pull/23\n", + " import sys\n", + " sys.path.insert(0, \"..\")\n", + "\n", + " import util._dask\n", + " cloudpickle.register_pickle_by_value(util)\n", + "\n", + " futures, futurekey2item = util._dask.dask_map(\n", + " Analysis().process,\n", + " preprocess_output,\n", + " client=client,\n", + " NanoEventsFactory_kwargs = {\n", + " \"preload\": lambda b: b in [],\n", + " \"schemaclass\": run.schema,\n", + " }\n", + " )\n", + "\n", + " final_future, failed_items = util._dask.dask_reduce(\n", + " futures,\n", + " futurekey2item=futurekey2item,\n", + " client=client,\n", + " treereduction=4\n", + " )\n", + " result = final_future.result()\n", + " out, report = result[\"out\"], result[\"report\"]\n", + " print(f\"failed chunks: {failed_items}\")\n", + "\n", " print(f\"preloaded columns: {len(columns_to_preload)}, {columns_to_preload[:4]} {\"etc.\" if len(columns_to_preload) > 4 else \"\"}\")\n", " print(f\"preloaded but unused columns: {len([c for c in columns_to_preload if c not in report[\"columns\"]])}\")\n", " print(f\"used but not preloaded columns: {len([c for c in report[\"columns\"] if c not in columns_to_preload])}\")\n", "\n", " # shortened version of report, dropping extra columns and per-chunk information\n", - " display(\n", - " dict((k, v) for k, v in report.items() if k not in [\"columns\", \"chunk_info\"]) | \n", - " {\"columns\": report[\"columns\"][0:10] + [\"...\"]} | \n", - " {\"chunk_info\": list(report[\"chunk_info\"].items())[:2] + [\"...\"]}\n", - " )\n", + " # display(\n", + " # dict((k, v) for k, v in report.items() if k not in [\"columns\", \"chunk_info\"]) | \n", + " # {\"columns\": report[\"columns\"][0:10] + [\"...\"]} | \n", + " # {\"chunk_info\": list(report[\"chunk_info\"].items())[:2] + [\"...\"]}\n", + " # )\n", "\n", "else:\n", " # coffea Runner-based processing\n",