Overview#
You can now use Workflow Streams to enable rich interactive user interfaces (UIs) for applications built on Temporal's Durable Execution. While Temporal users have been rendering incremental UI updates for many years, they use approaches that seem like workarounds. Workflow Streams is a durable streams abstraction, packaged as a library and built from Temporal's basic primitives: Workflows, Signals, Updates, and Queries.
You can use Workflow Streams to build AI agents that continuously update a UI with status, reasoning, tool calls, and generated text. This gives a person monitoring the agent the opportunity to interrupt or steer it, or just confidence that it is doing the right thing. Other uses include payment processing and order status updates.
Workflow Streams is in Public Preview for Python and TypeScript. It already comes integrated with Temporal's plugins for OpenAI Agents SDK, Google ADK, and LangGraph.
Background#
Most of us know streaming from music and video: you see the content as it becomes available. Workflow Streams gives Temporal applications the same property for its state and side effects.
Using Workflow Streams#
Workflow Streams is a durable abstraction implemented as a library on top of core Temporal primitives: Workflows, Signals, Updates, and Queries. A WorkflowStream is a namespace of topics scoped to one Workflow. Each topic carries an ordered sequence of typed events. The Workflow itself, its Activities, and external clients can all publish to the same topic, and multiple subscribers can read. Stream content is durably tracked by the Workflow, and the client library makes it easy to publish and subscribe.
To initialize a Workflow with Workflow Streams:
from temporalio import workflow
from temporalio.contrib.workflow_streams import WorkflowStream
@workflow.defn
class MyWorkflow:
@workflow.init
def __init__(self, input):
self.stream = WorkflowStream()
In the example below, we publish two events to the Workflow's status topic:
...
@workflow.run
async def run(self, input):
status = self.stream.topic("status", type=StatusEvent)
...
status.publish("working on step 1")
...
status.publish("working on step 2")
...
...
Publishers can be Activities, Child Workflows, unrelated Workflows, Standalone Activities, or Temporal clients. Here is what publishing from an Activity looks like:
from temporalio import activity
from temporalio.contrib.workflow_streams import WorkflowStreamClient
@activity.defn
async def my_activity(task_id) -> None:
client = WorkflowStreamClient.from_within_activity()
async with client:
output_stream = client.topic("outputs", type=OutputEvent)
for item in fetch_items(task_id):
item_result = process_item(item)
output_stream.publish(item_result)
You can consume events using a Temporal client, e.g., one that started the Workflow:
async def watch_progress(temporal_client, workflow_id) -> None:
stream = WorkflowStreamClient.create(temporal_client, workflow_id)
status = stream.topic("status", type=StatusEvent)
async for event in status.subscribe():
print(event)
The Workflow Streams implementation#
Architectural overview#
We implemented Workflow Streams using existing Temporal primitives, so it inherits the platform's scalability and reliability. Temporal scales out across many independent Workflows. A single Workflow processes events sequentially, which makes batching the main lever for increasing per-stream throughput.
The core idea is to maintain a list of stream events in the Workflow. Publishing from within the Workflow has minimal overhead because it merely involves appending to a list.
When events originate outside the Workflow, we use a Temporal Signal (a one-way message into a running Workflow) to send them over. Each Signal is a round trip and is recorded in the Workflow's persistent history, while an LLM can emit thousands of small chunks during a single response. Batching amortizes those costs and keeps history growth bounded. The default batch interval is 2 seconds. For AI streaming, the integration plugins use 100 milliseconds, which is responsive enough for a UI.
On the subscriber side, we implement a long-polling mechanism using Temporal's Update primitive (a request/response RPC into a Workflow). All events are numbered, and the client always passes the last received event to the Update handler. If the Workflow has any newer stream events, it returns immediately. If not, the Update handler waits until more events become available, then returns.
The sequence diagram below illustrates a typical flow where the publisher is within a Temporal Activity and the subscriber is an external client.
When you create a WorkflowStream inside the @workflow.init constructor (Temporal's hook that runs once when the Workflow starts), it dynamically registers the Signal and Update handlers it needs.
Important details#
You can tune the responsiveness of UI updates by adjusting the batch_interval. The default is 2 seconds, and you can choose whatever value makes sense in your application.
Workflow Streams provides exactly-once semantics. Internally, publishers tag batches with sequence keys so retries deduplicate; subscribers track offsets so they can resume from where they left off. Dedup state is retained for publisher_ttl (default 15 minutes); a retry that arrives after its entry has been pruned may appear to the consumer as a duplicate. When multiple publishers write the same topic concurrently, the Workflow's sequential Signal processing determines the total order. Activity retries that produce different content (an LLM call, for instance) need cooperation from the consumer; we cover that case in the Integrations section.
Continue-as-New (Temporal's mechanism for restarting a long-running Workflow into a fresh history) also requires care. Because Update handlers must drain before Continue-as-New runs, a long-polling subscriber would otherwise block the transition. Workflow Streams provides a wrapper for continue_as_new that handles the handoff: it detaches in-flight pollers, then snapshots the log and dedup table into the next run as prior_state. Exactly-once semantics carry across the boundary, and subscriber clients follow the chain automatically and resume at the next event.
await self.stream.continue_as_new(
lambda stream_state: [
WorkflowInput(
app_state=self.app_state,
stream_state=stream_state,
)
]
)
What about…?#
Pub/sub with Redis or Valkey#
Redis and Valkey provide a pub/sub messaging system that has been a popular choice among some Temporal users when implementing streaming. This approach offers very low latency, but the at-most-once delivery guarantee limits reliability. Recovering from dropped messages requires sequencing, gap detection, and idempotent application, in effect reinventing at-least-once with deduplication on top. Applications end up with two paths, a durable slow one and a best-effort fast one, and reconciliation logic in the UI to merge them under failure.
With a durable stream the UI is a pure function of the event log, allowing users to focus on application level code rather than reconciliation logic.
Redis Streams is durable and gets closer to the shape of Workflow Streams, but it isn't a substitute. Workflows still cannot publish directly; they hand off through Activities. The streams live in a separate system you operate alongside Temporal. And subscribers still own deduplication against publisher retries.
Workflow Streams provides the same exactly-once semantics with no additional infrastructure beyond the Workflow itself.
Iterable responses in Temporal#
We considered making Temporal results iterable: Update, Activity, or Workflow would return an async iterator, implemented with streaming delivery. The appealing property would be composition: a Workflow could build its iterator out of the iterators its Activities and child Workflows return.
We didn't go that direction. Some failures need to be visible to the UI but hidden from the Workflow. Most LLM APIs do not support resuming an interrupted call, so a retry produces a different response, and the UI has to retract the partial output and replace it. An event-based contract carries that retract-and-replace signal cleanly. Iterables would either surface the retry to the Workflow, defeating durable execution, or hide it from the UI, where the user has already seen the partial output.
Integrations#
Temporal integrates with agent frameworks such as OpenAI Agents SDK and Google ADK. When using Workflow Streams with these integrations, you configure the plugin to publish events from streaming LLM responses as they occur.
For example, with the OpenAI Agents SDK integration, the plugin wraps the model invocation as an Activity and publishes each native OpenAI ResponseStreamEvent to a configured topic. You set this up by passing a streaming topic to the plugin:
from datetime import timedelta
from temporalio.contrib.openai_agents import (
OpenAIAgentsPlugin,
ModelActivityParameters,
)
plugin = OpenAIAgentsPlugin(
model_params=ModelActivityParameters(
streaming_topic="agent_events",
streaming_batch_interval=timedelta(milliseconds=1000),
),
)
In your Workflow, host a WorkflowStream and call the agent's streaming runner as usual:
from agents import Agent, Runner
from temporalio import workflow
from temporalio.contrib.workflow_streams import WorkflowStream
@workflow.defn
class MyAgent:
@workflow.init
def __init__(self):
self.stream = WorkflowStream()
@workflow.run
async def run(self, prompt: str) -> str:
agent = Agent(name="Assistant", instructions="...")
result = Runner.run_streamed(agent, prompt)
return result.final_output
External UIs subscribe to the agent_events topic to render output as it arrives:
from temporalio.contrib.workflow_streams import WorkflowStreamClient
stream = WorkflowStreamClient.create(temporal_client, workflow_id)
events = stream.topic("agent_events")
async for event in events.subscribe():
# forward to UI
...
The Workflow doesn't see streaming events. It calls the agent and gets a final result, while subscribers see each delta as it arrives.
If the streaming Activity fails partway through, the deltas it already published stay on the stream, and the retried attempt publishes its own sequence. The plugin does not mark the transition; the consumer detects it from the model's own boundary signals (a fresh response.created, for instance) and resets the accumulated state.
Complete example#
We have published a complete sample application using Workflow Streams. It demonstrates a business analytics agent that generates Python and SQL code, keeping a user interface updated as it goes.
The sample uses Server-Sent Events (SSE) between the web browser UI and the back-end server, with the SSE stream derived from the durable Workflow Stream. SSE provides a mechanism for resuming connections: the client sends the last event ID received, and the server picks up from there. Because the underlying source is the durable stream, the Web API server needs no durability mechanism of its own; it stays stateless.
Conclusion#
Workflow Streams fits the many-independent-streams case, where each stream is tied to a single Workflow. It is not a replacement for high-throughput streaming systems like Apache Kafka.
You can build interactive agents that show their work (status, reasoning, tool calls, and generated text) and let observers interrupt or steer them. The same pattern fits order processing, payments, and other long-running work.
Workflow Streams is in Public Preview for Python and TypeScript. Go and Java are coming next.