Skip to content

Commit

Permalink
[#435] unregister can target a single replica.
Browse files Browse the repository at this point in the history
The data objects unregister method can now remove individual replicas if a
REPL_NUM_KW or RESC_NAME_KW is given to specify that replica.  As expected with
unregister(), the physical path (aka disk file) is not removed.
  • Loading branch information
d-w-moore authored and alanking committed Mar 28, 2023
1 parent a2f5633 commit 3ba273e
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 5 deletions.
14 changes: 12 additions & 2 deletions irods/manager/data_object_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,13 +346,19 @@ def open(self, path, mode, create = True, finalize_on_close = True, **options):

def trim(self, path, **options):

try:
oprType = options[kw.OPR_TYPE_KW]
except KeyError:
oprType = 0

message_body = FileOpenRequest(
objPath=path,
createMode=0,
openFlags=0,
offset=0,
dataSize=-1,
numThreads=self.sess.numThreads,
oprType=oprType,
KeyValPair_PI=StringStringMap(options),
)
message = iRODSMessage('RODS_API_REQ', msg=message_body,
Expand Down Expand Up @@ -392,9 +398,13 @@ def unlink(self, path, force=False, **options):

def unregister(self, path, **options):
# https://github.com/irods/irods/blob/4.2.1/lib/api/include/dataObjInpOut.h#L190
options[kw.OPR_TYPE_KW] = 26
options[kw.OPR_TYPE_KW] = 26 # UNREG_OPR: prevents deletion from disk.

self.unlink(path, **options)
# If a replica is targeted, use trim API.
if {kw.RESC_NAME_KW, kw.REPL_NUM_KW} & set(options.keys()):
self.trim(path, **options)
else:
self.unlink(path, **options)


def exists(self, path):
Expand Down
69 changes: 66 additions & 3 deletions irods/test/data_obj_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#! /usr/bin/env python
from __future__ import absolute_import
import os
import stat
import sys
import socket
import json
Expand Down Expand Up @@ -32,15 +33,31 @@
import irods.parallel
from irods.manager.data_object_manager import Server_Checksum_Warning


def make_ufs_resc_in_tmpdir(session, base_name, allow_local = False):
def make_ufs_resc_in_tmpdir(session, base_name, allow_local = False, client_vault_mode = (True,)):
# Parameters
# ----------
# base_name - The name for the resource, as well as the root directory of the Vault. Use something random and unlikely to collide.
# allow_local - Whether to allow the resource's vault to be located under a non-shared ie. "/tmp" style directory.
# client_vault_mode - A tuple of (client_mkdir[, mode_OR_mask]):
# client_mkdir - whether to call mkdir on the vault-path from the client side, and ...
# mode_OR_mask - if so, what mode bits to be OR'ed into the permission of the vault path after creation
# (A typical value might be : 0o777 | stat.S_ISGID, to guarantee iRODS has permissions on the vault)
tmpdir = helpers.irods_shared_tmp_dir()
if not tmpdir and allow_local:
tmpdir = os.getenv('TMPDIR') or '/tmp'
if not tmpdir:
raise RuntimeError("Must have filesystem path shareable with server.")

full_phys_dir = os.path.join(tmpdir,base_name)
if not os.path.exists(full_phys_dir): os.mkdir(full_phys_dir)

if client_vault_mode[0]:
if not os.path.exists(full_phys_dir):
os.mkdir(full_phys_dir)
guarantee_mode_bits = tuple(client_vault_mode[1:])
if guarantee_mode_bits != ():
mode = os.stat(full_phys_dir).st_mode
os.chmod(full_phys_dir, mode | guarantee_mode_bits[0])

session.resources.create(base_name,'unixfilesystem',session.host,full_phys_dir)
return full_phys_dir

Expand Down Expand Up @@ -1628,6 +1645,52 @@ def test_register_with_xml_special_chars(self):
self.sess.resources.get(resc_name).remove()


def test_unregister_can_target_replica__issue_435(self):
test_dir = helpers.irods_shared_tmp_dir()
loc_server = self.sess.host in ('localhost', socket.gethostname())
if not(test_dir) and not (loc_server):
self.skipTest('Requires access to server-side file(s)')
dt=datetime.now()
uniq1 = unique_name (my_function_name(), 1, dt)
uniq2 = unique_name (my_function_name(), 2, dt)
dir1 = dir2 = ''
data_objects = []
try:
VAULT_MODE = (loc_server, 0o777|stat.S_ISGID)
dir1 = make_ufs_resc_in_tmpdir(self.sess, uniq1, allow_local = loc_server, client_vault_mode = VAULT_MODE)
dir2 = make_ufs_resc_in_tmpdir(self.sess, uniq2, allow_local = loc_server, client_vault_mode = VAULT_MODE)

def replica_number_from_resource_name(data_path, resc):
return [r.number for r in self.sess.data_objects.get(data_path).replicas if r.resource_name == resc][0]

# Use two different ways to specify unregister target:
for keyword in (kw.RESC_NAME_KW, kw.REPL_NUM_KW):
dt=datetime.now()
data_path = '{self.coll_path}/435_test_{dt:%s.%f}'.format(**locals())
data = self.sess.data_objects.create(data_path, resource = uniq1)
data_objects.append(data)

# Ensure that two replicas exist.
data.replicate(**{kw.DEST_RESC_NAME_KW:uniq2})
data = self.sess.data_objects.get(data_path)
self.assertEqual(2,len(data.replicas))

physical_paths = [r.path for r in data.replicas]

# Assert that unregistering the specific replica decreases the number of replicas by 1.
data.unregister(**{keyword:(replica_number_from_resource_name(data_path, uniq2) if keyword == kw.REPL_NUM_KW else uniq2),
kw.COPIES_KW:1})
self.assertEqual(1, len(self.sess.data_objects.get(data_path).replicas))

# Assert replica files still both on disk.
for phys in physical_paths:
os.stat(phys)
finally:
# Clean up.
for d in data_objects: d.unlink(force = True)
if dir1: self.sess.resources.get(uniq1).remove()
if dir2: self.sess.resources.get(uniq2).remove()

if __name__ == '__main__':
# let the tests find the parent irods lib
sys.path.insert(0, os.path.abspath('../..'))
Expand Down

0 comments on commit 3ba273e

Please sign in to comment.