Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions .github/workflows/macos-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ name: CI # TODO rename the file in a standalone PR, its not macos-test, but ci

on:
# Trigger the workflow manually
workflow_dispatch: ~
workflow_dispatch:

push:
branches:
- 'main'
- 'develop'
- 'main'
- 'develop'

pull_request: ~
pull_request:

jobs:
ci:
Expand All @@ -21,12 +21,12 @@ jobs:
runs-on: "${{ fromJSON('{\"linux-x86\": [\"self-hosted\", \"Linux\", \"platform-builder-Rocky-8.6\"], \"macos-ARM64\": [\"self-hosted\", \"macOS\", \"ARM64\"]}')[matrix.arch_type] }}"
timeout-minutes: 20
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v4
# NOTE if setup uv used, make sure to configure uv cache correctly, to not fill home. But should be already installed and configured
# - uses: astral-sh/setup-uv@v7
- uses: extractions/setup-just@v3
- run: |
echo "uv version is $(uv --version)"
uv sync --python "${{ matrix.python_version }}"
just fmt
just val
- uses: extractions/setup-just@v3
- run: |
echo "uv version is $(uv --version)"
uv sync --python "${{ matrix.python_version }}"
just fmt
just val
2 changes: 1 addition & 1 deletion .github/workflows/test-pypi.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: cd

on:
workflow_dispatch: ~
workflow_dispatch:

jobs:
deploy:
Expand Down
30 changes: 20 additions & 10 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,38 @@ repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v6.0.0
hooks:
- id: check-yaml # Check YAML files for syntax errors only
args: [--unsafe, --allow-multiple-documents]
# - id: no-commit-to-branch # NOTE prevents commit to main/master, but since we run prek on that branch it makes no sense
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-json
- id: check-yaml
- id: check-toml
- id: check-added-large-files
exclude: |
(?x)(
.*uv.lock|
.*pylock.toml
)
- id: check-merge-conflict # Check for files that contain merge conflict
- id: check-merge-conflict
- id: debug-statements
- id: mixed-line-ending
- id: no-commit-to-branch
args: [--branch, main]
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.12.2
rev: v0.15.4
hooks:
- id: ruff # better black/flake/isort
files: ^src/
- id: ruff-check
exclude: '(dev/.*|.*_)\.py$'
args:
- --select
- I # isorting
- --fix
- --exit-non-zero-on-fix
- id: ruff-format
files: ^backend/
- repo: https://github.com/macisamuele/language-formatters-pre-commit-hooks
rev: v2.16.0
hooks:
- id: pretty-format-yaml
args: [--autofix, --preserve-quotes]
- id: pretty-format-toml
args: [--autofix]
ci:
autoupdate_schedule: monthly
autoupdate_commit_msg: "chore(deps): pre-commit.ci autoupdate"
2 changes: 1 addition & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* there are two python modules -- cascade which is a low level execution engine, and earthkit.workflows which is a higher level abstraction on top of it. Each has its own subdirectory in tests
* always use type annotations, it is enforced
* when working with a package with bad typing coverage like sqlalchemy, use ty:ignore comment
* when ty is not powerful enough, use ty:ignore
* when ty is not powerful enough, use ty:ignore
* use typing.cast when the code logic is implicitly erasing the type information
* prioritize using pydantic.BaseModel or dataclasses.dataclass object for capturing contracts and interfaces.
* ideally keep them plain, stateless, frozen, without functions -- we end up serializing those objects often over to other python processes or different languages
Expand Down
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -198,4 +198,4 @@
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
limitations under the License.
10 changes: 5 additions & 5 deletions benchmarks/scenario-shm_throughput.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
"""Benchmark shm in isolation (without cascade) by just invoking a lot of reads and writes in a limited memory setting"""
"""Benchmark shm in isolation (without cascade) by just invoking a lot of reads and writes
in a limited memory setting.
"""

import multiprocessing as mp
import random
Expand Down Expand Up @@ -27,9 +29,7 @@ def track(cls, label: str):

@classmethod
def report(cls):
rpt = (
lambda t1, t2: f"from {t1[0]} to {t2[0]} took {(t2[1]-t1[1]) / 1e6:.3f} ms"
)
rpt = lambda t1, t2: f"from {t1[0]} to {t2[0]} took {(t2[1] - t1[1]) / 1e6:.3f} ms"
print(rpt(cls.steps[0], cls.steps[-1]))
for i in range(len(cls.steps) - 1):
print(rpt(cls.steps[i], cls.steps[i + 1]))
Expand Down Expand Up @@ -84,7 +84,7 @@ def scenario1():


def scenario2():
"""Start 1024MB shm, allocate 128 * 8MB (so that all fits), then 128 ** 2 gets to simulate high load"""
"""Start 1024MB shm, allocate 128 * 8MB (so that all fits), then 128 ** 2 gets to simulate high load."""
start_shm(1024)
c.ensure()

Expand Down
4 changes: 1 addition & 3 deletions benchmarks/scheduling/sat_experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,7 @@ def graph2sat(g: SimplifiedGraph, workers: list[str]):
for worker in workers:
coschLoc = model.new_int_var(0, 1, f"{tasks[i]}~{tasks[j]}@{worker}")
loc.append(coschLoc)
model.add_multiplication_equality(
coschLoc, [w2t[(tasks[i], worker)], w2t[(tasks[j], worker)]]
)
model.add_multiplication_equality(coschLoc, [w2t[(tasks[i], worker)], w2t[(tasks[j], worker)]])
# coscheduled variable constraint
model.add(sum(loc) == cosch)
# respect worker parallelism
Expand Down
78 changes: 44 additions & 34 deletions docs/benchmarking/run1/analysis.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"import numpy as np"
"import numpy as np\n",
"import pandas as pd"
]
},
{
Expand All @@ -45,8 +45,8 @@
"outputs": [],
"source": [
"def fixWorker(df):\n",
" rows = df['host'] != 'controller'\n",
" df.loc[rows, 'worker'] = df.loc[rows, 'host'] + ':' + df.loc[rows, 'worker'] "
" rows = df[\"host\"] != \"controller\"\n",
" df.loc[rows, \"worker\"] = df.loc[rows, \"host\"] + \":\" + df.loc[rows, \"worker\"]"
]
},
{
Expand All @@ -56,13 +56,13 @@
"metadata": {},
"outputs": [],
"source": [
"f1 = pd.read_json('lA_F_1.jsonl', lines=True)\n",
"f4 = pd.read_json('lA_F_4.jsonl', lines=True)\n",
"m1_4 = pd.read_json('lA_M_1_4.jsonl', lines=True)\n",
"f1 = pd.read_json(\"lA_F_1.jsonl\", lines=True)\n",
"f4 = pd.read_json(\"lA_F_4.jsonl\", lines=True)\n",
"m1_4 = pd.read_json(\"lA_M_1_4.jsonl\", lines=True)\n",
"fixWorker(m1_4)\n",
"m4_1 = pd.read_json('lA_M_4_1.jsonl', lines=True)\n",
"m4_1 = pd.read_json(\"lA_M_4_1.jsonl\", lines=True)\n",
"fixWorker(m4_1)\n",
"m2_2 = pd.read_json('lA_M_2_2.jsonl', lines=True)\n",
"m2_2 = pd.read_json(\"lA_M_2_2.jsonl\", lines=True)\n",
"fixWorker(m2_2)"
]
},
Expand All @@ -75,47 +75,51 @@
"source": [
"def fixMode(df):\n",
" rows = ~df.dataset.isna()\n",
" proj = df[rows & ~df['mode'].isna()].set_index(['dataset', 'worker'])['mode']\n",
" lookup = proj[~proj.index.duplicated(keep='last')]\n",
" return df.set_index(['dataset', 'worker']).drop(columns='mode').join(lookup).reset_index()\n",
" proj = df[rows & ~df[\"mode\"].isna()].set_index([\"dataset\", \"worker\"])[\"mode\"]\n",
" lookup = proj[~proj.index.duplicated(keep=\"last\")]\n",
" return df.set_index([\"dataset\", \"worker\"]).drop(columns=\"mode\").join(lookup).reset_index()\n",
"\n",
"\n",
"def ensureColumns(df, columns):\n",
" for column in columns:\n",
" if not column in df.columns:\n",
" if column not in df.columns:\n",
" df = df.assign(**{column: np.nan})\n",
" return df\n",
" \n",
" \n",
"\n",
"\n",
"def transmitDurations(df):\n",
" df = fixMode(df)\n",
" datasets = df[~df.dataset.isna()].drop(columns='task')\n",
" durations = datasets.pivot(index=['dataset', 'worker', 'mode'], columns=['action'], values=['at'])\n",
" durations.columns = [name[1][len('transmit'):] for name in durations.columns]\n",
" datasets = df[~df.dataset.isna()].drop(columns=\"task\")\n",
" durations = datasets.pivot(index=[\"dataset\", \"worker\", \"mode\"], columns=[\"action\"], values=[\"at\"])\n",
" durations.columns = [name[1][len(\"transmit\") :] for name in durations.columns]\n",
" durations = durations.reset_index()\n",
" localFix = durations['mode'] == 'local'\n",
" durations.loc[localFix, 'Started'] = durations.loc[localFix, 'Finished']\n",
" durations.loc[localFix, 'Loaded'] = durations.loc[localFix, 'Finished']\n",
" localFix = durations[\"mode\"] == \"local\"\n",
" durations.loc[localFix, \"Started\"] = durations.loc[localFix, \"Finished\"]\n",
" durations.loc[localFix, \"Loaded\"] = durations.loc[localFix, \"Finished\"]\n",
" durations = durations.assign(total=durations.Finished - durations.Planned)\n",
" durations = durations.assign(commDelay=durations.Started-durations.Planned)\n",
" durations = durations.assign(loadDelay=durations.Loaded-durations.Started)\n",
" durations = durations.assign(transmitDelay=durations.Finished-durations.Loaded)\n",
" durations = durations.assign(commDelay=durations.Started - durations.Planned)\n",
" durations = durations.assign(loadDelay=durations.Loaded - durations.Started)\n",
" durations = durations.assign(transmitDelay=durations.Finished - durations.Loaded)\n",
" return durations\n",
"\n",
"\n",
"def taskDurations(df):\n",
" tasks = df[~df.task.isna()]\n",
" durations = tasks.pivot(index=['task', 'worker'], columns=['action'], values=['at'])\n",
" durations.columns = [name[1][len('task'):] for name in durations.columns]\n",
" durations = tasks.pivot(index=[\"task\", \"worker\"], columns=[\"action\"], values=[\"at\"])\n",
" durations.columns = [name[1][len(\"task\") :] for name in durations.columns]\n",
" durations = durations.reset_index()\n",
" durations = durations.assign(total=durations.Finished - durations.Planned)\n",
" durations = durations.assign(commDelay = durations.Enqueued - durations.Planned)\n",
" durations = durations.assign(queueDelay = durations.Started - durations.Enqueued)\n",
" durations = durations.assign(runtimes = durations.Finished - durations.Started)\n",
" durations = durations.assign(onWorker = durations.Finished - durations.Enqueued)\n",
" durations = durations.assign(commDelay=durations.Enqueued - durations.Planned)\n",
" durations = durations.assign(queueDelay=durations.Started - durations.Enqueued)\n",
" durations = durations.assign(runtimes=durations.Finished - durations.Started)\n",
" durations = durations.assign(onWorker=durations.Finished - durations.Enqueued)\n",
" return durations\n",
"\n",
"\n",
"def fmn(n):\n",
" return f\"{n:.3e}\"\n",
"\n",
"\n",
"def analyzeTransmits(df):\n",
" durations = transmitDurations(df)\n",
" print(f\"total transmit duration: {fmn(durations.total.sum())}\")\n",
Expand All @@ -136,7 +140,8 @@
" print(f\"mean transmit delay: {fmn(remotes.transmitDelay.mean())}\")\n",
" print(f\"max transmit delay: {fmn(remotes.transmitDelay.max())}\")\n",
" print(\" *** \")\n",
" \n",
"\n",
"\n",
"def analyzeTasks(df):\n",
" durations = taskDurations(df)\n",
" print(f\"total task duration: {fmn(durations.total.sum())}\")\n",
Expand Down Expand Up @@ -489,9 +494,14 @@
}
],
"source": [
"taskCompareF1F4 = task_f1.set_index(['task'])[['total']].rename(columns={'total': 'total1'}).join(task_f4.set_index(['task'])[['total']].rename(columns={'total': 'total4'}))\n",
"taskCompareF1F4 = taskCompareF1F4.assign(dif = taskCompareF1F4.total4 - taskCompareF1F4.total1)\n",
"taskCompareF1F4 = taskCompareF1F4.assign(rel = taskCompareF1F4.dif / taskCompareF1F4.total4)\n",
"taskCompareF1F4 = (\n",
" task_f1\n",
" .set_index([\"task\"])[[\"total\"]]\n",
" .rename(columns={\"total\": \"total1\"})\n",
" .join(task_f4.set_index([\"task\"])[[\"total\"]].rename(columns={\"total\": \"total4\"}))\n",
")\n",
"taskCompareF1F4 = taskCompareF1F4.assign(dif=taskCompareF1F4.total4 - taskCompareF1F4.total1)\n",
"taskCompareF1F4 = taskCompareF1F4.assign(rel=taskCompareF1F4.dif / taskCompareF1F4.total4)\n",
"taskCompareF1F4.sort_values(by=\"rel\")"
]
},
Expand Down
Loading