Skip to content

Commit

Permalink
Improve error messages when child fact queries raise error.
Browse files Browse the repository at this point in the history
  • Loading branch information
namcsi committed Jun 16, 2023
1 parent fe069ce commit f304215
Showing 1 changed file with 47 additions and 25 deletions.
72 changes: 47 additions & 25 deletions src/renopro/reify.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@
SymbolPredicateUnifier, parse_fact_string,
UnifierNoMatchError, parse_fact_files)


class ChildQueryError(Exception):
pass


class ChildrenQueryError(Exception):
pass


import renopro.clorm_predicates as preds

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -282,22 +291,30 @@ def _reify_symbol_string(self, symb):
value=symb.string))
return string1

def _reflect_child_pred(self, id_predicate):
def _reflect_child_pred(self, parent_pred, child_id_pred):
"""Utility function that takes a unary ast predicate
containing only an identifier pointing to a child predicate,
returns the child node obtained by reflecting the child
predicate.
queries reified factbase for child predicate, and returns the
child node obtained by reflecting the child predicate.
"""
identifier = id_predicate.id
identifier = child_id_pred.id
nonunary_pred = getattr(preds,
type(id_predicate).__name__.rstrip("1"))
pred = self._reified.query(nonunary_pred)\
.where(nonunary_pred.id == identifier)\
.singleton()
return self._reflect_predicate(pred)

def _reflect_child_preds(self, id_predicate):
type(child_id_pred).__name__.rstrip("1"))
query = self._reified.query(nonunary_pred)\
.where(nonunary_pred.id == identifier)
child_preds = list(query.all())
if len(child_preds) == 0:
msg = f"Error finding child fact of predicate {parent_pred}:\nExpected single child fact for identifier {child_id_pred}, found none."
raise ChildQueryError(msg)
elif len(child_preds) > 1:
msg = f"Error finding child fact of predicate {parent_pred}:\nExpected single child fact for identifier {child_id_pred}, found multiple:\n" + "\n".join(child_preds)
raise ChildQueryError(msg)
else:
child_pred = child_preds[0]
return self._reflect_predicate(child_pred)

def _reflect_child_preds(self, parent_pred, id_predicate):
"""Utility function that takes a unary ast predicate
containing only an identifier pointing to a tuple of child
predicates, and returns a list of the child nodes obtained by
Expand All @@ -307,13 +324,18 @@ def _reflect_child_preds(self, id_predicate):
identifier = id_predicate.id
nonunary_pred = getattr(preds,
type(id_predicate).__name__.rstrip("1"))
tuples = self._reified.query(nonunary_pred)\
query = self._reified.query(nonunary_pred)\
.where(nonunary_pred.id == identifier)\
.order_by(nonunary_pred.position)\
.all()
elements = [tup.element for tup in tuples]
nodes = list(map(self._reflect_child_pred, elements))
return nodes
.order_by(nonunary_pred.position)
tuples = list(query.all())
child_nodes = list()
for tup in tuples:
try:
child_nodes.append(self._reflect_child_pred(tup, tup.element))
except ChildQueryError as e:
msg = f"Error finding tuple of child facts of predicate {parent_pred}:\n"
raise ChildrenQueryError(msg + str(e))
return child_nodes

@singledispatchmethod
def _reflect_predicate(self, pred: preds.AST_Predicate): # nocoverage
Expand All @@ -329,29 +351,29 @@ def _reflect_predicate(self, pred: preds.AST_Predicate): # nocoverage
@_reflect_predicate.register
def _reflect_program(self, program: preds.Program) -> Sequence[AST]:
subprogram = list()
parameter_nodes = self._reflect_child_preds(program.parameters)
parameter_nodes = self._reflect_child_preds(program, program.parameters)
subprogram.append(ast.Program(location=DUMMY_LOC,
name=program.name,
parameters=parameter_nodes))
statement_nodes = self._reflect_child_preds(program.statements)
statement_nodes = self._reflect_child_preds(program, program.statements)
subprogram.extend(statement_nodes)
return subprogram

@_reflect_predicate.register
def _reflect_rule(self, rule: preds.Rule) -> AST:
head_node = self._reflect_child_pred(rule.head)
body_nodes = self._reflect_child_preds(rule.body)
head_node = self._reflect_child_pred(rule, rule.head)
body_nodes = self._reflect_child_preds(rule, rule.body)
return ast.Rule(location=DUMMY_LOC, head=head_node, body=body_nodes)

@_reflect_predicate.register
def _reflect_literal(self, lit: preds.Literal) -> AST:
sign = preds.sign_cl2ast[lit.sig]
atom_node = self._reflect_child_pred(lit.atom)
atom_node = self._reflect_child_pred(lit, lit.atom)
return ast.Literal(location=DUMMY_LOC, sign=sign, atom=atom_node)

@_reflect_predicate.register
def _reflect_function(self, func: preds.Function) -> AST:
arg_nodes = self._reflect_child_preds(func.arguments)
arg_nodes = self._reflect_child_preds(func, func.arguments)
return ast.Function(
location=DUMMY_LOC, name=func.name, arguments=arg_nodes, external=0
)
Expand Down Expand Up @@ -385,8 +407,8 @@ def _reflect_binary_operation(
ast_operator = preds.binary_operator_cl2ast[operation.operator]
return ast.BinaryOperation(
location=DUMMY_LOC, operator_type=ast_operator,
left=self._reflect_child_pred(operation.left),
right=self._reflect_child_pred(operation.right)
left=self._reflect_child_pred(operation, operation.left),
right=self._reflect_child_pred(operation, operation.right)
)

def reflect(self):
Expand Down

0 comments on commit f304215

Please sign in to comment.