Skip to content

Logic Walkthrough

Val Huber edited this page Dec 7, 2020 · 29 revisions

Logic Engine Internals - Walkthrough

Assumptions

Rules are associated with Mapped Tables and their Columns. You can think of them as extensions to the (logical) schema, or your SQLAlchemy models class definitions.

Rules are intended to govern derivations / constraints as transactions are committed ("transaction logic"), as well as other actions such as starting processes / sending mail or messages as a result of a transaction.

Transaction logic does not address:

  • SQLAlchemy mass updates
  • SQLAlchemy updates not using Mapped Class (ie., "raw sql")
  • updates made outside SQLAlchemy

Open questions:

  • Inheritance

Usage

Declaring Rules

Rules are declared with function calls - see rules-bank, which looks like this:

Rule.constraint(validate=Customer, as_condition=lambda row: row.Balance <= row.CreditLimit,
                error_msg="balance ({row.Balance}) exceeds credit ({row.CreditLimit})")
Rule.sum(derive=Customer.Balance, as_sum_of=Order.AmountTotal,
         where=lambda row: row.ShippedDate is None)

Rule.sum(derive=Order.AmountTotal, as_sum_of=OrderDetail.Amount)

Rule.formula(derive=OrderDetail.Amount, as_expression=lambda row: row.UnitPrice * row.Quantity)
Rule.copy(derive=OrderDetail.UnitPrice, from_parent=Product.UnitPrice)
Rule.formula(derive=OrderDetail.ShippedDate, as_expression=lambda row: row.OrderHeader.ShippedDate)

Rule.sum(derive=Product.UnitsShipped, as_sum_of=OrderDetail.Quantity,
         where="row.ShippedDate is not None")
Rule.formula(derive=Product.UnitsInStock, calling=units_shipped)

Note code completion enables you to discover and code rules in IDEs.

Implementation Overview

The logic_bank package provides for the definition and enforcement of rules.

Rule Declared on Mapped Classes

logic_bank.py provides functions for rule definition. Invoking these "deposits" rules in the rules_bank, where the rules are stored in memory for the logic engine.

Rule classes are in the rule_type package. Instances of these are what is deposited in the RuleBank, and are executed when transactions are committed.

The rule_bank is a singleton, which maintains the rules by table.

rule_bank_withdraw.py provides essentially instance methods in RuleBank, but had to be broken out to work around circular dependencies. Better solution would be desirable.

Execution

Let's consider placing an order, and checking credit. The focus here is on multi-level roll-ups, to compute the balance and check it against the credit.

User issues commit

Execution begins in nw/tests/test_add_order.py.

The import statement from nw.nw.logic import session runs nw/logic/__init__, which

  1. opens the database (create engine, conn etc)
  2. calls LogicBank.activate(session=session, activator=declare_logic), which sets up the RuleBank, loads the rules into the RuleBank (but does not execute them), and registers the before_flush listeners

The RuleBank is a singleton, which maintains the rules for each mapped class.

SQLAlchemy before_flush Listeners - create / run LogicRows

When test_add_order.py issues commit, SQLAlchemy invokes the logic_bank/exec_trans_logic/listeners.py, which, for each row:

  1. Creates a LogicRow instance: this wraps the row and old_row
  2. LogicRow implements the logic for for insert, delete and update rows. This is the core of the rules engine.

The old_row is of paramount importance, enabling pruning, adjustment logic, and user logic (how much did the salary change?)

Logic Row Operation

update looks like this:

    def update(self, reason: str = None, row: base = None):
        """
        make updates - with logic - in events, for example

        row = sqlalchemy read
        logic_row.update(row=row, msg="my log message")
        """
        if row is not None:
            user_logic_row = self.user_row_update(row=row, ins_upd_dlt="upd")
            user_logic_row.update(reason=reason)
        else:
            self.reason = reason
            self.log("Update - " + reason)
            self.early_row_events()
            self.copy_rules()
            self.formula_rules()
            self.adjust_parent_aggregates()  # parent chaining (sum / count adjustments)
            self.constraints()
            self.cascade_to_children()  # child chaining (cascade changed parent references)
            if self.row_sets is not None:  # eg, for debug as in upd_order_shipped test
                self.row_sets.remove_submitted(logic_row=self)

LogicRow finds the rules for that mapped table from the RuleBank, and executes the rule_type instance.

For example, here are formula_rules:

    def formula_rules(self):
        """ execute un-pruned formulae, in dependency order """
        self.log_engine("formula_rules")
        formula_rules = rule_bank_withdraw.rules_of_class(self, Formula)
        formula_rules.sort(key=lambda formula: formula._exec_order)
        for each_formula in formula_rules:
            if not self.is_formula_pruned(each_formula):
                each_formula.execute(self)
Logic Chaining

Rules operation is "forward chaining". The system analyzes each rule for dependencies; when those are changed, the system recomputes (or adjusts) the referencing value. This can chain forward, even across tables. So, a change to an OrderDetail can adjust the Order, which adjusts the Customer.

This can be seen in the following log, resulting from client insertion of an Order and 2 OrderDetails in add_order.py.

  • The indention indicates the chaining of multi-table logic across tables.
  • Row state is shown for each column, including old values [in brackets].
Logic Phase:		BEFORE COMMIT          						 - 2020-10-05 20:23:26,050 - logic_logger - DEBUG
Logic Phase:		ROW LOGIC (sqlalchemy before_flush)			 - 2020-10-05 20:23:26,050 - logic_logger - DEBUG
..Order[None] {Insert - client} AmountTotal: 0, CustomerId: ALFKI, EmployeeId: 6, Freight: 1, Id: None, OrderDate: None, RequiredDate: None, ShipAddress: None, ShipCity: Richmond, ShipCountry: None, ShipName: None, ShipPostalCode: None, ShipRegion: None, ShipVia: None, ShippedDate: None  row@: 0x102e67f40 - 2020-10-05 20:23:26,052 - logic_logger - DEBUG
....Customer[ALFKI] {Update - Adjusting Customer} Address: Obere Str. 57, Balance: 960.0000000000, City: Berlin, CompanyName: Alfreds Futterkiste, ContactName: Maria Anders, ContactTitle: Sales Representative, Country: Germany, CreditLimit: 2000.0000000000, Fax: 030-0076545, Id: ALFKI, OrderCount:  [8-->] 9, Phone: 030-0074321, PostalCode: 12209, Region: Western Europe, UnpaidOrderCount:  [3-->] 4  row@: 0x1030cc490 - 2020-10-05 20:23:26,072 - logic_logger - DEBUG
..OrderDetail[None] {Insert - client} Amount: 0, Discount: 0, Id: None, OrderId: None, ProductId: 1, Quantity: 1, ShippedDate: None, UnitPrice: 18  row@: 0x102e67fd0 - 2020-10-05 20:23:26,074 - logic_logger - DEBUG
..OrderDetail[None] {copy_rules for role: ProductOrdered} Amount: 0, Discount: 0, Id: None, OrderId: None, ProductId: 1, Quantity: 1, ShippedDate: None, UnitPrice: 18  row@: 0x102e67fd0 - 2020-10-05 20:23:26,079 - logic_logger - DEBUG
..OrderDetail[None] {Formula Amount} Amount: 18.0000000000, Discount: 0, Id: None, OrderId: None, ProductId: 1, Quantity: 1, ShippedDate: None, UnitPrice: 18.0000000000  row@: 0x102e67fd0 - 2020-10-05 20:23:26,082 - logic_logger - DEBUG
....Order[None] {Update - Adjusting OrderHeader} AmountTotal:  [0-->] 18.0000000000, CustomerId: ALFKI, EmployeeId: 6, Freight: 1, Id: None, OrderDate: None, RequiredDate: None, ShipAddress: None, ShipCity: Richmond, ShipCountry: None, ShipName: None, ShipPostalCode: None, ShipRegion: None, ShipVia: None, ShippedDate: None  row@: 0x102e67f40 - 2020-10-05 20:23:26,085 - logic_logger - DEBUG
......Customer[ALFKI] {Update - Adjusting Customer} Address: Obere Str. 57, Balance:  [960.0000000000-->] 978.0000000000, City: Berlin, CompanyName: Alfreds Futterkiste, ContactName: Maria Anders, ContactTitle: Sales Representative, Country: Germany, CreditLimit: 2000.0000000000, Fax: 030-0076545, Id: ALFKI, OrderCount: 9, Phone: 030-0074321, PostalCode: 12209, Region: Western Europe, UnpaidOrderCount: 4  row@: 0x1030cc490 - 2020-10-05 20:23:26,088 - logic_logger - DEBUG
..OrderDetail[None] {Insert - client} Amount: 0, Discount: 0, Id: None, OrderId: None, ProductId: 2, Quantity: 2, ShippedDate: None, UnitPrice: 18  row@: 0x102ef2040 - 2020-10-05 20:23:26,092 - logic_logger - DEBUG
..OrderDetail[None] {copy_rules for role: ProductOrdered} Amount: 0, Discount: 0, Id: None, OrderId: None, ProductId: 2, Quantity: 2, ShippedDate: None, UnitPrice: 18  row@: 0x102ef2040 - 2020-10-05 20:23:26,097 - logic_logger - DEBUG
..OrderDetail[None] {Formula Amount} Amount: 38.0000000000, Discount: 0, Id: None, OrderId: None, ProductId: 2, Quantity: 2, ShippedDate: None, UnitPrice: 19.0000000000  row@: 0x102ef2040 - 2020-10-05 20:23:26,098 - logic_logger - DEBUG
....Order[None] {Update - Adjusting OrderHeader} AmountTotal:  [18.0000000000-->] 56.0000000000, CustomerId: ALFKI, EmployeeId: 6, Freight: 1, Id: None, OrderDate: None, RequiredDate: None, ShipAddress: None, ShipCity: Richmond, ShipCountry: None, ShipName: None, ShipPostalCode: None, ShipRegion: None, ShipVia: None, ShippedDate: None  row@: 0x102e67f40 - 2020-10-05 20:23:26,103 - logic_logger - DEBUG
......Customer[ALFKI] {Update - Adjusting Customer} Address: Obere Str. 57, Balance:  [978.0000000000-->] 1016.0000000000, City: Berlin, CompanyName: Alfreds Futterkiste, ContactName: Maria Anders, ContactTitle: Sales Representative, Country: Germany, CreditLimit: 2000.0000000000, Fax: 030-0076545, Id: ALFKI, OrderCount: 9, Phone: 030-0074321, PostalCode: 12209, Region: Western Europe, UnpaidOrderCount: 4  row@: 0x1030cc490 - 2020-10-05 20:23:26,105 - logic_logger - DEBUG
Logic Phase:		COMMIT   									 - 2020-10-05 20:23:26,106 - logic_logger - DEBUG
..Order[None] {Commit Event} AmountTotal: 56.0000000000, CustomerId: ALFKI, EmployeeId: 6, Freight: 1, Id: None, OrderDate: None, RequiredDate: None, ShipAddress: None, ShipCity: Richmond, ShipCountry: None, ShipName: None, ShipPostalCode: None, ShipRegion: None, ShipVia: None, ShippedDate: None  row@: 0x102e67f40 - 2020-10-05 20:23:26,107 - logic_logger - DEBUG
..Order[None] {Hi, Steven, congratulate Michael on their new order} AmountTotal: 56.0000000000, CustomerId: ALFKI, EmployeeId: 6, Freight: 1, Id: None, OrderDate: None, RequiredDate: None, ShipAddress: None, ShipCity: Richmond, ShipCountry: None, ShipName: None, ShipPostalCode: None, ShipRegion: None, ShipVia: None, ShippedDate: None  row@: 0x102e67f40 - 2020-10-05 20:23:26,112 - logic_logger - DEBUG
Logic Phase:		FLUSH   (sqlalchemy flush processing       	 - 2020-10-05 20:23:26,113 - logic_logger - DEBUG

Key Elements

Rules Bank

A singleton with (from the code):

Attributes

_tables Dict[mapped_class_name: str, List[TableRules]]
_metadata, _base, _engine from SQLAlchemy

Where TableRules is (again, from the code):

Rules and dependencies for a mapped class

attributes
 rules : List['AbstractRule'] -- Sums, Constraints, Formulas etc for this mapped class
 referring_children : Dict[parent_role_name - str, List[parent_attr_name: str]] - Information driving cascade

Formulas - Dependencies and Pruning

Each formula rule definition has an _exec_order, which determines execution order. It is set in rule_bank_setup.validate(), (and yes, detects cycles).

Formula instance execution is pruned if their dependent attributes are unchanged. This is driven by formula._dependencies, which is list of attributes this rule refers to, including parent.attribute.

This is set in the formula constructor which calls AbstractRule#parse_dependencies.

Aggregates - Adjustments (1 per parent)

The objective here is adjustment processing. Also, consider that a parent might have multiple sums/counts from a single child. An additional objective is that these result in only 1 execution of parent logic.

Note the actual physical SQL is buffered, courtesy SQLAlchemy (recall logic execution occurs in before_flush.

Aggregate processing (from the code):

Objective: 1 (one) update per role, for N aggregates along that role.

For each child -> parent role,
    For each aggregate along that role
        execute sum (etc) logic which set parent_adjustor (as req'd)
    use parent_adjustor to save altered parent

ParentRoleAdjustor is a class (from the LogicRow code):

Contains current / previous parent_logic_row iff parent needs adjustment,
and method to save_altered_parents.

Instances are passed to <aggregate>.adjust_parent who will set parent row(s) values
iff adjustment is required (e.g., summed value changes, where changes, fk changes, etc)
This ensures only 1 update per set of aggregates along a given role

where save_altered_parent:

Save (chain) parent iff parent_logic_row has been set by sum/count executor.

Cascade Changed Parent References

Child rows can reference parent attributes, as in OrderDetail.ShippedDate, exercised in the upd_order_shipped test. So,

  • if the parent is altered, cascade to children
  • advising children that cascade is along role xyz

So, LogicRow.update() calls cascade_to_children:

Child Formulas can reference (my) Parent Attributes, so...
If the *referenced* Parent Attributes are changed, cascade to child
Setting update_msg to denote parent_role
This will cause each child to recompute all formulas referencing that role
eg,
  OrderDetail.ShippedDate = Order.ShippedDate, so....
  Order cascades changed ShippedDate => OrderDetailList

This of course requires we track the child references to a parent, by role. We obtain that via rule_bank_withdraw.get_referring_children:

def get_referring_children(parent_logic_row: LogicRow) -> dict:
    """
    return RulesBank[class_name].referring_children (create if None)
    referring_children is <parent_role_name>, parent_attribute_list()
    """

This information is updated into the RuleBank (if not set) by the method above. The design intent was to build it when "depositing" rules into the RuleBank, but get_mapper requires a mapped instance as input (unavailable when the RuleBank is being created).