Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions src/murfey/client/tui/screens.py
Original file line number Diff line number Diff line change
Expand Up @@ -685,15 +685,23 @@ def on_button_pressed(self, event: Button.Pressed):
self.app.push_screen("launcher")

if machine_data.get("upstream_data_directories"):
upstream_downloads = capture_get(
upstream_downloads: dict[str, dict[str, Path]] = capture_get(
base_url=str(self.app._environment.url.geturl()),
router_name="session_control.correlative_router",
function_name="find_upstream_visits",
token=token,
session_id=self.app._environment.murfey_session,
).json()
# Pass flattened dict for backwards compatibility
self.app.install_screen(
UpstreamDownloads(upstream_downloads), "upstream-downloads"
UpstreamDownloads(
{
visit_name: visit_dir
for _, upstream_visits in upstream_downloads.items()
for visit_name, visit_dir in upstream_visits.items()
}
),
"upstream-downloads",
)
self.app.push_screen("upstream-downloads")

Expand Down Expand Up @@ -759,15 +767,23 @@ def on_button_pressed(self, event: Button.Pressed):
self.app.push_screen("directory-select")

if machine_data.get("upstream_data_directories"):
upstream_downloads = capture_get(
upstream_downloads: dict[str, dict[str, Path]] = capture_get(
base_url=str(self.app._environment.url.geturl()),
router_name="session_control.correlative_router",
function_name="find_upstream_visits",
token=token,
session_id=self.app._environment.murfey_session,
).json()
# Pass a flattened dict for backwards compatibility
self.app.install_screen(
UpstreamDownloads(upstream_downloads), "upstream-downloads"
UpstreamDownloads(
{
visit_name: visit_dir
for _, upstream_visits in upstream_downloads.items()
for visit_name, visit_dir in upstream_visits.items()
}
),
"upstream-downloads",
)
self.app.push_screen("upstream-downloads")

Expand Down Expand Up @@ -817,7 +833,7 @@ def on_button_pressed(self, event: Button.Pressed):
stream_response = capture_get(
base_url=str(self.app._environment.url.geturl()),
router_name="session_control.correlative_router",
function_name="get_tiff",
function_name="get_tiff_file",
token=token,
visit_name=event.button.label,
session_id=self.app._environment.murfey_session,
Expand Down
84 changes: 81 additions & 3 deletions src/murfey/instrument_server/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,13 +469,91 @@ def upload_gain_reference(
return {"success": True}


class UpstreamTiffInfo(BaseModel):
class UpstreamFileDownloadInfo(BaseModel):
download_dir: Path
upstream_instrument: str
upstream_visit_path: Path


@router.post("/visits/{visit_name}/sessions/{session_id}/upstream_file_data_request")
def gather_upstream_files(
visit_name: str,
session_id: MurfeySessionID,
upstream_file_download: UpstreamFileDownloadInfo,
):
"""
Instrument server endpoint that will query the backend for files in the chosen
visit directory
"""
# Check for forbidden characters
if any(c in visit_name for c in ("/", "\\", ":", ";")):
logger.error(
f"Forbidden characters are present in visit name {sanitise(visit_name)}"
)
return {
"succss": False,
"detail": "Forbidden characters present in visit name",
}

# Sanitise inputs
download_dir = secure_path(upstream_file_download.download_dir)
upstream_instrument = sanitise(upstream_file_download.upstream_instrument)
upstream_visit_path = secure_path(upstream_file_download.upstream_visit_path)

# Get the list of files to download
murfey_url = urlparse(_get_murfey_url(), allow_fragments=False)
sanitised_visit_name = sanitise_nonpath(visit_name)
url_path = url_path_for(
"session_control.correlative_router",
"gather_upstream_files",
session_id=session_id,
visit_name=sanitised_visit_name,
)
upstream_files: list[str] = requests.get(
f"{murfey_url.geturl()}{url_path}",
headers={"Authorization": f"Bearer {tokens[session_id]}"},
json={
"upstream_instrument": upstream_instrument,
"upstream_visit_path": str(upstream_visit_path),
},
).json()

# Make the download directory and download gathered files
download_dir.mkdir(exist_ok=True)
for upstream_file in upstream_files:
url_path = url_path_for(
"session_control.correlative_router",
"get_upstream_file",
session_id=session_id,
visit_name=sanitised_visit_name,
upstream_file_path=upstream_file,
)
file_data = requests.get(
f"{murfey_url.geturl()}{url_path}",
headers={"Authorization": f"Bearer {tokens[session_id]}"},
stream=True,
)
upstream_file_relative_path = secure_path(
Path(upstream_file).relative_to(upstream_visit_path)
)
save_file = download_dir / upstream_file_relative_path
save_file.parent.mkdir(parents=True, exist_ok=True)
with open(save_file, "wb") as f:
for chunk in file_data.iter_content(chunk_size=32 * 1024**2):
f.write(chunk)
logger.info(f"Saved file to {str(save_file)!r}")
return {"success": True}


class UpstreamTiffDownloadInfo(BaseModel):
download_dir: Path


@router.post("/visits/{visit_name}/sessions/{session_id}/upstream_tiff_data_request")
def gather_upstream_tiffs(
visit_name: str, session_id: MurfeySessionID, upstream_tiff_info: UpstreamTiffInfo
visit_name: str,
session_id: MurfeySessionID,
upstream_tiff_info: UpstreamTiffDownloadInfo,
):
sanitised_visit_name = sanitise_nonpath(visit_name)
assert not any(c in visit_name for c in ("/", "\\", ":", ";"))
Expand All @@ -490,7 +568,7 @@ def gather_upstream_tiffs(
)
for tiff_path in upstream_tiff_paths:
tiff_data = requests.get(
f"{murfey_url.geturl()}{url_path_for('session_control.correlative_router', 'get_tiff', session_id=session_id, visit_name=sanitised_visit_name, tiff_path=tiff_path)}",
f"{murfey_url.geturl()}{url_path_for('session_control.correlative_router', 'get_tiff_file', session_id=session_id, visit_name=sanitised_visit_name, tiff_path=tiff_path)}",
stream=True,
headers={"Authorization": f"Bearer {tokens[session_id]}"},
)
Expand Down
60 changes: 59 additions & 1 deletion src/murfey/server/api/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from murfey.util.api import url_path_for
from murfey.util.config import get_machine_config
from murfey.util.db import RsyncInstance, Session, SessionProcessingParameters
from murfey.util.models import File, MultigridWatcherSetup
from murfey.util.models import File, MultigridWatcherSetup, UpstreamFileRequestInfo

# Create APIRouter class object
router = APIRouter(
Expand Down Expand Up @@ -396,6 +396,64 @@ async def request_upstream_tiff_data_download(
return data


@router.post("/visits/{visit_name}/sessions/{session_id}/upstream_file_data_request")
async def request_upstream_file_data_download(
visit_name: str,
session_id: MurfeySessionID,
upstream_file_request: UpstreamFileRequestInfo,
db=murfey_db,
):
"""
Forwards a request to the instrument server to trigger a file download request.
"""
# Load the current instrument's machine config
instrument_name = (
db.exec(select(Session).where(Session.id == session_id)).one().instrument_name
)
machine_config = get_machine_config(instrument_name=instrument_name)[
instrument_name
]

# Log and return errors if download directory or URL weren't specified
if not machine_config.upstream_data_download_directory:
log.error("No download directory was configured for this instrument")
return {
"success": False,
"detail": "No download directory configured",
}
if not machine_config.instrument_server_url:
log.error("Couldn't find instrument server URL to post request to")
return {
"success": False,
"detail": "No instrument server URL",
}

# Forward the download request
download_dir = str(
machine_config.upstream_data_download_directory / secure_filename(visit_name)
)
async with aiohttp.ClientSession() as clientsession:
url_path = url_path_for(
"api.router",
"gather_upstream_files",
visit_name=secure_filename(visit_name),
session_id=session_id,
)
async with clientsession.post(
f"{machine_config.instrument_server_url}{url_path}",
headers={
"Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}"
},
json={
"download_dir": download_dir,
"upstream_instrument": upstream_file_request.upstream_instrument,
"upstream_visit_path": str(upstream_file_request.upstream_visit_path),
},
) as resp:
data = await resp.json()
return data


class RsyncerSource(BaseModel):
source: str

Expand Down
95 changes: 45 additions & 50 deletions src/murfey/server/api/session_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from pydantic import BaseModel
from sqlalchemy import func
from sqlmodel import select
from werkzeug.utils import secure_filename

import murfey.server.prometheus as prom
from murfey.server import _transport_object
Expand All @@ -18,18 +17,22 @@
validate_instrument_token,
)
from murfey.server.api.shared import (
find_upstream_visits as _find_upstream_visits,
gather_upstream_files as _gather_upstream_files,
gather_upstream_tiffs as _gather_upstream_tiffs,
get_foil_hole as _get_foil_hole,
get_foil_holes_from_grid_square as _get_foil_holes_from_grid_square,
get_grid_squares as _get_grid_squares,
get_grid_squares_from_dcg as _get_grid_squares_from_dcg,
get_machine_config_for_instrument,
get_upstream_tiff_dirs,
get_tiff_file as _get_tiff_file,
get_upstream_file as _get_upstream_file,
remove_session_by_id,
)
from murfey.server.ispyb import DB as ispyb_db, get_all_ongoing_visits
from murfey.server.murfey_db import murfey_db
from murfey.util import sanitise
from murfey.util.config import MachineConfig, get_machine_config
from murfey.util.config import MachineConfig
from murfey.util.db import (
AutoProcProgram,
ClientEnvironment,
Expand All @@ -49,6 +52,7 @@
GridSquareParameters,
RsyncerInfo,
SearchMapParameters,
UpstreamFileRequestInfo,
Visit,
)
from murfey.workflows.spa.atlas import atlas_jpg_from_mrc
Expand Down Expand Up @@ -418,62 +422,53 @@ def register_batch_position(

@correlative_router.get("/sessions/{session_id}/upstream_visits")
async def find_upstream_visits(session_id: MurfeySessionID, db=murfey_db):
murfey_session = db.exec(select(Session).where(Session.id == session_id)).one()
visit_name = murfey_session.visit
instrument_name = murfey_session.instrument_name
machine_config = get_machine_config(instrument_name=instrument_name)[
instrument_name
]
upstream_visits = {}
# Iterates through provided upstream directories
for p in machine_config.upstream_data_directories:
# Looks for visit name in file path
for v in Path(p).glob(f"{visit_name.split('-')[0]}-*"):
upstream_visits[v.name] = v / machine_config.processed_directory_name
return upstream_visits
return _find_upstream_visits(session_id=session_id, db=db)


@correlative_router.get(
"/visits/{visit_name}/sessions/{session_id}/upstream_file_paths"
)
async def gather_upstream_files(
visit_name: str,
session_id: MurfeySessionID,
upstream_file_request: UpstreamFileRequestInfo,
db=murfey_db,
):
return _gather_upstream_files(
session_id=session_id,
upstream_instrument=upstream_file_request.upstream_instrument,
upstream_visit_path=upstream_file_request.upstream_visit_path,
db=db,
)


@correlative_router.get(
"/visits/{visit_name}/sessions/{session_id}/upstream_file/{upstream_file_path:path}"
)
async def get_upstream_file(
visit_name: str,
session_id: MurfeySessionID,
upstream_file_path: str,
db=murfey_db,
):
upstream_file = _get_upstream_file(upstream_file_path)
return (
FileResponse(path=upstream_file) if upstream_file is not None else upstream_file
)


@correlative_router.get(
"/visits/{visit_name}/sessions/{session_id}/upstream_tiff_paths"
)
async def gather_upstream_tiffs(visit_name: str, session_id: int, db=murfey_db):
"""
Looks for TIFF files associated with the current session in the permitted storage
servers, and returns their relative file paths as a list.
"""
instrument_name = (
db.exec(select(Session).where(Session.id == session_id)).one().instrument_name
)
upstream_tiff_paths = []
tiff_dirs = get_upstream_tiff_dirs(visit_name, instrument_name)
if not tiff_dirs:
return None
for tiff_dir in tiff_dirs:
for f in tiff_dir.glob("**/*.tiff"):
upstream_tiff_paths.append(str(f.relative_to(tiff_dir)))
for f in tiff_dir.glob("**/*.tif"):
upstream_tiff_paths.append(str(f.relative_to(tiff_dir)))
return upstream_tiff_paths
return _gather_upstream_tiffs(visit_name=visit_name, session_id=session_id, db=db)


@correlative_router.get(
"/visits/{visit_name}/sessions/{session_id}/upstream_tiff/{tiff_path:path}"
)
async def get_tiff(visit_name: str, session_id: int, tiff_path: str, db=murfey_db):
instrument_name = (
db.exec(select(Session).where(Session.id == session_id)).one().instrument_name
async def get_tiff_file(visit_name: str, session_id: int, tiff_path: str, db=murfey_db):
tiff_file = _get_tiff_file(
visit_name=visit_name, session_id=session_id, tiff_path=tiff_path, db=db
)
tiff_dirs = get_upstream_tiff_dirs(visit_name, instrument_name)
if not tiff_dirs:
return None

tiff_path = "/".join(secure_filename(p) for p in tiff_path.split("/"))
for tiff_dir in tiff_dirs:
test_path = tiff_dir / tiff_path
if test_path.is_file():
break
else:
logger.warning(f"TIFF {tiff_path} not found")
return None

return FileResponse(path=test_path)
return FileResponse(path=tiff_file) if tiff_file is not None else tiff_file
Loading