Server-to-Server Messaging
This guide explains how to implement communication between server instances in a distributed Mesh setup.
Overview
In a distributed application, you often need to communicate between different server instances. Mesh uses Redis as a communication backbone, making it easy to implement server-to-server messaging.
There are several approaches to server-to-server communication in Mesh:
- Redis Pub/Sub Channels: Direct communication between servers
- Records: Shared state between servers
- Client-to-Server Commands: Indirect communication through clients
Redis Pub/Sub for Server-to-Server Communication
The most direct way to communicate between server instances is to use Redis pub/sub channels.
Publishing Messages
// Server A
import { createClient } from "redis";
// Create a separate Redis client for pub/sub
const redisClient = createClient({
url: "redis://localhost:6379"
});
await redisClient.connect();
// Publish a message to a server-only channel
await redisClient.publish(
"mesh:server:notifications",
JSON.stringify({
type: "cache-invalidation",
key: "user:123",
timestamp: Date.now()
})
);
Subscribing to Messages
// Server B
import { createClient } from "redis";
// Create a separate Redis client for pub/sub
const redisClient = createClient({
url: "redis://localhost:6379"
});
await redisClient.connect();
// Subscribe to the server-only channel
await redisClient.subscribe("mesh:server:notifications", (message) => {
const data = JSON.parse(message);
if (data.type === "cache-invalidation") {
invalidateCache(data.key);
}
});
Best Practices for Redis Pub/Sub
- Use a naming convention: Prefix server-only channels (e.g.,
mesh:server:*
) to distinguish them from client-facing channels - Create separate Redis clients: Use dedicated Redis clients for pub/sub to avoid blocking other Redis operations
- Structure your messages: Use a consistent message format with type, payload, and metadata
- Handle reconnection: Resubscribe to channels if the Redis connection is lost
Shared State with Records
Another approach is to use Mesh records to share state between server instances.
Updating Shared State
// Server A
// Update a shared configuration record
await server.publishRecordUpdate("server:config", {
maintenanceMode: true,
maintenanceStart: Date.now(),
maintenanceEnd: Date.now() + 3600000, // 1 hour
});
Subscribing to Shared State
// Server B
// Get the current state
const { value: config } = await server.recordManager.getRecord("server:config");
// Subscribe to changes
server.recordManager.subscribeToRecord("server:config", (newValue) => {
if (newValue.maintenanceMode) {
enableMaintenanceMode();
} else {
disableMaintenanceMode();
}
});
Best Practices for Shared State
- Use server-only record IDs: Prefix server-only records (e.g.,
server:*
) and don’t expose them to clients - Keep records focused: Each record should represent a specific piece of shared state
- Consider update frequency: High-frequency updates may impact performance
- Handle initialization: Initialize records with default values if they don’t exist
Indirect Communication via Client Commands
You can also use client commands to indirectly communicate between servers.
Broadcasting to All Clients
// Server A
// Broadcast a message to all clients in a room
const connections = await server.getRoomMembers("global");
for (const connectionId of connections) {
// This will be routed to the appropriate server instance
server.send(connectionId, {
type: "server-notification",
message: "System maintenance in 10 minutes",
});
}
Client-Initiated Server-to-Server Communication
// Server A
server.exposeCommand("notify-all-servers", async (ctx) => {
const { message } = ctx.payload;
// Publish to a server-only channel
await redisClient.publish(
"mesh:server:notifications",
JSON.stringify({
type: "client-notification",
message,
connectionId: ctx.connection.id,
})
);
return { success: true };
});
// Server B (and all other servers)
await redisClient.subscribe("mesh:server:notifications", (message) => {
const data = JSON.parse(message);
if (data.type === "client-notification") {
console.log(`Client ${data.connectionId} sent: ${data.message}`);
}
});
Advanced Patterns
Distributed Locks
For operations that should only be performed by one server at a time:
import { createClient } from "redis";
const redisClient = createClient({
url: "redis://localhost:6379"
});
await redisClient.connect();
// Try to acquire a lock
const lockKey = "mesh:lock:daily-cleanup";
const lockValue = server.instanceId;
const lockTTL = 60; // seconds
const acquired = await redisClient.set(
lockKey,
lockValue,
{
NX: true, // Only set if key doesn't exist
EX: lockTTL, // Set expiration
}
);
if (acquired) {
try {
// We have the lock, perform the operation
await performDailyCleanup();
} finally {
// Release the lock if we still hold it
const currentValue = await redisClient.get(lockKey);
if (currentValue === lockValue) {
await redisClient.del(lockKey);
}
}
} else {
console.log("Another server is performing the cleanup");
}
Work Distribution
Distribute work across server instances:
// Server A
server.exposeCommand("process-batch", async (ctx) => {
const { items } = ctx.payload;
// Split items into chunks
const chunks = splitIntoChunks(items, 10);
// Distribute chunks to all server instances
const serverInstances = await getActiveServerInstances();
for (let i = 0; i < chunks.length; i++) {
const targetServer = serverInstances[i % serverInstances.length];
await redisClient.publish(
`mesh:server:${targetServer}:tasks`,
JSON.stringify({
type: "process-chunk",
chunk: chunks[i],
batchId: generateBatchId(),
})
);
}
return { success: true, message: "Processing started" };
});
// All servers
await redisClient.subscribe(`mesh:server:${server.instanceId}:tasks`, async (message) => {
const task = JSON.parse(message);
if (task.type === "process-chunk") {
await processChunk(task.chunk, task.batchId);
// Report completion
await redisClient.publish(
"mesh:server:task-results",
JSON.stringify({
type: "chunk-completed",
batchId: task.batchId,
serverId: server.instanceId,
})
);
}
});
Server Discovery
Discover active server instances:
// When server starts
const serverKey = `mesh:servers:${server.instanceId}`;
// Register this server
await redisClient.set(
serverKey,
JSON.stringify({
startTime: Date.now(),
address: server.address,
port: server.port,
}),
{
EX: 60, // 60 seconds TTL
}
);
// Refresh registration periodically
setInterval(async () => {
await redisClient.expire(serverKey, 60);
}, 30000);
// Get all active servers
async function getActiveServerInstances() {
const keys = await redisClient.keys("mesh:servers:*");
const servers = [];
for (const key of keys) {
const data = await redisClient.get(key);
if (data) {
servers.push({
instanceId: key.replace("mesh:servers:", ""),
...JSON.parse(data),
});
}
}
return servers;
}
Best Practices
-
Use appropriate communication patterns
- Direct pub/sub for immediate notifications
- Records for shared state
- Client commands for user-initiated actions
-
Handle failures gracefully
- Implement timeouts for distributed operations
- Use TTLs for locks and registrations
- Have fallback mechanisms
-
Consider message ordering
- Redis pub/sub preserves message order per channel
- But processing may happen out of order
- Use timestamps or sequence numbers if order matters
-
Monitor and log
- Log server-to-server communication for debugging
- Monitor message volumes and patterns
- Set up alerts for communication failures
-
Security considerations
- Don’t expose server-only channels to clients
- Validate messages before processing
- Consider encryption for sensitive data