diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 00678d7..51c100e 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -117,7 +117,7 @@ stages: - echo "INSTALL_EXTRAS=$INSTALL_EXTRAS" - echo "UV_RESOLUTION=$UV_RESOLUTION" - echo "MOD_VERSION=$MOD_VERSION" - - python -m uv pip install "kwdagger[$INSTALL_EXTRAS]==$MOD_VERSION" -f dist + - python -m pip install --prefer-binary "kwdagger[$INSTALL_EXTRAS]==$MOD_VERSION" -f dist - echo "Install finished." - echo "Creating test sandbox directory" - export WORKSPACE_DNAME="sandbox" @@ -187,7 +187,7 @@ stages: - echo "INSTALL_EXTRAS=$INSTALL_EXTRAS" - echo "UV_RESOLUTION=$UV_RESOLUTION" - echo "MOD_VERSION=$MOD_VERSION" - - python -m uv pip install "kwdagger[$INSTALL_EXTRAS]==$MOD_VERSION" -f dist + - python -m pip install --prefer-binary "kwdagger[$INSTALL_EXTRAS]==$MOD_VERSION" -f dist - echo "Install finished." - echo "Creating test sandbox directory" - export WORKSPACE_DNAME="sandbox" @@ -257,7 +257,7 @@ stages: - echo "INSTALL_EXTRAS=$INSTALL_EXTRAS" - echo "UV_RESOLUTION=$UV_RESOLUTION" - echo "MOD_VERSION=$MOD_VERSION" - - python -m uv pip install "kwdagger[$INSTALL_EXTRAS]==$MOD_VERSION" -f dist + - python -m pip install --prefer-binary "kwdagger[$INSTALL_EXTRAS]==$MOD_VERSION" -f dist - echo "Install finished." - echo "Creating test sandbox directory" - export WORKSPACE_DNAME="sandbox" @@ -327,7 +327,7 @@ stages: - echo "INSTALL_EXTRAS=$INSTALL_EXTRAS" - echo "UV_RESOLUTION=$UV_RESOLUTION" - echo "MOD_VERSION=$MOD_VERSION" - - python -m uv pip install "kwdagger[$INSTALL_EXTRAS]==$MOD_VERSION" -f dist + - python -m pip install --prefer-binary "kwdagger[$INSTALL_EXTRAS]==$MOD_VERSION" -f dist - echo "Install finished." - echo "Creating test sandbox directory" - export WORKSPACE_DNAME="sandbox" @@ -353,10 +353,10 @@ stages: - echo "pytest command finished, moving the coverage file to the repo root" build/sdist: <<: *build_sdist_template - image: python:3.14.0 + image: python:3.14 test/sdist/minimal-loose/cp314-linux-x86_64: <<: *test_minimal-loose_template - image: python:3.14.0 + image: python:3.14 needs: - build/sdist build/cp310-linux-x86_64: @@ -453,30 +453,30 @@ test/full-strict/cp313-linux-x86_64: - build/cp313-linux-x86_64 build/cp314-linux-x86_64: <<: *build_wheel_template - image: python:3.14.0 + image: python:3.14 test/minimal-loose/cp314-linux-x86_64: <<: *test_minimal-loose_template - image: python:3.14.0 + image: python:3.14 needs: - build/cp314-linux-x86_64 test/full-loose/cp314-linux-x86_64: <<: *test_full-loose_template - image: python:3.14.0 + image: python:3.14 needs: - build/cp314-linux-x86_64 test/minimal-strict/cp314-linux-x86_64: <<: *test_minimal-strict_template - image: python:3.14.0 + image: python:3.14 needs: - build/cp314-linux-x86_64 test/full-strict/cp314-linux-x86_64: <<: *test_full-strict_template - image: python:3.14.0 + image: python:3.14 needs: - build/cp314-linux-x86_64 lint: <<: *common_template - image: python:3.14.0 + image: python:3.14 stage: lint before_script: - df -h @@ -487,7 +487,7 @@ lint: allow_failure: true gpgsign/wheels: <<: *common_template - image: python:3.14.0 + image: python:3.14 stage: gpgsign artifacts: paths: @@ -551,7 +551,7 @@ gpgsign/wheels: artifacts: true deploy/wheels: <<: *common_template - image: python:3.14.0 + image: python:3.14 stage: deploy only: refs: @@ -673,4 +673,4 @@ deploy/wheels: "$CI_API_V4_URL/projects/$CI_PROJECT_ID/releases" -# end +# end \ No newline at end of file diff --git a/.readthedocs.yml b/.readthedocs.yml index 52edaf8..31875c1 100644 --- a/.readthedocs.yml +++ b/.readthedocs.yml @@ -7,15 +7,30 @@ # Required version: 2 + build: os: "ubuntu-24.04" tools: python: "3.13" + +# Build documentation in the docs/ directory with Sphinx sphinx: configuration: docs/source/conf.py + +# Build documentation with MkDocs +#mkdocs: +# configuration: mkdocs.yml + +# Optionally build your docs in additional formats such as PDF and ePub formats: all + python: install: - requirements: requirements/docs.txt - method: pip path: . + #extra_requirements: + # - docs + +#conda: +# environment: environment.yml diff --git a/CHANGELOG.md b/CHANGELOG.md index 6bd7822..7332413 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,8 +2,15 @@ We [keep a changelog](https://keepachangelog.com/en/1.0.0/). We aim to adhere to [semantic versioning](https://semver.org/spec/v2.0.0.html). +## Version 0.2.2 - Unreleased + +### Added + +* Support deriving ProcessNode IO/parameter groups from a scriptconfig schema via the new ``params`` class variable. + ## Version 0.2.1 - Unreleased + ### Changed * YAML paths in grid values no longer auto-expand unless explicitly behind an `__include__` key. See docs for details. diff --git a/docs/source/conf.py b/docs/source/conf.py index ad07e86..c899359 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -138,7 +138,7 @@ def visit_Assign(self, node): return visitor.version project = 'kwdagger' -copyright = '2025, Jon Crall' +copyright = '2026, Jon Crall' author = 'Jon Crall' modname = 'kwdagger' diff --git a/docs/source/manual/tutorials/scriptconfig_pipeline/README.rst b/docs/source/manual/tutorials/scriptconfig_pipeline/README.rst new file mode 100644 index 0000000..ec5c8de --- /dev/null +++ b/docs/source/manual/tutorials/scriptconfig_pipeline/README.rst @@ -0,0 +1,119 @@ +Scriptconfig Pipeline Tutorial +============================== + +This tutorial mirrors the two-stage pipeline example, but it uses +``scriptconfig`` schemas to declare input/output paths and parameter groups. +The ``ProcessNode.params`` class variable automatically derives +``in_paths``, ``out_paths``, ``algo_params``, and ``perf_params`` from the +schema so your pipeline stays in sync with the CLI definitions. + +Files in this tutorial +---------------------- + +* ``data/`` - two small JSONL datasets of movie and food reviews. +* ``example_user_module/cli`` - command line entry points for the prediction and + evaluation nodes (scriptconfig schemas live here). +* ``example_user_module/pipelines.py`` - pipeline wiring that uses + ``ProcessNode.params`` to derive node IO/params. +* ``run_pipeline.sh`` - copy/paste helper to schedule and aggregate. + +How scriptconfig drives ProcessNode definitions +----------------------------------------------- + +Each CLI class declares the node schema with tags: + +* ``in_path`` / ``in``: input paths +* ``out_path`` / ``out``: output templates (non-empty defaults are used) +* ``algo_param`` / ``algo``: algorithm parameters that affect outputs +* ``perf_param`` / ``perf``: execution-only parameters + +The ``primary`` tag on an ``out_path`` marks which output signals completion. +``ProcessNode`` uses these tags to populate the appropriate groups unless you +explicitly override them on the node class. + +Here is the schema for the prediction node: + +.. code:: python + + class KeywordSentimentPredictCLI(scfg.DataConfig): + src_fpath = scfg.Value(None, tags=['in_path']) + dst_fpath = scfg.Value('keyword_predictions.json', tags=['out_path', 'primary']) + dst_dpath = scfg.Value('.', tags=['out_path']) + + keyword = scfg.Value('great', tags=['algo_param']) + case_sensitive = scfg.Value(False, tags=['algo_param']) + workers = scfg.Value(0, tags=['perf_param']) + +The pipeline nodes simply point ``params`` at these schemas: + +.. code:: python + + class KeywordSentimentPredict(kwdagger.ProcessNode): + name = 'keyword_sentiment_predict' + executable = f'python {EXAMPLE_DPATH}/cli/keyword_sentiment_predict.py' + params = KeywordSentimentPredictCLI + + class SentimentEvaluate(kwdagger.ProcessNode): + name = 'sentiment_evaluate' + executable = f'python {EXAMPLE_DPATH}/cli/sentiment_evaluate.py' + params = SentimentEvaluateCLI + +Connecting the pipeline +----------------------- + +The wiring is the same as the base tutorial: prediction outputs feed evaluation +inputs, and the labeled dataset feeds both nodes. + +.. code:: python + + nodes = { + 'keyword_sentiment_predict': KeywordSentimentPredict(), + 'sentiment_evaluate': SentimentEvaluate(), + } + nodes['keyword_sentiment_predict'].outputs['dst_fpath'].connect( + nodes['sentiment_evaluate'].inputs['pred_fpath'] + ) + nodes['keyword_sentiment_predict'].inputs['src_fpath'].connect( + nodes['sentiment_evaluate'].inputs['true_fpath'] + ) + +Running the tutorial +-------------------- + +.. code:: bash + + # From this folder (modify to where your copy is) + cd ~/code/kwdagger/docs/source/manual/tutorials/scriptconfig_pipeline/ + + # Set the PYTHONPATH so kwdagger can see the custom module in this directory + export PYTHONPATH=. + + # Define where you want the results to be written to + EVAL_DPATH=$PWD/results + + kwdagger schedule \ + --params=" + pipeline: 'example_user_module.pipelines.my_sentiment_pipeline()' + matrix: + keyword_sentiment_predict.src_fpath: + - data/toy_reviews_movies.jsonl + - data/toy_reviews_food.jsonl + keyword_sentiment_predict.keyword: + - great + - boring + - love + sentiment_evaluate.workers: 0 + " \ + --root_dpath="${EVAL_DPATH}" \ + --tmux_workers=2 --backend=serial --skip_existing=1 \ + --run=1 + +Once jobs complete, aggregate with: + +.. code:: bash + + kwdagger aggregate \ + --params=" + pipeline: 'example_user_module.pipelines.my_sentiment_pipeline()' + root_dpath: ${EVAL_DPATH} + " diff --git a/docs/source/manual/tutorials/scriptconfig_pipeline/data/toy_reviews_food.jsonl b/docs/source/manual/tutorials/scriptconfig_pipeline/data/toy_reviews_food.jsonl new file mode 100644 index 0000000..b3ee746 --- /dev/null +++ b/docs/source/manual/tutorials/scriptconfig_pipeline/data/toy_reviews_food.jsonl @@ -0,0 +1,12 @@ +{"text": "The pizza was great and the service was friendly.", "label": "positive"} +{"text": "Soup was bland and arrived cold.", "label": "negative"} +{"text": "Great flavors but the wait was terrible.", "label": "negative"} +{"text": "I love the dessert menu!", "label": "positive"} +{"text": "Portions were small and the seating was cramped.", "label": "negative"} +{"text": "Great coffee and great ambiance.", "label": "positive"} +{"text": "Boring menu without vegetarian options.", "label": "negative"} +{"text": "I love the spicy noodles.", "label": "positive"} +{"text": "Service was boring but the food was great.", "label": "positive"} +{"text": "The salad was soggy and tasteless.", "label": "negative"} +{"text": "Great staff who love their customers.", "label": "positive"} +{"text": "Boring decor but I love the fresh bread.", "label": "positive"} diff --git a/docs/source/manual/tutorials/scriptconfig_pipeline/data/toy_reviews_movies.jsonl b/docs/source/manual/tutorials/scriptconfig_pipeline/data/toy_reviews_movies.jsonl new file mode 100644 index 0000000..f9027a6 --- /dev/null +++ b/docs/source/manual/tutorials/scriptconfig_pipeline/data/toy_reviews_movies.jsonl @@ -0,0 +1,17 @@ +{"text": "Great soundtrack and great pacing.", "label": "positive"} +{"text": "Boring plot with terrible acting.", "label": "negative"} +{"text": "Great visuals but boring story.", "label": "negative"} +{"text": "I love this cast and the great humor.", "label": "positive"} +{"text": "The movie was boring and far too long.", "label": "negative"} +{"text": "A great ending and a great start.", "label": "positive"} +{"text": "Lovely cinematography and I love the score.", "label": "positive"} +{"text": "The jokes were boring and fell flat.", "label": "negative"} +{"text": "Great characters kept me engaged.", "label": "positive"} +{"text": "Love the worldbuilding even if the pacing was slow.", "label": "positive"} +{"text": "Action scenes were boring and predictable.", "label": "negative"} +{"text": "Great sequel with heart.", "label": "positive"} +{"text": "I love how the mystery unfolded.", "label": "positive"} +{"text": "Boring dialogue ruined the tension.", "label": "negative"} +{"text": "Great acting but boring editing.", "label": "negative"} +{"text": "The film was great fun for the whole family.", "label": "positive"} +{"text": "I love every minute of this movie.", "label": "positive"} diff --git a/docs/source/manual/tutorials/scriptconfig_pipeline/example_user_module/__init__.py b/docs/source/manual/tutorials/scriptconfig_pipeline/example_user_module/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/docs/source/manual/tutorials/scriptconfig_pipeline/example_user_module/cli/keyword_sentiment_predict.py b/docs/source/manual/tutorials/scriptconfig_pipeline/example_user_module/cli/keyword_sentiment_predict.py new file mode 100644 index 0000000..97ed1d9 --- /dev/null +++ b/docs/source/manual/tutorials/scriptconfig_pipeline/example_user_module/cli/keyword_sentiment_predict.py @@ -0,0 +1,111 @@ +#!/usr/bin/env python3 +import json +import scriptconfig as scfg +import ubelt as ub +import kwutil +import rich +from rich.markup import escape + + +def _load_reviews(fpath): + records = [] + for line in ub.Path(fpath).read_text().splitlines(): + line = line.strip() + if not line: + continue + records.append(json.loads(line)) + return records + + +class KeywordSentimentPredictCLI(scfg.DataConfig): + """Minimal "model" that tags texts containing a keyword.""" + + src_fpath = scfg.Value( + None, + help='path to labeled jsonl review file', + tags=['in_path'], + ) + dst_fpath = scfg.Value( + 'keyword_predictions.json', + help='path to prediction file', + tags=['out_path', 'primary'], + ) + dst_dpath = scfg.Value( + '.', + help='path to output directory', + tags=['out_path'], + ) + + keyword = scfg.Value( + 'great', + help='word that marks a review as positive', + tags=['algo_param'], + ) + case_sensitive = scfg.Value( + False, + help='toggle case sensitivity', + tags=['algo_param'], + ) + workers = scfg.Value( + 0, + help='number of parallel workers (unused)', + tags=['perf_param'], + ) + + @classmethod + def main(cls, cmdline=1, **kwargs): + config = cls.cli(cmdline=cmdline, data=kwargs, strict=True) + rich.print('config = ' + escape(ub.urepr(config, nl=1))) + + data = { + 'info': [], + 'result': None, + } + + proc_context = kwutil.ProcessContext( + name='keyword_sentiment_predict', + type='process', + config=kwutil.Json.ensure_serializable(dict(config)), + track_emissions=False, + ) + proc_context.start() + + reviews = _load_reviews(config.src_fpath) + keyword = config.keyword if config.case_sensitive else config.keyword.lower() + + predictions = [] + for idx, record in enumerate(reviews): + haystack = record['text'] if config.case_sensitive else record['text'].lower() + predicted_label = 'positive' if keyword in haystack else 'negative' + predictions.append({ + 'id': idx, + 'text': record['text'], + 'predicted_label': predicted_label, + }) + + data['result'] = { + 'keyword': config.keyword, + 'case_sensitive': bool(config.case_sensitive), + 'predictions': predictions, + } + + obj = proc_context.stop() + data['info'].append(obj) + + dst_fpath = ub.Path(config.dst_fpath) + dst_fpath.parent.ensuredir() + dst_fpath.write_text(json.dumps(data, indent=2)) + print(f'Wrote to: dst_fpath={dst_fpath}') + + +__cli__ = KeywordSentimentPredictCLI + +if __name__ == '__main__': + __cli__.main() + + r""" + CommandLine: + python ~/code/kwdagger/docs/source/manual/tutorials/scriptconfig_pipeline/example_user_module/cli/keyword_sentiment_predict.py \ + --src_fpath ~/code/kwdagger/docs/source/manual/tutorials/scriptconfig_pipeline/data/toy_reviews_movies.jsonl \ + --dst_fpath ./keyword_predictions.json + """ diff --git a/docs/source/manual/tutorials/scriptconfig_pipeline/example_user_module/cli/sentiment_evaluate.py b/docs/source/manual/tutorials/scriptconfig_pipeline/example_user_module/cli/sentiment_evaluate.py new file mode 100644 index 0000000..099c1e7 --- /dev/null +++ b/docs/source/manual/tutorials/scriptconfig_pipeline/example_user_module/cli/sentiment_evaluate.py @@ -0,0 +1,141 @@ +#!/usr/bin/env python3 +import json +import scriptconfig as scfg +import ubelt as ub +import kwutil +import rich +from rich.markup import escape + + +def _load_reviews(fpath): + records = [] + for line in ub.Path(fpath).read_text().splitlines(): + line = line.strip() + if not line: + continue + records.append(json.loads(line)) + return records + + +def _safe_div(num, den): + return num / den if den else 0.0 + + +class SentimentEvaluateCLI(scfg.DataConfig): + """Evaluate predictions produced by :mod:`keyword_sentiment_predict`.""" + + pred_fpath = scfg.Value( + None, + help='path to predictions JSON', + tags=['in_path'], + ) + true_fpath = scfg.Value( + None, + help='path to labeled jsonl file', + tags=['in_path'], + ) + out_fpath = scfg.Value( + 'sentiment_metrics.json', + help='path to evaluation file', + tags=['out_path', 'primary'], + ) + workers = scfg.Value( + 0, + help='number of parallel workers (unused)', + tags=['perf_param'], + ) + + @classmethod + def main(cls, cmdline=1, **kwargs): + config = cls.cli(cmdline=cmdline, data=kwargs, strict=True) + rich.print('config = ' + escape(ub.urepr(config, nl=1))) + + data = { + 'info': [], + 'result': None, + } + + proc_context = kwutil.ProcessContext( + name='sentiment_evaluate', + type='process', + config=kwutil.Json.ensure_serializable(dict(config)), + track_emissions=False, + ) + proc_context.start() + + reviews = _load_reviews(config.true_fpath) + pred_data = json.loads(ub.Path(config.pred_fpath).read_text()) + predictions = pred_data['result']['predictions'] + + if len(predictions) != len(reviews): + raise AssertionError('Predictions and truths must have the same length') + + num_correct = 0 + confusion = { + 'tp': 0, + 'fp': 0, + 'tn': 0, + 'fn': 0, + } + detailed = [] + + for record, pred in zip(reviews, predictions): + true_label = record['label'] + pred_label = pred['predicted_label'] + correct = int(true_label == pred_label) + num_correct += correct + detailed.append({ + 'text': record['text'], + 'true_label': true_label, + 'predicted_label': pred_label, + }) + + if true_label == 'positive' and pred_label == 'positive': + confusion['tp'] += 1 + elif true_label == 'negative' and pred_label == 'positive': + confusion['fp'] += 1 + elif true_label == 'negative' and pred_label == 'negative': + confusion['tn'] += 1 + elif true_label == 'positive' and pred_label == 'negative': + confusion['fn'] += 1 + + accuracy = _safe_div(num_correct, len(reviews)) + precision = _safe_div(confusion['tp'], (confusion['tp'] + confusion['fp'])) + recall = _safe_div(confusion['tp'], (confusion['tp'] + confusion['fn'])) + + metrics = { + 'accuracy': accuracy, + 'precision_positive': precision, + 'recall_positive': recall, + 'num_examples': len(reviews), + 'keyword_used': pred_data['result']['keyword'], + } + + data['result'] = { + 'metrics': metrics, + 'confusion': confusion, + 'detailed': detailed, + } + + obj = proc_context.stop() + data['info'].append(obj) + + out_fpath = ub.Path(config.out_fpath) + out_fpath.parent.ensuredir() + out_fpath.write_text(json.dumps(data, indent=2)) + print(f'wrote to: out_fpath={out_fpath}') + + +__cli__ = SentimentEvaluateCLI + +if __name__ == '__main__': + __cli__.main() + r""" + CommandLine: + python ~/code/kwdagger/docs/source/manual/tutorials/scriptconfig_pipeline/example_user_module/cli/sentiment_evaluate.py \ + --true_fpath ~/code/kwdagger/docs/source/manual/tutorials/scriptconfig_pipeline/data/toy_reviews_movies.jsonl \ + --pred_fpath ./keyword_predictions.json \ + --out_fpath out.json + + python -m example_user_module.cli.keyword_sentiment_predict + """ diff --git a/docs/source/manual/tutorials/scriptconfig_pipeline/example_user_module/pipelines.py b/docs/source/manual/tutorials/scriptconfig_pipeline/example_user_module/pipelines.py new file mode 100644 index 0000000..c5ff0eb --- /dev/null +++ b/docs/source/manual/tutorials/scriptconfig_pipeline/example_user_module/pipelines.py @@ -0,0 +1,148 @@ +""" +Two-stage demo pipeline using scriptconfig schemas for node definitions. + +The first stage performs a tiny keyword-based "model" and the second stage +evaluates it. Each node derives its IO/param groups from a scriptconfig schema. +""" + +import kwdagger +import ubelt as ub + +from example_user_module.cli.keyword_sentiment_predict import ( + KeywordSentimentPredictCLI, +) +from example_user_module.cli.sentiment_evaluate import SentimentEvaluateCLI + +# Normally we want to invoke installed Python modules so we can abstract away +# hard coded paths, but for this example we will avoid that for simplicity. +try: + EXAMPLE_DPATH = ub.Path(__file__).parent +except NameError: + # for developer convenience + EXAMPLE_DPATH = ub.Path( + '~/code/kwdagger/docs/source/manual/tutorials/twostage_pipeline/example_user_module' + ).expanduser() + + +class KeywordSentimentPredict(kwdagger.ProcessNode): + """Run the lightweight keyword-based classifier.""" + + name = 'keyword_sentiment_predict' + executable = f'python {EXAMPLE_DPATH}/cli/keyword_sentiment_predict.py' + params = KeywordSentimentPredictCLI + + def load_result(self, node_dpath): + import json + from kwdagger.aggregate_loader import new_process_context_parser + from kwdagger.utils import util_dotdict + + output_fpath = node_dpath / self.out_paths[self.primary_out_key] + result = json.loads(output_fpath.read_text()) + proc_item = result['info'][-1] + nest_resolved = new_process_context_parser(proc_item) + # TODO: it would be useful if the aggregator could be given some stats + # about non-evaluation runs, but currently this does not work because + # it does not conform to the output needed by load_results. + # nest_resolved['result'] = { + # 'keyword': result['result']['keyword'], + # 'case_sensitive': result['result']['case_sensitive'], + # 'num_predictions': len(result['result']['predictions']), + # } + flat_resolved = util_dotdict.DotDict.from_nested(nest_resolved) + flat_resolved = flat_resolved.insert_prefix(self.name, index=1) + return flat_resolved + + +class SentimentEvaluate(kwdagger.ProcessNode): + """Score predictions against labels and expose metrics for aggregation.""" + + name = 'sentiment_evaluate' + executable = f'python {EXAMPLE_DPATH}/cli/sentiment_evaluate.py' + params = SentimentEvaluateCLI + + def load_result(self, node_dpath): + """ + Return metrics and configuration in a flattened dictionary. + + The returned dictionary should have a key structure that at the very + least has keys that look like: + "metrics.{node_name}.{metric_name}" + + Other keys like: + + "context.{node_name}.{key_name}" + "resolved_params.{node_name}.{key_name}" + "resources.{node_name}.{key_name}" + "machine.{node_name}.{key_name}" + + Can be filled in by using the ``new_process_context_parser`` helper and + kwutil.ProcessContext conventions shown in the CLI examples. + """ + import json + from kwdagger.aggregate_loader import new_process_context_parser + from kwdagger.utils import util_dotdict + + output_fpath = node_dpath / self.out_paths[self.primary_out_key] + result = json.loads(output_fpath.read_text()) + proc_item = result['info'][-1] + nest_resolved = new_process_context_parser(proc_item) + nest_resolved['metrics'] = result['result']['metrics'] + flat_resolved = util_dotdict.DotDict.from_nested(nest_resolved) + flat_resolved = flat_resolved.insert_prefix(self.name, index=1) + return flat_resolved + + def default_metrics(self): + metric_infos = [ + { + 'metric': 'accuracy', + 'objective': 'maximize', + 'primary': True, + 'display': True, + }, + { + 'metric': 'precision_positive', + 'objective': 'maximize', + 'primary': False, + 'display': True, + }, + { + 'metric': 'recall_positive', + 'objective': 'maximize', + 'primary': False, + 'display': True, + }, + ] + return metric_infos + + @property + def default_vantage_points(self): + vantage_points = [ + { + 'metric1': 'metrics.sentiment_evaluate.accuracy', + 'metric2': 'metrics.sentiment_evaluate.precision_positive', + }, + ] + return vantage_points + + +def my_sentiment_pipeline(): + """Create the two-stage keyword review pipeline.""" + + nodes = { + 'keyword_sentiment_predict': KeywordSentimentPredict(), + 'sentiment_evaluate': SentimentEvaluate(), + } + + # Connect your nodes together: predictions feed into evaluation. + nodes['keyword_sentiment_predict'].outputs['dst_fpath'].connect( + nodes['sentiment_evaluate'].inputs['pred_fpath'] + ) + + # Reuse the same labeled data for both prediction input and ground truth. + nodes['keyword_sentiment_predict'].inputs['src_fpath'].connect( + nodes['sentiment_evaluate'].inputs['true_fpath'] + ) + + dag = kwdagger.Pipeline(nodes) + dag.build_nx_graphs() + return dag diff --git a/docs/source/manual/tutorials/scriptconfig_pipeline/run_pipeline.sh b/docs/source/manual/tutorials/scriptconfig_pipeline/run_pipeline.sh new file mode 100644 index 0000000..53d2c84 --- /dev/null +++ b/docs/source/manual/tutorials/scriptconfig_pipeline/run_pipeline.sh @@ -0,0 +1,53 @@ +#!/usr/bin/env bash + +# Copy/paste friendly: set SCRIPT_DIR to this folder (edit if you run elsewhere). +if [[ -n "${BASH_SOURCE[0]}" ]]; then + SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +else + SCRIPT_DIR="$HOME/code/kwdagger/docs/source/manual/tutorials/scriptconfig_pipeline" +fi +cd "$SCRIPT_DIR" + +# Set PYTHONPATH to ensure Python can see the example directory. +export PYTHONPATH=. + +EVAL_DPATH=${EVAL_DPATH:-$PWD/results} +echo "EVAL_DPATH = $EVAL_DPATH" +kwdagger schedule \ + --params=" + pipeline: 'example_user_module.pipelines.my_sentiment_pipeline()' + matrix: + keyword_sentiment_predict.src_fpath: + - data/toy_reviews_movies.jsonl + - data/toy_reviews_food.jsonl + keyword_sentiment_predict.keyword: + - great + - boring + - love + sentiment_evaluate.workers: 0 + " \ + --root_dpath="${EVAL_DPATH}" \ + --tmux_workers=2 \ + --backend=serial --skip_existing=1 \ + --run=1 + +kwdagger aggregate \ + --pipeline='example_user_module.pipelines.my_sentiment_pipeline()' \ + --target " + - $EVAL_DPATH + " \ + --output_dpath="$EVAL_DPATH/full_aggregate" \ + --resource_report=0 \ + --io_workers=0 \ + --eval_nodes=" + - sentiment_evaluate + " \ + --stdout_report=" + top_k: 10 + print_models: True + concise: 1 + " \ + --plot_params=" + enabled: 0 + " \ + --cache_resolved_results=False diff --git a/kwdagger/__init__.py b/kwdagger/__init__.py index ace2c46..f8ac78d 100644 --- a/kwdagger/__init__.py +++ b/kwdagger/__init__.py @@ -1,4 +1,4 @@ -__version__ = '0.2.1' +__version__ = '0.2.2' __autogen__ = """ mkinit ~/code/kwdagger/kwdagger/__init__.py -w diff --git a/kwdagger/pipeline.py b/kwdagger/pipeline.py index edd42da..dbb4543 100644 --- a/kwdagger/pipeline.py +++ b/kwdagger/pipeline.py @@ -21,6 +21,7 @@ import networkx as nx import os import ubelt as ub +import warnings from functools import cached_property from typing import Union, Dict, Set, List, Any, Optional from kwdagger.utils import util_dotdict @@ -1042,6 +1043,10 @@ def main(cls, cmdline=1, **kwargs): primary_out_key : str = None + # Optional scriptconfig schema for deriving path/param groups. This is the + # preferred mechanism; _from_scriptconfig remains for legacy compatibility. + params = None + def __init__(self, *, # TODO: allow positional arguments after we find a good order name=None, @@ -1106,6 +1111,43 @@ def __init__(self, self._configured_cache = {} + if self.params is not None: + derived = self._derive_groups_from_params_spec(self.params) + ( + derived_in_paths, + derived_out_paths, + derived_algo_params, + derived_perf_params, + derived_primary_out_key, + ) = derived + if self.in_paths is None: + self.in_paths = set() + if self.out_paths is None: + self.out_paths = {} + if self.algo_params is None: + self.algo_params = {} + if self.perf_params is None: + self.perf_params = {} + if isinstance(self.in_paths, dict): + for key in derived_in_paths: + self.in_paths.setdefault(key, None) + else: + self.in_paths = set(self.in_paths) | set(derived_in_paths) + for key, value in derived_out_paths.items(): + self.out_paths.setdefault(key, value) + if isinstance(self.algo_params, dict): + for key, value in derived_algo_params.items(): + self.algo_params.setdefault(key, value) + else: + self.algo_params = set(self.algo_params or set()) | set(derived_algo_params) + if isinstance(self.perf_params, dict): + for key, value in derived_perf_params.items(): + self.perf_params.setdefault(key, value) + else: + self.perf_params = set(self.perf_params or set()) | set(derived_perf_params) + if self.primary_out_key is None: + self.primary_out_key = derived_primary_out_key + if self.primary_out_key is None: if len(self.out_paths) == 1: self.primary_out_key = ub.peek(self.out_paths) @@ -1154,6 +1196,7 @@ def _from_scriptconfig(cls, config_cls, **kwargs): EXPERIMENTAL Wrap a scriptconfig object to define a baseline process node. + This is a legacy helper; prefer defining ``params`` on the node class. Ignore: >>> import scriptconfig as scfg @@ -1207,7 +1250,62 @@ def _from_scriptconfig(cls, config_cls, **kwargs): >>> dag.configure(config) >>> dag.submit_jobs(queue) >>> queue.print_commands(with_status=0, exclude_tags='boilerplate') - """ + + Example: + >>> import warnings + >>> import scriptconfig as scfg + >>> class DemoCfg(scfg.DataConfig): + >>> src = scfg.Value('ignored.txt', tags=['in_path']) + >>> dst = scfg.Value('schema.txt', tags=['out_path', 'primary']) + >>> foo = 1 + >>> # + >>> class DemoNode(ProcessNode): + >>> name = 'demo' + >>> params = DemoCfg + >>> out_paths = {'dst': 'explicit.txt'} + >>> # + >>> with warnings.catch_warnings(record=True) as warns: + >>> warnings.simplefilter('always') + >>> node = DemoNode() + >>> found = any('src' in str(w.message) for w in warns) + >>> found + True + >>> node.algo_params['foo'] == 1 + True + >>> node.out_paths['dst'] == 'explicit.txt' + True + >>> derived = ProcessNode._derive_groups_from_params_spec(DemoCfg) + >>> legacy = ProcessNode._from_scriptconfig(DemoCfg, name='demo') + >>> legacy.in_paths == derived[0] + True + >>> legacy.out_paths == derived[1] + True + """ + derived = cls._derive_groups_from_params_spec(config_cls) + path_kwargs = { + 'in_paths': derived[0], + 'out_paths': derived[1], + 'algo_params': derived[2], + 'perf_params': derived[3], + } + if derived[4] is not None: + path_kwargs['primary_out_key'] = derived[4] + + name = kwargs.get('name', None) + if name is None: + name = getattr(config_cls, '__command__', name) + if name is None: + name = config_cls.__name__ + node_kwargs = {} + node_kwargs['name'] = name + node_kwargs['executable'] = '' + node_kwargs.update(path_kwargs) + node_kwargs.update(kwargs) + self = cls(**node_kwargs) + return self + + @staticmethod + def _derive_groups_from_params_spec(params_spec): tag_to_group = { 'in_path': 'in_paths', 'in': 'in_paths', @@ -1217,7 +1315,6 @@ def _from_scriptconfig(cls, config_cls, **kwargs): 'algo': 'algo_params', 'perf_param': 'perf_params', 'perf': 'perf_params', - } path_kwargs = { 'in_paths': set(), @@ -1225,33 +1322,73 @@ def _from_scriptconfig(cls, config_cls, **kwargs): 'perf_params': {}, 'algo_params': {}, } - for key, value in config_cls.__default__.items(): - if hasattr(value, 'tags'): - tags = set(value.tags) - have_tags = tag_to_group.keys() & tags - assert len(have_tags) <= 1 - have_groups = {tag_to_group[t] for t in have_tags} - if 'primary' in tags and 'out_paths' in have_groups: - path_kwargs['primary_out_key'] = key - - for group_key in have_groups: - if group_key == 'in_paths': - path_kwargs[group_key].add(key) - else: - path_kwargs[group_key][key] = value.value + primary_out_key = None + + if params_spec is None: + return ( + path_kwargs['in_paths'], + path_kwargs['out_paths'], + path_kwargs['algo_params'], + path_kwargs['perf_params'], + primary_out_key, + ) + + instance_values = None + if isinstance(params_spec, dict): + items = params_spec.items() + elif hasattr(params_spec, '__default__'): + config_cls = params_spec if isinstance(params_spec, type) else params_spec.__class__ + defaults = config_cls.__default__ + if not isinstance(params_spec, type): + instance_values = {} + if hasattr(params_spec, 'items'): + try: + instance_values = dict(params_spec.items()) + except Exception: + instance_values = {} + if not instance_values: + instance_values = { + key: value for key, value in getattr(params_spec, '__dict__', {}).items() + if not key.startswith('_') + } + items = defaults.items() + else: + raise TypeError(f'Unsupported params_spec type: {type(params_spec)}') - name = kwargs.get('name', None) - if name is None: - name = getattr(config_cls, '__command__', name) - if name is None: - name = config_cls.__name__ - node_kwargs = {} - node_kwargs['name'] = name - node_kwargs['executable'] = '' - node_kwargs.update(path_kwargs) - node_kwargs.update(kwargs) - self = cls(**node_kwargs) - return self + for key, value in items: + if instance_values is not None and key in instance_values: + default_value = instance_values[key] + else: + default_value = value.value if hasattr(value, 'value') else value + tags = set(getattr(value, 'tags', []) or []) + have_tags = tag_to_group.keys() & tags + if len(have_tags) > 1: + raise ValueError( + f'Parameter "{key}" has conflicting tags: {sorted(have_tags)}') + have_groups = {tag_to_group[t] for t in have_tags} + if 'primary' in tags and 'out_paths' in have_groups: + primary_out_key = key + if not have_groups: + have_groups = {'algo_params'} + for group_key in have_groups: + if group_key == 'in_paths': + if default_value is not None: + warnings.warn( + f'Ignoring default for in_path "{key}" defined in params.') + path_kwargs[group_key].add(key) + elif group_key == 'out_paths': + if isinstance(default_value, str) and default_value: + path_kwargs[group_key][key] = default_value + else: + path_kwargs[group_key][key] = default_value + + return ( + path_kwargs['in_paths'], + path_kwargs['out_paths'], + path_kwargs['algo_params'], + path_kwargs['perf_params'], + primary_out_key, + ) def configure(self, config=None, cache=True, enabled=True): """ diff --git a/publish.sh b/publish.sh old mode 100644 new mode 100755 diff --git a/requirements/runtime.txt b/requirements/runtime.txt index e493c5e..780b448 100644 --- a/requirements/runtime.txt +++ b/requirements/runtime.txt @@ -17,7 +17,7 @@ kwarray>=0.6.19 kwutil>=0.3.8 networkx>=3.6 ; python_version < '4.0' and python_version >= '3.14' # Python 3.14+ -networkx>=3.2 ; python_version < '3.13' +networkx>=3.2 ; python_version < '3.14' # xdev availpkg rich diff --git a/tests/test_processnode_params.py b/tests/test_processnode_params.py new file mode 100644 index 0000000..4af04e5 --- /dev/null +++ b/tests/test_processnode_params.py @@ -0,0 +1,47 @@ +import pytest +import scriptconfig as scfg + +from kwdagger.pipeline import ProcessNode + + +class DemoCfg(scfg.DataConfig): + src = scfg.Value('ignored.txt', tags=['in_path']) + dst = scfg.Value('schema.txt', tags=['out_path', 'primary']) + extra = scfg.Value('', tags=['out_path']) + foo = 1 + workers = scfg.Value(2, tags=['perf_param']) + + +class DemoNode(ProcessNode): + name = 'demo' + params = DemoCfg + out_paths = {'dst': 'explicit.txt'} + + +def test_params_schema_derivation(): + with pytest.warns(UserWarning, match='in_path "src"'): + node = DemoNode() + + assert node.in_paths['src'] is None + assert node.out_paths['dst'] == 'explicit.txt' + assert node.algo_params['foo'] == 1 + assert node.perf_params['workers'] == 2 + assert node.primary_out_key == 'dst' + + derived = ProcessNode._derive_groups_from_params_spec(DemoCfg) + with pytest.warns(UserWarning, match='in_path "src"'): + legacy = ProcessNode._from_scriptconfig(DemoCfg, name='demo') + + assert legacy.in_paths == derived[0] + assert legacy.out_paths == derived[1] + assert legacy.algo_params == derived[2] + assert legacy.perf_params == derived[3] + assert legacy.primary_out_key == derived[4] + + +def test_params_conflicting_tags(): + class BadCfg(scfg.DataConfig): + foo = scfg.Value(1, tags=['in', 'out']) + + with pytest.raises(ValueError, match='conflicting tags'): + ProcessNode._derive_groups_from_params_spec(BadCfg)