Skip to content

Commit

Permalink
test_put
Browse files Browse the repository at this point in the history
  • Loading branch information
unkcpz committed Nov 24, 2024
1 parent 2266b1c commit eb6f979
Showing 1 changed file with 54 additions and 60 deletions.
114 changes: 54 additions & 60 deletions tests/transports/test_all_plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -621,9 +621,7 @@ def test_put_and_get_overwrite(


def test_copy(custom_transport, tmp_path_factory):
"""Test copying."""
# local_dir = os.path.join('/', 'tmp')
# remote_dir = local_dir
"""Test copying from a remote src to remote dst"""
remote_dir = tmp_path_factory.mktemp('remote')

directory = 'tmp_try'
Expand All @@ -636,7 +634,7 @@ def test_copy(custom_transport, tmp_path_factory):
base_dir = workdir / 'origin'
base_dir.mkdir()

# first test put: I create three files in local
# first create three files
file_1 = base_dir / 'a.txt'
file_2 = base_dir / 'b.tmp'
file_3 = base_dir / 'c.txt'
Expand Down Expand Up @@ -689,80 +687,76 @@ def test_copy(custom_transport, tmp_path_factory):
transport.rmtree(str(workdir))


def test_put(custom_transport):
"""Test putting files."""
# exactly the same tests of copy, just with the put function
# and therefore the local path must be absolute
local_dir = os.path.join('/', 'tmp')
remote_dir = local_dir
def test_put(custom_transport, tmp_path_factory):
"""Test putting files.
Those are similar tests of copy, just with the put function which copy from mocked local to mocked remote
and therefore the local path must be absolute
"""
local_dir = tmp_path_factory.mktemp('local')
remote_dir = tmp_path_factory.mktemp('remote')
directory = 'tmp_try'

with custom_transport as transport:
transport.chdir(remote_dir)
local_workdir = local_dir / directory
remote_workdir = remote_dir / directory

while os.path.exists(os.path.join(local_dir, directory)):
# I append a random letter/number until it is unique
directory += random.choice(string.ascii_uppercase + string.digits)
transport.mkdir(str(remote_workdir))

transport.mkdir(directory)
transport.chdir(directory)

local_base_dir = os.path.join(local_dir, directory, 'local')
os.mkdir(local_base_dir)
local_base_dir: Path = local_workdir / 'origin'
local_base_dir.mkdir(parents=True)

# first test put: I create three files in local
file_1 = os.path.join(local_base_dir, 'a.txt')
file_2 = os.path.join(local_base_dir, 'b.tmp')
file_3 = os.path.join(local_base_dir, 'c.txt')
file_1 = local_base_dir / 'a.txt'
file_2 = local_base_dir / 'b.tmp'
file_3 = local_base_dir / 'c.txt'
text = 'Viva Verdi\n'
for filename in [file_1, file_2, file_3]:
with open(filename, 'w', encoding='utf8') as fhandle:
fhandle.write(text)

# first test putransport. Copy of two files matching patterns, into a folder
transport.put(os.path.join(local_base_dir, '*.txt'), '.')
assert set(['a.txt', 'c.txt', 'local']) == set(transport.listdir('.'))
transport.remove('a.txt')
transport.remove('c.txt')
# second. Copy of folder into a non existing folder
transport.put(local_base_dir, 'prova')
assert set(['prova', 'local']) == set(transport.listdir('.'))
assert set(['a.txt', 'b.tmp', 'c.txt']) == set(transport.listdir('prova'))
transport.rmtree('prova')
# third. copy of folder into an existing folder
transport.mkdir('prova')
transport.put(local_base_dir, 'prova')
assert set(['prova', 'local']) == set(transport.listdir('.'))
assert set(['local']) == set(transport.listdir('prova'))
assert set(['a.txt', 'b.tmp', 'c.txt']) == set(transport.listdir(os.path.join('prova', 'local')))
transport.rmtree('prova')
# third test copy. Can copy one file into a new file
transport.put(os.path.join(local_base_dir, '*.tmp'), 'prova')
assert set(['prova', 'local']) == set(transport.listdir('.'))
transport.remove('prova')
# fourth test copy: can't copy more than one file on the same file,
# first test the put. Copy of two files matching patterns, into a folder
transport.put(str(local_base_dir / '*.txt'), str(remote_workdir))
assert set(['a.txt', 'c.txt']) == set(transport.listdir(str(remote_workdir)))
transport.remove(str(remote_workdir / 'a.txt'))
transport.remove(str(remote_workdir / 'c.txt'))

# second test put. Put of two folders
transport.put(str(local_base_dir), str(remote_workdir / 'prova'))
assert set(['prova']) == set(transport.listdir(str(remote_workdir)))
assert set(['a.txt', 'b.tmp', 'c.txt']) == set(transport.listdir(str(remote_workdir / 'prova')))
transport.rmtree(str(remote_workdir / 'prova'))

# third test put. Can copy one file into a new file
transport.put(str(local_base_dir / '*.tmp'), str(remote_workdir / 'prova'))
assert transport.isfile(str(remote_workdir / 'prova'))
transport.remove(str(remote_workdir / 'prova'))

# fourth test put: can't copy more than one file on the same file,
# i.e., the destination should be a folder
with pytest.raises(OSError):
transport.put(os.path.join(local_base_dir, '*.txt'), 'prova')
# copy of folder into file
with open(os.path.join(local_dir, directory, 'existing.txt'), 'w', encoding='utf8') as fhandle:
fhandle.write(text)
with pytest.raises(OSError):
transport.put(os.path.join(local_base_dir), 'existing.txt')
transport.remove('existing.txt')
transport.put(str(local_base_dir / '*.txt'), str(remote_workdir / 'prova'))

# fifth test, copying one file into a folder
transport.mkdir('prova')
transport.put(os.path.join(local_base_dir, 'a.txt'), 'prova')
assert set(transport.listdir('prova')) == set(['a.txt'])
transport.rmtree('prova')
transport.mkdir(str(remote_workdir / 'prova'))
transport.put(str(local_base_dir / 'a.txt'), str(remote_workdir / 'prova'))
assert set(transport.listdir(str(remote_workdir / 'prova'))) == set(['a.txt'])
transport.rmtree(str(remote_workdir / 'prova'))

# sixth test, copying one file into a file
transport.put(os.path.join(local_base_dir, 'a.txt'), 'prova')
assert transport.isfile('prova')
transport.remove('prova')
transport.put(str(local_base_dir / 'a.txt'), str(remote_workdir / 'prova'))
assert transport.isfile(str(remote_workdir / 'prova'))
transport.remove(str(remote_workdir / 'prova'))

# put of folder into an existing folder
# NOTE: the command cp has a different behavior on Mac vs Ubuntu
# tests performed locally on a Mac may result in a failure.
transport.mkdir(str(remote_workdir / 'prova'))
transport.put(str(local_base_dir), str(remote_workdir / 'prova'))
assert set(['origin']) == set(transport.listdir(str(remote_workdir / 'prova')))
assert set(['a.txt', 'b.tmp', 'c.txt']) == set(transport.listdir(str(remote_workdir / 'prova' / 'origin')))
transport.rmtree(str(remote_workdir / 'prova'))
# exit
transport.chdir('..')
transport.rmtree(directory)
transport.rmtree(str(remote_workdir))


def test_get(custom_transport):
Expand Down

0 comments on commit eb6f979

Please sign in to comment.