Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add disk usage log message at start of Toil run. #3516

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 0 additions & 47 deletions src/toil/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -1372,53 +1372,6 @@ def cacheDirName(workflowID):
return f'cache-{workflowID}'


def getDirSizeRecursively(dirPath: str) -> int:
"""
This method will return the cumulative number of bytes occupied by the files
on disk in the directory and its subdirectories.

If the method is unable to access a file or directory (due to insufficient
permissions, or due to the file or directory having been removed while this
function was attempting to traverse it), the error will be handled
internally, and a (possibly 0) lower bound on the size of the directory
will be returned.

The environment variable 'BLOCKSIZE'='512' is set instead of the much cleaner
--block-size=1 because Apple can't handle it.

:param str dirPath: A valid path to a directory or file.
:return: Total size, in bytes, of the file or directory at dirPath.
"""

# du is often faster than using os.lstat(), sometimes significantly so.

# The call: 'du -s /some/path' should give the number of 512-byte blocks
# allocated with the environment variable: BLOCKSIZE='512' set, and we
# multiply this by 512 to return the filesize in bytes.

try:
return int(subprocess.check_output(['du', '-s', dirPath],
env=dict(os.environ, BLOCKSIZE='512')).decode('utf-8').split()[0]) * 512
except subprocess.CalledProcessError:
# Something was inaccessible or went away
return 0


def getFileSystemSize(dirPath: str) -> Tuple[int, int]:
"""
Return the free space, and total size of the file system hosting `dirPath`.

:param str dirPath: A valid path to a directory.
:return: free space and total size of file system
:rtype: tuple
"""
assert os.path.exists(dirPath)
diskStats = os.statvfs(dirPath)
freeSpace = diskStats.f_frsize * diskStats.f_bavail
diskSize = diskStats.f_frsize * diskStats.f_blocks
return freeSpace, diskSize


def safeUnpickleFromStream(stream):
string = stream.read()
return pickle.loads(string)
9 changes: 5 additions & 4 deletions src/toil/fileStores/cachingFileStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@
import time
import uuid
from contextlib import contextmanager
from typing import Any, Callable, Generator, Optional
from typing import Callable, Generator, Optional, Tuple

from toil.common import cacheDirName, getDirSizeRecursively, getFileSystemSize
from toil.common import cacheDirName
from toil.fileStores import FileID, make_public_dir
from toil.fileStores.abstractFileStore import AbstractFileStore
from toil.jobStores.abstractJobStore import AbstractJobStore
from toil.lib.humanize import bytes2human
from toil.lib.io import atomic_copy, atomic_copyobj, robust_rmtree
from toil.lib.resources import get_dir_size_recursively, get_file_system_size
from toil.lib.retry import ErrorCondition, retry
from toil.lib.threading import get_process_name, process_name_exists
from toil.job import Job, JobDescription
Expand Down Expand Up @@ -232,7 +233,7 @@ def __init__(self, jobStore: AbstractJobStore, jobDesc: JobDescription, localTem
self._ensureTables(self.con)

# Initialize the space accounting properties
freeSpace, _ = getFileSystemSize(self.localCacheDir)
freeSpace, _ = get_file_system_size(self.localCacheDir)
self._write([('INSERT OR IGNORE INTO properties VALUES (?, ?)', ('maxSpace', freeSpace))])

# Space used by caching and by jobs is accounted with queries
Expand Down Expand Up @@ -999,7 +1000,7 @@ def open(self, job: Job) -> Generator[None, None, None]:
# See how much disk space is used at the end of the job.
# Not a real peak disk usage, but close enough to be useful for warning the user.
# TODO: Push this logic into the abstract file store
disk: int = getDirSizeRecursively(self.localTempDir)
disk: int = get_dir_size_recursively(self.localTempDir)
percent: float = 0.0
if self.jobDiskBytes and self.jobDiskBytes > 0:
percent = float(disk) / self.jobDiskBytes * 100
Expand Down
8 changes: 3 additions & 5 deletions src/toil/fileStores/nonCachingFileStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,19 @@
import fcntl
import logging
import os
import stat
import sys
import uuid
from collections import defaultdict
from contextlib import contextmanager
from typing import Callable, Dict, Optional, Generator

import dill

from toil.common import getDirSizeRecursively, getFileSystemSize
from toil.fileStores import FileID, make_public_dir
from toil.fileStores.abstractFileStore import AbstractFileStore
from toil.jobStores.abstractJobStore import AbstractJobStore
from toil.lib.humanize import bytes2human
from toil.lib.io import robust_rmtree
from toil.lib.resources import get_dir_size_recursively, get_file_system_size
from toil.lib.threading import get_process_name, process_name_exists
from toil.job import Job, JobDescription

Expand All @@ -51,15 +49,15 @@ def open(self, job: Job) -> Generator[None, None, None]:
self.localTempDir = make_public_dir(os.path.join(self.localTempDir, str(uuid.uuid4())))
self._removeDeadJobs(self.workDir)
self.jobStateFile = self._createJobStateFile()
freeSpace, diskSize = getFileSystemSize(self.localTempDir)
freeSpace, diskSize = get_file_system_size(self.localTempDir)
if freeSpace <= 0.1 * diskSize:
logger.warning(f'Starting job {self.jobName} with less than 10%% of disk space remaining.')
try:
os.chdir(self.localTempDir)
with super().open(job):
yield
finally:
disk = getDirSizeRecursively(self.localTempDir)
disk: int = get_dir_size_recursively(self.localTempDir)
percent = float(disk) / jobReqs * 100 if jobReqs > 0 else 0.0
disk_usage = (f"Job {self.jobName} used {percent:.2f}% disk ({bytes2human(disk)}B [{disk}B] used, "
f"{bytes2human(jobReqs)}B [{jobReqs}B] requested).")
Expand Down
15 changes: 14 additions & 1 deletion src/toil/jobStores/fileJobStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import errno
import logging
import os
import stat
import pickle
import random
import re
Expand All @@ -32,7 +31,9 @@
NoSuchFileException,
NoSuchJobException,
NoSuchJobStoreException)
from toil.lib.humanize import bytes2human
from toil.lib.io import AtomicFileCreate, atomic_copy, atomic_copyobj, robust_rmtree
from toil.lib.resources import get_file_system_size

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -105,11 +106,23 @@ def initialize(self, config):
os.makedirs(self.sharedFilesDir, exist_ok=True)
self.linkImports = config.linkImports
self.moveExports = config.moveExports
available_disk_space, total_disk_size = get_file_system_size(self.jobStoreDir)
logger.info(
f'Starting Local JobStore with: '
f'{bytes2human(available_disk_space)}b Free Space Left / {bytes2human(total_disk_size)}b Total Disk '
f'( {self.jobStoreDir} )'
)
super(FileJobStore, self).initialize(config)

def resume(self):
if not os.path.isdir(self.jobStoreDir):
raise NoSuchJobStoreException(self.jobStoreDir)
available_disk_space, total_disk_size = get_file_system_size(self.jobStoreDir)
logger.info(
f'Resuming Local JobStore with: '
f'{bytes2human(available_disk_space)}b Free Space Left / {bytes2human(total_disk_size)}b Total Disk '
f'( {self.jobStoreDir} )'
)
super(FileJobStore, self).resume()

def destroy(self):
Expand Down
48 changes: 47 additions & 1 deletion src/toil/lib/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
import fnmatch
import os
import resource
import subprocess

from typing import List
from typing import List, Tuple


def get_total_cpu_time_and_memory_usage():
Expand All @@ -37,6 +38,51 @@ def get_total_cpu_time():
return me.ru_utime + me.ru_stime + childs.ru_utime + childs.ru_stime


def get_dir_size_recursively(dir_path: str) -> int:
"""
This method will return the cumulative number of bytes occupied by the files
on disk in the directory and its subdirectories.

If the method is unable to access a file or directory (due to insufficient
permissions, or due to the file or directory having been removed while this
function was attempting to traverse it), the error will be handled
internally, and a (possibly 0) lower bound on the size of the directory
will be returned.

The environment variable 'BLOCKSIZE'='512' is set instead of the much cleaner
--block-size=1 because Apple can't handle it.

:param str dir_path: A valid path to a directory or file.
:return: Total size, in bytes, of the file or directory at dirPath.
"""
# du is often faster than using os.lstat(), sometimes significantly so.

# The call: 'du -s /some/path' should give the number of 512-byte blocks
# allocated with the environment variable: BLOCKSIZE='512' set, and we
# multiply this by 512 to return the filesize in bytes.
try:
return int(subprocess.check_output(['du', '-s', dir_path],
env=dict(os.environ, BLOCKSIZE='512')).decode('utf-8').split()[0]) * 512
except subprocess.CalledProcessError:
# Something was inaccessible or went away
return 0


def get_file_system_size(dir_path: str) -> Tuple[int, int]:
"""
Return the free space, and total size of the file system hosting `dirPath`.

:param str dir_path: A valid path to a directory.
:return: free space and total size of file system
:rtype: tuple
"""
assert os.path.exists(dir_path)
disk_stats = os.statvfs(dir_path)
available_disk_space = disk_stats.f_frsize * disk_stats.f_bavail
total_disk_size = disk_stats.f_frsize * disk_stats.f_blocks
return available_disk_space, total_disk_size


def glob(glob_pattern: str, directoryname: str) -> List[str]:
"""
Walks through a directory and its subdirectories looking for files matching
Expand Down
4 changes: 2 additions & 2 deletions src/toil/test/src/miscTests.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def testGetSizeOfDirectoryWorks(self):
This test generates a number of random directories and randomly sized
files to test this using getDirSizeRecursively.
'''
from toil.common import getDirSizeRecursively
from toil.lib.resources import get_dir_size_recursively

# a list of the directories used in the test
directories = [self.testDir]
Expand Down Expand Up @@ -84,7 +84,7 @@ def testGetSizeOfDirectoryWorks(self):
os.link(linkSrc, fileName)
files[fileName] = 'Link to %s' % linkSrc

computedDirectorySize = getDirSizeRecursively(self.testDir)
computedDirectorySize = get_dir_size_recursively(self.testDir)
totalExpectedSize = sum([x for x in list(files.values()) if isinstance(x, int)])
self.assertGreaterEqual(computedDirectorySize, totalExpectedSize)

Expand Down