Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 160 additions & 64 deletions src/synpp/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -931,81 +931,177 @@ def run_from_cmd(argv):
class Synpp:
def __init__(self, config: dict, working_directory: str = None, logger: logging.Logger = logging.getLogger("synpp"),
definitions: List[Dict[str, Union[str, Callable, ModuleType]]] = None, flowchart_path: str = None,
dryrun: bool = False, externals: Dict[str, str] = {}, aliases = {}):
self.config = config
dryrun: bool = False, externals: Dict[str, str] = None, aliases: dict = None):
if not isinstance(config, dict):
raise PipelineError("config must be a dictionary")

# Store core configuration
self.config = copy.deepcopy(config)
self.working_directory = working_directory
self.logger = logger
self.definitions = definitions
self.flowchart_path = flowchart_path
self.dryrun = dryrun
self.externals = externals
self.aliases = aliases

def run_pipeline(self, definitions=None, rerun_required=True, dryrun=None, verbose=False, flowchart_path=None):
if definitions is None and self.definitions is None:

# Handle mutable defaults safely
self.externals = externals.copy() if externals is not None else {}
self.aliases = aliases.copy() if aliases is not None else {}

def run_pipeline(self, definitions=None, rerun_required=True, verbose=False):
# Determine which definitions to use
if definitions is not None:
effective_definitions = definitions
elif self.definitions is not None:
effective_definitions = self.definitions
else:
raise PipelineError("A list of stage definitions must be available in object or provided explicitly.")
elif definitions is None:
definitions = self.definitions
if dryrun is None:
dryrun = self.dryrun
return run(definitions, self.config, self.working_directory, flowchart_path=flowchart_path,
dryrun=dryrun, verbose=verbose, logger=self.logger, rerun_required=rerun_required,
ensure_working_directory=True, externals=self.externals, aliases=self.aliases)

def run_single(self, descriptor, config={}, rerun_if_cached=False, dryrun=False, verbose=False):
return run([{'descriptor': descriptor, 'config': config}], self.config, self.working_directory,
dryrun=dryrun, verbose=verbose, logger=self.logger, rerun_required=rerun_if_cached,
flowchart_path=self.flowchart_path, ensure_working_directory=True, externals=self.externals,
aliases=self.aliases)[0]


return run(
definitions=effective_definitions,
config=self.config,
working_directory=self.working_directory,
flowchart_path=self.flowchart_path,
dryrun=self.dryrun,
verbose=verbose,
logger=self.logger,
rerun_required=rerun_required,
ensure_working_directory=True,
externals=self.externals,
aliases=self.aliases
)

def run_single(self, descriptor, config=None, rerun_if_cached=False, dryrun=False, verbose=False):
if config is None:
config = {}


@staticmethod
def build_from_yml(config_path, working_directory = None, run = [], overrides = {}):
with open(config_path) as f:
settings = yaml.load(f, Loader=yaml.SafeLoader)
stage_definition = {'descriptor': descriptor, 'config': config}

results = run(
definitions=[stage_definition],
config=self.config,
working_directory=self.working_directory,
flowchart_path=self.flowchart_path,
dryrun=dryrun, # Use parameter override for single stage
verbose=verbose,
logger=self.logger,
rerun_required=rerun_if_cached,
ensure_working_directory=True,
externals=self.externals,
aliases=self.aliases
)
return results[0]

@staticmethod
def build_from_yml(config_path, working_directory=None, run=None, overrides=None):
if run is None:
run = []
if overrides is None:
overrides = {}

# Load and validate YAML configuration
if not os.path.isfile(config_path):
raise PipelineError(f"Configuration file not found: {config_path}")

try:
with open(config_path, 'r', encoding='utf-8') as f:
settings = yaml.load(f, Loader=yaml.SafeLoader)
except yaml.YAMLError as e:
raise PipelineError(f"Invalid YAML syntax in {config_path}: {e}")
except Exception as e:
raise PipelineError(f"Error reading configuration file {config_path}: {e}")

if not isinstance(settings, dict):
raise PipelineError(f"YAML root must be a dictionary in {config_path}")

# Process stage definitions
if run:
run_stages = run
elif "run" in settings:
run_stages = settings["run"]
else:
raise PipelineError("No 'run' section found in configuration and no run parameter provided")

if not isinstance(run_stages, list):
raise PipelineError("'run' section must be a list")

definitions = []

run = run if len(run) > 0 else settings["run"]
for item in run:
parameters = {}

if type(item) == dict:
key = list(item.keys())[0]
parameters = item[key]
item = key

definitions.append({
"descriptor": item, "config": parameters
})

config = settings["config"] if "config" in settings else {}

for option, value in overrides.items():
current = config
for segment in option.split(".")[:-1]:
if not segment in current:
current[segment] = dict()

current = current[segment]

assert type(current) == dict

option = option.split(".")[-1]
if option in current:
current[option] = type(current[option])(value)
for item in run_stages:
if isinstance(item, str):
# Simple string descriptor
definitions.append({"descriptor": item, "config": {}})
elif isinstance(item, dict):
# Dictionary with single key-value pair (descriptor -> config)
if len(item) != 1:
raise PipelineError(f"Run item dictionary must have exactly one key-value pair: {item}")
descriptor = next(iter(item.keys()))
parameters = item[descriptor]
definitions.append({"descriptor": descriptor, "config": parameters})
else:
current[option] = value

if working_directory is None:
working_directory = settings["working_directory"] if "working_directory" in settings else None
raise PipelineError(f"Run item must be string or dictionary, got {type(item)}: {item}")

flowchart_path = settings["flowchart_path"] if "flowchart_path" in settings else None
dryrun = settings["dryrun"] if "dryrun" in settings else False
externals = settings["externals"] if "externals" in settings else {}
aliases = settings["aliases"] if "aliases" in settings else {}

return Synpp(config=config, working_directory=working_directory, definitions=definitions,
flowchart_path=flowchart_path, dryrun=dryrun, externals=externals, aliases=aliases)
# Extract base configuration and apply overrides
base_config = settings.get("config", {})

if overrides:
# Work with a deep copy to avoid modifying the original
final_config = copy.deepcopy(base_config)

for option_path, value in overrides.items():
if not isinstance(option_path, str) or not option_path:
raise PipelineError(f"Override key must be non-empty string: {option_path}")

# Split the dot-notation path
path_segments = option_path.split(".")
current_dict = final_config

# Navigate to the parent of the target key
for segment in path_segments[:-1]:
if segment not in current_dict:
current_dict[segment] = {}
elif not isinstance(current_dict[segment], dict):
raise PipelineError(f"Cannot override {option_path}: '{segment}' is not a dictionary")
current_dict = current_dict[segment]

# Ensure current location is a dictionary
if not isinstance(current_dict, dict):
raise PipelineError(f"Cannot apply override {option_path}: path does not lead to a dictionary")

# Apply the override value
final_key = path_segments[-1]
if final_key in current_dict:
# Try to preserve the original type if possible
original_value = current_dict[final_key]
try:
# Attempt type-preserving conversion
current_dict[final_key] = type(original_value)(value)
except (ValueError, TypeError):
# If conversion fails, use the new value as-is
current_dict[final_key] = value
else:
# New key, use value as-is
current_dict[final_key] = value
else:
final_config = copy.deepcopy(base_config)

# Extract other settings with proper defaults
resolved_working_directory = working_directory if working_directory is not None else settings.get("working_directory")
flowchart_path = settings.get("flowchart_path")
dryrun = settings.get("dryrun", False)
externals = settings.get("externals", {})
aliases = settings.get("aliases", {})

# Create and return Synpp instance
return Synpp(
config=final_config,
working_directory=resolved_working_directory,
definitions=definitions,
flowchart_path=flowchart_path,
dryrun=dryrun,
externals=externals,
aliases=aliases
)


def stage(function=None, *args, **kwargs):
Expand Down
Loading