Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prefect 214 #11

Merged
merged 4 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions examples/launch_ptycho.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Note that you must have prefect in your environment. Recommend going up one directory and
# running `pip install -r requirements.txt`
#
import asyncio
import os
import prefect


def get_prefect_client(httpx_settings=None):
# This prefect client uses prefect settings to set the the api url and api key. This
# can be set in a number of ways. This script ASSUMES that you have set environment variables
# PREFECT_API_URL = https://servername/api
# PREFECT_API_KEY = the api key given to you
# httpx_settings allows you to affect the http client that the prefect client uses
return prefect.get_client(httpx_settings=httpx_settings)


def get_prefect_client_2(prefect_api_url, prefect_api_key, httpx_settings=None):
# Same prefect client, but if you know the url and api_key
# httpx_settings allows you to affect the http client that the prefect client uses
return prefect.PrefectClient(
prefect_api_url,
api_key=prefect_api_key,
httpx_settings=httpx_settings)


async def prefect_start_flow(prefect_client, deployment_name, file_path):

deployment = await prefect_client.read_deployment_by_name(deployment_name)
flow_run = await prefect_client.create_flow_run_from_deployment(
deployment.id,
name=os.path.basename(file_path),
parameters={"file_path": file_path},
)
return flow_run


client = get_prefect_client()
asyncio.run(prefect_start_flow(client, "transfer_auto_recon/transfer_auto_recon", "/a/file/path"))
8 changes: 4 additions & 4 deletions orchestration/prefect.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import logging

from prefect import get_run_logger, task
from prefect.client import get_client
from prefect.orion.utilities.schemas import DateTimeTZ
from prefect.orion.schemas.states import Scheduled
from prefect import get_client

from prefect.states import Scheduled


logger = logging.getLogger("orchestration.prefect")
Expand All @@ -23,7 +23,7 @@ async def schedule(
assert (
deployment
), f"No deployment found in config for deploymnent_name {deploymnent_name}"
date_time_tz = DateTimeTZ.now() + duration_from_now
date_time_tz = datetime.datetime.now() + duration_from_now
await client.create_flow_run_from_deployment(
deployment.id,
state=Scheduled(scheduled_time=date_time_tz),
Expand Down
3 changes: 1 addition & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ httpx>=0.22.0
numpy
pillow
python-dotenv
prefect==2.7.9
pydantic<2.0.0
prefect==2.14.3
pyscicat @ git+https://github.com/dylanmcreynolds/pyscicat@fix_auth
pyyaml
authlib
Loading