Database Adapters
Build a custom adapter to sync agent-native app state to any database.
Overview
Agent-native apps use files as their database. But when multiple users collaborate across different agent instances, you need a sync layer to keep everyone in sync. That's what adapters do — they bridge the gap between the local file system and a remote database.
Three adapters ship with @agent-native/core (Firestore, Supabase, Neon), but you can build your own for any backend — DynamoDB, MongoDB, Turso, a REST API, whatever you need.
An adapter is a single class that implements five methods. The sync engine handles everything else: file watching, conflict resolution, deduplication, and pattern matching.
The interface
Every adapter implements FileSyncAdapter:
import type { FileSyncAdapter, FileRecord, FileChange, Unsubscribe } from "@agent-native/core/adapters/sync";
interface FileSyncAdapter {
query(appId: string, ownerId: string): Promise<{ id: string; data: FileRecord }[]>;
get(id: string): Promise<{ id: string; data: FileRecord } | null>;
set(id: string, record: Partial<FileRecord>): Promise<void>;
delete(id: string): Promise<void>;
subscribe(
appId: string,
ownerId: string,
onChange: (changes: FileChange[]) => void,
onError: (error: any) => void,
): Unsubscribe;
}The types it works with:
interface FileRecord {
path: string; // File path relative to project root
content: string; // File contents
app: string; // Application identifier
ownerId: string; // Owner/user ID
lastUpdated: number; // Unix timestamp (ms)
createdAt?: number; // Optional creation timestamp
}
interface FileChange {
type: "added" | "modified" | "removed";
id: string; // Document ID
data: FileRecord;
}
type Unsubscribe = () => void;Building an adapter
Each method has a specific job. Here's what to implement:
query(appId, ownerId)
Return all file records for a given app and owner. This is called at startup to load the initial state.
async query(appId: string, ownerId: string) {
const rows = await db.select("files", { app: appId, owner_id: ownerId });
return rows.map(row => ({ id: row.id, data: toFileRecord(row) }));
}get(id)
Fetch a single record by its document ID. Return null if not found.
async get(id: string) {
const row = await db.findOne("files", { id });
if (!row) return null;
return { id: row.id, data: toFileRecord(row) };
}set(id, record)
Upsert a file record. The record argument is Partial<FileRecord> — on updates, only changed fields are passed. Your implementation should merge with existing data, not overwrite.
async set(id: string, record: Partial<FileRecord>) {
await db.upsert("files", {
id,
path: record.path,
content: record.content,
app: record.app,
owner_id: record.ownerId,
last_updated: record.lastUpdated,
created_at: record.createdAt,
});
}delete(id)
Delete a file record by ID.
async delete(id: string) {
await db.remove("files", { id });
}subscribe(appId, ownerId, onChange, onError)
Listen for remote changes and call onChange with an array of FileChange objects. Return an unsubscribe function that stops listening when called.
You have two options here:
If your database supports change streams (Firestore onSnapshot, Supabase Realtime, MongoDB Change Streams), use them. Lower latency, no wasted queries.
If your database doesn't support real-time, poll on an interval. The Neon adapter does this at 2-second intervals. Keep an in-memory snapshot and diff against it.
Here's the polling approach (works with any database):
subscribe(appId, ownerId, onChange, onError): Unsubscribe {
const snapshot = new Map<string, { content: string; lastUpdated: number }>();
let stopped = false;
const poll = async () => {
if (stopped) return;
try {
const records = await this.query(appId, ownerId);
const currentIds = new Set<string>();
const changes: FileChange[] = [];
for (const { id, data } of records) {
currentIds.add(id);
const prev = snapshot.get(id);
if (!prev) {
changes.push({ type: "added", id, data });
} else if (prev.content !== data.content || prev.lastUpdated !== data.lastUpdated) {
changes.push({ type: "modified", id, data });
}
snapshot.set(id, { content: data.content, lastUpdated: data.lastUpdated });
}
for (const [id] of snapshot) {
if (!currentIds.has(id)) {
changes.push({
type: "removed", id,
data: { path: "", content: "", app: appId, ownerId, lastUpdated: 0 },
});
snapshot.delete(id);
}
}
if (changes.length > 0) onChange(changes);
} catch (err) {
onError(err);
}
if (!stopped) setTimeout(poll, 2000);
};
poll();
return () => { stopped = true; };
}Full example
Here's a complete adapter for a generic SQL database (e.g., Turso, PlanetScale, or any driver that supports parameterized queries):
import type {
FileSyncAdapter, FileRecord, FileChange, Unsubscribe
} from "@agent-native/core/adapters/sync";
function rowToRecord(row: any): FileRecord {
return {
path: row.path,
content: row.content,
app: row.app,
ownerId: row.owner_id,
lastUpdated: Number(row.last_updated),
createdAt: row.created_at != null ? Number(row.created_at) : undefined,
};
}
export class MyDatabaseAdapter implements FileSyncAdapter {
constructor(private db: any) {}
async query(appId: string, ownerId: string) {
const { rows } = await this.db.execute(
"SELECT * FROM files WHERE app = ? AND owner_id = ?",
[appId, ownerId]
);
return rows.map((row: any) => ({ id: row.id, data: rowToRecord(row) }));
}
async get(id: string) {
const { rows } = await this.db.execute(
"SELECT * FROM files WHERE id = ? LIMIT 1", [id]
);
if (rows.length === 0) return null;
return { id: rows[0].id, data: rowToRecord(rows[0]) };
}
async set(id: string, record: Partial<FileRecord>) {
await this.db.execute(
`INSERT INTO files (id, path, content, app, owner_id, last_updated, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (id) DO UPDATE SET
path = COALESCE(excluded.path, files.path),
content = COALESCE(excluded.content, files.content),
app = COALESCE(excluded.app, files.app),
owner_id = COALESCE(excluded.owner_id, files.owner_id),
last_updated = COALESCE(excluded.last_updated, files.last_updated),
created_at = COALESCE(excluded.created_at, files.created_at)`,
[id, record.path ?? "", record.content ?? "", record.app ?? "",
record.ownerId ?? "", record.lastUpdated ?? 0, record.createdAt ?? null]
);
}
async delete(id: string) {
await this.db.execute("DELETE FROM files WHERE id = ?", [id]);
}
subscribe(appId: string, ownerId: string, onChange: (c: FileChange[]) => void, onError: (e: any) => void): Unsubscribe {
const snapshot = new Map<string, { content: string; lastUpdated: number }>();
let stopped = false;
const poll = async () => {
if (stopped) return;
try {
const records = await this.query(appId, ownerId);
const currentIds = new Set<string>();
const changes: FileChange[] = [];
for (const { id, data } of records) {
currentIds.add(id);
const prev = snapshot.get(id);
if (!prev) {
changes.push({ type: "added", id, data });
} else if (prev.content !== data.content || prev.lastUpdated !== data.lastUpdated) {
changes.push({ type: "modified", id, data });
}
snapshot.set(id, { content: data.content, lastUpdated: data.lastUpdated });
}
for (const [id] of snapshot) {
if (!currentIds.has(id)) {
changes.push({
type: "removed", id,
data: { path: "", content: "", app: appId, ownerId, lastUpdated: 0 },
});
snapshot.delete(id);
}
}
if (changes.length > 0) onChange(changes);
} catch (err) { onError(err); }
if (!stopped) setTimeout(poll, 2000);
};
poll();
return () => { stopped = true; };
}
}Wiring it up
Pass your adapter to FileSync in your server setup:
import { FileSync } from "@agent-native/core/adapters/sync";
import { MyDatabaseAdapter } from "./my-adapter";
const adapter = new MyDatabaseAdapter(dbClient);
const sync = new FileSync({
appId: "my-app",
ownerId: "shared",
contentRoot: "./data",
adapter,
});
// Start syncing
await sync.initFileSync();The sync engine handles the rest — watching files, pushing changes, pulling remote updates, and resolving conflicts via three-way merge.
Table schema
All SQL-based adapters use the same table schema. Create this in your database:
CREATE TABLE files (
id TEXT PRIMARY KEY,
path TEXT,
content TEXT,
app TEXT,
owner_id TEXT,
last_updated BIGINT,
created_at BIGINT
);
CREATE INDEX idx_files_app_owner ON files(app, owner_id);Document IDs
The sync engine generates document IDs automatically in the format {appId}__{path/with/__separators}. For example, app "my-app" with file data/projects/draft.json becomes "my-app__data__projects__draft.json". Your adapter just stores and retrieves these IDs — it doesn't need to generate them.
Testing
Test your adapter against the five methods. Here's a minimal test pattern:
import { describe, it, expect } from "vitest";
import { MyDatabaseAdapter } from "./my-adapter";
describe("MyDatabaseAdapter", () => {
const adapter = new MyDatabaseAdapter(testDb);
it("set and get", async () => {
await adapter.set("test-1", {
path: "data/test.json",
content: '{"hello":"world"}',
app: "test-app",
ownerId: "user-1",
lastUpdated: Date.now(),
});
const result = await adapter.get("test-1");
expect(result).not.toBeNull();
expect(result!.data.path).toBe("data/test.json");
expect(result!.data.content).toBe('{"hello":"world"}');
});
it("query filters by app and owner", async () => {
const results = await adapter.query("test-app", "user-1");
expect(results.length).toBeGreaterThan(0);
expect(results.every(r => r.data.app === "test-app")).toBe(true);
});
it("delete removes the record", async () => {
await adapter.delete("test-1");
const result = await adapter.get("test-1");
expect(result).toBeNull();
});
it("subscribe detects changes", async () => {
const changes = await new Promise<any[]>((resolve) => {
const unsub = adapter.subscribe("test-app", "user-1", (c) => {
unsub();
resolve(c);
}, console.error);
// Trigger a change
adapter.set("test-2", {
path: "data/new.json", content: "{}", app: "test-app",
ownerId: "user-1", lastUpdated: Date.now(),
});
});
expect(changes.some(c => c.type === "added")).toBe(true);
});
});Publishing
You can publish your adapter as a standalone npm package. Export the adapter class and any config types:
// package.json
{
"name": "agent-native-adapter-turso",
"peerDependencies": {
"@agent-native/core": ">=0.2"
}
}Users install your package and pass it to FileSync — that's it. The adapter interface is stable and versioned with @agent-native/core.