Skip to content

Commit

Permalink
Merge pull request #6 from LCOGT/using-fits2img
Browse files Browse the repository at this point in the history
Fits to Jpgs, Operation output caching, serving presigned urls to frontend
  • Loading branch information
LTDakin authored Apr 9, 2024
2 parents c7f8da5 + 4b6640c commit 883f3ea
Show file tree
Hide file tree
Showing 6 changed files with 648 additions and 72 deletions.
70 changes: 70 additions & 0 deletions datalab/datalab_session/data_operations/data_operation.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
from abc import ABC, abstractmethod
import hashlib
import json
import os
import tempfile
from django.core.cache import cache

from fits2image.conversions import fits_to_jpg
from astropy.io import fits
import numpy as np

from datalab.datalab_session.tasks import execute_data_operation
from datalab.datalab_session.util import add_file_to_bucket, get_archive_from_basename

CACHE_DURATION = 60 * 60 * 24 * 30 # cache for 30 days

Expand Down Expand Up @@ -88,3 +95,66 @@ def set_output(self, output_data: dict):

def get_output(self) -> dict:
return cache.get(f'operation_{self.cache_key}_output')

# percent lets you alocate a fraction of the operation that this takes up in time
# cur_percent is the current completion of the operation
def create_and_store_fits(self, hdu_list: fits.HDUList, percent=None, cur_percent=None) -> list:
if not type(hdu_list) == list:
hdu_list = [hdu_list]

output = []
total_files = len(hdu_list)

# Create temp file paths for storing the products
fits_path = tempfile.NamedTemporaryFile(suffix=f'{self.cache_key}.fits').name
large_jpg_path = tempfile.NamedTemporaryFile(suffix=f'{self.cache_key}-large.jpg').name
thumbnail_jpg_path = tempfile.NamedTemporaryFile(suffix=f'{self.cache_key}-small.jpg').name

for index, hdu in enumerate(hdu_list, start=1):
height, width = hdu[1].shape

hdu.writeto(fits_path)
fits_to_jpg(fits_path, large_jpg_path, width=width, height=height)
fits_to_jpg(fits_path, thumbnail_jpg_path)

# Save Fits and Thumbnails in S3 Buckets
fits_url = add_file_to_bucket(f'{self.cache_key}/{self.cache_key}-{index}.fits', fits_path)
large_jpg_url = add_file_to_bucket(f'{self.cache_key}/{self.cache_key}-{index}-large.jpg', large_jpg_path)
thumbnail_jpg_url = add_file_to_bucket(f'{self.cache_key}/{self.cache_key}-{index}-small.jpg', thumbnail_jpg_path)

output.append({'large_url': large_jpg_url, 'thumbnail_url': thumbnail_jpg_url})

if percent is not None and cur_percent is not None:
self.set_percent_completion(cur_percent + index/total_files * percent)

return output

def get_fits_npdata(self, input_files: list[dict], percent=None, cur_percent=None) -> list[np.memmap]:
total_files = len(input_files)
memmap_paths = []

# get the fits urls, download their file, extract the image data, and store in a list
with tempfile.TemporaryDirectory() as temp_dir:
for index, file_info in enumerate(input_files, start=1):
basename = file_info.get('basename', 'No basename found')
archive_record = get_archive_from_basename(basename)

try:
fits_url = archive_record[0].get('url', 'No URL found')
except IndexError:
continue

with fits.open(fits_url) as hdu_list:
data = hdu_list['SCI'].data
memmap_path = os.path.join(temp_dir, f'memmap_{index}.dat')
memmap_array = np.memmap(memmap_path, dtype=data.dtype, mode='w+', shape=data.shape)
memmap_array[:] = data[:]
memmap_paths.append(memmap_path)

if percent is not None and cur_percent is not None:
self.set_percent_completion(cur_percent + index/total_files * percent)

return [
np.memmap(path, dtype=np.float32, mode='r', shape=memmap_array.shape)
for path in memmap_paths
]
68 changes: 12 additions & 56 deletions datalab/datalab_session/data_operations/median.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
from io import BytesIO
import logging
import os
import tempfile

import numpy as np
from astropy.io import fits

from datalab.datalab_session.data_operations.data_operation import BaseDataOperation
from datalab.datalab_session.util import store_fits_output, get_archive_from_basename
from datalab.datalab_session.util import create_fits, stack_arrays

log = logging.getLogger()
log.setLevel(logging.INFO)
Expand Down Expand Up @@ -42,63 +38,23 @@ def wizard_description():
}
}

def operate(self):
input_files = self.input_data.get('input_files', [])
file_count = len(input_files)
def operate(self, input_files, cache_key):

if file_count == 0:
return { 'output_files': [] }
log.info(f'Executing median operation on {len(input_files)} files')

log.info(f'Executing median operation on {file_count} files')
image_data_list = self.get_fits_npdata(input_files, percent=40.0, cur_percent=0.0)

with tempfile.TemporaryDirectory() as temp_dir:
memmap_paths = []
stacked_data = stack_arrays(image_data_list)

for index, file_info in enumerate(input_files):
basename = file_info.get('basename', 'No basename found')
archive_record = get_archive_from_basename(basename)
# using the numpy library's median method
median = np.median(stacked_data, axis=2)

try:
fits_url = archive_record[0].get('url', 'No URL found')
except IndexError:
continue
hdu_list = create_fits(cache_key, median)

with fits.open(fits_url, use_fsspec=True) as hdu_list:
data = hdu_list['SCI'].data
memmap_path = os.path.join(temp_dir, f'memmap_{index}.dat')
memmap_array = np.memmap(memmap_path, dtype=data.dtype, mode='w+', shape=data.shape)
memmap_array[:] = data[:]
memmap_paths.append(memmap_path)
output = self.create_and_store_fits(hdu_list, percent=60.0, cur_percent=40.0)

self.set_percent_completion(index / file_count)
output = {'output_files': output}

image_data_list = [
np.memmap(path, dtype=np.float32, mode='r', shape=memmap_array.shape)
for path in memmap_paths
]

# Crop fits image data to be the same shape then stack
min_shape = min(arr.shape for arr in image_data_list)
cropped_data_list = [arr[:min_shape[0], :min_shape[1]] for arr in image_data_list]
stacked_data = np.stack(cropped_data_list, axis=2)

# Calculate a Median along the z axis
median = np.median(stacked_data, axis=2)

cache_key = self.generate_cache_key()
header = fits.Header([('KEY', cache_key)])
primary_hdu = fits.PrimaryHDU(header=header)
image_hdu = fits.ImageHDU(median)
hdu_list = fits.HDUList([primary_hdu, image_hdu])

fits_buffer = BytesIO()
hdu_list.writeto(fits_buffer)
fits_buffer.seek(0)

# Write the HDU List to the output FITS file in the bucket
response = store_fits_output(cache_key, fits_buffer)

# TODO: No output yet, need to build a thumbnail service
output = {'output_files': []}
self.set_percent_completion(file_count / file_count)
log.info(f'Median operation output: {output}')
self.set_percent_completion(1)
self.set_output(output)
11 changes: 10 additions & 1 deletion datalab/datalab_session/tasks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import logging

import dramatiq

from datalab.datalab_session.data_operations.utils import available_operations
from datalab.datalab_session.util import get_presigned_url, key_exists

log = logging.getLogger()
log.setLevel(logging.INFO)

#TODO: Perhaps define a pipeline that can take the output of one data operation and upload to a s3 bucket, indicate success, etc...

Expand All @@ -10,4 +16,7 @@ def execute_data_operation(data_operation_name: str, input_data: dict):
if operation_class is None:
raise NotImplementedError("Operation not implemented!")
else:
operation_class(input_data).operate()
operation = operation_class(input_data)
cache_key = operation.generate_cache_key()

operation.operate(input_data.get('input_files', []), cache_key)
97 changes: 87 additions & 10 deletions datalab/datalab_session/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,87 @@
import logging

import boto3
from astropy.io import fits
import numpy as np

from django.conf import settings

log = logging.getLogger()
log.setLevel(logging.INFO)

def store_fits_output(item_key: str, fits_buffer: object) -> object:
def add_file_to_bucket(item_key: str, path: object) -> str:
"""
Stores a fits into the operation bucket in S3
Keyword Arguements:
item_key -- name under which to store the fits file
fits_buffer -- the fits file to add to the bucket
Args:
item_key -- name under which to store the fits file
fits_buffer -- the fits file in a BytesIO buffer to add to the bucket
Returns:
A presigned url for the object just added to the bucket
"""
log.info(f'Adding {item_key} to {settings.DATALAB_OPERATION_BUCKET}')

s3 = boto3.resource('s3')
response = s3.Bucket(settings.DATALAB_OPERATION_BUCKET).put_object(Key = item_key, Body = fits_buffer.getvalue())
return response
s3 = boto3.client('s3')
response = s3.upload_file(
path,
settings.DATALAB_OPERATION_BUCKET,
item_key
)

return get_presigned_url(item_key)

def get_presigned_url(key: str) -> str:
"""
Gets a presigned url from the operation bucket using the key
Args:
item_key -- name to look up in the bucket
Returns:
A presigned url for the object or None
"""
s3 = boto3.client('s3')

try:
url = s3.generate_presigned_url(
ClientMethod='get_object',
Params={
'Bucket': settings.DATALAB_OPERATION_BUCKET,
'Key': key
},
ExpiresIn = 60 * 60 * 24 * 30 # URL will be valid for 30 days
)
except:
log.error(f'File {key} not found in bucket')
return None

return url

def key_exists(key: str) -> bool:
"""
Checks if a given string exists as part of an object key in an S3 bucket.
Args:
bucket_name (str): The name of the S3 bucket.
prefix (str): The string to look for in the object keys.
Returns:
bool: True if at least one object key contains the given prefix, False otherwise.
"""
s3 = boto3.client('s3')
response = s3.list_objects_v2(Bucket=settings.DATALAB_OPERATION_BUCKET, Prefix=key, MaxKeys=1)
return 'Contents' in response

def get_archive_from_basename(basename: str) -> dict:
"""
Queries and returns an archive file from the Archive
Looks for the key as a prefix in the operations s3 bucket
Keyword Arguements:
basename -- name to query
Args:
basename -- name to query
Returns:
dict of archive fits urls
"""
query_params = {'basename_exact': basename }

Expand All @@ -41,3 +96,25 @@ def get_archive_from_basename(basename: str) -> dict:
raise FileNotFoundError

return results

def create_fits(key: str, image_arr: np.ndarray) -> fits.HDUList:

header = fits.Header([('KEY', key)])
primary_hdu = fits.PrimaryHDU(header=header)
image_hdu = fits.ImageHDU(image_arr)

hdu_list = fits.HDUList([primary_hdu, image_hdu])

return hdu_list

def stack_arrays(array_list: list):
"""
Takes a list of numpy arrays, crops them to an equal shape, and stacks them to be a 3d numpy array
"""
min_shape = min(arr.shape for arr in array_list)
cropped_data_list = [arr[:min_shape[0], :min_shape[1]] for arr in array_list]

stacked = np.stack(cropped_data_list, axis=2)

return stacked
Loading

0 comments on commit 883f3ea

Please sign in to comment.