diff --git a/.github/workflows/install-test-conda-forge.yml b/.github/workflows/install-test-conda-forge.yml index 24ad101..8831217 100644 --- a/.github/workflows/install-test-conda-forge.yml +++ b/.github/workflows/install-test-conda-forge.yml @@ -30,7 +30,7 @@ jobs: - name: Install package run: | conda info - conda install orthority>=0.5.1 + conda install orthority>=0.6.0 conda list - name: Install OpenCV Linux dependencies diff --git a/orthority/camera.py b/orthority/camera.py index 507d9e8..dee13a9 100644 --- a/orthority/camera.py +++ b/orthority/camera.py @@ -209,6 +209,7 @@ def pixel_boundary(self, num_pts: int = None) -> np.ndarray: Boundary pixel (j=column, i=row) coordinates as a 2-by-N array, with (j, i) along the first dimension. """ + # TODO: this does not always return the correct number of pts e.g. num_pts=7 / 11 def rect_boundary(im_size: np.ndarray, num_pts: int) -> np.ndarray: """Return a rectangular pixel coordinate boundary of ``num_pts`` ~evenly spaced points @@ -858,6 +859,8 @@ def pixel_to_world_z(self, ji: np.ndarray, z: float | np.ndarray) -> np.ndarray: """ # TODO: consider only returning (x, y). the z dimension is redundant, and it is used this # way in most (all?) places. + # TODO: i have noticed that the results with e.g. z=0 sometimes have z close to but not + # equal 0. is there a way of re-organising this so that doesn't happen? self._test_init() self._validate_pixel_coords(ji) self._validate_z(z, ji) diff --git a/orthority/fit.py b/orthority/fit.py index 7b7a80f..5cb8943 100644 --- a/orthority/fit.py +++ b/orthority/fit.py @@ -13,22 +13,38 @@ # You should have received a copy of the GNU Affero General Public License along with Orthority. # If not, see . """Camera model fitting and refinement.""" + from __future__ import annotations import logging +import warnings +from collections.abc import Sequence from copy import deepcopy -from typing import Sequence +from math import ceil +from typing import Any +import cv2 import numpy as np +from rasterio.crs import CRS from rasterio.rpc import RPC +from rasterio.warp import transform -from orthority.camera import RpcCamera -from orthority.enums import RpcRefine +from orthority import param_io +from orthority.camera import FrameCamera, RpcCamera +from orthority.enums import CameraType, RpcRefine +from orthority.errors import OrthorityWarning logger = logging.getLogger(__name__) _default_rpc_refine_method = RpcRefine.shift +_frame_dist_params = {k: v[3:] for k, v in param_io._opt_frame_schema.items()} +"""Distortion coefficient names in OpenCV ordering for each frame camera model.""" +_frame_num_params = {k: len(v) + 6 for k, v in _frame_dist_params.items()} +"""Number of distortion coefficient and exterior parameters for each frame camera model (excludes +focal length(s) and principal point). +""" + def refine_rpc( rpc: RPC | dict, gcps: Sequence[dict], method: RpcRefine = _default_rpc_refine_method @@ -85,7 +101,7 @@ def _norm_ji(rpc: dict, ji: np.ndarray) -> np.ndarray: refine_tform[:, -1] = off.mean(axis=1) else: for axis in range(2): - ji_rpc_ = np.vstack((ji_rpc[axis], np.ones((ji_rpc.shape[1])))) + ji_rpc_ = np.vstack((ji_rpc[axis], np.ones(ji_rpc.shape[1]))) (m, c), res, rank, s = np.linalg.lstsq(ji_rpc_.T, ji_gcp[axis], rcond=None) refine_tform[axis, axis] = m refine_tform[axis, 2] = c @@ -112,3 +128,242 @@ def _norm_ji(rpc: dict, ji: np.ndarray) -> np.ndarray: refined_rpc[num_key] += np.array(refined_rpc[den_key]) * refine_tform[axis, 2] refined_rpc[num_key] = refined_rpc[num_key].tolist() return refined_rpc + + +def _gcps_to_cv_coords( + gcp_dict: dict[str, Sequence[dict]], crs: str | CRS | None = None +) -> tuple[list[np.ndarray], list[np.ndarray], np.ndarray, np.ndarray]: + """Convert a GCP dictionary to list of pixel coordinate arrays, a list of world coordinate + arrays and a reference world coordinate position which world coordinate arrays have been + offset relative to. + """ + crs = CRS.from_string(crs) if isinstance(crs, str) else crs + # form lists of pixel and world coordinate arrays + jis = [] + xyzs = [] + for gcps in gcp_dict.values(): + ji = np.array([gcp['ji'] for gcp in gcps]) + xyz = np.array([gcp['xyz'] for gcp in gcps]) + if crs: + xyz = np.array(transform(CRS.from_epsg(4979), crs, *(xyz.T))).T + jis.append(ji.astype('float32')) + xyzs.append(xyz) + + # offset world coordinates and convert to float32 + all_xyz = np.vstack(xyzs) + scale_xyz, off_xyz = all_xyz.std(), all_xyz.mean(axis=0) + xyzs = [((xyz - off_xyz) / scale_xyz).astype('float32') for xyz in xyzs] + return jis, xyzs, scale_xyz, off_xyz + + +def fit_frame( + cam_type: CameraType, + im_size: tuple[int, int], + gcp_dict: dict[str, Sequence[dict]], + crs: str | CRS | None = None, +) -> tuple[dict[str, dict[str, Any]], dict[str, dict[str, Any]]]: + """ + Fit a frame camera to GCPs. + + :param cam_type: + Camera type to fit. + :param im_size: + Image (width, height) in pixels. + :param gcp_dict: + GCP dictionary e.g. as returned by :func:`~orthority.param_io.read_im_gcps` or + :func:`~orthority.param_io.read_oty_gcps`. + :param crs: + CRS of the camera world coordinate system as an EPSG, proj4 or WKT string, + or :class:`~rasterio.crs.CRS` object. If set to ``None`` (the default), GCPs are assumed + to be in the world coordinate CRS, and are not transformed. Otherwise, GCPs are + transformed from geographic WGS84 coordinates to this CRS if it is supplied. + + :return: + Interior parameter and exterior parameter dictionaries. + """ + # TODO: is it better to use cv2.initCameraMatrix2D and cv2.solvePnp(flags=cv2.SOLVEPNP_SQPNP) + # rather than cv2.calibrateCamera when num pts <=4 + + # check there are at least 4 GCPs per image + min_gcps = min(len(gcps) for gcps in gcp_dict.values()) + if min_gcps < 4: + raise ValueError('At least four GCPs are needed per image.') + + # check the total number of GCPs is enough to fit cam_type + ttl_gcps = sum(len(gcps) for gcps in gcp_dict.values()) + req_gcps = max(4, ceil((1 + _frame_num_params[cam_type]) / 2)) + if ttl_gcps < req_gcps: + raise ValueError( + f"A total of at least {req_gcps} GCPs are required to fit the '{cam_type!r}' model." + ) + + # convert GCPs to OpenCV compatible lists of arrays + jis, xyzs, scale_xyz, off_xyz = _gcps_to_cv_coords(gcp_dict, crs=crs) + + # check if GCPs are co-planar (replicates OpenCV's test) + zs = np.vstack([xyz[:, 2] for xyz in xyzs]) + z_mean, z_std = np.mean(zs), np.std(zs) + if z_mean > 1e-5 or z_std > 1e-5: + raise ValueError('GCPs should be co-planar to fit interior parameters.') + + criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_COUNT, 1000, 1e-15) + warn_str = ( + "A total of at least {0} GCPs are required to estimate all '{1!r}' parameters, but there " + "are {2}. The initial intrinsic matrix will not be globally optimised." + ) + + # setup calibration flags & params based on cam_type and number of GCPs + if cam_type is not CameraType.fisheye: + # force square pixels always + flags = cv2.CALIB_FIX_ASPECT_RATIO | cv2.CALIB_USE_INTRINSIC_GUESS + + # fix initial intrinsic matrix if there are not enough GCPs to estimate all params (+3 is + # for 1 focal length and 2 principal points) + req_gcps = ceil((_frame_num_params[cam_type] + 3 + 1) / 2) + if ttl_gcps < req_gcps: + warnings.warn( + warn_str.format(req_gcps, cam_type, ttl_gcps), + category=OrthorityWarning, + stacklevel=2, + ) + flags |= cv2.CALIB_FIX_PRINCIPAL_POINT | cv2.CALIB_FIX_FOCAL_LENGTH + + if cam_type is CameraType.pinhole: + # fix distortion at zero + flags |= ( + cv2.CALIB_ZERO_TANGENT_DIST | cv2.CALIB_FIX_K1 | cv2.CALIB_FIX_K2 | cv2.CALIB_FIX_K3 + ) + elif cam_type is CameraType.opencv: + # enable full OpenCV model + flags |= cv2.CALIB_RATIONAL_MODEL | cv2.CALIB_THIN_PRISM_MODEL | cv2.CALIB_TILTED_MODEL + + # initial interior params + K = cv2.initCameraMatrix2D(xyzs, jis, im_size) + dist_param = np.zeros(len(_frame_dist_params[cam_type]), dtype='float32') + + # calibrate + err, K, dist_param, rs, ts = cv2.calibrateCamera( + xyzs, jis, im_size, K, dist_param, flags=flags, criteria=criteria + ) + else: + # TODO: fisheye needs at least 5 gcps (per image or in ttl?) so the test above needs + # updating + # TODO: with >5 gcps fisheye.calibrate always runs, for low num gcps it seems to fix + # dist_param & K, but I'm not sure if this is based on the num gcps, or some numerical + # property of them + # the oty fisheye camera does not have skew/alpha and CALIB_RECOMPUTE_EXTRINSIC improves + # accuracy + flags = cv2.fisheye.CALIB_FIX_SKEW | cv2.fisheye.CALIB_RECOMPUTE_EXTRINSIC + + # Fix initial intrinsic matrix if there are not enough GCPs to estimate all params (+4 is + # for 2 focal lengths (you can't fix fisheye aspect ratio) and 2 principal points). + # (Note that cv2.fisheye.calibrate() behaves differently to cv2.fisheye.calibrate(): it + # still runs with ttl_gcps < req_gcps, apparently fixing K and distortion coefficients.) + req_gcps = ceil((_frame_num_params[cam_type] + 4 + 1) / 2) + if ttl_gcps < req_gcps: + warnings.warn( + warn_str.format(req_gcps, cam_type, ttl_gcps), + category=OrthorityWarning, + stacklevel=2, + ) + flags |= cv2.fisheye.CALIB_FIX_PRINCIPAL_POINT | cv2.fisheye.CALIB_FIX_FOCAL_LENGTH + + # convert coords to cv2.fisheye format + xyzs = [xyz[None, :] for xyz in xyzs] + jis = [ji[None, :] for ji in jis] + + # calibrate + err, K, dist_param, rs, ts = cv2.fisheye.calibrate( + xyzs, jis, im_size, None, None, flags=flags, criteria=criteria + ) + logger.debug( + f"RMS reprojection error for fit of '{cam_type}' model to {ttl_gcps} GCPs: {err:.4f}" + ) + + # convert opencv to oty format interior & exterior params + cam_id = f'{cam_type!r}_fit_to_{ttl_gcps}_gcps' + c_xy = (K[0, 2], K[1, 2]) - (np.array(im_size) - 1) / 2 + c_xy /= max(im_size) + dist_param = dict(zip(_frame_dist_params[cam_type], dist_param.squeeze().tolist())) + + int_param = dict( + cam_type=cam_type, + im_size=im_size, + focal_len=(float(K[0, 0]), float(K[1, 1])), + sensor_size=(float(im_size[0]), float(im_size[1])), + cx=float(c_xy[0]), + cy=float(c_xy[1]), + **dist_param, + ) + int_param_dict = {cam_id: int_param} + + ext_param_dict = {} + for filename, t, r in zip(gcp_dict.keys(), ts, rs): + xyz, opk = param_io._cv_ext_to_oty_ext(t, r, off_xyz=off_xyz, scale_xyz=scale_xyz) + ext_param_dict[filename] = dict(xyz=xyz, opk=opk, camera=cam_id) + + return int_param_dict, ext_param_dict + + +def fit_frame_exterior( + int_param_dict: dict[str, dict[str, Any]], + gcp_dict: dict[str, Sequence[dict]], + crs: str | CRS | None = None, +): + """ + Fit frame camera exterior parameters to GCPs, given the camera's interior parameters. + + :param int_param_dict: + Interior parameter dictionary. + :param gcp_dict: + GCP dictionary e.g. as returned by :func:`~orthority.param_io.read_im_gcps` or + :func:`~orthority.param_io.read_oty_gcps`. + :param crs: + CRS of the camera world coordinate system as an EPSG, proj4 or WKT string, + or :class:`~rasterio.crs.CRS` object. If set to ``None`` (the default), GCPs are assumed + to be in the world coordinate CRS, and are not transformed. Otherwise, GCPs are + transformed from geographic WGS84 coordinates to this CRS if it is supplied. + + :return: + Exterior parameter dictionary. + """ + if len(int_param_dict) > 1: + warnings.warn( + f"Refining the first of {len(int_param_dict)} cameras defined in the interior " + f"parameter dictionary.", + category=OrthorityWarning, + stacklevel=2, + ) + cam_id = next(iter(int_param_dict.keys())) + int_param = next(iter(int_param_dict.values())) + + # check there are at least 3 GCPs per image + min_gcps = min(len(gcps) for gcps in gcp_dict.values()) + if min_gcps < 3: + raise ValueError('At least three GCPs are needed per image.') + + # get initial intrinsic matrix + K = FrameCamera._get_intrinsic( + int_param['im_size'], + int_param['focal_len'], + int_param.get('sensor_size'), + int_param.get('cx', 0.0), + int_param.get('cy', 0.0), + ) + + # get initial distortion coefficients + dist_names = _frame_dist_params[int_param['cam_type']] + dist_param = [int_param.get(dn, 0.0) for dn in dist_names] + dist_param = np.array(dist_param) if dist_param else None + + # convert GCPs to OpenCV compatible lists of arrays + jis, xyzs, scale_xyz, off_xyz = _gcps_to_cv_coords(gcp_dict, crs=crs) + + # fit exterior parameters (SOLVEPNP_SQPNP is globally optimal so does not need further refining) + ext_param_dict = {} + for filename, xyz, ji in zip(gcp_dict.keys(), xyzs, jis): + _, r, t = cv2.solvePnP(xyz, ji, K, dist_param, flags=cv2.SOLVEPNP_SQPNP) + xyz_, opk = param_io._cv_ext_to_oty_ext(t, r, scale_xyz=scale_xyz, off_xyz=off_xyz) + ext_param_dict[filename] = dict(xyz=xyz_, opk=opk, camera=cam_id) + + return ext_param_dict diff --git a/orthority/param_io.py b/orthority/param_io.py index f7e04ac..e61c23c 100644 --- a/orthority/param_io.py +++ b/orthority/param_io.py @@ -365,7 +365,7 @@ def _read_im_rpc_param( rpc_param = dict(cam_type=CameraType.rpc, im_size=im_size, rpc=rpc.to_dict()) # TODO: can filename be made to conform to actual case of the filename on the file # system? otherwise, in windows the user can pass a different case filename here which - # won't macth with GCPs when refining. + # won't match with GCPs when refining. return {filename: rpc_param} # read RPC params in a thread pool, populating rpc_param_dict in same order as files @@ -488,8 +488,8 @@ def _read_im_gcps( # https://gdal.org/user/raster_data_model.html#gcps. This assumes image GCPs are in # center of pixel coordinate convention. oty_gcps = [] - for gcp, xyz in zip(gcps, xyz.T): - gcp = dict(ji=(gcp.col, gcp.row), xyz=tuple(xyz.tolist()), id=gcp.id, info=gcp.info) + for gcp, xyz_ in zip(gcps, xyz.T): + gcp = dict(ji=(gcp.col, gcp.row), xyz=tuple(xyz_.tolist()), id=gcp.id, info=gcp.info) oty_gcps.append(gcp) return {filename: oty_gcps} @@ -729,9 +729,9 @@ def _opk_to_rotation(opk: tuple[float, float, float]) -> np.ndarray: def _rotation_to_opk(R: np.ndarray) -> tuple[float, float, float]: """Convert the given rotation matrix to the (omega, phi, kappa) angles in radians.""" # see https://s3.amazonaws.com/mics.pix4d.com/KB/documents/Pix4D_Yaw_Pitch_Roll_Omega_to_Phi_Kappa_angles_and_conversion.pdf - omega = np.arctan2(-R[1, 2], R[2, 2]) - phi = np.arcsin(R[0, 2]) - kappa = np.arctan2(-R[0, 1], R[0, 0]) + omega = float(np.arctan2(-R[1, 2], R[2, 2])) + phi = float(np.arcsin(R[0, 2])) + kappa = float(np.arctan2(-R[0, 1], R[0, 0])) return omega, phi, kappa @@ -835,6 +835,30 @@ def _rpy_to_opk( return omega, phi, kappa +def _cv_ext_to_oty_ext( + t: Sequence[float] | np.ndarray, + r: Sequence[float] | np.ndarray, + scale_xyz: Sequence[float] | np.ndarray | None = None, + off_xyz: Sequence[float] | np.ndarray | None = None, +) -> tuple[tuple[float, float, float], tuple[float, float, float]]: + """Convert OpenCV / OpenSfM rotation and translation vectors to Orthority format and + convention camera (x, y, z) position and (omega, phi, kappa) angles. Camera positions are + scaled and offset by ``scale_xyz`` and ``off_xyz`` when supplied. + """ + # adapted from ODM: https://github.com/OpenDroneMap/ODM/blob/master/opendm/shots.py + R = cv2.Rodrigues(np.array(r))[0].T + xyz = (-R.dot(t)).squeeze() + if scale_xyz is not None: + xyz *= scale_xyz + if off_xyz is not None: + xyz += off_xyz + xyz = tuple(xyz.tolist()) + # rotate camera coords from OpenSfM / OpenCV to PATB convention + R_ = R.dot(np.array([[1, 0, 0], [0, -1, 0], [0, 0, -1]])) + opk = _rotation_to_opk(R_) + return xyz, opk + + class FrameReader(ABC): """ Base frame camera parameter reader. @@ -1231,14 +1255,10 @@ def read_ext_param(self) -> dict[str, dict[str, Any]]: ext_param_dict = {} for filename, shot_dict in self._json_dict['shots'].items(): - # convert reconstruction 'translation' and 'rotation' to oty exterior params, - # adapted from ODM: https://github.com/OpenDroneMap/ODM/blob/master/opendm/shots.py - R = cv2.Rodrigues(np.array(shot_dict['rotation']))[0].T - delta_xyz = -R.dot(shot_dict['translation']) - xyz = tuple((ref_xyz + delta_xyz).tolist()) - # rotate camera coords from OpenSfM / OpenCV to PATB convention - R_ = R.dot(np.array([[1, 0, 0], [0, -1, 0], [0, 0, -1]])) - opk = _rotation_to_opk(R_) + # convert reconstruction 'translation' and 'rotation' to oty exterior params + xyz, opk = _cv_ext_to_oty_ext( + shot_dict['translation'], shot_dict['rotation'], off_xyz=ref_xyz + ) cam_id = shot_dict['camera'] cam_id = cam_id[3:] if cam_id.startswith('v2 ') else cam_id ext_param_dict[filename] = dict(xyz=xyz, opk=opk, camera=cam_id) diff --git a/orthority/version.py b/orthority/version.py index f1e435a..03100dc 100644 --- a/orthority/version.py +++ b/orthority/version.py @@ -13,4 +13,4 @@ # You should have received a copy of the GNU Affero General Public License along with Orthority. # If not, see . -__version__ = '0.5.1' +__version__ = '0.6.0' diff --git a/tests/test_fit.py b/tests/test_fit.py index 363dab9..8403771 100644 --- a/tests/test_fit.py +++ b/tests/test_fit.py @@ -15,21 +15,39 @@ from __future__ import annotations +from math import ceil + +import cv2 import numpy as np import pytest from rasterio.rpc import RPC +from rasterio.warp import transform from orthority import fit -from orthority.camera import RpcCamera -from orthority.enums import RpcRefine +from orthority.camera import FrameCamera, RpcCamera, create_camera +from orthority.enums import CameraType, RpcRefine +from orthority.errors import OrthorityWarning +from tests.test_param_io import _validate_ext_param_dict, _validate_int_param_dict + + +@pytest.fixture(scope='session') +def grid_ji(im_size: tuple[int, int]) -> np.ndarray: + """A 2-by-N array of pixel coordinates lying on a grid inside ``im_size``.""" + step = 20 + j, i = ( + np.arange(step, im_size[0] - step + 1, step), + np.arange(step, im_size[1] - step + 1, step), + ) + jgrid, igrid = np.meshgrid(j, i, indexing='xy') + ji = np.array((jgrid.reshape(-1), igrid.reshape(-1))) + return ji + np.random.randn(*ji.shape) @pytest.mark.parametrize('shift, drift', [((5.0, 10.0), None), ((5.0, 10.0), (1.2, 0.8))]) def test_refine_rpc( rpc: dict, im_size: tuple[int, int], shift: tuple[float, float], drift: tuple[float, float] ): - """Test ``refine_rpc()`` correctly refines an RPC model by testing it against refinement GCPs. - """ + """Test ``refine_rpc()`` correctly refines an RPC model by testing it against refinement GCPs.""" # create affine transform to realise shift & drift method = RpcRefine.shift if not drift else RpcRefine.shift_drift drift = (1.0, 1.0) if not drift else drift @@ -85,3 +103,314 @@ def test_refine_num_gcps(rpc: dict, im_size: tuple[int, int], method: RpcRefine, with pytest.raises(ValueError) as ex: fit.refine_rpc(rpc, gcps[:-1], method=method) assert 'At least' in str(ex.value) + + +@pytest.mark.parametrize( + 'cam_type, camera', + [ + # (CameraType.pinhole, 'pinhole_camera'), + # (CameraType.brown, 'brown_camera'), + # (CameraType.opencv, 'opencv_camera'), + (CameraType.fisheye, 'fisheye_camera'), + ], +) +def test_fit_frame_dictionaries( + cam_type: CameraType, camera: str, grid_ji: np.ndarray, request: pytest.FixtureRequest +): + """Test fit_frame() returns valid parameter dictionaries.""" + cam: FrameCamera = request.getfixturevalue(camera) + + # create a mock GCP dictionary with multiple images + xyz = cam.pixel_to_world_z(grid_ji, z=0) + gcps = [dict(ji=ji_gcp, xyz=xyz_gcp) for ji_gcp, xyz_gcp in zip(grid_ji.T, xyz.T)] + # split gcps over images to avoid fisheye conditioning error + split_gcps = ceil(len(gcps) / 2) + gcp_dict = {'file1.ext': gcps[:split_gcps], 'file2.ext': gcps[split_gcps:]} + + # fit parameters + int_param_dict, ext_param_dict = fit.fit_frame(cam_type, cam.im_size, gcp_dict) + + # test parameter dictionary validity + assert len(int_param_dict) == 1 + assert ext_param_dict.keys() == gcp_dict.keys() + _validate_int_param_dict(int_param_dict) + int_param = next(iter(int_param_dict.values())) + assert set(int_param.keys()).issuperset(fit._frame_dist_params[cam_type]) + _validate_ext_param_dict(ext_param_dict) + + +def test_fit_frame_crs(pinhole_camera: FrameCamera, grid_ji: np.ndarray, utm34n_crs: str): + """Test fit_frame() crs parameter.""" + # create mock GCP dictionary with coordinates transformed from the reference camera's world + # CRS (utm34n_crs) to WGS84 geographic + xyz = pinhole_camera.pixel_to_world_z(grid_ji, z=0) + lla = np.array(transform(utm34n_crs, 'EPSG:4979', *xyz)) + gcps = [dict(ji=ji_gcp, xyz=lla_gcp) for ji_gcp, lla_gcp in zip(grid_ji.T, lla.T)] + gcp_dict = {'file.ext': gcps} + + # fit camera params with crs= + int_param_dict, ext_param_dict = fit.fit_frame( + CameraType.pinhole, pinhole_camera.im_size, gcp_dict, crs=utm34n_crs + ) + + # create a camera with the fitted parameters and test its world coordinates match those of + # the reference camera + int_param = next(iter(int_param_dict.values())) + ext_param = next(iter(ext_param_dict.values())) + test_cam = create_camera(**int_param, xyz=ext_param['xyz'], opk=ext_param['opk']) + test_xyz = test_cam.pixel_to_world_z(grid_ji, z=0) + assert test_xyz == pytest.approx(xyz, abs=1) + + +@pytest.mark.parametrize( + 'cam_type, camera', + [ + (CameraType.pinhole, 'pinhole_camera'), + (CameraType.brown, 'brown_camera'), + (CameraType.opencv, 'opencv_camera'), + (CameraType.fisheye, 'fisheye_camera'), + ], +) +def test_fit_frame_min_gcps( + cam_type: CameraType, camera: str, grid_ji: np.ndarray, request: pytest.FixtureRequest +): + """Test fit_frame() with the minimum allowed number of GCPs.""" + gcp_cam: FrameCamera = request.getfixturevalue(camera) + num_params = fit._frame_num_params[cam_type] + min_gcps = ceil((num_params + 1) / 2) + + # create a grid of at least min_gcps GCPs that approx. covers the image (with a buffer inside + # the boundary) + gcps_per_side = ceil(np.sqrt(min_gcps)) + steps = np.array(gcp_cam.im_size) / (gcps_per_side + 1) + j, i = ( + np.arange(steps[0], gcp_cam.im_size[0], steps[0]), + np.arange(steps[1], gcp_cam.im_size[1], steps[1]), + ) + jgrid, igrid = np.meshgrid(j, i, indexing='xy') + ji = np.array((jgrid.reshape(-1), igrid.reshape(-1))) + + # create a mock GCP dictionary with min_gcps GCPs + ji = ji[:, :min_gcps] + xyz = gcp_cam.pixel_to_world_z(ji, z=0) + gcps = [dict(ji=ji_gcp, xyz=xyz_gcp) for ji_gcp, xyz_gcp in zip(ji.T, xyz.T)] + gcp_dict = {'file.ext': gcps} + + # fit camera params + int_param_dict, ext_param_dict = fit.fit_frame(cam_type, gcp_cam.im_size, gcp_dict) + assert len(int_param_dict) == len(ext_param_dict) == 1 + + # create a camera with the fitted parameters and test its accuracy + int_param = next(iter(int_param_dict.values())) + ext_param = next(iter(ext_param_dict.values())) + test_cam = create_camera(**int_param, xyz=ext_param['xyz'], opk=ext_param['opk']) + + test_ji = test_cam.world_to_pixel(xyz) + assert test_ji == pytest.approx(ji, abs=0.5) + test_xyz = test_cam.pixel_to_world_z(ji, z=0) + assert test_xyz == pytest.approx(xyz, abs=5) + + +@pytest.mark.parametrize( + 'cam_type, dist_param', + [ + (CameraType.pinhole, None), + (CameraType.brown, 'brown_dist_param'), + (CameraType.opencv, 'opencv_dist_param'), + (CameraType.fisheye, 'fisheye_dist_param'), + ], +) +def test_fit_frame_multiple_images( + cam_type: CameraType, + dist_param: str, + frame_args: dict, + grid_ji: np.ndarray, + request: pytest.FixtureRequest, +): + """Test fit_frame() with multiple image GCPs.""" + dist_param: dict = request.getfixturevalue(dist_param) if dist_param else {} + + # create a mock GCP dictionary with multiple images at different camera positions / orientations + gcp_dict = {} + xyz_dict = {} + gcp_cam = create_camera(cam_type, **frame_args, **dist_param) + for i, (ext_xyz, ext_opk) in enumerate( + zip( + [(2e4, 3e4, 1e3), (3e4, 3e4, 1e3), (3e4, 3e4, 2e3)], + [(-3.0, 2.0, 10.0), (-15.0, 2.0, 10.0), (-30.0, 20.0, 10.0)], + ) + ): + ext_opk = tuple(np.radians(ext_opk)) + gcp_cam.update(xyz=ext_xyz, opk=ext_opk) + xyz = gcp_cam.pixel_to_world_z(grid_ji, z=0) + gcps = [dict(ji=ji_gcp, xyz=xyz_gcp) for ji_gcp, xyz_gcp in zip(grid_ji.T, xyz.T)] + + key = f'file{i}.ext' + xyz_dict[key] = xyz + gcp_dict[key] = gcps + + # fit camera params + int_param_dict, ext_param_dict = fit.fit_frame(cam_type, gcp_cam.im_size, gcp_dict) + + # configure cameras with the fitted parameters and test accuracy + int_param = next(iter(int_param_dict.values())) + test_cam = create_camera(**int_param) + for filename, ext_param in ext_param_dict.items(): + test_cam.update(xyz=ext_param['xyz'], opk=ext_param['opk']) + + test_ji = test_cam.world_to_pixel(xyz_dict[filename]) + assert test_ji == pytest.approx(grid_ji, abs=0.1) + test_xyz = test_cam.pixel_to_world_z(grid_ji, z=0) + assert test_xyz == pytest.approx(xyz_dict[filename], abs=1) + + +def test_fit_frame_errors(opencv_camera: FrameCamera, grid_ji: np.ndarray): + """Test fit_frame() errors and warnings.""" + cam_type = CameraType.opencv + # create mock GCPs + xyz = opencv_camera.pixel_to_world_z(grid_ji, z=0) + gcps = [dict(ji=ji_gcp, xyz=xyz_gcp) for ji_gcp, xyz_gcp in zip(grid_ji.T, xyz.T)] + + # test an error is raised with < 4 GCPs in an image + gcp_dict = {'file1.ext': gcps[:3], 'file2.ext': gcps[3:]} + with pytest.raises(ValueError, match='At least four'): + _, _ = fit.fit_frame(cam_type, opencv_camera.im_size, gcp_dict) + + # test an error is raised with less than min number of GCPs (in total) required to fit cam_type + min_gcps = ceil((1 + fit._frame_num_params[cam_type]) / 2) + gcp_dict = {'file1.ext': gcps[:4], 'file2.ext': gcps[: min_gcps - 4 - 1]} + with pytest.raises(ValueError, match='A total of at least'): + _, _ = fit.fit_frame(cam_type, opencv_camera.im_size, gcp_dict) + + # test a warning is issued with less than the number of GCPs required to globally optimise + # all cam_type parameters + gcp_dict = {'file1.ext': gcps[:min_gcps]} + with pytest.warns(OrthorityWarning, match='will not be globally optimised'): + try: + _, _ = fit.fit_frame(cam_type, opencv_camera.im_size, gcp_dict) + except cv2.error: + # suppress conditioning error + pass + + # test an error is raised with non-planar GCPs + xyz[2] = np.random.randn(1, xyz.shape[1]) + gcps = [dict(ji=ji_gcp, xyz=xyz_gcp) for ji_gcp, xyz_gcp in zip(grid_ji.T, xyz.T)] + gcp_dict = {'file1.ext': gcps} + with pytest.raises(ValueError, match='should be co-planar'): + _, _ = fit.fit_frame(cam_type, opencv_camera.im_size, gcp_dict) + + +def test_fit_frame_exterior_dictionary( + pinhole_int_param_dict: dict, + exterior_args: dict, + grid_ji: np.ndarray, +): + """Test fit_frame_exterior() returns a valid exterior parameter dictionary.""" + int_param = next(iter(pinhole_int_param_dict.values())) + + # create a mock GCP dictionary with multiple images + gcp_dict = {} + gcp_cam = create_camera(**int_param, **exterior_args) + xyz = gcp_cam.pixel_to_world_z(grid_ji, z=0) + gcps = [dict(ji=ji_gcp, xyz=xyz_gcp) for ji_gcp, xyz_gcp in zip(grid_ji.T, xyz.T)] + gcp_dict = {'file1.ext': gcps, 'file2.ext': gcps} + + # fit exterior params + ext_param_dict = fit.fit_frame_exterior(pinhole_int_param_dict, gcp_dict) + + # test dictionary validity + assert ext_param_dict.keys() == gcp_dict.keys() + _validate_ext_param_dict(ext_param_dict) + + +def test_fit_frame_exterior_crs( + pinhole_int_param_dict: dict, exterior_args: dict, grid_ji: np.ndarray, utm34n_crs: str +): + """Test fit_frame() crs parameter.""" + # create mock GCP dictionary with coordinates transformed from the reference camera's world + # CRS (utm34n_crs) to WGS84 geographic + int_param = next(iter(pinhole_int_param_dict.values())) + gcp_cam = create_camera(**int_param, **exterior_args) + xyz = gcp_cam.pixel_to_world_z(grid_ji, z=0) + lla = np.array(transform(utm34n_crs, 'EPSG:4979', *xyz)) + gcps = [dict(ji=ji_gcp, xyz=lla_gcp) for ji_gcp, lla_gcp in zip(grid_ji.T, lla.T)] + gcp_dict = {'file.ext': gcps} + + # fit exterior params with crs= + ext_param_dict = fit.fit_frame_exterior(pinhole_int_param_dict, gcp_dict, crs=utm34n_crs) + + # test the camera position against the reference + ext_param = next(iter(ext_param_dict.values())) + assert ext_param['xyz'] == pytest.approx(exterior_args['xyz'], abs=1e-3) + + +@pytest.mark.parametrize( + 'int_param_dict', + [ + 'pinhole_int_param_dict', + 'brown_int_param_dict', + 'opencv_int_param_dict', + 'fisheye_int_param_dict', + ], +) +def test_fit_frame_exterior_multiple_images( + int_param_dict: str, + exterior_args: dict, + grid_ji: np.ndarray, + request: pytest.FixtureRequest, +): + """Test fit_frame_exterior() with multiple images.""" + int_param_dict: dict = request.getfixturevalue(int_param_dict) + int_param = next(iter(int_param_dict.values())) + + # create a mock GCP dictionary with multiple images at different camera positions / orientations + gcp_dict = {} + ref_ext_param_dict = {} + gcp_cam = create_camera(**int_param, **exterior_args) + for i, (ext_xyz, ext_opk) in enumerate( + zip( + [(2e4, 3e4, 1e3), (3e4, 3e4, 1e3), (3e4, 3e4, 2e3)], + [(-3.0, 2.0, 10.0), (-15.0, 2.0, 10.0), (-30.0, 20.0, 10.0)], + ) + ): + ext_opk = tuple(np.radians(ext_opk)) + gcp_cam.update(xyz=ext_xyz, opk=ext_opk) + xyz = gcp_cam.pixel_to_world_z(grid_ji, z=0) + gcps = [dict(ji=ji_gcp, xyz=xyz_gcp) for ji_gcp, xyz_gcp in zip(grid_ji.T, xyz.T)] + + key = f'file{i}.ext' + ref_ext_param_dict[key] = dict(xyz=ext_xyz, opk=ext_opk) + gcp_dict[key] = gcps + + # fit exterior params + test_ext_param_dict = fit.fit_frame_exterior(int_param_dict, gcp_dict) + + # test parameter accuracy + assert test_ext_param_dict.keys() == ref_ext_param_dict.keys() + for test_ext_param, ref_ext_param in zip( + test_ext_param_dict.values(), ref_ext_param_dict.values() + ): + assert test_ext_param['xyz'] == pytest.approx(ref_ext_param['xyz'], abs=1e-3) + assert test_ext_param['opk'] == pytest.approx(ref_ext_param['opk'], abs=1e-5) + + +def test_fit_frame_exterior_errors( + pinhole_int_param_dict: dict, exterior_args: dict, grid_ji: np.ndarray +): + """Test fit_frame_exterior() errors and warnings.""" + # create mock GCPs + int_param = next(iter(pinhole_int_param_dict.values())) + cam = create_camera(**int_param, **exterior_args) + xyz = cam.pixel_to_world_z(grid_ji, z=0) + gcps = [dict(ji=ji_gcp, xyz=xyz_gcp) for ji_gcp, xyz_gcp in zip(grid_ji.T, xyz.T)] + + # test an error is raised with < 3 GCPs in an image + gcp_dict = {'file1.ext': gcps[:2], 'file2.ext': gcps[2:]} + with pytest.raises(ValueError, match='At least three'): + _ = fit.fit_frame_exterior(pinhole_int_param_dict, gcp_dict) + + # test a warning is issued with >1 camera in the interior parameter dictionary + int_param_dict = {'cam1': int_param, 'cam2': int_param} + gcp_dict = {'file.ext': gcps} + with pytest.warns(OrthorityWarning, match='Refining the first'): + _ = fit.fit_frame_exterior(int_param_dict, gcp_dict)