Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 99 additions & 101 deletions Ficus/src/rust/ficus/src/pipelines/activities_parts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{
repeat_sets::{build_repeat_set_tree_from_repeats, build_repeat_sets},
},
},
pipeline_part,
pipelines::{
context::PipelineInfrastructure,
keys::context_keys::{
Expand Down Expand Up @@ -80,12 +81,10 @@ impl FromStr for ActivitiesLogsSourceDto {
}

impl PipelineParts {
pub(super) fn discover_activities() -> (String, PipelinePartFactory) {
Self::create_pipeline_part(Self::DISCOVER_ACTIVITIES, &|context, _, config| {
let activity_level = Self::get_user_data(config, &ACTIVITY_LEVEL_KEY)?;
Self::do_discover_activities(context, *activity_level, config)
})
}
pipeline_part!(discover_activities, |context: &mut PipelineContext, _, config: &UserDataImpl| {
let activity_level = Self::get_user_data(config, &ACTIVITY_LEVEL_KEY)?;
Self::do_discover_activities(context, *activity_level, config)
});

pub(super) fn do_discover_activities(
context: &mut PipelineContext,
Expand Down Expand Up @@ -116,12 +115,13 @@ impl PipelineParts {
Ok(())
}

pub(super) fn discover_activities_instances() -> (String, PipelinePartFactory) {
Self::create_pipeline_part(Self::DISCOVER_ACTIVITIES_INSTANCES, &|context, _, config| {
pipeline_part!(
discover_activities_instances,
|context: &mut PipelineContext, _, config: &UserDataImpl| {
Self::do_discover_activities_instances(context, config)?;
Ok(())
})
}
}
);

pub(super) fn do_discover_activities_instances(
context: &mut PipelineContext,
Expand Down Expand Up @@ -149,12 +149,10 @@ impl PipelineParts {
Ok(())
}

pub(super) fn create_log_from_activities() -> (String, PipelinePartFactory) {
Self::create_pipeline_part(Self::CREATE_LOG_FROM_ACTIVITIES, &|context, _, config| {
Self::do_create_log_from_activities(context, config)?;
Ok(())
})
}
pipeline_part!(create_log_from_activities, &|context, _, config| {
Self::do_create_log_from_activities(context, config)?;
Ok(())
});

pub(super) fn do_create_log_from_activities(
context: &mut PipelineContext,
Expand Down Expand Up @@ -192,8 +190,9 @@ impl PipelineParts {
Ok(())
}

pub(super) fn discover_activities_instances_for_several_levels() -> (String, PipelinePartFactory) {
Self::create_pipeline_part(Self::DISCOVER_ACTIVITIES_FOR_SEVERAL_LEVEL, &|context, infra, config| {
pipeline_part!(
discover_activities_for_several_levels,
|context: &mut PipelineContext, infra: &PipelineInfrastructure, config: &UserDataImpl| {
let event_classes = Self::get_user_data(config, &REGEXES_KEY)?;
let initial_activity_level = *Self::get_user_data(config, &ACTIVITY_LEVEL_KEY)?;
let patterns_kind = Self::get_user_data(config, &PATTERNS_KIND_KEY)?;
Expand Down Expand Up @@ -223,8 +222,8 @@ impl PipelineParts {
}

Ok(())
})
}
}
);

pub(super) fn adjust_with_activities_from_unattached_events(
old_context: &mut PipelineContext,
Expand Down Expand Up @@ -266,8 +265,9 @@ impl PipelineParts {
Ok(())
}

pub(super) fn discover_activities_in_unattached_subtraces() -> (String, PipelinePartFactory) {
Self::create_pipeline_part(Self::DISCOVER_ACTIVITIES_IN_UNATTACHED_SUBTRACES, &|context, _, config| {
pipeline_part!(
discover_activities_in_unattached_subtraces,
|context: &mut PipelineContext, _, config: &UserDataImpl| {
let log = Self::get_user_data(context, &EVENT_LOG_KEY)?;
let mut existing_activities = &Self::create_empty_activities(log);

Expand Down Expand Up @@ -296,8 +296,8 @@ impl PipelineParts {
context.put_concrete(TRACE_ACTIVITIES_KEY.key(), new_activities);

Ok(())
})
}
}
);

pub(super) fn create_add_unattached_events_part(&self, config: UserDataImpl) -> DefaultPipelinePart {
let name = Self::DISCOVER_ACTIVITIES_IN_UNATTACHED_SUBTRACES;
Expand All @@ -315,12 +315,10 @@ impl PipelineParts {
activities
}

pub(super) fn clear_activities_related_stuff() -> (String, PipelinePartFactory) {
Self::create_pipeline_part(Self::CLEAR_ACTIVITIES, &|context, _, _| {
Self::do_clear_activities_related_stuff(context);
Ok(())
})
}
pipeline_part!(clear_activities, |context: &mut PipelineContext, _, _| {
Self::do_clear_activities_related_stuff(context);
Ok(())
});

pub(super) fn do_clear_activities_related_stuff(context: &mut PipelineContext) {
context.remove_concrete(ACTIVITIES_KEY.key());
Expand All @@ -329,16 +327,16 @@ impl PipelineParts {
context.remove_concrete(REPEAT_SETS_KEY.key());
}

pub(super) fn get_number_of_underlying_events() -> (String, PipelinePartFactory) {
Self::create_pipeline_part(Self::GET_UNDERLYING_EVENTS_COUNT, &|context, infra, _| {
let log = Self::get_user_data(context, &EVENT_LOG_KEY)?;
let count = count_underlying_events(log);
infra.log(format!("Number of underlying events: {}", &count).as_str())?;
pipeline_part!(get_underlying_events_count, |context: &mut PipelineContext,
infra: &PipelineInfrastructure,
_| {
let log = Self::get_user_data(context, &EVENT_LOG_KEY)?;
let count = count_underlying_events(log);
infra.log(format!("Number of underlying events: {}", &count).as_str())?;

context.put_concrete(UNDERLYING_EVENTS_COUNT_KEY.key(), count);
Ok(())
})
}
context.put_concrete(UNDERLYING_EVENTS_COUNT_KEY.key(), count);
Ok(())
});

pub(super) fn execute_with_activities_instances(
activities: &Vec<ActivityInTraceInfo>,
Expand All @@ -362,8 +360,9 @@ impl PipelineParts {
Ok(())
}

pub(super) fn discover_activities_until_no_more() -> (String, PipelinePartFactory) {
Self::create_pipeline_part(Self::DISCOVER_ACTIVITIES_UNTIL_NO_MORE, &|context, infra, config| {
pipeline_part!(
discover_activities_until_no_more,
|context: &mut PipelineContext, infra: &PipelineInfrastructure, config: &UserDataImpl| {
let activity_level = *Self::get_user_data(config, &ACTIVITY_LEVEL_KEY)?;
let after_activities_extraction_pipeline = Self::get_user_data(config, &PIPELINE_KEY);
let execute_only_after_last_extraction = *Self::get_user_data(config, &EXECUTE_ONLY_ON_LAST_EXTRACTION_KEY)?;
Expand Down Expand Up @@ -413,11 +412,12 @@ impl PipelineParts {
return Ok(());
}
}
})
}
}
);

pub(super) fn execute_with_each_activity_log() -> (String, PipelinePartFactory) {
Self::create_pipeline_part(Self::EXECUTE_WITH_EACH_ACTIVITY_LOG, &|context, infra, config| {
pipeline_part!(
execute_with_each_activity_log,
|context: &mut PipelineContext, infra: &PipelineInfrastructure, config: &UserDataImpl| {
let pipeline = Self::get_user_data(config, &PIPELINE_KEY)?;
let activities_to_logs = Self::create_activities_to_logs(context, config)?;

Expand All @@ -430,8 +430,8 @@ impl PipelineParts {
}

Ok(())
})
}
}
);

fn create_activities_to_logs(
context: &mut PipelineContext,
Expand All @@ -452,69 +452,66 @@ impl PipelineParts {
}
}

pub(super) fn substitute_underlying_events() -> (String, PipelinePartFactory) {
Self::create_pipeline_part(Self::SUBSTITUTE_UNDERLYING_EVENTS, &|context, _, _| {
let log = Self::get_user_data_mut(context, &EVENT_LOG_KEY)?;
let mut new_log = XesEventLogImpl::empty();
pipeline_part!(substitute_underlying_events, |context: &mut PipelineContext, _, _| {
let log = Self::get_user_data_mut(context, &EVENT_LOG_KEY)?;
let mut new_log = XesEventLogImpl::empty();

for trace in log.traces() {
let mut new_trace = XesTraceImpl::empty();
for event in trace.borrow().events() {
substitute_underlying_events(event, &mut new_trace);
}

new_log.push(Rc::new(RefCell::new(new_trace)));
for trace in log.traces() {
let mut new_trace = XesTraceImpl::empty();
for event in trace.borrow().events() {
substitute_underlying_events(event, &mut new_trace);
}

context.put_concrete(EVENT_LOG_KEY.key(), new_log);
Ok(())
})
}
new_log.push(Rc::new(RefCell::new(new_trace)));
}

pub(super) fn apply_class_extractor() -> (String, PipelinePartFactory) {
Self::create_pipeline_part(Self::APPLY_CLASS_EXTRACTOR, &|context, _, config| {
let log = Self::get_user_data_mut(context, &EVENT_LOG_KEY)?;
context.put_concrete(EVENT_LOG_KEY.key(), new_log);
Ok(())
});

let event_class_regex = Self::get_user_data(config, &EVENT_CLASS_REGEX_KEY)?;
let event_class_regex = Self::try_parse_regex(event_class_regex)?;
pipeline_part!(apply_class_extractor, |context: &mut PipelineContext, _, config: &UserDataImpl| {
let log = Self::get_user_data_mut(context, &EVENT_LOG_KEY)?;

let filter_regex = Self::get_user_data(config, &REGEX_KEY)?;
let filter_regex = Self::try_parse_regex(filter_regex)?;
let event_class_regex = Self::get_user_data(config, &EVENT_CLASS_REGEX_KEY)?;
let event_class_regex = Self::try_parse_regex(event_class_regex)?;

for trace in log.traces() {
for event in trace.borrow().events() {
if !filter_regex.is_match(event.borrow().name()).ok().unwrap() {
continue;
}
let filter_regex = Self::get_user_data(config, &REGEX_KEY)?;
let filter_regex = Self::try_parse_regex(filter_regex)?;

let borrowed_event = event.borrow();
let found_match = event_class_regex.find(borrowed_event.name());
if found_match.is_err() {
continue;
}
for trace in log.traces() {
for event in trace.borrow().events() {
if !filter_regex.is_match(event.borrow().name()).ok().unwrap() {
continue;
}

let (start, end) = if let Ok(Some(found_match)) = found_match {
(found_match.start(), found_match.end())
} else {
(0, borrowed_event.name().len())
};
let borrowed_event = event.borrow();
let found_match = event_class_regex.find(borrowed_event.name());
if found_match.is_err() {
continue;
}

drop(found_match);
drop(borrowed_event);
let (start, end) = if let Ok(Some(found_match)) = found_match {
(found_match.start(), found_match.end())
} else {
(0, borrowed_event.name().len())
};

if start == 0 {
let new_name = event.borrow().name()[start..end].to_owned();
event.borrow_mut().set_name(new_name);
}
drop(found_match);
drop(borrowed_event);

if start == 0 {
let new_name = event.borrow().name()[start..end].to_owned();
event.borrow_mut().set_name(new_name);
}
}
}

Ok(())
})
}
Ok(())
});

pub(super) fn serialize_activities_logs() -> (String, PipelinePartFactory) {
Self::create_pipeline_part(Self::SERIALIZE_ACTIVITIES_LOGS, &|context, _, config| {
pipeline_part!(
serialize_activities_logs,
|context: &mut PipelineContext, _, config: &UserDataImpl| {
let logs_to_activities = Self::create_activities_to_logs(context, config)?;
let path = Path::new(Self::get_user_data(config, &PATH_KEY)?);
let format = Self::get_user_data(config, &LOG_SERIALIZATION_FORMAT_KEY)?;
Expand All @@ -539,11 +536,12 @@ impl PipelineParts {
}

Ok(())
})
}
}
);

pub(super) fn reverse_hierarchy_indices() -> (String, PipelinePartFactory) {
Self::create_pipeline_part(Self::REVERSE_HIERARCHY_INDICES, &|context, _, _| {
pipeline_part!(
reverse_hierarchy_indices,
|context: &mut PipelineContext, _, config: &UserDataImpl| {
let log = Self::get_user_data_mut(context, &EVENT_LOG_KEY)?;

const HIERARCHY_LEVEL: &str = "hierarchy_level_";
Expand Down Expand Up @@ -591,6 +589,6 @@ impl PipelineParts {
}

Ok(())
})
}
}
);
}
Loading
Loading