RTK Query gives you the ability to receive streaming updates for persistent queries. This enables a query to establish an ongoing connection to the server (typically using WebSockets), and apply updates to the cached data as additional information is received from the server.
Streaming updates can be used to enable the API to receive real-time updates to the back-end data, such as new entries being created, or important properties being updated.
To enable streaming updates for a query, pass the asynchronous
onCacheEntryAdded function to the query, including the logic for how to update the query when streamed data is received. See
onCacheEntryAdded API reference for more details.
Primarily updates to query data should be done via
polling intermittently on an interval, using
cache invalidation to invalidate data based on tags associated with queries & mutations, or with
refetchOnMountOrArgChange to fetch fresh data when a component using the data mounts.
However, streaming updates is particularly useful for scenarios involving:
- Small, frequent changes to large objects. Rather than repeatedly polling for a large object repeatedly, the object can be fetched with an initial query, and streaming updates can update individual properties as updates are received.
- External event-driven updates. Where data may be changed by the server or otherwise external users and where real-time updates are expected to be shown to an active user, polling alone would result in periods of stale data in between queries, causing state to easily get out of sync. Streaming updates can update all active clients as the updates occur rather than waiting for the next interval to elapse.
Example use cases that benefit from streaming updates are:
- GraphQL subscriptions
- Real-time chat applications
- Real-time multiplayer games
- Collaborative document editing with multiple concurrent users
onCacheEntryAdded lifecycle callback lets you write arbitrary async logic that will be executed after a new cache entry is added to the RTK Query cache (ie, after a component has created a new subscription to a given endpoint+params combination).
onCacheEntryAdded will be called with two arguments: the
arg that was passed to the subscription, and an options object containing "lifecycle promises" and utility functions. You can use these to write sequenced logic that waits for data to be added, initiates server connections, applies partial updates, and cleans up the connection when the query subscription is removed.
Typically, you will
await cacheDataLoaded to determine when the first data has been fetched, then use the
updateCacheData utility to apply streaming updates as messages are received.
updateCacheData is an Immer-powered callback that receives a
draft of the current cache value. You may "mutate" the draft value to update it as needed based on the received values. RTK Query will then dispatch an action that applies a diffed patch based on those changes.
Finally, you can
await cacheEntryRemoved to know when to clean up any server connections.
getMessages query is triggered (e.g. via a component mounting with the
useGetMessagesQuery() hook), a
cache entry will be added based on the serialized arguments for the endpoint. The associated query will be fired off based on the
query property to fetch the initial data for the cache. Meanwhile, the asynchronous
onCacheEntryAdded callback will begin, and create a new WebSocket connection. Once the response for the initial query is received, the cache will be populated with the response data, and the
cacheDataLoaded promise will resolve. After awaiting the
cacheDataLoaded promise, the
message event listener will be added to the WebSocket connection, which updates the cache data when an associated message is received.
When there are no more active subscriptions to the data (e.g. when the subscribed components remain unmounted for a sufficient amount of time), the
cacheEntryRemoved promise will resolve, allowing the remaining code to run and close the websocket connection. RTK Query will also remove the associated data from the cache.
If a query for the corresponding cache entry runs later, it will overwrite the whole cache entry, and the streaming update listeners will continue to work on the updated data.
This example demonstrates how the previous example can be altered to allow for transforming the response shape when adding data to the cache.
For example, the data is transformed from this shape:
A key point to keep in mind is that updates to the cached data within the
onCacheEntryAdded callback must respect the transformed data shape which will be present for the cached data. The example shows how
createEntityAdapter can be used for the initial
transformResponse, and again when streamed updates are received to upsert received items into the cached data, while maintaining the normalized state structure.