Implementing GraphQL Subscriptions (Real-time Subscriptions)
GraphQL Subscriptions — mechanism of long-lived connection between client and server, through which server sends data when events occur. Transport — usually WebSocket (protocol graphql-ws or legacy subscriptions-transport-ws), rarely SSE. Subscriptions — third GraphQL operation alongside Query and Mutation.
When Needed
For website with GraphQL API Subscriptions solve tasks where need update UI without user action: real-time chat, notifications, order status tracking, live stats, collaborative editing. If already have GraphQL, adding Subscriptions cheaper than building separate WebSocket server.
Server Side: Node.js + graphql-ws
Modern standard — package graphql-ws which implements graphql-transport-ws protocol:
import { createServer } from 'http';
import { WebSocketServer } from 'ws';
import { useServer } from 'graphql-ws/lib/use/ws';
import { makeExecutableSchema } from '@graphql-tools/schema';
import { PubSub } from 'graphql-subscriptions';
const pubsub = new PubSub();
const typeDefs = `
type Message {
id: ID!
roomId: String!
authorId: String!
text: String!
createdAt: String!
}
type OrderStatus {
orderId: ID!
status: String!
updatedAt: String!
}
type Query {
messages(roomId: String!): [Message!]!
}
type Mutation {
sendMessage(roomId: String!, text: String!): Message!
}
type Subscription {
messageAdded(roomId: String!): Message!
orderStatusChanged(orderId: ID!): OrderStatus!
}
`;
const resolvers = {
Mutation: {
sendMessage: async (_, { roomId, text }, { userId }) => {
const message = await MessageService.create({ roomId, text, authorId: userId });
// Publish event
pubsub.publish(`MESSAGE_ADDED:${roomId}`, { messageAdded: message });
return message;
},
},
Subscription: {
messageAdded: {
subscribe: (_, { roomId }, { userId }) => {
// Check user access to room
if (!ChatRoom.hasAccess(userId, roomId)) {
throw new Error('Forbidden');
}
return pubsub.asyncIterator(`MESSAGE_ADDED:${roomId}`);
},
},
orderStatusChanged: {
subscribe: (_, { orderId }, { userId }) => {
// User can subscribe only to own orders
if (!Order.belongsTo(orderId, userId)) {
throw new Error('Forbidden');
}
return pubsub.asyncIterator(`ORDER_STATUS:${orderId}`);
},
},
},
};
const schema = makeExecutableSchema({ typeDefs, resolvers });
const httpServer = createServer();
const wsServer = new WebSocketServer({ server: httpServer, path: '/graphql' });
useServer(
{
schema,
context: async (ctx) => {
// Authentication via connection params
const token = ctx.connectionParams?.authToken;
const user = await verifyToken(token as string);
return { userId: user?.id };
},
onConnect: async (ctx) => {
const token = ctx.connectionParams?.authToken;
if (!token) return false; // reject connection
return true;
},
onDisconnect: (ctx, code, reason) => {
console.log(`Client disconnected: ${code} ${reason}`);
},
},
wsServer
);
httpServer.listen(4000);
Scaling: Redis PubSub instead of In-Memory
Built-in PubSub from graphql-subscriptions is in-memory, works only within single process. With multiple instances need graphql-redis-subscriptions:
import { RedisPubSub } from 'graphql-redis-subscriptions';
import Redis from 'ioredis';
const options = {
host: process.env.REDIS_HOST,
port: 6379,
retryStrategy: (times: number) => Math.min(times * 50, 2000),
};
const pubsub = new RedisPubSub({
publisher: new Redis(options),
subscriber: new Redis(options),
});
// Usage identical — pubsub.publish() and pubsub.asyncIterator() same
Now any instance can publish event, and all subscribers on all instances receive it through Redis Pub/Sub.
Client Side: Apollo Client
import { ApolloClient, InMemoryCache, split, HttpLink } from '@apollo/client';
import { GraphQLWsLink } from '@apollo/client/link/subscriptions';
import { createClient } from 'graphql-ws';
import { getMainDefinition } from '@apollo/client/utilities';
const httpLink = new HttpLink({ uri: '/graphql' });
const wsLink = new GraphQLWsLink(
createClient({
url: 'wss://example.com/graphql',
connectionParams: () => ({
authToken: localStorage.getItem('token'),
}),
shouldRetry: () => true,
retryAttempts: 10,
on: {
connected: () => console.log('WS connected'),
closed: () => console.log('WS closed'),
},
})
);
// Query/Mutation go via HTTP, Subscription — via WS
const splitLink = split(
({ query }) => {
const def = getMainDefinition(query);
return def.kind === 'OperationDefinition' && def.operation === 'subscription';
},
wsLink,
httpLink
);
export const client = new ApolloClient({
link: splitLink,
cache: new InMemoryCache(),
});
import { useSubscription, gql } from '@apollo/client';
const MESSAGE_ADDED = gql`
subscription MessageAdded($roomId: String!) {
messageAdded(roomId: $roomId) {
id
text
authorId
createdAt
}
}
`;
function ChatRoom({ roomId }: { roomId: string }) {
const [messages, setMessages] = useState<Message[]>([]);
useSubscription(MESSAGE_ADDED, {
variables: { roomId },
onData: ({ data }) => {
const newMessage = data.data?.messageAdded;
if (newMessage) {
setMessages(prev => [...prev, newMessage]);
}
},
onError: (error) => console.error('Subscription error:', error),
});
return (
<div>
{messages.map(msg => (
<div key={msg.id}>{msg.text}</div>
))}
</div>
);
}
Server-Side Event Filtering
Sometimes need filter events right in resolver to avoid sending extra:
import { withFilter } from 'graphql-subscriptions';
const resolvers = {
Subscription: {
messageAdded: {
// withFilter wraps asyncIterator and filters events
subscribe: withFilter(
(_, { roomId }) => pubsub.asyncIterator(`MESSAGES`),
(payload, variables) => {
// Send only if roomId matches
return payload.messageAdded.roomId === variables.roomId;
}
),
},
},
};
This allows using single broad channel MESSAGES instead of channel per room, filtering on server.
Connection Management and Memory Leaks
Careless Subscriptions can lead to leaks: iterators not closed, listeners accumulate.
// Proper iterator completion
const MESSAGE_ADDED_SUBSCRIPTION = {
subscribe: async function* (_, { roomId }, context) {
const iterator = pubsub.asyncIterator(`MESSAGE_ADDED:${roomId}`);
try {
for await (const value of iterator) {
yield value;
}
} finally {
// Called on client unsubscribe
iterator.return?.();
}
},
};
graphql-ws automatically calls return() on iterator when connection closes or explicit unsubscribe, but explicit try/finally protects from non-standard cases.
Integration with Laravel Backend
If GraphQL API built on PHP (Lighthouse), publish events through Redis from Laravel, receive in Node.js WebSocket server:
// Laravel publishes event
Redis::publish('ORDER_STATUS:' . $order->id, json_encode([
'orderStatusChanged' => [
'orderId' => $order->id,
'status' => $order->status,
'updatedAt' => now()->toISOString(),
],
]));
// Node.js WS server listens Redis and forwards to pubsub
const subscriber = new Redis({ host: process.env.REDIS_HOST });
subscriber.psubscribe('ORDER_STATUS:*');
subscriber.on('pmessage', (pattern, channel, message) => {
const orderId = channel.split(':')[1];
const data = JSON.parse(message);
pubsub.publish(`ORDER_STATUS:${orderId}`, data);
});
Testing
import { createTestClient } from 'apollo-server-testing';
import { execute, subscribe } from 'graphql';
it('should deliver messages to subscribers', async () => {
const results: any[] = [];
const iterator = await subscribe({
schema,
document: parse(`subscription { messageAdded(roomId: "room1") { id text } }`),
contextValue: { userId: 'user1' },
});
// Collect first event
const firstResult = (await (iterator as AsyncIterator<any>).next()).value;
// Publish event
pubsub.publish('MESSAGE_ADDED:room1', {
messageAdded: { id: '1', text: 'Hello', roomId: 'room1', authorId: 'user2' }
});
expect(firstResult.data.messageAdded.text).toBe('Hello');
});
Timeline
Basic Subscriptions with one event type on existing GraphQL server — 2–3 days. Full implementation with Redis PubSub, authentication via connectionParams, multiple subscription types and tests — 1–1.5 weeks. Adding to Laravel/Lighthouse via hybrid PHP + Node.js schema — another 2–3 days.







