Realtime
The Realtime plugin enables real-time data synchronization in your Better Query application using WebSockets. It automatically broadcasts all CRUD operations to connected clients, supports custom channels, and provides presence tracking, allowing you to build live, collaborative applications.
Installation
Install dependencies
The realtime plugin requires the ws WebSocket library:
npm install ws
# or
pnpm add ws
# or
yarn add wsFor TypeScript, also install types:
npm install -D @types/wsAdd the plugin to your query config
Add the realtime plugin to your Better Query server configuration:
import { betterQuery } from "better-query"
import { realtimePlugin } from "better-query/plugins"
export const query = betterQuery({
// ... other config options
plugins: [
realtimePlugin({
enabled: true,
port: 3001, // WebSocket server port
path: "/realtime", // WebSocket endpoint path
heartbeatInterval: 30000, // Send heartbeat every 30 seconds
broadcastPresence: true, // Track user presence
})
]
})Setup client plugin
Add the realtime client plugin to your client configuration:
import { createQueryClient } from "better-query/client"
import { realtimeClient } from "better-query/client"
const client = createQueryClient({
baseURL: "http://localhost:3000",
queryPlugins: [
realtimeClient({
wsUrl: "ws://localhost:3001/realtime", // WebSocket URL
autoReconnect: true, // Auto-reconnect on disconnect
})
]
})No additional migration needed
The Realtime plugin works entirely in-memory and doesn't require database schema changes. Just add it to your config and start using it.
How It Works
The Realtime plugin uses WebSockets to establish persistent, bidirectional connections between the server and clients. This enables instant push updates with minimal overhead.
Automatic Broadcasting
When you perform CRUD operations through Better Query, the plugin automatically broadcasts changes to all subscribed clients:
create- When a new record is createdupdate- When a record is updateddelete- When a record is deleted
Channel System
The plugin uses a channel-based subscription model:
- Resource channels (
resource:post) - Subscribe to all changes for a resource type - Record channels (
post:123) - Subscribe to changes for a specific record - Custom channels (
notifications,chat:room1) - Create your own channels for custom events
WebSocket Message Format
Messages follow this structure:
interface RealtimeMessage {
type: "subscribe" | "unsubscribe" | "data_change" | "presence_update" | "broadcast" | "heartbeat" | "error",
channel?: string,
payload?: {
resource?: string,
operation?: "create" | "update" | "delete",
data?: any,
id?: string,
// For presence updates
action?: "join" | "leave",
userId?: string,
onlineUsers?: Array<{ userId?: string, metadata?: any }>
},
timestamp?: number
}Configuration
Server Plugin Options
Configure the realtime plugin on the server:
realtimePlugin({
// Enable/disable the plugin
enabled: true,
// WebSocket server instance (optional - plugin creates one if not provided)
wss: myWebSocketServer,
// Port for WebSocket server (when creating a new server)
port: 3001,
// Path for WebSocket endpoint
path: "/realtime",
// Resources to enable realtime for (empty array = all resources)
resources: ["post", "comment"], // Only broadcast changes for these resources
// Whether to broadcast user presence (join/leave events)
broadcastPresence: true,
// Custom authentication handler
authenticate: async (request) => {
// Extract and verify token from request
const token = getTokenFromRequest(request);
const user = await validateToken(token);
// Return user data or null if authentication fails
return user ? { userId: user.id, ...user } : null;
},
// Maximum reconnection attempts
maxReconnectAttempts: 5,
// Heartbeat interval in milliseconds (30 seconds)
heartbeatInterval: 30000,
})Client Plugin Options
Configure the realtime client plugin:
realtimeClient({
// WebSocket URL (if not provided, inferred from baseURL)
wsUrl: "ws://localhost:3001/realtime",
// Auto-reconnect on disconnect
autoReconnect: true,
// Maximum reconnection attempts
maxReconnectAttempts: 5,
// Reconnection delay in milliseconds
reconnectDelay: 1000,
// Heartbeat interval in milliseconds
heartbeatInterval: 30000,
// Authentication token or function to get token
auth: "your-auth-token",
// or provide a function
auth: async () => await getAuthToken(),
// Enable debug logging
debug: true,
})Client Usage
Connecting to Realtime Server
First, connect to the WebSocket server:
// Connect to realtime server
await client.realtime.connect();
// Monitor connection state
client.realtime.atoms.connectionState.subscribe((state) => {
console.log('Connection state:', state);
// state: "disconnected" | "connecting" | "connected" | "reconnecting"
});Subscribe to Resource Changes
Subscribe to all changes for a specific resource type:
// Subscribe to all post changes
const unsubscribe = await client.realtime.subscribeToResource(
"post",
(event) => {
console.log('Operation:', event.operation); // "create", "update", or "delete"
console.log('Resource:', event.resource); // "post"
console.log('Data:', event.data); // The record data
console.log('ID:', event.id); // The record ID
if (event.operation === "create") {
// Handle new post
} else if (event.operation === "update") {
// Handle post update
} else if (event.operation === "delete") {
// Handle post deletion
}
}
);
// Unsubscribe when done
unsubscribe();Subscribe to Specific Record
Subscribe to changes for a single record:
// Subscribe to a specific post
const unsubscribe = await client.realtime.subscribeToRecord(
"post",
"post-123",
(event) => {
console.log('This post was updated:', event);
}
);Subscribe to Custom Channels
Create and subscribe to custom channels:
// Subscribe to a custom channel
const unsubscribe = await client.realtime.subscribe(
"notifications",
(message) => {
console.log('Received:', message.payload);
}
);
// Broadcast to the channel
await client.realtime.broadcast("notifications", {
type: "alert",
message: "New feature available!"
});Presence Tracking
Track which users are online in a channel:
// Subscribe to presence updates
await client.realtime.subscribeToPresence(
"resource:post",
(event) => {
console.log(`User ${event.userId} ${event.action}ed`); // "join" or "leave"
console.log('Online users:', event.onlineUsers);
}
);
// Get online users reactively
client.realtime.atoms.onlineUsers.subscribe((usersByChannel) => {
const users = usersByChannel.get("resource:post") || [];
console.log('Viewing posts:', users.length, 'users');
});React Hook Example
Create a reusable React hook for realtime updates:
import { useEffect, useState } from 'react';
import type { DataChangeEvent } from 'better-query/client';
export function useRealtimeResource<T>(
client: any,
resource: string
) {
const [data, setData] = useState<T[]>([]);
const [isConnected, setIsConnected] = useState(false);
useEffect(() => {
// Connect to realtime
client.realtime.connect();
// Subscribe to connection state
const unsubState = client.realtime.atoms.connectionState.subscribe(
(state: string) => {
setIsConnected(state === 'connected');
}
);
// Subscribe to resource changes
const unsubResource = client.realtime.subscribeToResource(
resource,
(event: DataChangeEvent) => {
if (event.operation === 'create') {
setData((prev) => [...prev, event.data]);
} else if (event.operation === 'update') {
setData((prev) =>
prev.map((item: any) =>
item.id === event.data.id ? event.data : item
)
);
} else if (event.operation === 'delete') {
setData((prev) =>
prev.filter((item: any) => item.id !== event.id)
);
}
}
);
return () => {
unsubState();
unsubResource();
client.realtime.disconnect();
};
}, [client, resource]);
return { data, isConnected };
}Usage:
import { useRealtimeResource } from './useRealtime';
import { client } from './client';
interface Post {
id: string;
title: string;
}
export function PostList() {
const { data: posts, isConnected } = useRealtimeResource<Post>(client, 'post');
return (
<div>
<div>
Status: {isConnected ? '🟢 Connected' : '🔴 Disconnected'}
</div>
<ul>
{posts.map((post) => (
<li key={post.id}>{post.title}</li>
))}
</ul>
</div>
);
}Vue Composable Example
import { ref, onMounted, onUnmounted } from 'vue';
import type { DataChangeEvent } from 'better-query/client';
export function useRealtimeResource<T>(
client: any,
resource: string
) {
const data = ref<T[]>([]);
const isConnected = ref(false);
const unsubscribers: Array<() => void> = [];
onMounted(async () => {
// Connect to realtime
await client.realtime.connect();
// Subscribe to connection state
const unsubState = client.realtime.atoms.connectionState.subscribe(
(state: string) => {
isConnected.value = state === 'connected';
}
);
unsubscribers.push(unsubState);
// Subscribe to resource changes
const unsubResource = await client.realtime.subscribeToResource(
resource,
(event: DataChangeEvent) => {
if (event.operation === 'create') {
data.value.push(event.data as T);
} else if (event.operation === 'update') {
const index = data.value.findIndex((item: any) => item.id === event.data.id);
if (index !== -1) {
data.value[index] = event.data as T;
}
} else if (event.operation === 'delete') {
data.value = data.value.filter((item: any) => item.id !== event.id);
}
}
);
unsubscribers.push(unsubResource);
});
onUnmounted(() => {
unsubscribers.forEach(unsub => unsub());
client.realtime.disconnect();
});
return { data, isConnected };
}Server-Side Broadcasting
Using REST Endpoint
Broadcast custom messages to channels from your server code:
// Broadcast to a channel using the REST API
const response = await fetch("http://localhost:3000/api/query/realtime/broadcast", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
channel: "notifications",
payload: {
type: "info",
message: "System maintenance scheduled",
},
}),
});
const result = await response.json();
console.log(result); // { success: true, channel: "notifications" }Channel Patterns
Use these patterns for organizing channels:
// Resource channel - all changes for a resource type
await client.realtime.subscribe("resource:post", callback);
// Record channel - specific record changes
await client.realtime.subscribe("post:123", callback);
// Custom channel - application-specific events
await client.realtime.subscribe("notifications", callback);
await client.realtime.subscribe("chat:room1", callback);Monitoring Connections
Get statistics about active WebSocket connections:
const response = await fetch("http://localhost:3000/api/query/realtime/stats");
const stats = await response.json();
console.log(stats);
// {
// connected: 42,
// channels: [
// {
// name: "resource:post",
// clients: 15,
// onlineUsers: [...]
// }
// ]
// }Get Channel Users
Retrieve online users in a specific channel:
const response = await fetch(
"http://localhost:3000/api/query/realtime/channel/users?channel=resource:post"
);
const result = await response.json();
console.log(result);
// {
// channel: "resource:post",
// users: [...],
// count: 15
// }Advanced Examples
Live Collaborative Editing
Track document changes and active collaborators in real-time:
import { useEffect, useState } from 'react';
import { client } from './client';
export function CollaborativeEditor({ documentId }: { documentId: string }) {
const [content, setContent] = useState('');
const [activeUsers, setActiveUsers] = useState<string[]>([]);
useEffect(() => {
// Connect to realtime
client.realtime.connect();
// Subscribe to document updates
const unsubDoc = client.realtime.subscribeToRecord(
'document',
documentId,
(event) => {
if (event.operation === 'update') {
setContent(event.data.content);
}
}
);
// Subscribe to presence updates
const unsubPresence = client.realtime.subscribeToPresence(
`document:${documentId}`,
(event) => {
if (event.action === 'join') {
setActiveUsers((prev) => [...prev, event.userId]);
} else if (event.action === 'leave') {
setActiveUsers((prev) => prev.filter((id) => id !== event.userId));
}
}
);
return () => {
unsubDoc();
unsubPresence();
client.realtime.disconnect();
};
}, [documentId]);
const handleContentChange = async (newContent: string) => {
setContent(newContent);
// Update via your API
await client.documents.update({
body: { id: documentId, content: newContent }
});
};
return (
<div>
<div>Active Users: {activeUsers.length}</div>
<textarea
value={content}
onChange={(e) => handleContentChange(e.target.value)}
/>
</div>
);
}Real-Time Dashboard
Build a live dashboard that updates automatically:
import { useRealtimeResource } from './useRealtime';
import { client } from './client';
interface Order {
id: string;
total: number;
createdAt: string;
}
interface User {
id: string;
createdAt: string;
}
export function Dashboard() {
const { data: orders } = useRealtimeResource<Order>(client, 'order');
const { data: users } = useRealtimeResource<User>(client, 'user');
const todayOrders = orders.filter(
(order) => new Date(order.createdAt).toDateString() === new Date().toDateString()
);
const newUsers = users.filter(
(user) => Date.now() - new Date(user.createdAt).getTime() < 24 * 60 * 60 * 1000
);
const todayRevenue = todayOrders.reduce((sum, o) => sum + o.total, 0);
return (
<div>
<div>
<h3>Today's Orders</h3>
<p>{todayOrders.length} orders</p>
<p>${todayRevenue.toFixed(2)}</p>
</div>
<div>
<h3>New Users (24h)</h3>
<p>{newUsers.length} users</p>
</div>
</div>
);
}Live Chat
Build a real-time chat application:
import { useEffect, useState } from 'react';
import { client } from './client';
interface Message {
id: string;
roomId: string;
author: string;
content: string;
createdAt: string;
}
export function Chat({ roomId }: { roomId: string }) {
const [messages, setMessages] = useState<Message[]>([]);
useEffect(() => {
// Connect to realtime
client.realtime.connect();
// Subscribe to room channel
const unsubscribe = client.realtime.subscribe(
`chat:${roomId}`,
(message) => {
if (message.type === 'broadcast') {
setMessages((prev) => [...prev, message.payload]);
}
}
);
return () => {
unsubscribe();
client.realtime.disconnect();
};
}, [roomId]);
const sendMessage = async (content: string) => {
// Broadcast message to room
await client.realtime.broadcast(`chat:${roomId}`, {
roomId,
content,
author: 'currentUser',
createdAt: new Date().toISOString(),
});
};
return (
<div>
<div>
{messages.map((msg) => (
<div key={msg.id}>
<strong>{msg.author}:</strong> {msg.content}
</div>
))}
</div>
<input
onKeyDown={(e) => {
if (e.key === 'Enter') {
sendMessage(e.currentTarget.value);
e.currentTarget.value = '';
}
}}
/>
</div>
);
}Live Notifications
Display real-time notifications:
import { useEffect, useState } from 'react';
import { client } from './client';
interface Notification {
id: string;
message: string;
type: 'info' | 'success' | 'warning' | 'error';
createdAt: string;
}
export function NotificationBell({ userId }: { userId: string }) {
const [notifications, setNotifications] = useState<Notification[]>([]);
const [unreadCount, setUnreadCount] = useState(0);
useEffect(() => {
// Connect to realtime
client.realtime.connect();
// Subscribe to user's notification channel
const unsubscribe = client.realtime.subscribe(
`notifications:${userId}`,
(message) => {
if (message.type === 'broadcast') {
setNotifications((prev) => [message.payload, ...prev]);
setUnreadCount((prev) => prev + 1);
}
}
);
return () => {
unsubscribe();
client.realtime.disconnect();
};
}, [userId]);
return (
<div>
<button onClick={() => setUnreadCount(0)}>
🔔 {unreadCount > 0 && <span>{unreadCount}</span>}
</button>
<div>
{notifications.map((notif) => (
<div key={notif.id} className={notif.type}>
{notif.message}
</div>
))}
</div>
</div>
);
}Best Practices
- Use specific subscriptions - Subscribe to specific channels/records instead of broad resource subscriptions when possible
- Clean up connections - Always unsubscribe and disconnect when components unmount
- Handle reconnection - Monitor connection state and handle reconnection scenarios
- Limit subscriptions per client - Don't subscribe to more channels than necessary
- Use presence wisely - Only enable presence tracking where needed to reduce overhead
- Monitor performance - Use the stats endpoint to monitor active connections and channels
- Debounce rapid updates - Consider debouncing UI updates if you expect frequent changes
- Implement authentication - Always authenticate WebSocket connections in production
Security Considerations
Authentication
Always implement authentication for WebSocket connections:
realtimePlugin({
authenticate: async (request) => {
// Extract token from request URL or headers
const url = new URL(request.url, 'ws://localhost');
const token = url.searchParams.get('token');
if (!token) return null;
// Verify token and return user data
const user = await verifyToken(token);
return user ? { userId: user.id, ...user } : null;
},
})Channel Access Control
Implement custom logic to control which channels users can access:
// In your application code, verify permissions before subscribing
const canAccessChannel = await checkUserPermission(userId, channelName);
if (canAccessChannel) {
await client.realtime.subscribe(channelName, callback);
}Data Filtering
Filter sensitive data before broadcasting:
// In your CRUD hooks or custom broadcast logic
const sanitizedData = {
id: user.id,
name: user.name,
// Don't include password, email, etc.
};Use Secure WebSocket (WSS)
In production, always use WSS (WebSocket over TLS):
realtimeClient({
wsUrl: "wss://yourdomain.com/realtime", // Use WSS, not WS
})Rate Limiting
Consider implementing rate limiting for broadcast operations to prevent abuse:
// Limit how often users can broadcast to custom channels
const rateLimiter = new Map<string, number>();
async function checkRateLimit(userId: string): Promise<boolean> {
const lastBroadcast = rateLimiter.get(userId) || 0;
const now = Date.now();
if (now - lastBroadcast < 1000) { // 1 second cooldown
return false;
}
rateLimiter.set(userId, now);
return true;
}API Reference
Server Endpoints
The plugin automatically adds these REST endpoints for managing realtime connections:
GET /realtime/stats
Get statistics about active WebSocket connections.
Response:
{
connected: number,
channels: Array<{
name: string,
clients: number,
onlineUsers: Array<{ userId?: string, metadata?: any }>
}>
}Example:
const response = await fetch('/api/query/realtime/stats');
const stats = await response.json();
console.log('Active connections:', stats.connected);GET /realtime/channel/users
Get online users in a specific channel.
Query Parameters:
channel(string, required) - Channel name
Response:
{
channel: string,
users: Array<{ userId?: string, metadata?: any }>,
count: number
}Example:
const response = await fetch(
'/api/query/realtime/channel/users?channel=resource:post'
);
const { users, count } = await response.json();POST /realtime/broadcast
Broadcast a message to a specific channel (server-side only).
Body:
{
channel: string,
payload: any
}Response:
{
success: boolean,
channel: string
}Example:
await fetch('/api/query/realtime/broadcast', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
channel: 'notifications',
payload: { message: 'Server maintenance in 10 minutes' }
})
});Client Methods
The client plugin provides these methods:
connect()
Connect to the WebSocket server.
await client.realtime.connect();disconnect()
Disconnect from the WebSocket server.
await client.realtime.disconnect();subscribe(channel, callback)
Subscribe to a custom channel.
const unsubscribe = await client.realtime.subscribe(
"custom-channel",
(message) => {
console.log('Received:', message);
}
);
// Unsubscribe later
unsubscribe();subscribeToResource(resource, callback)
Subscribe to all changes for a resource type.
const unsubscribe = await client.realtime.subscribeToResource(
"post",
(event) => {
console.log('Operation:', event.operation);
console.log('Data:', event.data);
}
);subscribeToRecord(resource, id, callback)
Subscribe to changes for a specific record.
const unsubscribe = await client.realtime.subscribeToRecord(
"post",
"123",
(event) => {
console.log('Post 123 changed:', event);
}
);subscribeToPresence(channel, callback)
Subscribe to presence updates (user join/leave events).
const unsubscribe = await client.realtime.subscribeToPresence(
"resource:post",
(event) => {
console.log(`${event.userId} ${event.action}ed`);
console.log('Online users:', event.onlineUsers);
}
);broadcast(channel, payload)
Broadcast a message to a channel.
await client.realtime.broadcast("notifications", {
type: "info",
message: "Hello!"
});getConnectionState()
Get the current connection state.
const state = await client.realtime.getConnectionState();
// "disconnected" | "connecting" | "connected" | "reconnecting"getOnlineUsers(channel)
Get online users in a channel.
const users = await client.realtime.getOnlineUsers("resource:post");
console.log('Online users:', users);Reactive Atoms
The client provides reactive atoms for state management:
connectionState
Subscribe to connection state changes.
client.realtime.atoms.connectionState.subscribe((state) => {
console.log('Connection state:', state);
});onlineUsers
Subscribe to online users changes (by channel).
client.realtime.atoms.onlineUsers.subscribe((usersByChannel) => {
const users = usersByChannel.get("resource:post") || [];
console.log('Users viewing posts:', users.length);
});Troubleshooting
Connection Issues
Problem: WebSocket connection fails or keeps disconnecting.
Solutions:
- Check that the WebSocket server is running on the correct port
- Verify the
wsUrlin client configuration matches the server - Check firewall/proxy settings that might block WebSocket connections
- Monitor connection state using the
connectionStateatom - Enable debug logging in client options
realtimeClient({
debug: true, // Enable debug logs
})Not Receiving Updates
Problem: Client doesn't receive realtime updates.
Solutions:
- Verify you're connected: check
connectionStateatom - Ensure you're subscribed to the correct channel name
- Check that the server plugin is enabled
- Verify the resource is included in the
resourcesarray (if specified) - Check browser console for WebSocket errors
// Verify connection
client.realtime.atoms.connectionState.subscribe((state) => {
console.log('Connection:', state);
});High Memory Usage
Problem: Server memory usage keeps growing.
Solutions:
- Too many active WebSocket connections
- Clients not properly disconnecting
- Check connection stats regularly
- Implement connection limits
- Monitor with the stats endpoint
// Monitor connections
const stats = await fetch('/api/query/realtime/stats').then(r => r.json());
console.log('Active connections:', stats.connected);Reconnection Loops
Problem: Client keeps reconnecting repeatedly.
Solutions:
- Check authentication is working correctly
- Verify server is accessible
- Review
maxReconnectAttemptssetting - Check for server-side errors in logs
- Adjust
reconnectDelayto avoid overwhelming server
realtimeClient({
maxReconnectAttempts: 5,
reconnectDelay: 2000, // 2 seconds between attempts
})CORS Issues
Problem: WebSocket connection blocked by CORS policy.
Solutions:
- WebSocket connections don't use CORS headers
- Check if the issue is with the initial HTTP upgrade request
- Verify the WebSocket server allows connections from your origin
- Ensure you're using the correct protocol (ws:// vs wss://)
TypeScript Errors
Problem: TypeScript errors with client types.
Solutions:
- Ensure
@types/wsis installed - Import types from
better-query/client - Check that client is properly typed with server type
import type { DataChangeEvent } from 'better-query/client';
const client = createQueryClient<typeof query>({
// ... config
});Performance Tips
-
Use specific channels - Subscribe to specific record channels instead of entire resource channels when you only need updates for one item
// Good: specific record await client.realtime.subscribeToRecord("post", "123", callback); // Less optimal: entire resource (if you only need one post) await client.realtime.subscribeToResource("post", callback); -
Limit subscriptions - Only subscribe to channels you actually need
// Don't subscribe to everything // Only subscribe to what your UI displays -
Clean up properly - Always unsubscribe when components unmount
useEffect(() => { const unsub = client.realtime.subscribe(channel, callback); return () => unsub(); // Clean up }, []); -
Debounce UI updates - If you expect frequent updates, debounce your UI rendering
import { debounce } from 'lodash'; const updateUI = debounce((data) => { setData(data); }, 100); -
Use connection pooling - Reuse client instances instead of creating new ones
// Create once, reuse everywhere const client = createQueryClient({ ... }); -
Monitor connection stats - Regularly check active connections
// Set up periodic monitoring setInterval(async () => { const stats = await fetch('/api/query/realtime/stats').then(r => r.json()); console.log('Connections:', stats.connected); }, 60000); // Check every minute -
Filter broadcasts on server - Don't send unnecessary data to clients
realtimePlugin({ resources: ["post", "comment"], // Only broadcast these resources }) -
Use presence tracking selectively - Only enable presence where needed
realtimePlugin({ broadcastPresence: true, // Only if you need it }) -
Optimize payload size - Send only necessary data in broadcasts
// Instead of sending entire objects await client.realtime.broadcast(channel, { id: post.id, title: post.title, // Only what's needed }); -
Scale horizontally - For high-traffic applications, consider:
- Running WebSocket server on separate port/process
- Using Redis for pub/sub across multiple server instances
- Load balancing WebSocket connections
- Implementing sticky sessions
Related
- Cache Plugin - Reduce database load with caching
- Audit Plugin - Track all changes for compliance
- Hooks - Understand the hook system