Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

recursiveloader: use less memory in assert_directory_verifies #2

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 161 additions & 37 deletions gemato/recursiveloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ def _iter_unordered_manifests_for_path(self, path, recursive=False):
elif recursive and gemato.util.path_starts_with(d, path):
yield (k, d, v)

def _iter_manifests_for_path(self, path, recursive=False):
def _iter_manifests_for_path(self, path, recursive=False, sort_key=lambda kdv: len(kdv[1])):
"""
Iterate over loaded Manifests that can apply to path.
If @recursive is True, returns also Manifests for subdirectories
Expand All @@ -354,7 +354,7 @@ def _iter_manifests_for_path(self, path, recursive=False):
return sorted(
self._iter_unordered_manifests_for_path(
path, recursive=recursive),
key=lambda kdv: len(kdv[1]),
key=sort_key,
reverse=True)

def load_manifests_for_path(self, path, recursive=False, verify=True):
Expand All @@ -368,20 +368,38 @@ def load_manifests_for_path(self, path, recursive=False, verify=True):
on mismatch. Otherwise, sub-Manifests will be loaded
unconditionally of whether they match parent checksums.
"""
for curmpath, relpath, m in self._iter_load_manifests_for_path(
path, recursive=recursive, verify=verify):
self.loaded_manifests[curmpath] = m

def _iter_load_manifests_for_path(self, path, recursive=False, verify=True,
sort_key=lambda kdv: kdv[1].split(os.sep)):
"""
Traverse manifests in depth-first order with directories sorted by
name. Only hold references to a minimum number of ManifestFile
instances, in order to conserve memory.

The caller can traverse manifest and directory iterators in unison,
minimizing the amount of data in memory.
"""
pool = multiprocessing.Pool(processes=self.max_jobs)

# Manifests pop from the stack in depth-first order
manifest_stack = list(self._iter_manifests_for_path(path,
recursive=recursive, sort_key=sort_key))
traversed = set(curmpath for curmpath, relpath, m in manifest_stack)
try:
# TODO: figure out how to avoid confusing uses of 'recursive'
while True:
while manifest_stack:
to_load = []
for curmpath, relpath, m in self._iter_manifests_for_path(
path, recursive):
for e in m.entries:
curmpath, relpath, m = manifest_stack.pop()
yield (curmpath, relpath, m)

for e in m.entries:
if e.tag != 'MANIFEST':
continue
mpath = os.path.join(relpath, e.path)
if curmpath == mpath or mpath in self.loaded_manifests:
if curmpath == mpath or mpath in traversed:
continue
mdir = os.path.dirname(mpath)
if not verify:
Expand All @@ -390,12 +408,19 @@ def load_manifests_for_path(self, path, recursive=False, verify=True):
to_load.append((mpath, e))
elif recursive and gemato.util.path_starts_with(mdir, path):
to_load.append((mpath, e))
if not to_load:
break

manifests = pool.imap_unordered(self.manifest_loader, to_load,
chunksize=16)
self.loaded_manifests.update(manifests)

manifests = [(mpath, os.path.dirname(mpath), e)
for mpath, e in manifests]

# Manifests pop from the stack in depth-first order
manifests.sort(key=sort_key, reverse=True)
for mpath, mdir, m in manifests:
traversed.add(mpath)
manifest_stack.append(
(mpath, os.path.dirname(mpath), m))

pool.close()
pool.join()
Expand Down Expand Up @@ -511,13 +536,54 @@ def get_file_entry_dict(self, path='', only_types=None,
be verified against MANIFEST entries. Pass False only when
doing updates.
"""
out = {}
for dirpath, dirout in self._iter_file_entry_dict(
path=path, only_types=only_types,
verify_manifests=verify_manifests):
other = out.get(dirpath)
if other is None:
out[dirpath] = dirout
else:
# This happens due to the relpath = '' setting
# for all DIST entries.
for filename, e in dirout.items():
if filename in other:
e = self._merge_entries(other[filename], e)
other[filename] = e
return out

self.load_manifests_for_path(path, recursive=True,
verify=verify_manifests)
@staticmethod
def _merge_entries(e1, e2):
# compare the two entries
ret, diff = gemato.verify.verify_entry_compatibility(
e1, e2)
if not ret:
raise gemato.exceptions.ManifestIncompatibleEntry(
e1, e2, diff)
# we need to construct a single entry with both checksums
if diff:
new_checksums = dict(e2.checksums)
for k, d1, d2 in diff:
if d2 is None:
new_checksums[k] = d1
e1 = type(e1)(e1.path, e1.size, new_checksums)
return e1

def _iter_file_entry_dict(self, path='', only_types=None,
verify_manifests=True,
sort_key=lambda p: p.split(os.sep)):
out = {}
for mpath, relpath, m in self._iter_manifests_for_path(path,
recursive=True):
for e in m.entries:
dir_stack = [path]
iter_load = self._iter_load_manifests_for_path(path,
recursive=True, verify=verify_manifests)
mpath, mdir, m = next(iter_load, (None, None, None))

while dir_stack or mdir is not None:
if not dir_stack or (mdir is not None and
sort_key(mdir) <= sort_key(dir_stack[-1])):
subdirs = []
relpath = mdir
for e in m.entries:
if only_types is not None:
if e.tag not in only_types:
continue
Expand All @@ -533,23 +599,20 @@ def get_file_entry_dict(self, path='', only_types=None,
if gemato.util.path_starts_with(fullpath, path):
dirpath = os.path.dirname(fullpath)
filename = os.path.basename(e.path)
subdirs.append(dirpath)
dirout = out.setdefault(dirpath, {})
if filename in dirout:
# compare the two entries
ret, diff = gemato.verify.verify_entry_compatibility(
dirout[filename], e)
if not ret:
raise gemato.exceptions.ManifestIncompatibleEntry(
dirout[filename], e, diff)
# we need to construct a single entry with both checksums
if diff:
new_checksums = dict(e.checksums)
for k, d1, d2 in diff:
if d2 is None:
new_checksums[k] = d1
e = type(e)(e.path, e.size, new_checksums)
e = self._merge_entries(dirout[filename], e)
dirout[filename] = e
return out
subdirs.sort(key=sort_key, reverse=True)
dir_stack.extend(subdirs)
mpath, mdir, m = next(iter_load, (None, None, None))
else:
dirpath = dir_stack.pop()
try:
yield dirpath, out.pop(dirpath)
except KeyError:
pass

def assert_directory_verifies(self, path='',
fail_handler=gemato.util.throw_exception,
Expand Down Expand Up @@ -580,22 +643,83 @@ def assert_directory_verifies(self, path='',
to None (the default), the number of system CPUs will be used.
"""

entry_dict = self.get_file_entry_dict(path)
remaining_entries = {}
entry_iter = self._iter_file_entry_dict(path)
it = os.walk(os.path.join(self.root_directory, path),
onerror=gemato.util.throw_exception,
followlinks=True)
sort_key = lambda p: p.split(os.sep)
dir_stack = []

def _walk_directory(it):
"""
Pre-process os.walk() result for verification. Yield objects
suitable to passing to subprocesses.
"""
for dirpath, dirnames, filenames in it:
relpath = os.path.relpath(dirpath, self.root_directory)
# strip dot to avoid matching problems
if relpath == '.':
relpath = ''
dirdict = entry_dict.pop(relpath, {})
pop_until = None
entry_dir, entry_dict = next(entry_iter, (None, None))
while True:
if pop_until is not None:
dirpath, dirnames, filenames, relpath = dir_stack.pop()
if pop_until is relpath:
pop_until = None
elif (dir_stack and entry_dir is not None and
gemato.util.path_starts_with(dir_stack[-1][-1], entry_dir)):
dirpath, dirnames, filenames, relpath = dir_stack.pop()
else:
try:
dirpath, dirnames, filenames = next(it)
except StopIteration:
while entry_dir is not None:
remaining_entries[entry_dir] = entry_dict
entry_dir, entry_dict = next(entry_iter, (None, None))
break

relpath = os.path.relpath(dirpath, self.root_directory)

# strip dot to avoid matching problems
if relpath == '.':
relpath = ''

dirnames.sort()

if relpath == entry_dir:
dirdict = entry_dict
entry_dir, entry_dict = next(entry_iter, (None, None))
elif entry_dir is not None and gemato.util.path_starts_with(relpath, entry_dir):
dirdict = {}
else:
relpath_key = sort_key(relpath)
if dir_stack and entry_dir is not None:
entry_dir_key = sort_key(entry_dir)
if relpath_key > entry_dir_key and entry_dir_key <= sort_key(dir_stack[-1][-1]):
# Try to insert it into the stack for later processing.
for i, item in enumerate(dir_stack):
if item[-1] and relpath_key > sort_key(item[-1]):
dir_stack.insert(i, (dirpath, dirnames, filenames, relpath))
dirpath = None
break
if dirpath is None:
if pop_until is None:
pop_until = relpath
continue
while entry_dir is not None and relpath_key > sort_key(entry_dir):
remaining_entries[entry_dir] = entry_dict
entry_dir, entry_dict = next(entry_iter, (None, None))

if relpath == entry_dir:
dirdict = entry_dict
entry_dir, entry_dict = next(entry_iter, (None, None))
elif entry_dir is not None:
relpath_key = sort_key(relpath)
entry_dir_key = sort_key(entry_dir)
if relpath_key < entry_dir_key and len(relpath_key) <= len(entry_dir_key):
dir_stack.append((dirpath, dirnames, filenames, relpath))
continue
else:
dirdict = {}
else:
dirdict = {}

skip_dirs = []
for d in dirnames:
Expand Down Expand Up @@ -643,7 +767,7 @@ def _walk_directory(it):
pool.close()

# check for missing directories
for relpath, dirdict in entry_dict.items():
for relpath, dirdict in remaining_entries.items():
for f, e in dirdict.items():
fpath = os.path.join(relpath, f)
syspath = os.path.join(self.root_directory, fpath)
Expand Down