Robust Message Handlers

Drew

Drew Hoskins

Staff Product Manager

Imagine these scenarios:

  • Your shipment-tracking Workflow needs to know when the item leaves the warehouse and is loaded into a truck. You could signal your Workflow when the truck driver scans the barcode, and the Workflow will update the status and send out notifications.
  • Users can add items to your e-commerce shopping cart Workflow. Update it to add the item, calculate the subtotal, and receive back the subtotal and updated item list to render.

Signals and Updates are the Temporal messages that power these scenarios, enabling outside callers to interact with Workflows. But handling these messages can be tricky. We’ve heard consistent customer feedback that due to three distinct categories of concurrency problems, it’s not always easy to code up blocking message handlers in Workflows.

So, we’ve launched a flurry of improvements to the developer experience for Signal and Update handlers. They will allow you to create powerful, interactive Workflows faster and with fewer lurking bugs.

Read on to learn about them. But first, you’ll need a bit of background.

Message handling styles

When you’re handling a Signal or Update message in your Workflow, there are two coding styles you can adopt: Processing the requests in a handler, or using the handler only to queue up the work. Which should you choose?

To answer, I’ll give a small Temporal history lesson as a way to introduce the styles and compare them on their merits, and then, I’ll announce improvements that will help you choose the convenience of handlers more often.

The battle between handlers and queues

The Go SDK–the OG of Temporal SDKs–exclusively uses Signal Channels which the main Workflow method “receives” Signals from. That is, there are no provided Signal handlers. Often, people receive a Signal from a channel and put it into a queue for later processing at a time of their choosing.

Here’s a Python example in this queueing style. It’s inspired by the EntityBedrockWorkflow sample in Python, which demos a chatbot where Signals come in when a user prompts the AI.

In the handler, we do some light validation and put the work into a queue:

@workflow.signal
async def user_prompt(self, prompt: str) -> None:
    if self.chat_ended:
      workflow.logger.warn(f"Message dropped due to chat closed: {prompt}")
      return

      self.prompt_queue.append(prompt)

This is not too complicated, but now imagine adding a second type of Signal that needs to be handled in the order it was received. You’d end up with an “event loop” that iterates through your queue and switches on the Signal type. It quickly gets complicated and becomes hard to make type safe.

This is a shame, given that Temporal, by its nature, already provides you an event loop.

So, when we launched the Java SDK, we created Signal handlers and hoped people would shift to using them. And we continued this pattern in all subsequent languages.

Here’s the same Signal in handler style:

@workflow.signal
async def user_prompt(self, prompt: str) -> None:
  if self.chat_ended:
     workflow.logger.warn(f"Message dropped due to chat closed: {prompt}")
     return

  self.conversation_history.append(("user", prompt))
  #Get a response
  response = await self.send_prompt_to_bedrock(prompt)
  self.conversation_history[-1].add_response(response)

As you can see, this style is more convenient–it looks like an API in any web application.

However, there’s a catch. During the call to send_prompt_to_bedrock, this handler might interleave with others as Signals come in, causing race conditions. Can you spot it? We’ll fix it later.

Users reported three main types of issues with Signal handlers:

  • As above, it’s hard to reason about interleaved handlers, making it easy to write concurrency bugs.
  • With no initializer method available, in certain circumstances, handlers could run before the Workflow initialized, causing crashes.
  • It was hard to ensure that handlers complete before the Workflow exits.

Confronted with these issues, developers often resorted to queues. After all, if you simply loop through your queue and process one message at a time, your work is serialized and there are no race conditions.

As we turned our attention to Workflow Updates, we saw that the handler style was even more suitable, enough so that we switched to handlers for Updates in Go. This is because returning values and exceptions directly to clients is much easier from a handler.

Since the three problems apply to Updates as well, this gave us even more motivation to make all handlers safer and more useful, as you’ll see below.

There are still great use cases for queues, which give you absolute control over the ordering and timing of your work. Queues are especially useful when different messages are not processed independently of one another. For example, in a streaming-to-batch scenario where you want to aggregate various Signals before sending their payloads in a batch to an activity, place the work into a queue and periodically send out the work.

What we built

This brings us to today, where we are announcing three improvements to Signal and Update handlers. We’ve also completely revamped our docs and added samples.

Synchronization Primitives

To help you serialize interleaved handlers without using a queue, you need concurrency primitives like locks. Primitives are now available in all Temporal SDKs. These extensions guard critical sections, are replay-safe, and in most cases can be interrupted by Workflow cancellation.

In our chat example, we might fix the race condition with an asyncio.Lock like so:

@workflow.signal
async def user_prompt(self, prompt: str) -> None:
  # ...
  async with self.conversation_history_lock:
      self.conversation_history.append(("user", prompt))
      #call an activity.
      response = await self.send_prompt_to_bedrock(prompt)
      self.conversation_history[-1].add_response(response)

Here in Python, we were able to simply recommend the built-in locking libraries with no wrapper. But in all other SDKs, we provide our own or recommend a third-party library, while embracing the style of its language’s native primitives. See our documentation for links to the appropriate locking primitives in each SDK.

Workflow Initialization Methods

Initialize your Workflow's state before handling messages. This prevents reading uninitialized instance variables. In Typescript or Go, the game is the same: wait until after you’ve initialized before registering message handlers. But if you use one of our object-oriented SDKs: Java, Python, .NET, or PHP, you haven’t had an easy way to do this until now.

Initializing at the top of your Workflow’s main method can cause faults: @workflow.defn

class MyBadWorkflow:
def __init__(self) -> None:
     self.helloee: Optional[str] = None

@workflow.run
async def run(self, helloee: str) -> None:
     self.helloee = helloee
     #...

@workflow.update
async def hello(self, greeting: str = “hello”) -> str:
     #Bug! helloee could be None!
     return f"{greeting}, {self.helloee}!"

This will break reliably with Signal-with-Start patterns, but could also occur unexpectedly in other scenarios where a Signal arrived before a Worker ran the first Task in the Workflow.

To fix this, in these SDKs, you may now annotate your Workflow’s constructor and receive the Workflow’s arguments in it. The constructor is guaranteed to run before your handler here:

@workflow.defn
class MyGoodWorkflow:
     @workflow.init
     def __init__(self, helloee: str) -> None:
         self.helloee = helloee

@workflow.run
async def run(self, helloee: str) -> None:
  #...

@workflow.update
async def hello(self, greeting: str = “hello”) -> str:
   #Works!
   return f"{greeting}, {self.helloee}!"

We recommend that you use this pattern for any interactive Workflow, though note that you cannot make blocking calls in the constructor. Workflow Init is available today in .NET, Python, and Java, and soon in PHP. Consult our documentation to get started.

Dangling Handlers

Blocking handlers can accidentally be left partially finished when the Workflow completes or continues as new. We call these dangling handlers, and they can cause data inconsistency and leave clients with unmet expectations.

Before, for the main Workflow routine to tell when all the handlers are finished, you’d need to do your own careful bookkeeping, which is cumbersome and error-prone.

We’ve now added a method, All Handlers Finished, that lets you await any remaining handlers. If you don’t, we will warn you, reminding you to wait for this condition before you exit or continue as new. If you really want to abandon an unfinished handler, you may suppress this diagnostic using a Handler Unfinished Policy. See our documentation.

Summary

This suite of concurrency-management primitives makes handling Updates and Signals breezier. We hope that this gives handler styles their day in the sun, while we still acknowledge that queuing is a powerful mechanism for advanced use cases.

Further reading

We’ve revamped our docs on message passing. I’d like to call particular attention to our new overview of Message Handlers which provide a conceptual overview that will help you code with confidence.

We’ve also built samples showing best practices for handlers in various languages. See the Safe Message Handlers samples in Python, Typescript, .NET, and Go. Java is coming soon.

Feedback?

We’d love your feedback on anything mentioned here, as well as Workflow Update, which is in Public Preview as of this writing. As usual, you can connect to our community via Slack, GitHub, or our forum.