Skip to content

Commit

Permalink
modified Scan class
Browse files Browse the repository at this point in the history
  • Loading branch information
bingli621 committed Oct 14, 2024
1 parent 9c206d7 commit 4e8570c
Show file tree
Hide file tree
Showing 16 changed files with 153 additions and 121 deletions.
Binary file modified .DS_Store
Binary file not shown.
126 changes: 76 additions & 50 deletions src/tavi/data/scan.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
# -*- coding: utf-8 -*-
from dataclasses import dataclass
from typing import Literal, Optional
from typing import Literal, Optional, Union

import matplotlib.pyplot as plt
import numpy as np

from tavi.data.nxentry import NexusEntry
from tavi.data.plotter import Plot1D
from tavi.sample.xtal import Xtal
from tavi.utilities import spice_to_mantid


@dataclass
Expand Down Expand Up @@ -59,7 +60,9 @@ class Scan(object):
Manage a single measued scan
Attributes:
data: dictionary contains lists of scan data
name (str): scan name
_nexus_dict (NexusEntry)
data (dict): dictionary contains lists of scan data
Methods:
load_scan
Expand All @@ -71,17 +74,8 @@ class Scan(object):
def __init__(self, name: str, nexus_dict: NexusEntry) -> None:

self.name: str = name
self.nexus_dict: NexusEntry = nexus_dict
self._nexus_dict: NexusEntry = nexus_dict
self.data: dict = self.get_data()
# always using spice convention
self.SPICE_CONVENTION = True

@classmethod
def from_tavi(cls, tavi_data: dict, scan_num: int, exp_id: Optional[str] = None):
if exp_id is None:
exp_id = next(iter(tavi_data))
scan_name = f"scan{scan_num:04}"
return cls(scan_name, tavi_data[exp_id].get(scan_name))

@classmethod
def from_spice(
Expand Down Expand Up @@ -111,77 +105,85 @@ def from_nexus(cls, path_to_nexus: str, scan_num: Optional[int] = None):
def scan_info(self):
scan_info = ScanInfo(
scan_num=int(self.name[-4:]),
scan_title=self.nexus_dict.get("title"),
def_y=self.nexus_dict.get("data", ATTRS=True)["signal"],
def_x=self.nexus_dict.get("data", ATTRS=True)["axes"],
start_time=self.nexus_dict.get("start_time"),
end_time=self.nexus_dict.get("end_time"),
scan_title=self._nexus_dict.get("title"),
def_y=self._nexus_dict.get("data", ATTRS=True)["signal"],
def_x=self._nexus_dict.get("data", ATTRS=True)["axes"],
start_time=self._nexus_dict.get("start_time"),
end_time=self._nexus_dict.get("end_time"),
preset_type="normal",
preset_channel=self.nexus_dict.get("monitor/mode"),
preset_value=self.nexus_dict.get("monitor/preset"),
preset_channel=self._nexus_dict.get("monitor/mode"),
preset_value=self._nexus_dict.get("monitor/preset"),
)
return scan_info

@property
def sample_ub_info(self):
sample_type = self.nexus_dict.get("sample/type")
ub_matrix = self.nexus_dict.get("sample/orientation_matrix")
lattice_constants = self.nexus_dict.get("sample/unit_cell")
sample_type = self._nexus_dict.get("sample/type")
ub_matrix = self._nexus_dict.get("sample/orientation_matrix")
lattice_constants = self._nexus_dict.get("sample/unit_cell")
if sample_type == "crystal" and (ub_matrix is not None):
xtal = Xtal(lattice_constants)
(u, v) = xtal.spice_ub_matrix_to_uv(ub_matrix.reshape(3, 3))
(u, v) = xtal.ub_matrix_to_uv(spice_to_mantid(ub_matrix.reshape(3, 3)))
else:
u = None
v = None

sample_ub_info = SampleUBInfo(
sample_name=self.nexus_dict.get("sample/name"),
sample_name=self._nexus_dict.get("sample/name"),
lattice_constants=lattice_constants,
ub_matrix=ub_matrix,
type=sample_type,
u=u,
v=v,
# ub_mode: int = 0 # mode for UB determination in SPICE
# angle_mode: int = 0 # mode for goni angle calculation in SPICE
plane_normal=self.nexus_dict.get("sample/plane_normal"),
plane_normal=self._nexus_dict.get("sample/plane_normal"),
# in_plane_ref: Optional[np.ndarray] = None
ubconf=None, # path to UB configration file)
ubconf=None, # path to UB configration file
)

return sample_ub_info

@property
def instrument_info(self):
instru_info = InstrumentInfo(
monochromator=self.nexus_dict.get("monochromator/type"),
analyzer=self.nexus_dict.get("analyser/type"),
monochromator=self._nexus_dict.get("monochromator/type"),
analyzer=self._nexus_dict.get("analyser/type"),
sense=(
self.nexus_dict.get("monochromator/sense")
+ self.nexus_dict.get("sample/sense")
+ self.nexus_dict.get("analyser/sense")
self._nexus_dict.get("monochromator/sense")
+ self._nexus_dict.get("sample/sense")
+ self._nexus_dict.get("analyser/sense")
),
collimation=self.nexus_dict.get("divergence_x"),
collimation=self._nexus_dict.get("divergence_x"),
)
return instru_info

def get_data(self):
def get_data(self) -> dict:
"""Get scan data as a dictionary"""
data_dict = {}
names = self.nexus_dict.get_dataset_names()
names = self._nexus_dict.get_dataset_names()
for name in names:
data_dict.update({name: self.nexus_dict.get(name)})
data_dict.update({name: self._nexus_dict.get(name)})
return data_dict

def _rebin_tol(
self,
x_raw: np.ndarray,
y_raw: np.ndarray,
y_str: str,
rebin_step: float,
rebin_params: tuple,
norm_channel: Literal["time", "monitor", "mcu", None],
norm_val: float,
):
"""Rebin with tolerance"""
x_grid = np.arange(np.min(x_raw) + rebin_step / 2, np.max(x_raw) + rebin_step / 2, rebin_step)

rebin_min, rebin_max, rebin_step = rebin_params
if rebin_min is None:
rebin_min = np.min(x_raw)
if rebin_max is None:
rebin_max = np.max(x_raw)

x_grid = np.arange(rebin_min + rebin_step / 2, rebin_max + rebin_step / 2, rebin_step)
x = np.zeros_like(x_grid)
y = np.zeros_like(x_grid)
counts = np.zeros_like(x_grid)
Expand Down Expand Up @@ -225,16 +227,19 @@ def _rebin_grid(
x_raw: np.ndarray,
y_raw: np.ndarray,
y_str: str,
rebin_step: float,
rebin_params: tuple,
norm_channel: Literal["time", "monitor", "mcu", None],
norm_val: float,
):
"""Rebin with a regular grid"""
x = np.arange(
np.min(x_raw) + rebin_step / 2,
np.max(x_raw) + rebin_step / 2,
rebin_step,
)

rebin_min, rebin_max, rebin_step = rebin_params
if rebin_min is None:
rebin_min = np.min(x_raw)
if rebin_max is None:
rebin_max = np.max(x_raw)

x = np.arange(rebin_min + rebin_step / 2, rebin_max + rebin_step / 2, rebin_step)
y = np.zeros_like(x)
cts = np.zeros_like(x)
yerr = None
Expand Down Expand Up @@ -271,10 +276,19 @@ def generate_curve(
norm_channel: Literal["time", "monitor", "mcu", None] = None,
norm_val: float = 1.0,
rebin_type: Literal["tol", "grid", None] = None,
rebin_step: float = 0.0,
rebin_params: Union[float, tuple] = 0.0,
) -> Plot1D:
"""Generate a curve from a single scan to plot,
with the options to normalize the y-axis and rebin x-axis.
"""Generate a curve from a single scan to plot, with the options
to normalize the y-axis and rebin x-axis.
Args:
x_str (str): x-axis variable
y_str (str): y-axis variable
norm_channel (str | None): choose from "time", "monitor", "mcu"
norm_val (float): value to normalized to
rebin_type (str | None): "tol" or "grid"
rebin_params (float | tuple(flot, float, float)): take as step size if a numer is given,
take as (min, max, step) if a tuple of size 3 is given
"""

if x_str is None:
Expand Down Expand Up @@ -306,14 +320,26 @@ def generate_curve(

return plot1d

if not rebin_step > 0:
raise ValueError("Rebin step needs to be greater than zero.")
# validate rebin params
if isinstance(rebin_params, tuple):
if len(rebin_params) != 3:
raise ValueError("Rebin parameters should have the form (min, max, step)")
rebin_min, rebin_max, rebin_step = rebin_params
if (rebin_min >= rebin_max) or (rebin_step < 0):
raise ValueError(f"Nonsensical rebin parameters {rebin_params}")

elif isinstance(rebin_params, float | int):
if rebin_params < 0:
raise ValueError("Rebin step needs to be greater than zero.")
rebin_params = (None, float(rebin_params), None)
else:
raise ValueError(f"Unrecogonized rebin parameters {rebin_params}")

match rebin_type:
case "tol": # x weighted by normalization channel
x, y, yerr = self._rebin_tol(x_raw, y_raw, y_str, rebin_step, norm_channel, norm_val)
x, y, yerr = self._rebin_tol(x_raw, y_raw, y_str, rebin_params, norm_channel, norm_val)
case "grid":
x, y, yerr = self._rebin_grid(x_raw, y_raw, y_str, rebin_step, norm_channel, norm_val)
x, y, yerr = self._rebin_grid(x_raw, y_raw, y_str, rebin_params, norm_channel, norm_val)
case _:
raise ValueError('Unrecogonized rebin type. Needs to be "tol" or "grid".')

Expand Down
45 changes: 33 additions & 12 deletions src/tavi/data/tavi.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import h5py

from tavi.data.nxentry import NexusEntry
from tavi.data.scan_group import ScanGroup
from tavi.data.scan import Scan


class TAVI(object):
Expand Down Expand Up @@ -36,12 +36,13 @@ def __init__(self, tavi_file_path: Optional[str] = None) -> None:

@staticmethod
def load_data(scan_dict: dict):
"""Load data under coresponding exp_id"""
data_dict: dict = {}
for scan_name, nxentry in scan_dict.items():
exp_id = nxentry["attrs"]["dataset_name"]
if not data_dict.get(exp_id): # first entry
data_dict.update({exp_id: {scan_name: nxentry}})
else:
else: # exp_id existing
data_dict[exp_id].update({scan_name: nxentry})
return data_dict

Expand Down Expand Up @@ -133,14 +134,34 @@ def save(self, file_path: Optional[str] = None):
except OSError:
print(f"Cannot create tavi file at {self.file_path}")

def generate_scan_group(
def get_scan(
self,
signals=None,
backgrounds=None,
signal_axes=(None, None, None),
background_axes=(None, None, None),
):
"""Generate a scan group."""
sg = ScanGroup(signals, backgrounds, signal_axes, background_axes)

return sg
scan_num: int,
exp_id: Optional[str] = None,
) -> Scan:
"""Get the scan at location /data/exp_id/scanXXXX, return a Scan instance
Arguments:
scan_num (int): scan number
exp_id (str | None): in the format of IPTSXXXXX_INSTRU_expXXXX, needed when
more than one experiment is loaded as data
Return:
Scan: an instance of Scan class
"""
if exp_id is None:
exp_id = next(iter(self.data))
dataset = self.data[exp_id]
scan_name = f"scan{scan_num:04}"
return Scan(scan_name, dataset[scan_name])

# def generate_scan_group(
# self,
# signals=None,
# backgrounds=None,
# signal_axes=(None, None, None),
# background_axes=(None, None, None),
# ):
# """Generate a scan group."""
# sg = ScanGroup(signals, backgrounds, signal_axes, background_axes)

# return sg
24 changes: 7 additions & 17 deletions src/tavi/instrument/tas.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from tavi.instrument.tas_base import TASBase
from tavi.sample.xtal import Xtal
from tavi.utilities import MotorAngles, Peak, UBConf, en2q, get_angle_from_triangle
from tavi.utilities import MotorAngles, Peak, UBConf, en2q, get_angle_from_triangle, mantid_to_spice, spice_to_mantid


class TAS(TASBase):
Expand All @@ -22,16 +22,6 @@ def __init__(self, SPICE_CONVENTION: bool = True):
super().__init__()
self.SPICE_CONVENTION = SPICE_CONVENTION # use coordination system defined in SPICE

@staticmethod
def spice_to_mantid(vec):
"""suffle the order"""
return np.array([vec[0], vec[2], -vec[1]])

@staticmethod
def mantid_to_spice(vec):
"""suffle the order"""
return np.array([vec[0], -vec[2], vec[1]])

@staticmethod
def q_lab(
two_theta_deg: float,
Expand Down Expand Up @@ -162,9 +152,9 @@ def _find_u_from_two_peaks(
ub_mat = np.matmul(u_mat, b_mat)

if self.SPICE_CONVENTION:
plane_normal = TAS.mantid_to_spice(plane_normal)
in_plane_ref = TAS.mantid_to_spice(in_plane_ref)
ub_mat = TAS.mantid_to_spice(ub_mat)
plane_normal = mantid_to_spice(plane_normal)
in_plane_ref = mantid_to_spice(in_plane_ref)
ub_mat = mantid_to_spice(ub_mat)

return UBConf(
peaks,
Expand Down Expand Up @@ -217,9 +207,9 @@ def _t_mat_minimal_tilt(self, hkl: np.ndarray):
ub_mat = self.sample.ub_mat

if self.SPICE_CONVENTION: # suffle the order following SPICE convention
plane_normal = TAS.spice_to_mantid(plane_normal)
in_plane_ref = TAS.spice_to_mantid(in_plane_ref)
ub_mat = TAS.spice_to_mantid(ub_mat)
plane_normal = spice_to_mantid(plane_normal)
in_plane_ref = spice_to_mantid(in_plane_ref)
ub_mat = spice_to_mantid(ub_mat)

q = ub_mat @ hkl
t1 = q / np.linalg.norm(q)
Expand Down
13 changes: 0 additions & 13 deletions src/tavi/sample/xtal.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,6 @@ def ub_matrix_to_uv(ub_matrix) -> tuple[np.ndarray, np.ndarray]:
v = inv_ub_matrix @ np.array([1, 0, 0])
return (u, v)

@staticmethod
def spice_ub_matrix_to_uv(spice_ub_matrix) -> tuple[np.ndarray, np.ndarray]:
"""Calculate u and v vector from UB matrix
Note:
u vector, in reciprocal lattice unit, along beam
v vector, in reciprocal lattice unit,in the horizaontal scattering plane"""
ub_matrix = np.array([spice_ub_matrix[0], spice_ub_matrix[2], -spice_ub_matrix[1]])
return Xtal.ub_matrix_to_uv(ub_matrix)

@staticmethod
def ub_matrix_to_lattice_params(ub_matrix):
"""Calculate lattice parameters from UB matrix"""
Expand Down Expand Up @@ -126,10 +117,6 @@ def uv_to_ub_matrix(self, u, v) -> np.ndarray:

return ub_matrix

# TODO
def uv_to_spice_ub_matrix(self, u, v) -> np.ndarray:
pass

def set_orientation(self, ubconf: UBConf) -> None:
"Set crystal orientation from UB conf"

Expand Down
10 changes: 10 additions & 0 deletions src/tavi/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,13 @@ def rotation_matrix_2d(phi):
]
)
return mat


def spice_to_mantid(vec):
"""suffle the order from spice convention to mantid convention"""
return np.array([vec[0], vec[2], -vec[1]])


def mantid_to_spice(vec):
"""suffle the order from mantid convention to spice convention"""
return np.array([vec[0], -vec[2], vec[1]])
Binary file modified test_data/.DS_Store
Binary file not shown.
Binary file modified test_data/scan_to_nexus_test.h5
Binary file not shown.
Binary file modified test_data/spice_to_nxdict_test_all.h5
Binary file not shown.
Binary file modified test_data/spice_to_nxdict_test_empty.h5
Binary file not shown.
Binary file modified test_data/spice_to_nxdict_test_scan0034.h5
Binary file not shown.
Binary file modified test_data/tavi_test_exp424.h5
Binary file not shown.
Loading

0 comments on commit 4e8570c

Please sign in to comment.