Real-time subscriptions

Written By Stanislas

Last updated 16 days ago


Subscribe to real-time events like message streaming, tool calls, and user activity using WebSocket connections. Learn how to set up subscriptions and handle live updates in your chat application.

Overview

Subscriptions allow you to listen to real-time events from the API using WebSocket connections. Instead of polling for updates, your client receives events as they happen. This enables features like character-by-character message streaming, live tool call indicators, typing notifications, and more.

The Swiftask API provides six main subscription types: message streaming, new complete messages, intermediate messages, agent tool calls, user typing, and bot analyzing datasources. You can subscribe to one or multiple events simultaneously.


Prerequisites

Before setting up subscriptions, ensure you have:

  1. Authenticated with the API β€” Follow the Authentication & Setup guide to get your accessToken and workspaceId

  2. Configured WebSocket in your Apollo Client β€” Your client must have a WebSocketLink configured with proper authentication

  3. A session ID β€” From Managing chat sessions

  4. Understanding of async patterns β€” Subscriptions use observables and callbacks


Getting started

Here's the minimal setup to subscribe to streaming messages:

Step 1: Configure WebSocket connection

Set up Apollo Client with WebSocket support for subscriptions.

import { ApolloClient, InMemoryCache, createHttpLink } from '@apollo/client';
import { setContext } from '@apollo/client/link/context';
import { WebSocketLink } from '@apollo/client/link/ws';
import { split } from '@apollo/client';
import { getMainDefinition } from '@apollo/client/utilities';

const httpLink = createHttpLink({
  uri: 'https://graphql.swiftask.ai/graphql',
});

const authLink = setContext((_, { headers }) => ({
  headers: {
    ...headers,
    authorization: `Bearer ${accessToken}`,
    'x-workspace-id': workspaceId,
    'x-client': 'widget',
  },
}));

const wsLink = new WebSocketLink({
  uri: 'wss://graphql.swiftask.ai/graphql',
  options: {
    reconnect: true,
    connectionParams: {
      authorization: `Bearer ${accessToken}`,
      workspaceId: workspaceId,
    },
  },
});

// Route queries/mutations to HTTP, subscriptions to WebSocket
const splitLink = split(
  ({ query }) => {
    const definition = getMainDefinition(query);
    return definition.kind === 'OperationDefinition' && definition.operation === 'subscription';
  },
  wsLink,
  authLink.concat(httpLink)
);

const client = new ApolloClient({
  link: splitLink,
  cache: new InMemoryCache(),
});

Step 2: Subscribe to message streaming

Listen to real-time message chunks as they stream in.

import { gql } from '@apollo/client';

const MESSAGE_STREAM = gql`
  subscription OnMessageStream($sessionId: Float!) {
    onMessageStream(sessionId: $sessionId) {
      messageChunk
      botResponseMessageId
      isStoppable
    }
  }
`;

const subscribeToMessageStream = (client, sessionId, onChunk) => {
  let fullMessage = '';

  return client
    .subscribe({
      query: MESSAGE_STREAM,
      variables: { sessionId },
    })
    .subscribe({
      next: ({ data }) => {
        const chunk = data.onMessageStream.messageChunk;
        fullMessage += chunk;
        onChunk(fullMessage, data.onMessageStream);
      },
      error: (error) => {
        console.error('Stream error:', error);
      },
      complete: () => {
        console.log('Stream complete');
      },
    });
};

// Usage
const subscription = subscribeToMessageStream(client, 67890, (fullMessage, data) => {
  console.log('Streaming:', fullMessage);
  if (data.isStoppable) {
    console.log('User can stop generation');
  }
});

// Cleanup when done
// subscription.unsubscribe();

Step 3: Subscribe to new complete messages

Listen for new complete messages in the session (useful for multi-user chats).

const NEW_MESSAGE = gql`
  subscription NewMessage($sessionId: Float!) {
    newMessage(sessionId: $sessionId) {
      id
      message
      createdAt
      sentBy {
        firstName
      }
      isBotReply
    }
  }
`;

const subscribeToNewMessages = (client, sessionId, onNewMessage) => {
  return client
    .subscribe({
      query: NEW_MESSAGE,
      variables: { sessionId },
    })
    .subscribe({
      next: ({ data }) => {
        onNewMessage(data.newMessage);
      },
      error: (error) => {
        console.error('Subscription error:', error);
      },
    });
};

// Usage
const messageSub = subscribeToNewMessages(client, 67890, (message) => {
  console.log(`${message.sentBy.firstName}: ${message.message}`);
  addMessageToUI(message);
});

Available subscriptions

Message streaming

Subscribe to real-time character-by-character streaming of the bot's response.

GraphQL Subscription:

subscription OnMessageStream($sessionId: Float!) {
  onMessageStream(sessionId: $sessionId) {
    messageChunk
    userId
    id
    botId
    botResponseMessageId
    isStoppable
  }
}

Response fields:

Field

Type

Description

messageChunk

string

The new text chunk to append

userId

number

User ID generating the message

id

number

Stream message ID

botId

number

Bot ID generating the response

botResponseMessageId

number

ID of the complete response message

isStoppable

boolean

Whether generation can be stopped

Use cases:

  • Display streaming text in real-time

  • Show a stop button while generating

  • Create a typewriter effect

  • Update UI character by character

Example with UI updates:

const subscribeToMessageStream = (client, sessionId, onChunk) => {
  let fullMessage = '';
  let messageElement = createMessageElement();

  return client
    .subscribe({
      query: MESSAGE_STREAM,
      variables: { sessionId },
    })
    .subscribe({
      next: ({ data }) => {
        fullMessage += data.onMessageStream.messageChunk;
        messageElement.textContent = fullMessage;

        if (data.isStoppable) {
          showStopButton(() => {
            // Handle stop click
          });
        } else {
          hideStopButton();
        }

        onChunk(fullMessage, data.onMessageStream);
      },
      error: (error) => {
        console.error('Stream error:', error);
        messageElement.classList.add('error');
      },
      complete: () => {
        messageElement.classList.add('complete');
      },
    });
};
New complete messages

Subscribe to new complete messages (not streaming chunks).

GraphQL Subscription:

subscription NewMessage($sessionId: Float!) {
  newMessage(sessionId: $sessionId) {
    id
    message
    createdAt
    sessionId
    sentBy {
      id
      firstName
      lastName
    }
    isBotReply
  }
}

Use cases:

  • Display new messages in chat history

  • Show notifications for new messages

  • Update message counts

  • Trigger UI animations

Example:

const NEW_MESSAGE = gql`
  subscription NewMessage($sessionId: Float!) {
    newMessage(sessionId: $sessionId) {
      id
      message
      createdAt
      sentBy {
        firstName
      }
      isBotReply
    }
  }
`;

const subscribeToNewMessages = (client, sessionId, onNewMessage) => {
  return client
    .subscribe({
      query: NEW_MESSAGE,
      variables: { sessionId },
    })
    .subscribe({
      next: ({ data }) => {
        const message = data.newMessage;
        const sender = message.isBotReply ? 'Bot' : message.sentBy.firstName;
        
        onNewMessage(message);
        
        // Play notification sound for user messages
        if (!message.isBotReply) {
          playNotificationSound();
        }
      },
    });
};

Intermediate messages

Subscribe to intermediate processing messages (the bot's thinking steps).

GraphQL Subscription:

subscription NewIntermediateMessage($sessionId: Float!) {
  newIntermediateMessage(sessionId: $sessionId) {
    id
    message
    createdAt
    isBotReply
  }
}

Use cases:

  • Show the bot's thinking process

  • Display analysis steps

  • Debug agent behavior

  • Show progress indicators

Example:

const INTERMEDIATE_MESSAGE = gql`
  subscription NewIntermediateMessage($sessionId: Float!) {
    newIntermediateMessage(sessionId: $sessionId) {
      id
      message
      createdAt
    }
  }
`;

const subscribeToIntermediateMessages = (client, sessionId, onIntermediate) => {
  return client
    .subscribe({
      query: INTERMEDIATE_MESSAGE,
      variables: { sessionId },
    })
    .subscribe({
      next: ({ data }) => {
        console.log('Thinking step:', data.newIntermediateMessage.message);
        onIntermediate(data.newIntermediateMessage);
        showThinkingIndicator(data.newIntermediateMessage.message);
      },
    });
};

Agent tool calls

Subscribe to events when the agent uses tools (search, calculations, API calls, etc.).

GraphQL Subscription:

subscription OnAgentToolCall($sessionId: Float!) {
  onAgentToolCall(sessionId: $sessionId) {
    tool
    toolImage
    botId
    id
    status
  }
}

Response fields:

Field

Type

Description

tool

string

Name of the tool being used

toolImage

string

Icon/image URL for the tool

botId

number

Bot ID using the tool

id

number

Tool call event ID

status

string

"START", "END", or "ERROR"

Common tool names:

  • web_search β€” Web searching

  • calculator β€” Mathematical calculations

  • knowledge_base β€” Knowledge base lookup

  • api_call β€” External API calls

  • database_query β€” Database queries

Use cases:

  • Show which tools the agent is using

  • Display tool execution progress

  • Show error indicators for failed tools

  • Create a tool activity log

Example:

const TOOL_CALL = gql`
  subscription OnAgentToolCall($sessionId: Float!) {
    onAgentToolCall(sessionId: $sessionId) {
      tool
      status
      toolImage
    }
  }
`;

const subscribeToToolCalls = (client, sessionId, onToolCall) => {
  const activeTools = {};

  return client
    .subscribe({
      query: TOOL_CALL,
      variables: { sessionId },
    })
    .subscribe({
      next: ({ data }) => {
        const { tool, status, toolImage } = data.onAgentToolCall;

        if (status === 'START') {
          activeTools[tool] = true;
          showToolIndicator(tool, toolImage);
          console.log(`πŸ”§ Using ${tool}...`);
        } else if (status === 'END') {
          delete activeTools[tool];
          hideToolIndicator(tool);
          console.log(`βœ“ ${tool} completed`);
        } else if (status === 'ERROR') {
          delete activeTools[tool];
          showToolError(tool);
          console.log(`βœ— ${tool} failed`);
        }

        onToolCall(data.onAgentToolCall);
      },
    });
};

User typing

Subscribe to typing indicators when users are typing.

GraphQL Subscription:

subscription UserTyping($sessionId: Float!) {
  userTyping(sessionId: $sessionId) {
    id
    firstName
    lastName
  }
}

Use cases:

  • Show "User is typing..." indicator

  • Improve real-time chat experience

  • Prevent message overlap issues

Example:

const USER_TYPING = gql`
  subscription UserTyping($sessionId: Float!) {
    userTyping(sessionId: $sessionId) {
      id
      firstName
    }
  }
`;

const subscribeToTyping = (client, sessionId, onTyping) => {
  return client
    .subscribe({
      query: USER_TYPING,
      variables: { sessionId },
    })
    .subscribe({
      next: ({ data }) => {
        const user = data.userTyping;
        showTypingIndicator(`${user.firstName} is typing...`);
        onTyping(user);
      },
    });
};
Bot analyzing datasources

Subscribe to events when the bot is analyzing its data sources.

GraphQL Subscription:

subscription BotAnalyzingDatasources($sessionId: Float!) {
  botAnalyzingDatasources(sessionId: $sessionId) {
    id
    firstName
  }
}

Use cases:

  • Show "Analyzing knowledge base..." indicator

  • Indicate bot is searching for information

  • Display progress during data analysis

Example:

const BOT_ANALYZING = gql`
  subscription BotAnalyzingDatasources($sessionId: Float!) {
    botAnalyzingDatasources(sessionId: $sessionId) {
      id
      firstName
    }
  }
`;

const subscribeToAnalyzing = (client, sessionId, onAnalyzing) => {
  return client
    .subscribe({
      query: BOT_ANALYZING,
      variables: { sessionId },
    })
    .subscribe({
      next: ({ data }) => {
        showAnalyzingIndicator('Searching knowledge base...');
        onAnalyzing(data.botAnalyzingDatasources);
      },
    });
};
Complete subscription manager

Here's a reusable class that manages multiple subscriptions:

class SubscriptionManager {
  constructor(client, sessionId) {
    this.client = client;
    this.sessionId = sessionId;
    this.subscriptions = [];
  }

  subscribeToAll(callbacks) {
    // Message streaming
    if (callbacks.onMessageChunk) {
      this.subscribeToStream(callbacks.onMessageChunk);
    }

    // New complete messages
    if (callbacks.onNewMessage) {
      this.subscribeToMessages(callbacks.onNewMessage);
    }

    // Intermediate thinking steps
    if (callbacks.onIntermediate) {
      this.subscribeToIntermediateMessages(callbacks.onIntermediate);
    }

    // Tool calls
    if (callbacks.onToolCall) {
      this.subscribeToToolCalls(callbacks.onToolCall);
    }

    // User typing
    if (callbacks.onUserTyping) {
      this.subscribeToTyping(callbacks.onUserTyping);
    }

    // Bot analyzing
    if (callbacks.onBotAnalyzing) {
      this.subscribeToAnalyzing(callbacks.onBotAnalyzing);
    }
  }

  subscribeToStream(onChunk) {
    const MESSAGE_STREAM = gql`
      subscription OnMessageStream($sessionId: Float!) {
        onMessageStream(sessionId: $sessionId) {
          messageChunk
          botResponseMessageId
          isStoppable
        }
      }
    `;

    let fullMessage = '';

    const subscription = this.client
      .subscribe({
        query: MESSAGE_STREAM,
        variables: { sessionId: this.sessionId },
      })
      .subscribe({
        next: ({ data }) => {
          fullMessage += data.onMessageStream.messageChunk;
          onChunk(fullMessage, data.onMessageStream);
        },
      });

    this.subscriptions.push(subscription);
    return subscription;
  }

  subscribeToMessages(onMessage) {
    const NEW_MESSAGE = gql`
      subscription NewMessage($sessionId: Float!) {
        newMessage(sessionId: $sessionId) {
          id
          message
          createdAt
          isBotReply
          sentBy {
            firstName
          }
        }
      }
    `;

    const subscription = this.client
      .subscribe({
        query: NEW_MESSAGE,
        variables: { sessionId: this.sessionId },
      })
      .subscribe({
        next: ({ data }) => {
          onMessage(data.newMessage);
        },
      });

    this.subscriptions.push(subscription);
    return subscription;
  }

  subscribeToIntermediateMessages(onIntermediate) {
    const INTERMEDIATE = gql`
      subscription NewIntermediateMessage($sessionId: Float!) {
        newIntermediateMessage(sessionId: $sessionId) {
          id
          message
          createdAt
        }
      }
    `;

    const subscription = this.client
      .subscribe({
        query: INTERMEDIATE,
        variables: { sessionId: this.sessionId },
      })
      .subscribe({
        next: ({ data }) => {
          onIntermediate(data.newIntermediateMessage);
        },
      });

    this.subscriptions.push(subscription);
    return subscription;
  }

  subscribeToToolCalls(onToolCall) {
    const TOOL_CALL = gql`
      subscription OnAgentToolCall($sessionId: Float!) {
        onAgentToolCall(sessionId: $sessionId) {
          tool
          status
          toolImage
        }
      }
    `;

    const subscription = this.client
      .subscribe({
        query: TOOL_CALL,
        variables: { sessionId: this.sessionId },
      })
      .subscribe({
        next: ({ data }) => {
          onToolCall(data.onAgentToolCall);
        },
      });

    this.subscriptions.push(subscription);
    return subscription;
  }

  subscribeToTyping(onTyping) {
    const USER_TYPING = gql`
      subscription UserTyping($sessionId: Float!) {
        userTyping(sessionId: $sessionId) {
          firstName
        }
      }
    `;

    const subscription = this.client
      .subscribe({
        query: USER_TYPING,
        variables: { sessionId: this.sessionId },
      })
      .subscribe({
        next: ({ data }) => {
          onTyping(data.userTyping);
        },
      });

    this.subscriptions.push(subscription);
    return subscription;
  }

  subscribeToAnalyzing(onAnalyzing) {
    const BOT_ANALYZING = gql`
      subscription BotAnalyzingDatasources($sessionId: Float!) {
        botAnalyzingDatasources(sessionId: $sessionId) {
          id
          firstName
        }
      }
    `;

    const subscription = this.client
      .subscribe({
        query: BOT_ANALYZING,
        variables: { sessionId: this.sessionId },
      })
      .subscribe({
        next: ({ data }) => {
          onAnalyzing(data.botAnalyzingDatasources);
        },
      });

    this.subscriptions.push(subscription);
    return subscription;
  }

  unsubscribeAll() {
    this.subscriptions.forEach((sub) => sub.unsubscribe());
    this.subscriptions = [];
  }

  unsubscribe(subscription) {
    subscription.unsubscribe();
    this.subscriptions = this.subscriptions.filter((s) => s !== subscription);
  }
}

// Usage example
const subManager = new SubscriptionManager(client, 67890);

subManager.subscribeToAll({
  onMessageChunk: (fullMessage, data) => {
    updateStreamingMessage(fullMessage);
  },
  onNewMessage: (message) => {
    addMessageToHistory(message);
  },
  onToolCall: (toolCall) => {
    showToolActivity(toolCall);
  },
  onUserTyping: (user) => {
    showTypingIndicator(user);
  },
  onBotAnalyzing: () => {
    showAnalyzingIndicator();
  },
});

// Cleanup when component unmounts
// subManager.unsubscribeAll();

Practical use cases

Full-featured chat UI

Implement a complete chat interface with all subscription types:

class ChatUI {
  constructor(client, sessionId) {
    this.subManager = new SubscriptionManager(client, sessionId);
    this.messages = [];
  }

  initialize() {
    this.subManager.subscribeToAll({
      onMessageChunk: (fullMessage) => {
        this.updateStreamingMessage(fullMessage);
      },
      onNewMessage: (message) => {
        this.messages.push(message);
        this.renderMessages();
        
        if (!message.isBotReply) {
          this.playNotificationSound();
        }
      },
      onToolCall: (toolCall) => {
        if (toolCall.status === 'START') {
          this.showToolIndicator(toolCall.tool);
        } else if (toolCall.status === 'END') {
          this.hideToolIndicator(toolCall.tool);
        } else if (toolCall.status === 'ERROR') {
          this.showToolError(toolCall.tool);
        }
      },
      onUserTyping: (user) => {
        this.showTypingIndicator(`${user.firstName} is typing...`);
      },
      onBotAnalyzing: () => {
        this.showLoadingIndicator('Analyzing knowledge base...');
      },
    });
  }

  updateStreamingMessage(fullMessage) {
    const botMessageElement = document.getElementById('bot-message');
    botMessageElement.textContent = fullMessage;
  }

  renderMessages() {
    const messagesContainer = document.getElementById('messages');
    messagesContainer.innerHTML = this.messages
      .map((msg) => `
        <div class="message ${msg.isBotReply ? 'bot' : 'user'}">
          <p>${msg.message}</p>
          <span class="timestamp">${new Date(msg.createdAt).toLocaleTimeString()}</span>
        </div>
      `)
      .join('');
  }

  showToolIndicator(tool) {
    const indicator = document.createElement('div');
    indicator.id = `tool-${tool}`;
    indicator.className = 'tool-indicator';
    indicator.textContent = `Using ${tool}...`;
    document.getElementById('status').appendChild(indicator);
  }

  hideToolIndicator(tool) {
    const indicator = document.getElementById(`tool-${tool}`);
    if (indicator) indicator.remove();
  }

  showToolError(tool) {
    const indicator = document.getElementById(`tool-${tool}`);
    if (indicator) {
      indicator.classList.add('error');
      indicator.textContent = `${tool} failed`;
    }
  }

  showTypingIndicator(text) {
    document.getElementById('typing').textContent = text;
    setTimeout(() => {
      document.getElementById('typing').textContent = '';
    }, 3000);
  }

  showLoadingIndicator(text) {
    document.getElementById('status').textContent = text;
  }

  playNotificationSound() {
    const audio = new Audio('/notification.mp3');
    audio.play().catch(() => {
      // Audio playback failed, silently continue
    });
  }

  cleanup() {
    this.subManager.unsubscribeAll();
  }
}

// Usage
const chatUI = new ChatUI(client, 67890);
chatUI.initialize();

// Cleanup on page unload
window.addEventListener('beforeunload', () => {
  chatUI.cleanup();
});
Selective subscriptions

Selective subscriptions

Subscribe only to the events you need:

// Only subscribe to streaming and tool calls
const subManager = new SubscriptionManager(client, sessionId);

subManager.subscribeToStream((fullMessage) => {
  updateUI(fullMessage);
});

subManager.subscribeToToolCalls((toolCall) => {
  if (toolCall.status === 'START') {
    showToolBadge(toolCall.tool);
  }
});

Error handling with subscriptions

Handle WebSocket errors and reconnection:

const subscribeWithErrorHandling = (client, sessionId, onChunk) => {
  const MESSAGE_STREAM = gql`
    subscription OnMessageStream($sessionId: Float!) {
      onMessageStream(sessionId: $sessionId) {
        messageChunk
      }
    }
  `;

  return client
    .subscribe({
      query: MESSAGE_STREAM,
      variables: { sessionId },
    })
    .subscribe({
      next: ({ data }) => {
        onChunk(data.onMessageStream.messageChunk);
      },
      error: (error) => {
        console.error('Subscription error:', error);
        
        if (error.message?.includes('authorization')) {
          // Re-authenticate
          console.log('Re-authenticating...');
          // Handle auth error
        } else if (error.message?.includes('network')) {
          // Network error - retry after delay
          console.log('Network error, retrying in 5s...');
          setTimeout(() => {
            subscribeWithErrorHandling(client, sessionId, onChunk);
          }, 5000);
        }
      },
      complete: () => {
        console.log('Subscription completed');
      },
    });
};

Best practices

  1. Always unsubscribe when done β€” Prevent memory leaks by unsubscribing when components unmount.

  2. Combine subscriptions efficiently β€” Use a manager class to handle multiple subscriptions together.

  3. Handle errors gracefully β€” Implement error callbacks and reconnection logic.

  4. Validate data before updating UI β€” Always check that subscription data is valid before rendering.

  5. Implement timeouts for long-running streams β€” Stop streaming if it takes too long.

  6. Show appropriate UI indicators β€” Use tool indicators, typing notifications, and loading states.

  7. Buffer chunks for smooth updates β€” Don't update UI on every single character; batch updates for better performance.

  8. Use selective subscriptions β€” Only subscribe to events you actually need.

  9. Handle WebSocket reconnection β€” Configure reconnect: true and implement retry logic.

  10. Monitor subscription health β€” Log subscription events for debugging.