GraphQL Subscriptions real-time implementation

Our company is engaged in the development, support and maintenance of sites of any complexity. From simple one-page sites to large-scale cluster systems built on micro services. Experience of developers is confirmed by certificates from vendors.
Development and maintenance of all types of websites:
Informational websites or web applications
Business card websites, landing pages, corporate websites, online catalogs, quizzes, promo websites, blogs, news resources, informational portals, forums, aggregators
E-commerce websites or web applications
Online stores, B2B portals, marketplaces, online exchanges, cashback websites, exchanges, dropshipping platforms, product parsers
Business process management web applications
CRM systems, ERP systems, corporate portals, production management systems, information parsers
Electronic service websites or web applications
Classified ads platforms, online schools, online cinemas, website builders, portals for electronic services, video hosting platforms, thematic portals

These are just some of the technical types of websites we work with, and each of them can have its own specific features and functionality, as well as be customized to meet the specific needs and goals of the client.

Our competencies:
Development stages
Latest works
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    822
  • image_crm_chasseurs_493_0.webp
    CRM development for Chasseurs
    847
  • image_website-sbh_0.png
    Website development for SBH Partners
    999
  • image_website-_0.png
    Website development for Red Pear
    451

Implementing GraphQL Subscriptions (Real-time Subscriptions)

GraphQL Subscriptions — mechanism of long-lived connection between client and server, through which server sends data when events occur. Transport — usually WebSocket (protocol graphql-ws or legacy subscriptions-transport-ws), rarely SSE. Subscriptions — third GraphQL operation alongside Query and Mutation.

When Needed

For website with GraphQL API Subscriptions solve tasks where need update UI without user action: real-time chat, notifications, order status tracking, live stats, collaborative editing. If already have GraphQL, adding Subscriptions cheaper than building separate WebSocket server.

Server Side: Node.js + graphql-ws

Modern standard — package graphql-ws which implements graphql-transport-ws protocol:

import { createServer } from 'http';
import { WebSocketServer } from 'ws';
import { useServer } from 'graphql-ws/lib/use/ws';
import { makeExecutableSchema } from '@graphql-tools/schema';
import { PubSub } from 'graphql-subscriptions';

const pubsub = new PubSub();

const typeDefs = `
  type Message {
    id: ID!
    roomId: String!
    authorId: String!
    text: String!
    createdAt: String!
  }

  type OrderStatus {
    orderId: ID!
    status: String!
    updatedAt: String!
  }

  type Query {
    messages(roomId: String!): [Message!]!
  }

  type Mutation {
    sendMessage(roomId: String!, text: String!): Message!
  }

  type Subscription {
    messageAdded(roomId: String!): Message!
    orderStatusChanged(orderId: ID!): OrderStatus!
  }
`;

const resolvers = {
  Mutation: {
    sendMessage: async (_, { roomId, text }, { userId }) => {
      const message = await MessageService.create({ roomId, text, authorId: userId });

      // Publish event
      pubsub.publish(`MESSAGE_ADDED:${roomId}`, { messageAdded: message });

      return message;
    },
  },

  Subscription: {
    messageAdded: {
      subscribe: (_, { roomId }, { userId }) => {
        // Check user access to room
        if (!ChatRoom.hasAccess(userId, roomId)) {
          throw new Error('Forbidden');
        }
        return pubsub.asyncIterator(`MESSAGE_ADDED:${roomId}`);
      },
    },

    orderStatusChanged: {
      subscribe: (_, { orderId }, { userId }) => {
        // User can subscribe only to own orders
        if (!Order.belongsTo(orderId, userId)) {
          throw new Error('Forbidden');
        }
        return pubsub.asyncIterator(`ORDER_STATUS:${orderId}`);
      },
    },
  },
};

const schema = makeExecutableSchema({ typeDefs, resolvers });

const httpServer = createServer();
const wsServer = new WebSocketServer({ server: httpServer, path: '/graphql' });

useServer(
  {
    schema,
    context: async (ctx) => {
      // Authentication via connection params
      const token = ctx.connectionParams?.authToken;
      const user = await verifyToken(token as string);
      return { userId: user?.id };
    },
    onConnect: async (ctx) => {
      const token = ctx.connectionParams?.authToken;
      if (!token) return false; // reject connection
      return true;
    },
    onDisconnect: (ctx, code, reason) => {
      console.log(`Client disconnected: ${code} ${reason}`);
    },
  },
  wsServer
);

httpServer.listen(4000);

Scaling: Redis PubSub instead of In-Memory

Built-in PubSub from graphql-subscriptions is in-memory, works only within single process. With multiple instances need graphql-redis-subscriptions:

import { RedisPubSub } from 'graphql-redis-subscriptions';
import Redis from 'ioredis';

const options = {
  host: process.env.REDIS_HOST,
  port: 6379,
  retryStrategy: (times: number) => Math.min(times * 50, 2000),
};

const pubsub = new RedisPubSub({
  publisher: new Redis(options),
  subscriber: new Redis(options),
});

// Usage identical — pubsub.publish() and pubsub.asyncIterator() same

Now any instance can publish event, and all subscribers on all instances receive it through Redis Pub/Sub.

Client Side: Apollo Client

import { ApolloClient, InMemoryCache, split, HttpLink } from '@apollo/client';
import { GraphQLWsLink } from '@apollo/client/link/subscriptions';
import { createClient } from 'graphql-ws';
import { getMainDefinition } from '@apollo/client/utilities';

const httpLink = new HttpLink({ uri: '/graphql' });

const wsLink = new GraphQLWsLink(
  createClient({
    url: 'wss://example.com/graphql',
    connectionParams: () => ({
      authToken: localStorage.getItem('token'),
    }),
    shouldRetry: () => true,
    retryAttempts: 10,
    on: {
      connected: () => console.log('WS connected'),
      closed: () => console.log('WS closed'),
    },
  })
);

// Query/Mutation go via HTTP, Subscription — via WS
const splitLink = split(
  ({ query }) => {
    const def = getMainDefinition(query);
    return def.kind === 'OperationDefinition' && def.operation === 'subscription';
  },
  wsLink,
  httpLink
);

export const client = new ApolloClient({
  link: splitLink,
  cache: new InMemoryCache(),
});
import { useSubscription, gql } from '@apollo/client';

const MESSAGE_ADDED = gql`
  subscription MessageAdded($roomId: String!) {
    messageAdded(roomId: $roomId) {
      id
      text
      authorId
      createdAt
    }
  }
`;

function ChatRoom({ roomId }: { roomId: string }) {
  const [messages, setMessages] = useState<Message[]>([]);

  useSubscription(MESSAGE_ADDED, {
    variables: { roomId },
    onData: ({ data }) => {
      const newMessage = data.data?.messageAdded;
      if (newMessage) {
        setMessages(prev => [...prev, newMessage]);
      }
    },
    onError: (error) => console.error('Subscription error:', error),
  });

  return (
    <div>
      {messages.map(msg => (
        <div key={msg.id}>{msg.text}</div>
      ))}
    </div>
  );
}

Server-Side Event Filtering

Sometimes need filter events right in resolver to avoid sending extra:

import { withFilter } from 'graphql-subscriptions';

const resolvers = {
  Subscription: {
    messageAdded: {
      // withFilter wraps asyncIterator and filters events
      subscribe: withFilter(
        (_, { roomId }) => pubsub.asyncIterator(`MESSAGES`),
        (payload, variables) => {
          // Send only if roomId matches
          return payload.messageAdded.roomId === variables.roomId;
        }
      ),
    },
  },
};

This allows using single broad channel MESSAGES instead of channel per room, filtering on server.

Connection Management and Memory Leaks

Careless Subscriptions can lead to leaks: iterators not closed, listeners accumulate.

// Proper iterator completion
const MESSAGE_ADDED_SUBSCRIPTION = {
  subscribe: async function* (_, { roomId }, context) {
    const iterator = pubsub.asyncIterator(`MESSAGE_ADDED:${roomId}`);

    try {
      for await (const value of iterator) {
        yield value;
      }
    } finally {
      // Called on client unsubscribe
      iterator.return?.();
    }
  },
};

graphql-ws automatically calls return() on iterator when connection closes or explicit unsubscribe, but explicit try/finally protects from non-standard cases.

Integration with Laravel Backend

If GraphQL API built on PHP (Lighthouse), publish events through Redis from Laravel, receive in Node.js WebSocket server:

// Laravel publishes event
Redis::publish('ORDER_STATUS:' . $order->id, json_encode([
    'orderStatusChanged' => [
        'orderId'   => $order->id,
        'status'    => $order->status,
        'updatedAt' => now()->toISOString(),
    ],
]));
// Node.js WS server listens Redis and forwards to pubsub
const subscriber = new Redis({ host: process.env.REDIS_HOST });
subscriber.psubscribe('ORDER_STATUS:*');

subscriber.on('pmessage', (pattern, channel, message) => {
  const orderId = channel.split(':')[1];
  const data = JSON.parse(message);
  pubsub.publish(`ORDER_STATUS:${orderId}`, data);
});

Testing

import { createTestClient } from 'apollo-server-testing';
import { execute, subscribe } from 'graphql';

it('should deliver messages to subscribers', async () => {
  const results: any[] = [];
  const iterator = await subscribe({
    schema,
    document: parse(`subscription { messageAdded(roomId: "room1") { id text } }`),
    contextValue: { userId: 'user1' },
  });

  // Collect first event
  const firstResult = (await (iterator as AsyncIterator<any>).next()).value;

  // Publish event
  pubsub.publish('MESSAGE_ADDED:room1', {
    messageAdded: { id: '1', text: 'Hello', roomId: 'room1', authorId: 'user2' }
  });

  expect(firstResult.data.messageAdded.text).toBe('Hello');
});

Timeline

Basic Subscriptions with one event type on existing GraphQL server — 2–3 days. Full implementation with Redis PubSub, authentication via connectionParams, multiple subscription types and tests — 1–1.5 weeks. Adding to Laravel/Lighthouse via hybrid PHP + Node.js schema — another 2–3 days.