Real-time subscriptions
Written By Stanislas
Last updated 16 days ago
Subscribe to real-time events like message streaming, tool calls, and user activity using WebSocket connections. Learn how to set up subscriptions and handle live updates in your chat application.
Overview
Subscriptions allow you to listen to real-time events from the API using WebSocket connections. Instead of polling for updates, your client receives events as they happen. This enables features like character-by-character message streaming, live tool call indicators, typing notifications, and more.
The Swiftask API provides six main subscription types: message streaming, new complete messages, intermediate messages, agent tool calls, user typing, and bot analyzing datasources. You can subscribe to one or multiple events simultaneously.
Prerequisites
Before setting up subscriptions, ensure you have:
Authenticated with the API β Follow the Authentication & Setup guide to get your
accessTokenandworkspaceIdConfigured WebSocket in your Apollo Client β Your client must have a WebSocketLink configured with proper authentication
A session ID β From Managing chat sessions
Understanding of async patterns β Subscriptions use observables and callbacks
Getting started
Here's the minimal setup to subscribe to streaming messages:
Step 1: Configure WebSocket connection
Set up Apollo Client with WebSocket support for subscriptions.
import { ApolloClient, InMemoryCache, createHttpLink } from '@apollo/client';
import { setContext } from '@apollo/client/link/context';
import { WebSocketLink } from '@apollo/client/link/ws';
import { split } from '@apollo/client';
import { getMainDefinition } from '@apollo/client/utilities';
const httpLink = createHttpLink({
uri: 'https://graphql.swiftask.ai/graphql',
});
const authLink = setContext((_, { headers }) => ({
headers: {
...headers,
authorization: `Bearer ${accessToken}`,
'x-workspace-id': workspaceId,
'x-client': 'widget',
},
}));
const wsLink = new WebSocketLink({
uri: 'wss://graphql.swiftask.ai/graphql',
options: {
reconnect: true,
connectionParams: {
authorization: `Bearer ${accessToken}`,
workspaceId: workspaceId,
},
},
});
// Route queries/mutations to HTTP, subscriptions to WebSocket
const splitLink = split(
({ query }) => {
const definition = getMainDefinition(query);
return definition.kind === 'OperationDefinition' && definition.operation === 'subscription';
},
wsLink,
authLink.concat(httpLink)
);
const client = new ApolloClient({
link: splitLink,
cache: new InMemoryCache(),
});
Step 2: Subscribe to message streaming
Listen to real-time message chunks as they stream in.
import { gql } from '@apollo/client';
const MESSAGE_STREAM = gql`
subscription OnMessageStream($sessionId: Float!) {
onMessageStream(sessionId: $sessionId) {
messageChunk
botResponseMessageId
isStoppable
}
}
`;
const subscribeToMessageStream = (client, sessionId, onChunk) => {
let fullMessage = '';
return client
.subscribe({
query: MESSAGE_STREAM,
variables: { sessionId },
})
.subscribe({
next: ({ data }) => {
const chunk = data.onMessageStream.messageChunk;
fullMessage += chunk;
onChunk(fullMessage, data.onMessageStream);
},
error: (error) => {
console.error('Stream error:', error);
},
complete: () => {
console.log('Stream complete');
},
});
};
// Usage
const subscription = subscribeToMessageStream(client, 67890, (fullMessage, data) => {
console.log('Streaming:', fullMessage);
if (data.isStoppable) {
console.log('User can stop generation');
}
});
// Cleanup when done
// subscription.unsubscribe();
Step 3: Subscribe to new complete messages
Listen for new complete messages in the session (useful for multi-user chats).
const NEW_MESSAGE = gql`
subscription NewMessage($sessionId: Float!) {
newMessage(sessionId: $sessionId) {
id
message
createdAt
sentBy {
firstName
}
isBotReply
}
}
`;
const subscribeToNewMessages = (client, sessionId, onNewMessage) => {
return client
.subscribe({
query: NEW_MESSAGE,
variables: { sessionId },
})
.subscribe({
next: ({ data }) => {
onNewMessage(data.newMessage);
},
error: (error) => {
console.error('Subscription error:', error);
},
});
};
// Usage
const messageSub = subscribeToNewMessages(client, 67890, (message) => {
console.log(`${message.sentBy.firstName}: ${message.message}`);
addMessageToUI(message);
});
Available subscriptions
Message streaming
Subscribe to real-time character-by-character streaming of the bot's response.
GraphQL Subscription:
subscription OnMessageStream($sessionId: Float!) {
onMessageStream(sessionId: $sessionId) {
messageChunk
userId
id
botId
botResponseMessageId
isStoppable
}
}
Response fields:
Use cases:
Display streaming text in real-time
Show a stop button while generating
Create a typewriter effect
Update UI character by character
Example with UI updates:
const subscribeToMessageStream = (client, sessionId, onChunk) => {
let fullMessage = '';
let messageElement = createMessageElement();
return client
.subscribe({
query: MESSAGE_STREAM,
variables: { sessionId },
})
.subscribe({
next: ({ data }) => {
fullMessage += data.onMessageStream.messageChunk;
messageElement.textContent = fullMessage;
if (data.isStoppable) {
showStopButton(() => {
// Handle stop click
});
} else {
hideStopButton();
}
onChunk(fullMessage, data.onMessageStream);
},
error: (error) => {
console.error('Stream error:', error);
messageElement.classList.add('error');
},
complete: () => {
messageElement.classList.add('complete');
},
});
};
New complete messagesSubscribe to new complete messages (not streaming chunks).
GraphQL Subscription:
subscription NewMessage($sessionId: Float!) {
newMessage(sessionId: $sessionId) {
id
message
createdAt
sessionId
sentBy {
id
firstName
lastName
}
isBotReply
}
}
Use cases:
Display new messages in chat history
Show notifications for new messages
Update message counts
Trigger UI animations
Example:
const NEW_MESSAGE = gql`
subscription NewMessage($sessionId: Float!) {
newMessage(sessionId: $sessionId) {
id
message
createdAt
sentBy {
firstName
}
isBotReply
}
}
`;
const subscribeToNewMessages = (client, sessionId, onNewMessage) => {
return client
.subscribe({
query: NEW_MESSAGE,
variables: { sessionId },
})
.subscribe({
next: ({ data }) => {
const message = data.newMessage;
const sender = message.isBotReply ? 'Bot' : message.sentBy.firstName;
onNewMessage(message);
// Play notification sound for user messages
if (!message.isBotReply) {
playNotificationSound();
}
},
});
};
Intermediate messages
Subscribe to intermediate processing messages (the bot's thinking steps).
GraphQL Subscription:
subscription NewIntermediateMessage($sessionId: Float!) {
newIntermediateMessage(sessionId: $sessionId) {
id
message
createdAt
isBotReply
}
}
Use cases:
Show the bot's thinking process
Display analysis steps
Debug agent behavior
Show progress indicators
Example:
const INTERMEDIATE_MESSAGE = gql`
subscription NewIntermediateMessage($sessionId: Float!) {
newIntermediateMessage(sessionId: $sessionId) {
id
message
createdAt
}
}
`;
const subscribeToIntermediateMessages = (client, sessionId, onIntermediate) => {
return client
.subscribe({
query: INTERMEDIATE_MESSAGE,
variables: { sessionId },
})
.subscribe({
next: ({ data }) => {
console.log('Thinking step:', data.newIntermediateMessage.message);
onIntermediate(data.newIntermediateMessage);
showThinkingIndicator(data.newIntermediateMessage.message);
},
});
};
Agent tool calls
Subscribe to events when the agent uses tools (search, calculations, API calls, etc.).
GraphQL Subscription:
subscription OnAgentToolCall($sessionId: Float!) {
onAgentToolCall(sessionId: $sessionId) {
tool
toolImage
botId
id
status
}
}
Response fields:
Common tool names:
web_searchβ Web searchingcalculatorβ Mathematical calculationsknowledge_baseβ Knowledge base lookupapi_callβ External API callsdatabase_queryβ Database queries
Use cases:
Show which tools the agent is using
Display tool execution progress
Show error indicators for failed tools
Create a tool activity log
Example:
const TOOL_CALL = gql`
subscription OnAgentToolCall($sessionId: Float!) {
onAgentToolCall(sessionId: $sessionId) {
tool
status
toolImage
}
}
`;
const subscribeToToolCalls = (client, sessionId, onToolCall) => {
const activeTools = {};
return client
.subscribe({
query: TOOL_CALL,
variables: { sessionId },
})
.subscribe({
next: ({ data }) => {
const { tool, status, toolImage } = data.onAgentToolCall;
if (status === 'START') {
activeTools[tool] = true;
showToolIndicator(tool, toolImage);
console.log(`π§ Using ${tool}...`);
} else if (status === 'END') {
delete activeTools[tool];
hideToolIndicator(tool);
console.log(`β ${tool} completed`);
} else if (status === 'ERROR') {
delete activeTools[tool];
showToolError(tool);
console.log(`β ${tool} failed`);
}
onToolCall(data.onAgentToolCall);
},
});
};
User typing
Subscribe to typing indicators when users are typing.
GraphQL Subscription:
subscription UserTyping($sessionId: Float!) {
userTyping(sessionId: $sessionId) {
id
firstName
lastName
}
}
Use cases:
Show "User is typing..." indicator
Improve real-time chat experience
Prevent message overlap issues
Example:
const USER_TYPING = gql`
subscription UserTyping($sessionId: Float!) {
userTyping(sessionId: $sessionId) {
id
firstName
}
}
`;
const subscribeToTyping = (client, sessionId, onTyping) => {
return client
.subscribe({
query: USER_TYPING,
variables: { sessionId },
})
.subscribe({
next: ({ data }) => {
const user = data.userTyping;
showTypingIndicator(`${user.firstName} is typing...`);
onTyping(user);
},
});
};
Bot analyzing datasourcesSubscribe to events when the bot is analyzing its data sources.
GraphQL Subscription:
subscription BotAnalyzingDatasources($sessionId: Float!) {
botAnalyzingDatasources(sessionId: $sessionId) {
id
firstName
}
}
Use cases:
Show "Analyzing knowledge base..." indicator
Indicate bot is searching for information
Display progress during data analysis
Example:
const BOT_ANALYZING = gql`
subscription BotAnalyzingDatasources($sessionId: Float!) {
botAnalyzingDatasources(sessionId: $sessionId) {
id
firstName
}
}
`;
const subscribeToAnalyzing = (client, sessionId, onAnalyzing) => {
return client
.subscribe({
query: BOT_ANALYZING,
variables: { sessionId },
})
.subscribe({
next: ({ data }) => {
showAnalyzingIndicator('Searching knowledge base...');
onAnalyzing(data.botAnalyzingDatasources);
},
});
};
Complete subscription managerHere's a reusable class that manages multiple subscriptions:
class SubscriptionManager {
constructor(client, sessionId) {
this.client = client;
this.sessionId = sessionId;
this.subscriptions = [];
}
subscribeToAll(callbacks) {
// Message streaming
if (callbacks.onMessageChunk) {
this.subscribeToStream(callbacks.onMessageChunk);
}
// New complete messages
if (callbacks.onNewMessage) {
this.subscribeToMessages(callbacks.onNewMessage);
}
// Intermediate thinking steps
if (callbacks.onIntermediate) {
this.subscribeToIntermediateMessages(callbacks.onIntermediate);
}
// Tool calls
if (callbacks.onToolCall) {
this.subscribeToToolCalls(callbacks.onToolCall);
}
// User typing
if (callbacks.onUserTyping) {
this.subscribeToTyping(callbacks.onUserTyping);
}
// Bot analyzing
if (callbacks.onBotAnalyzing) {
this.subscribeToAnalyzing(callbacks.onBotAnalyzing);
}
}
subscribeToStream(onChunk) {
const MESSAGE_STREAM = gql`
subscription OnMessageStream($sessionId: Float!) {
onMessageStream(sessionId: $sessionId) {
messageChunk
botResponseMessageId
isStoppable
}
}
`;
let fullMessage = '';
const subscription = this.client
.subscribe({
query: MESSAGE_STREAM,
variables: { sessionId: this.sessionId },
})
.subscribe({
next: ({ data }) => {
fullMessage += data.onMessageStream.messageChunk;
onChunk(fullMessage, data.onMessageStream);
},
});
this.subscriptions.push(subscription);
return subscription;
}
subscribeToMessages(onMessage) {
const NEW_MESSAGE = gql`
subscription NewMessage($sessionId: Float!) {
newMessage(sessionId: $sessionId) {
id
message
createdAt
isBotReply
sentBy {
firstName
}
}
}
`;
const subscription = this.client
.subscribe({
query: NEW_MESSAGE,
variables: { sessionId: this.sessionId },
})
.subscribe({
next: ({ data }) => {
onMessage(data.newMessage);
},
});
this.subscriptions.push(subscription);
return subscription;
}
subscribeToIntermediateMessages(onIntermediate) {
const INTERMEDIATE = gql`
subscription NewIntermediateMessage($sessionId: Float!) {
newIntermediateMessage(sessionId: $sessionId) {
id
message
createdAt
}
}
`;
const subscription = this.client
.subscribe({
query: INTERMEDIATE,
variables: { sessionId: this.sessionId },
})
.subscribe({
next: ({ data }) => {
onIntermediate(data.newIntermediateMessage);
},
});
this.subscriptions.push(subscription);
return subscription;
}
subscribeToToolCalls(onToolCall) {
const TOOL_CALL = gql`
subscription OnAgentToolCall($sessionId: Float!) {
onAgentToolCall(sessionId: $sessionId) {
tool
status
toolImage
}
}
`;
const subscription = this.client
.subscribe({
query: TOOL_CALL,
variables: { sessionId: this.sessionId },
})
.subscribe({
next: ({ data }) => {
onToolCall(data.onAgentToolCall);
},
});
this.subscriptions.push(subscription);
return subscription;
}
subscribeToTyping(onTyping) {
const USER_TYPING = gql`
subscription UserTyping($sessionId: Float!) {
userTyping(sessionId: $sessionId) {
firstName
}
}
`;
const subscription = this.client
.subscribe({
query: USER_TYPING,
variables: { sessionId: this.sessionId },
})
.subscribe({
next: ({ data }) => {
onTyping(data.userTyping);
},
});
this.subscriptions.push(subscription);
return subscription;
}
subscribeToAnalyzing(onAnalyzing) {
const BOT_ANALYZING = gql`
subscription BotAnalyzingDatasources($sessionId: Float!) {
botAnalyzingDatasources(sessionId: $sessionId) {
id
firstName
}
}
`;
const subscription = this.client
.subscribe({
query: BOT_ANALYZING,
variables: { sessionId: this.sessionId },
})
.subscribe({
next: ({ data }) => {
onAnalyzing(data.botAnalyzingDatasources);
},
});
this.subscriptions.push(subscription);
return subscription;
}
unsubscribeAll() {
this.subscriptions.forEach((sub) => sub.unsubscribe());
this.subscriptions = [];
}
unsubscribe(subscription) {
subscription.unsubscribe();
this.subscriptions = this.subscriptions.filter((s) => s !== subscription);
}
}
// Usage example
const subManager = new SubscriptionManager(client, 67890);
subManager.subscribeToAll({
onMessageChunk: (fullMessage, data) => {
updateStreamingMessage(fullMessage);
},
onNewMessage: (message) => {
addMessageToHistory(message);
},
onToolCall: (toolCall) => {
showToolActivity(toolCall);
},
onUserTyping: (user) => {
showTypingIndicator(user);
},
onBotAnalyzing: () => {
showAnalyzingIndicator();
},
});
// Cleanup when component unmounts
// subManager.unsubscribeAll();
Practical use cases
Full-featured chat UI
Implement a complete chat interface with all subscription types:
class ChatUI {
constructor(client, sessionId) {
this.subManager = new SubscriptionManager(client, sessionId);
this.messages = [];
}
initialize() {
this.subManager.subscribeToAll({
onMessageChunk: (fullMessage) => {
this.updateStreamingMessage(fullMessage);
},
onNewMessage: (message) => {
this.messages.push(message);
this.renderMessages();
if (!message.isBotReply) {
this.playNotificationSound();
}
},
onToolCall: (toolCall) => {
if (toolCall.status === 'START') {
this.showToolIndicator(toolCall.tool);
} else if (toolCall.status === 'END') {
this.hideToolIndicator(toolCall.tool);
} else if (toolCall.status === 'ERROR') {
this.showToolError(toolCall.tool);
}
},
onUserTyping: (user) => {
this.showTypingIndicator(`${user.firstName} is typing...`);
},
onBotAnalyzing: () => {
this.showLoadingIndicator('Analyzing knowledge base...');
},
});
}
updateStreamingMessage(fullMessage) {
const botMessageElement = document.getElementById('bot-message');
botMessageElement.textContent = fullMessage;
}
renderMessages() {
const messagesContainer = document.getElementById('messages');
messagesContainer.innerHTML = this.messages
.map((msg) => `
<div class="message ${msg.isBotReply ? 'bot' : 'user'}">
<p>${msg.message}</p>
<span class="timestamp">${new Date(msg.createdAt).toLocaleTimeString()}</span>
</div>
`)
.join('');
}
showToolIndicator(tool) {
const indicator = document.createElement('div');
indicator.id = `tool-${tool}`;
indicator.className = 'tool-indicator';
indicator.textContent = `Using ${tool}...`;
document.getElementById('status').appendChild(indicator);
}
hideToolIndicator(tool) {
const indicator = document.getElementById(`tool-${tool}`);
if (indicator) indicator.remove();
}
showToolError(tool) {
const indicator = document.getElementById(`tool-${tool}`);
if (indicator) {
indicator.classList.add('error');
indicator.textContent = `${tool} failed`;
}
}
showTypingIndicator(text) {
document.getElementById('typing').textContent = text;
setTimeout(() => {
document.getElementById('typing').textContent = '';
}, 3000);
}
showLoadingIndicator(text) {
document.getElementById('status').textContent = text;
}
playNotificationSound() {
const audio = new Audio('/notification.mp3');
audio.play().catch(() => {
// Audio playback failed, silently continue
});
}
cleanup() {
this.subManager.unsubscribeAll();
}
}
// Usage
const chatUI = new ChatUI(client, 67890);
chatUI.initialize();
// Cleanup on page unload
window.addEventListener('beforeunload', () => {
chatUI.cleanup();
});
Selective subscriptionsSelective subscriptions
Subscribe only to the events you need:
// Only subscribe to streaming and tool calls
const subManager = new SubscriptionManager(client, sessionId);
subManager.subscribeToStream((fullMessage) => {
updateUI(fullMessage);
});
subManager.subscribeToToolCalls((toolCall) => {
if (toolCall.status === 'START') {
showToolBadge(toolCall.tool);
}
});
Error handling with subscriptions
Handle WebSocket errors and reconnection:
const subscribeWithErrorHandling = (client, sessionId, onChunk) => {
const MESSAGE_STREAM = gql`
subscription OnMessageStream($sessionId: Float!) {
onMessageStream(sessionId: $sessionId) {
messageChunk
}
}
`;
return client
.subscribe({
query: MESSAGE_STREAM,
variables: { sessionId },
})
.subscribe({
next: ({ data }) => {
onChunk(data.onMessageStream.messageChunk);
},
error: (error) => {
console.error('Subscription error:', error);
if (error.message?.includes('authorization')) {
// Re-authenticate
console.log('Re-authenticating...');
// Handle auth error
} else if (error.message?.includes('network')) {
// Network error - retry after delay
console.log('Network error, retrying in 5s...');
setTimeout(() => {
subscribeWithErrorHandling(client, sessionId, onChunk);
}, 5000);
}
},
complete: () => {
console.log('Subscription completed');
},
});
};
Best practices
Always unsubscribe when done β Prevent memory leaks by unsubscribing when components unmount.
Combine subscriptions efficiently β Use a manager class to handle multiple subscriptions together.
Handle errors gracefully β Implement error callbacks and reconnection logic.
Validate data before updating UI β Always check that subscription data is valid before rendering.
Implement timeouts for long-running streams β Stop streaming if it takes too long.
Show appropriate UI indicators β Use tool indicators, typing notifications, and loading states.
Buffer chunks for smooth updates β Don't update UI on every single character; batch updates for better performance.
Use selective subscriptions β Only subscribe to events you actually need.
Handle WebSocket reconnection β Configure
reconnect: trueand implement retry logic.Monitor subscription health β Log subscription events for debugging.