Skip to content

Commit

Permalink
fix import errors
Browse files Browse the repository at this point in the history
  • Loading branch information
Acribbs committed Jan 17, 2025
1 parent b952fc1 commit 0791bbd
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 1 deletion.
49 changes: 49 additions & 0 deletions cgatcore/pipeline/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,52 @@ def __exit__(self, exc_type, exc_value, traceback):
"""Exit the runtime context related to this object."""
# Cleanup logic, if any, can be added here
pass


class Executor(BaseExecutor):
"""Main executor class that handles job execution and resource management."""

def __init__(self, **kwargs):
"""Initialize with configuration options."""
super().__init__(**kwargs)
self.task_name = "executor_task"
self.default_total_time = 5

# Initialize job options
self.job_options = kwargs.get('job_options', '')
self.queue = kwargs.get('queue')
self.cluster_queue_manager = kwargs.get('cluster_queue_manager', 'slurm')

def run(self, statement_list, **kwargs):
"""Execute a list of statements.
Args:
statement_list (list): List of commands to execute
**kwargs: Additional execution options
Returns:
tuple: (exit_code, stdout, stderr)
"""
if isinstance(statement_list, str):
statement_list = [statement_list]

results = []
for statement in statement_list:
# Choose appropriate executor based on configuration
if self.cluster_queue_manager == 'slurm':
from cgatcore.pipeline.executors import SlurmExecutor
executor = SlurmExecutor(**self.config)
elif self.cluster_queue_manager == 'sge':
from cgatcore.pipeline.executors import SGEExecutor
executor = SGEExecutor(**self.config)
elif self.cluster_queue_manager == 'torque':
from cgatcore.pipeline.executors import TorqueExecutor
executor = TorqueExecutor(**self.config)
else:
from cgatcore.pipeline.executors import LocalExecutor
executor = LocalExecutor(**self.config)

result = executor.run(statement)
results.append(result)

return results[0] if len(results) == 1 else results
3 changes: 2 additions & 1 deletion cgatcore/pipeline/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@
from cgatcore.pipeline.utils import get_caller, get_caller_locals, is_test
from cgatcore.pipeline.execution import (
execute, start_session, get_drmaa_session, close_session)
from cgatcore.pipeline.executors import (
Executor, LocalExecutor, SlurmExecutor, SGEExecutor, TorqueExecutor)

# redirect os.stat and other OS utilities to cached versions to speed
# up ruffus. Be careful not to use os.stat in task functions.
# Does ruffus rely on os.stat for completed tasks?
SAVED_OS_STAT = os.stat
SAVED_OS_PATH_ABSPATH = os.path.abspath
SAVED_OS_PATH_REALPATH = os.path.realpath
Expand Down

0 comments on commit 0791bbd

Please sign in to comment.