Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 8 additions & 14 deletions README_ARCH.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# PSI Architecture Overview

This module implements a minimal seven-layer PSI cognitive cycle with explicit
ports for each layer, an in-memory prioritized event bus and exposure over
HTTP/WebSocket.
This module implements a seven-layer PSI cognitive cycle with a canonical
knowledge base (facts, operators, rules and episodes), a prioritized event bus
with backpressure and a lightweight HTTP/WebSocket façade.

## Cycle

Expand All @@ -16,18 +16,12 @@ HTTP/WebSocket.

## Endpoints

Application API on port `7000`:
Minimal PSI API on port `7000`:

- `POST /api/v1/events` – submit a JSON event.
- `GET /api/v1/metrics` – scrape Prometheus metrics.
- `WS /ws/events` – stream processed events.

Debug dashboard on port `8080`:

- `GET /stream.mjpeg` – live camera stream.
- `GET /events.ndjson` – newline-delimited PSI events with layer information.
- `GET /metrics` – Prometheus metrics for debugging.
- Static dashboard at `/` renders camera, status KPIs and the PSI cycle table.
- `GET /psi/state` – health/state information.
- `GET /psi/decisions` – last deliberated actions.
- `GET /psi/metrics` – cycle latency, queue size and drop count.
- `WS /psi/events` – stream processed events in real time.

## Running

Expand Down
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ dependencies {
implementation 'org.json:json:20240303'
implementation 'io.javalin:javalin:5.6.2'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.16.1'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.16.1'
implementation 'io.micrometer:micrometer-core:1.12.5'
implementation 'io.micrometer:micrometer-registry-prometheus:1.12.5'
testImplementation 'org.junit.jupiter:junit-jupiter:5.10.2'
Expand Down
57 changes: 57 additions & 0 deletions src/main/java/dev/bot/zeno/psi/app/PsiMain.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package dev.bot.zeno.psi.app;

import dev.bot.zeno.psi.core.EventBus;
import dev.bot.zeno.psi.core.PSIEngine;
import dev.bot.zeno.psi.core.SafetyGuard;
import dev.bot.zeno.psi.layers.actionexpression.ActionExpressionLayer;
import dev.bot.zeno.psi.layers.deliberation.DeliberationLayer;
import dev.bot.zeno.psi.layers.interpretation.InterpretationLayer;
import dev.bot.zeno.psi.layers.perception.PerceptionLayer;
import dev.bot.zeno.psi.layers.reflection.ReflectionLayer;
import dev.bot.zeno.psi.layers.selfstate.SelfStateLayer;
import dev.bot.zeno.psi.layers.valueappraisal.ValueAppraisalLayer;
import dev.bot.zeno.psi.models.Event;
import dev.bot.zeno.psi.persistence.JsonlKnowledgeBase;
import dev.bot.zeno.psi.persistence.KnowledgeBase;

import java.nio.file.Path;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.UUID;

/**
* Small bootstrap class demonstrating the PSI engine in isolation.
*/
public final class PsiMain {

public static void main(String[] args) throws Exception {
EventBus bus = new EventBus(100);
KnowledgeBase kb = new JsonlKnowledgeBase(Path.of("kb"));
SafetyGuard guard = (e, ctx) -> true;
PSIEngine engine = new PSIEngine(bus, List.of(
new PerceptionLayer(),
new InterpretationLayer(),
new SelfStateLayer(),
new ValueAppraisalLayer(),
new DeliberationLayer(),
new ActionExpressionLayer(bus),
new ReflectionLayer()
), kb, guard);
engine.start();

// Simulate an event from the vision system
Event vision = new Event(
"vision.detection",
Map.of("class", "person"),
UUID.randomUUID().toString(),
1,
Instant.now(),
Map.of());
bus.publish(vision);

// Let the cycle run once
Thread.sleep(500);
engine.stop();
}
}
48 changes: 48 additions & 0 deletions src/main/java/dev/bot/zeno/psi/app/PsiServer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package dev.bot.zeno.psi.app;

import com.fasterxml.jackson.databind.ObjectMapper;
import dev.bot.zeno.psi.core.EventBus;
import dev.bot.zeno.psi.core.PSIEngine;
import dev.bot.zeno.psi.models.Event;
import io.javalin.Javalin;
import io.javalin.websocket.WsContext;

import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.Set;

/**
* Minimal HTTP/WebSocket server exposing PSI state and metrics.
*/
public class PsiServer {

private final PSIEngine engine;
private final EventBus bus;
private final ObjectMapper mapper = new ObjectMapper();
private final Set<WsContext> sessions = new CopyOnWriteArraySet<>();

public PsiServer(PSIEngine engine) {
this.engine = engine;
this.bus = engine.bus();
}

public void start(int port) {
Javalin app = Javalin.create();
app.get("/psi/state", ctx -> ctx.json(Map.of("status", "ok")));
app.get("/psi/decisions", ctx -> ctx.json(Map.of()));
app.get("/psi/metrics", ctx -> ctx.json(engine.metrics()));
app.ws("/psi/events", ws -> {
ws.onConnect(sessions::add);
ws.onClose(sessions::remove);
});
bus.addListener(e -> broadcast(e));
app.start(port);
}

private void broadcast(Event e) {
String json;
try { json = mapper.writeValueAsString(e); }
catch (Exception ex) { return; }
sessions.forEach(s -> s.send(json));
}
}
23 changes: 23 additions & 0 deletions src/main/java/dev/bot/zeno/psi/core/Context.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package dev.bot.zeno.psi.core;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Shared mutable state passed between PSI layers during a single cognitive
* cycle. Layers should store only lightweight information here; long-term
* knowledge must be persisted via the {@code KnowledgeBase} abstraction.
*/
public class Context {

private final Map<String, Object> data = new ConcurrentHashMap<>();

public void put(String key, Object value) { data.put(key, value); }

@SuppressWarnings("unchecked")
public <T> T get(String key) { return (T) data.get(key); }

public Map<String, Object> data() { return data; }

public void clear() { data.clear(); }
}
100 changes: 100 additions & 0 deletions src/main/java/dev/bot/zeno/psi/core/CycleCoordinator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package dev.bot.zeno.psi.core;

import dev.bot.zeno.psi.layers.PSILayer;
import dev.bot.zeno.psi.models.Event;
import dev.bot.zeno.psi.persistence.KnowledgeBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;

/**
* Executes the seven PSI layers sequentially for every event consumed from the
* {@link EventBus}. Provides latency metrics and applies a {@link SafetyGuard}
* before actions are expressed.
*/
public class CycleCoordinator implements Runnable {

private static final Logger LOGGER = LoggerFactory.getLogger(CycleCoordinator.class);

public record Metrics(double p95LatencyMs, int queueSize, long droppedEvents) { }

private final EventBus bus;
private final List<PSILayer> layers;
private final Context context;
private final KnowledgeBase knowledgeBase;
private final SafetyGuard guard;
private final int actionIndex;

private volatile boolean running = true;
private final Deque<Long> latencies = new ArrayDeque<>();
private Metrics metrics = new Metrics(0, 0, 0);

public CycleCoordinator(EventBus bus, List<PSILayer> layers, Context context,
KnowledgeBase kb, SafetyGuard guard) {
this.bus = bus;
this.layers = List.copyOf(layers);
this.context = context;
this.knowledgeBase = kb;
this.guard = guard;
this.actionIndex = 5; // ActionExpression is layer index 5
}

@Override
public void run() {
LOGGER.info("CycleCoordinator started");
while (running && !Thread.currentThread().isInterrupted()) {
try {
Event input = bus.take();
context.put("event", input);
Instant start = Instant.now();
for (int i = 0; i < layers.size(); i++) {
if (i == actionIndex) {
Event e = context.get("event");
if (!guard.allow(e, context)) {
LOGGER.warn("SafetyGuard blocked action {}", e.symbol());
continue; // skip action layer
}
}
PSILayer layer = layers.get(i);
layer.receiveInput(context);
layer.process(context);
layer.produceOutput(context);
}
Event output = context.get("event");
if (output != null) {
bus.publish(output);
knowledgeBase.saveFact(output);
}
context.clear();
Instant end = Instant.now();
updateMetrics(Duration.between(start, end));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (IOException e) {
LOGGER.error("Failed to persist event", e);
} catch (Exception e) {
LOGGER.error("Unexpected error during cognitive cycle", e);
}
}
LOGGER.info("CycleCoordinator stopped");
}

private void updateMetrics(Duration latency) {
latencies.addLast(latency.toMillis());
if (latencies.size() > 100) latencies.removeFirst();
double p95 = latencies.stream().sorted()
.skip((long) (latencies.size() * 0.95) - 1)
.findFirst().orElse(0L);
metrics = new Metrics(p95, bus.size(), bus.dropped());
}

public Metrics metrics() { return metrics; }

public void stop() { running = false; }
}
55 changes: 55 additions & 0 deletions src/main/java/dev/bot/zeno/psi/core/EventBus.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package dev.bot.zeno.psi.core;

import dev.bot.zeno.psi.models.Event;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

/**
* Priority aware event bus with basic backpressure. When the bus is full lower
* priority events are dropped and a drop counter is incremented.
*/
public class EventBus {

private final PriorityBlockingQueue<Event> queue = new PriorityBlockingQueue<>();
private final int capacity;
private final AtomicInteger size = new AtomicInteger();
private final AtomicLong dropped = new AtomicLong();
private final List<Consumer<Event>> listeners = new CopyOnWriteArrayList<>();

public EventBus(int capacity) { this.capacity = capacity; }

public void publish(Event e) {
int s = size.get();
if (s >= capacity) {
Event lowest = queue.peek();
if (lowest != null && lowest.compareTo(e) < 0) {
queue.poll();
queue.offer(e);
} else {
dropped.incrementAndGet();
return;
}
} else {
queue.offer(e);
size.incrementAndGet();
}
listeners.forEach(l -> l.accept(e));
}

public Event take() throws InterruptedException {
Event e = queue.take();
size.decrementAndGet();
return e;
}

public void addListener(Consumer<Event> listener) { listeners.add(listener); }

public int size() { return size.get(); }

public long dropped() { return dropped.get(); }
}
38 changes: 38 additions & 0 deletions src/main/java/dev/bot/zeno/psi/core/PSIEngine.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package dev.bot.zeno.psi.core;

import dev.bot.zeno.psi.layers.PSILayer;
import dev.bot.zeno.psi.persistence.KnowledgeBase;

import java.util.List;

/**
* High level façade responsible for wiring together the event bus,
* knowledge base and layer coordinator. Applications should instantiate
* this engine and call {@link #start()} to begin processing events.
*/
public class PSIEngine {

private final EventBus bus;
private final CycleCoordinator coordinator;
private final Thread cycleThread;

public PSIEngine(EventBus bus, List<PSILayer> layers, KnowledgeBase kb, SafetyGuard guard) {
Context ctx = new Context();
this.bus = bus;
this.coordinator = new CycleCoordinator(bus, layers, ctx, kb, guard);
this.cycleThread = new Thread(coordinator, "psi-cycle");
}

public EventBus bus() { return bus; }

public CycleCoordinator.Metrics metrics() { return coordinator.metrics(); }

public void start() { cycleThread.start(); }

public void stop() {
coordinator.stop();
try {
cycleThread.join(1000);
} catch (InterruptedException ignored) { }
}
}
11 changes: 11 additions & 0 deletions src/main/java/dev/bot/zeno/psi/core/SafetyGuard.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package dev.bot.zeno.psi.core;

import dev.bot.zeno.psi.models.Event;

/**
* Hook allowing verification of outbound actions before they reach actuators.
*/
@FunctionalInterface
public interface SafetyGuard {
boolean allow(Event event, Context ctx);
}
Loading