Model
Rye — Functions Reference
Use proven Rye functions to create events, evolve assertions, and query graph context safely.
Source file: design/model/functions.md
Rye — Functions Reference
Utility Functions, Deduplication, and Query Patterns
All functions include a SET search_path clause in their definition for schema isolation. Internal functions use SET search_path = rye, pg_catalog. Cross-schema functions (CDC, profiles) use SET search_path = rye, pg_catalog, public. SECURITY DEFINER functions exclude public from the path. See design/model/deployment.md for details.
1. Assertion Immutability Guard
Assertions are append-only. Only the superseded_at and superseded_by columns may be updated (by the supersession function). All other columns are immutable after insert.
CREATE FUNCTION assertions_immutable_guard() RETURNS trigger AS $$
BEGIN
IF NEW.claim IS DISTINCT FROM OLD.claim
OR NEW.assertion_type IS DISTINCT FROM OLD.assertion_type
OR NEW.subject_node_id IS DISTINCT FROM OLD.subject_node_id
OR NEW.subject_edge_id IS DISTINCT FROM OLD.subject_edge_id
OR NEW.asserted_at IS DISTINCT FROM OLD.asserted_at
OR NEW.effective_at IS DISTINCT FROM OLD.effective_at
OR NEW.source_event_id IS DISTINCT FROM OLD.source_event_id
OR NEW.confidence IS DISTINCT FROM OLD.confidence
THEN
RAISE EXCEPTION 'Assertion content is immutable. Only superseded_at and superseded_by may be updated.';
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_assertions_immutable
BEFORE UPDATE ON assertions
FOR EACH ROW
EXECUTE FUNCTION assertions_immutable_guard();
2. Assertion Supersession
Supersedes an existing assertion and creates a replacement in a single transaction. The update path is function-scoped (for example using app.write_path and app.supersede_assertion_id session flags set by the function).
CREATE FUNCTION supersede_assertion(
p_old_assertion_id uuid,
p_new_assertion_type text,
p_new_subject_node_id uuid,
p_new_subject_edge_id uuid,
p_new_claim jsonb,
p_new_assertion_key text DEFAULT NULL,
p_new_effective_at timestamptz DEFAULT NULL,
p_new_source_event_id uuid DEFAULT NULL,
p_new_confidence numeric DEFAULT NULL
) RETURNS uuid AS $$
DECLARE
v_old assertions;
v_new_id uuid;
BEGIN
PERFORM set_config('app.write_path', 'supersede_assertion', true);
PERFORM set_config('app.supersede_assertion_id', p_old_assertion_id::text, true);
SELECT * INTO v_old
FROM assertions
WHERE id = p_old_assertion_id
FOR UPDATE;
IF NOT FOUND THEN
RAISE EXCEPTION 'Assertion % not found', p_old_assertion_id;
END IF;
IF v_old.superseded_at IS NOT NULL THEN
RAISE EXCEPTION 'Assertion % is already superseded', p_old_assertion_id;
END IF;
-- Create the new assertion
v_new_id := gen_random_uuid();
-- Supersede old assertion first to avoid active-key uniqueness conflicts.
UPDATE assertions
SET superseded_at = now(),
superseded_by = v_new_id
WHERE id = p_old_assertion_id
AND superseded_at IS NULL;
-- Create replacement assertion with the pre-generated id.
INSERT INTO assertions (
id, assertion_type, assertion_key, subject_node_id, subject_edge_id,
claim, effective_at, source_event_id, confidence
) VALUES (
v_new_id,
coalesce(p_new_assertion_type, v_old.assertion_type),
coalesce(nullif(trim(p_new_assertion_key), ''), v_old.assertion_key),
coalesce(p_new_subject_node_id, v_old.subject_node_id),
coalesce(p_new_subject_edge_id, v_old.subject_edge_id),
p_new_claim, p_new_effective_at, p_new_source_event_id, p_new_confidence
);
RETURN v_new_id;
END;
$$ LANGUAGE plpgsql;
3. Human-Readable Code Generation
Generates codes in the format {PREFIX}-{YYMM}-{SEQ} (e.g., OPP-2403-0042). Uses a counter table with INSERT ... ON CONFLICT DO UPDATE for concurrency safety — no race conditions, no global sequence contention.
CREATE FUNCTION generate_crm_code(p_prefix text) RETURNS text AS $$
DECLARE
v_yymm text;
v_seq int;
BEGIN
v_yymm := to_char(now(), 'YYMM');
INSERT INTO crm_code_counters (prefix, year_month, next_val)
VALUES (p_prefix, v_yymm, 2)
ON CONFLICT (prefix, year_month)
DO UPDATE SET next_val = crm_code_counters.next_val + 1
RETURNING next_val - 1 INTO v_seq;
RETURN p_prefix || '-' || v_yymm || '-' || lpad(v_seq::text, 4, '0');
END;
$$ LANGUAGE plpgsql;
Counters reset per prefix per month. OPP-2403-0042 is the 42nd opportunity created in March 2024.
4. Node Merge (Deduplication)
Merges a duplicate node into a canonical node by redirecting all edges, assertions, event participations, and artifacts. Records a node_merge event before redirecting participations so both nodes are valid participants in the audit trail.
When the duplicate has active assertions that conflict with the canonical node’s assertions (same type and key), the duplicate’s assertions are superseded in favor of the canonical’s. Non-conflicting assertions are copied to the canonical node.
CREATE FUNCTION merge_nodes(
p_duplicate_id uuid,
p_canonical_id uuid,
p_merged_by text DEFAULT 'system'
) RETURNS void AS $$
Steps:
- Validates both nodes exist and duplicate is not archived.
- Records the merge in
node_merges. - Records a
node_mergeevent with both nodes as participants (canonicalandduplicateroles). - Redirects edges, assertions (with conflict resolution), event participations, artifacts, and source mappings.
- Merges properties (canonical wins on conflicts).
- Archives the duplicate.
5. Fuzzy Candidate Detection
For cross-source deduplication where external IDs don’t match, use trigram similarity:
-- Find candidate duplicate people
SELECT
a.id AS node_a, a.label AS label_a,
b.id AS node_b, b.label AS label_b,
similarity(a.label, b.label) AS name_sim
FROM nodes a
JOIN nodes b ON a.id < b.id
WHERE a.node_type = 'person' AND b.node_type = 'person'
AND a.label % b.label
AND similarity(a.label, b.label) > 0.4
AND a.archived_at IS NULL AND b.archived_at IS NULL;
6. Domain-Specific Normalizers
The graph supports domain-specific normalization functions. These are optional utilities — add them as your domain requires.
-- Example: Tax Map Parcel normalizer for land domains
-- Standardizes "045-0002-0031", "45/2/31", "45-2-31" → "45-2-31"
CREATE FUNCTION normalize_tmp(raw text) RETURNS text AS $$
SELECT array_to_string(
ARRAY(
SELECT CASE
WHEN ltrim(part, '0') = '' THEN '0'
ELSE ltrim(part, '0')
END
FROM unnest(regexp_split_to_array(raw, '[-/.\s]+')) AS part
WHERE part != ''
), '-'
);
$$ LANGUAGE sql IMMUTABLE;
7. Event Recording
All event creation should use record_event(). This function pre-generates the event UUID and inserts both the event and its participants atomically, avoiding an RLS interaction where INSERT ... RETURNING id on the events table fails because the event_read_policy requires participants to exist before the event is visible.
CREATE FUNCTION record_event(
p_event_type text,
p_summary text,
p_properties jsonb DEFAULT '{}',
p_participant_ids uuid[] DEFAULT '{}',
p_participant_roles text[] DEFAULT '{}',
p_actor text DEFAULT NULL,
p_occurred_at timestamptz DEFAULT now()
) RETURNS uuid;
Usage:
SELECT record_event(
p_event_type := 'meeting',
p_summary := 'Quarterly review with Acme',
p_properties := '{"location": "zoom"}',
p_participant_ids := ARRAY['<node_uuid_1>', '<node_uuid_2>']::uuid[],
p_participant_roles := ARRAY['organizer', 'attendee'],
p_actor := 'user:alice'
);
Do NOT insert into events and event_participants separately.
8. Domain Table Integration
link_record — Connect a domain table row to the graph
Creates a graph node and maps it back to the source table via node_source_map. Each distinct source_id creates a new node. Calling again with the same (schema, table, source_id) updates the existing node’s properties instead of creating a duplicate.
Lookup order: checks node_source_map first (canonical path), then falls back to external_id/external_source on the nodes table. This handles cases where the source map was created manually without setting external_id.
A unique index on node_source_map(source_schema, source_table, source_id) prevents orphaned or duplicate mappings.
CREATE FUNCTION link_record(
p_source_schema text,
p_source_table text,
p_source_id text,
p_node_type text,
p_label text,
p_properties jsonb DEFAULT '{}',
p_source_id_type text DEFAULT 'int'
) RETURNS uuid;
Usage:
SELECT link_record(
p_source_schema := 'public',
p_source_table := 'customers',
p_source_id := '42',
p_node_type := 'org',
p_label := 'Acme Corp',
p_properties := '{"plan": "growth", "mrr": 299}'
);
link_records_batch — Bulk domain table import
Processes multiple link_record() calls in a single function call. Accepts parallel arrays for source IDs, labels, and optionally properties.
CREATE FUNCTION link_records_batch(
p_source_schema text,
p_source_table text,
p_source_ids text[],
p_node_type text,
p_labels text[],
p_properties jsonb[] DEFAULT NULL,
p_source_id_type text DEFAULT 'int'
) RETURNS uuid[];
Usage:
SELECT link_records_batch(
p_source_schema := 'public',
p_source_table := 'customers',
p_source_ids := ARRAY['1', '2', '3'],
p_node_type := 'org',
p_labels := ARRAY['Acme', 'Globex', 'Initech'],
p_properties := ARRAY[
'{"plan": "growth"}'::jsonb,
'{"plan": "starter"}'::jsonb,
'{"plan": "enterprise"}'::jsonb
]
);
track_table — Attach CDC triggers for change tracking
Attaches a trigger to a domain table that records INSERT/UPDATE/DELETE as domain_change events for any row that has a linked graph node. Uses record_event() internally.
CREATE FUNCTION track_table(
p_schema text,
p_table text,
p_trigger_name text DEFAULT NULL
) RETURNS void;
Usage:
SELECT track_table('public', 'customers');
Changes to linked rows produce domain_change events with full before/after diff in the changed_fields property.
capture_domain_change — CDC trigger function
The trigger function called by track_table(). Not called directly — it runs automatically on INSERT/UPDATE/DELETE. Only fires for rows that have a linked node in node_source_map. Unlinked rows are silently skipped.
Supports tables with any primary key column — tries id first, then falls back to the table’s actual PK column via pg_index catalog lookup.
9. Instance Introspection
Returns a summary of everything in the Rye instance — node types, edge types, assertion types, event types, tracked tables, and totals.
CREATE FUNCTION rye_catalog() RETURNS jsonb;
Usage:
SELECT rye_catalog();
An agent’s first call to orient itself in a new instance.
10. Agent Query Logging
Logs an agent’s read operation for audit purposes. Delegates to record_event() internally.
CREATE FUNCTION log_agent_query(
p_agent_id text,
p_query_text text,
p_result_summary text,
p_nodes_referenced uuid[]
) RETURNS uuid;
11. Artifact Recording
Creates an artifact with optional content-hash deduplication. Parallel to record_event() for events and link_record() for nodes.
CREATE FUNCTION record_artifact(
p_artifact_type text,
p_content jsonb,
p_source_event_id uuid DEFAULT NULL,
p_source_node_id uuid DEFAULT NULL,
p_related_node_ids uuid[] DEFAULT '{}',
p_location jsonb DEFAULT NULL,
p_content_hash text DEFAULT NULL
) RETURNS uuid;
Usage:
-- Store a parsed document extract
SELECT record_artifact(
p_artifact_type := 'document_parse',
p_content := '{"title": "Q4 Report", "sections": [...]}',
p_source_event_id := '<parse_event_uuid>',
p_source_node_id := '<document_node_uuid>',
p_related_node_ids := ARRAY['<mentioned_person_uuid>']::uuid[],
p_content_hash := 'sha256:abc123...'
);
If p_content_hash is provided and a matching artifact of the same type already exists, returns the existing artifact’s ID without inserting a duplicate. The hash is stored in attrs->>'content_hash'.
12. Assertion Disputes
contest_assertion — Record a competing claim without superseding
When new information contradicts an existing assertion but you’re not certain it should replace it, use contest_assertion(). It creates a competing assertion alongside the original — both remain active with different assertion_key values. A dispute_raised event is recorded.
CREATE FUNCTION contest_assertion(
p_existing_assertion_id uuid,
p_new_claim jsonb,
p_source text,
p_confidence numeric DEFAULT NULL,
p_reason text DEFAULT NULL,
p_source_event_id uuid DEFAULT NULL,
p_actor text DEFAULT NULL
) RETURNS uuid;
Usage:
-- A county filing contradicts the recorded ownership fraction
SELECT contest_assertion(
p_existing_assertion_id := '<current_ownership_assertion_uuid>',
p_new_claim := '{"fraction": 0.125, "basis": "county filing 2024-0892"}',
p_source := 'county_filing_2024_0892',
p_confidence := 0.7,
p_reason := 'County filing shows different fraction than original title opinion'
);
The competing assertion uses assertion_key = 'contested:<source>' so it doesn’t violate the active uniqueness constraint. Use the active_disputes view to find all nodes with competing assertions.
resolve_dispute — Pick a winner and supersede losers
CREATE FUNCTION resolve_dispute(
p_winning_assertion_id uuid,
p_reason text DEFAULT NULL,
p_actor text DEFAULT NULL
) RETURNS uuid;
Usage:
-- After review, the county filing is correct
SELECT resolve_dispute(
p_winning_assertion_id := '<contested_assertion_uuid>',
p_reason := 'County filing confirmed by title examiner',
p_actor := 'user:alice'
);
This supersedes all other active assertions for the same (subject, type). If the winner has a contested: key, it’s promoted to a clean default assertion. A dispute_resolved event is recorded.
active_disputes — View competing assertions
SELECT * FROM active_disputes
WHERE subject_node_id = '<node_uuid>';
Returns rows where a subject has multiple active assertions of the same type, with all competing claims aggregated.
13. Materialized View Refresh
Refreshes all profile materialized views that exist in the database. Uses CONCURRENTLY to allow reads during refresh.
CREATE FUNCTION refresh_materialized_views() RETURNS void;
Usage:
SELECT refresh_materialized_views();
Only refreshes views that are installed — checks pg_matviews before each refresh. Safe to call regardless of which profiles are active.
14. Common Query Patterns
Point-in-time reconstruction
What did we believe about a node as of a specific date?
SELECT * FROM assertions
WHERE subject_node_id = '<node_uuid>'
AND asserted_at <= '2024-03-15T00:00:00Z'
AND (superseded_at IS NULL OR superseded_at > '2024-03-15T00:00:00Z')
ORDER BY assertion_type, asserted_at DESC;
Contradiction detection
Find assertions that were superseded, along with what replaced them:
SELECT
old_a.assertion_type,
old_a.claim AS old_claim,
old_a.asserted_at AS old_asserted_at,
new_a.claim AS new_claim,
new_a.asserted_at AS new_asserted_at,
n.label AS subject_label
FROM assertions old_a
JOIN assertions new_a ON new_a.id = old_a.superseded_by
JOIN nodes n ON n.id = old_a.subject_node_id
WHERE old_a.superseded_at IS NOT NULL
ORDER BY old_a.superseded_at DESC;
Graph traversal (N hops)
WITH RECURSIVE graph AS (
SELECT
target_id AS node_id,
1 AS depth,
ARRAY[source_id, target_id] AS path,
edge_type
FROM edges
WHERE source_id = '<start_node_uuid>' AND archived_at IS NULL
UNION ALL
SELECT
e.target_id,
g.depth + 1,
g.path || e.target_id,
e.edge_type
FROM graph g
JOIN edges e ON e.source_id = g.node_id AND e.archived_at IS NULL
WHERE g.depth < 3
AND NOT e.target_id = ANY(g.path)
)
SELECT DISTINCT n.*, g.depth, g.path
FROM graph g
JOIN nodes n ON n.id = g.node_id AND n.archived_at IS NULL
ORDER BY g.depth, n.node_type;
Agent-optimized summary
Returns a compact context for a node, ranked and limited for agent consumption:
SELECT agent_node_summary('<node_uuid>', 15);
Returns a JSONB object with node, top_relationships, current_facts, and recent_activity arrays, each limited to p_max_items entries. Relationships include both outbound and inbound edges with a direction field ('outbound' or 'inbound').