Amazon Bedrock with Temporal: Rock Solid

Temporal Symbol light 1@2x

Brendan Myers & Steve Androulakis

Solutions Architects

tl;dr In this blog we walk through the steps required to build a chatbot powered by Temporal and Amazon Bedrock.

Developing AI-powered applications has never been easier, and with multiple high-quality large language models (LLMs) publicly available, developers have many options. Amazon Bedrock makes it straightforward for developers to access models from leading providers through a single, standardized API, and removes the friction and pain of hosting reliable, scalable models.

Even with all the benefits that come with Amazon Bedrock, there are still a number of broader challenges that must be addressed when building production-grade Generative AI (Gen AI) applications.

  • LLMs are stateless; if the history of interactions with a model are required – which is the case for conversational AI – then the history must be stored securely.
  • LLM conversations may be very long running, and may have large intervals between interactions. Very long running sessions can be tricky to manage;
    • It can be difficult to determine which sessions can remain in memory and which can be persisted to disk, resulting in extra costs where too much memory is left unused, or poor performance due to too little memory.
    • There are two paths for handling user inputs; one for active sessions and another for inactive sessions. Active sessions are simpler as the session is present in memory, whereas inactive sessions are more complicated; the session needs to be restored, additional user requests may arrive during session restoration, and errors during session restoration need to be handled.
  • LLMs are deterministic, but generally we want some randomness (e.g. top-p, top-k. temperature) in the output. For example, to enable users to have the LLM rephrase a result if needed. For debugging and investigation, individual parameters passed to an LLM need to be logged for each and every invocation, and the overall history/sequence of invocations but be able to be rebuilt if needed.

These are difficult challenges but solvable. Caching/persistence can be added to store the history of conversations, queues can be added to help with rate-limiting calls made to Bedrock, and tracing, observability, and logging tools can be strung together to provide visibility into Gen AI applications. The problem with this approach is all these solutions involve additional developer effort, become complicated quickly, and are yet another resource to monitor and troubleshoot when things inevitably go wrong.

How Temporal Solves these Problems

Temporal enables developers to write workflows that are guaranteed to execute to completion. Temporal manages queues, retries, state management, and more; developers spend less time on scaffolding and build reliable applications faster. By modeling each conversation as a Temporal workflow, the same patterns that Temporal developers use every day can be used to solve the challenges that come with building Gen AI applications powered by Amazon Bedrock.

  • The history of each conversation is stored by Temporal within the workflow, and can be used on subsequent calls to Amazon Bedrock to provide history and context for the LLM. As each conversation is a separate workflow, the history of each conversation is stored separately, significantly reducing the risk of accidentally exposing one user’s chat history to another user.
  • Temporal workflows can run forever, and a workflow that is waiting for an external signal (such as user input) doesn’t consume compute resources, meaning a workflow can wait indefinitely.
  • Temporal’s event history captures the inputs and outputs (including failures) for every step in a workflow, where each step can be almost anything; a write to a database, sending a notification, a request to Amazon Bedrock, etc. This allows for rapid debugging as Temporal provides a single place to view the execution history for a workflow.
  • Temporal provides a straightforward approach to developing processes that execute in parallel, bypassing traditional event-driven constructs like message buses and dead letter queues.

Building an AI chatbot with Temporal and Amazon Bedrock

To better understand how Temporal and Amazon Bedrock can be used to quickly build resilient AI applications, we’ll walk through building an example chatbot application using the Temporal Python SDK. We’ll start with a simple workflow to introduce some of the Temporal concepts, and then add additional features to build a more sophisticated conversational AI application. We’ll show the relevant snippets of code as we progress, and if you’d like to run the samples yourself, the full code is available on GitHub.

Prerequisites

  • An AWS account with access to one or more LLMs on Amazon Bedrock.
  • AWS secret access key configured on your local machine
  • Temporal CLI
  • Python >= 3.8
  • Temporal Python SDK

If you are following along with the code in the GitHub repository, you will also need Poetry to install the dependencies and run the samples.

A basic Bedrock workflow

Our first step is to build a workflow that interacts with Amazon Bedrock. In this simple example we’ll accept a prompt from a user and use it to generate a response from Bedrock.

bedrock-prompt-workflow

Using the Temporal Python SDK, a workflow is just a function written in standard Python with a decorator to define it as the workflow entry point. For our basic Bedrock workflow, the workflow definition is straightforward; we define a workflow class (SimpleBedrockWorkflow) with a single function (run) that returns the result of the activity that calls Bedrock (prompt_bedrock).


@workflow.defn
class BasicBedrockWorkflow:
    @workflow.run
    async def run(self, prompt: str) -> str:

        workflow.logger.info("Prompt: %s" % prompt)

        response = await workflow.execute_activity_method(
            BedrockActivities.prompt_bedrock,
            prompt,
            schedule_to_close_timeout=timedelta(seconds=20),
        )

        workflow.logger.info("Response: %s" % response)

        return response

An activity is some function or code that might fail and would potentially need to be retried. Temporal can automatically handle retries when an activity fails. How many times to retry, and how often, can be defined by an activity retry policy. By default, when an activity fails, Temporal will continue to retry the activity forever, with an increasing delay between retries (exponential backoff), until the activity is successful. Like the workflow definition, we use a decorator when defining activities.

The prompt_bedrock activity invokes an LLM on Bedrock and formats a response to return to the user.


class BedrockActivities:
    def __init__(self) -> None:
        self.bedrock = boto3.client(service_name="bedrock-runtime", config=config)

    @activity.defn
    def prompt_bedrock(self, prompt: str) -> str:
        # Model params
        modelId = "meta.llama2-70b-chat-v1"
        accept = "application/json"
        contentType = "application/json"
        max_gen_len = 512
        temperature = 0.1
        top_p = 0.2

        body = json.dumps(
            {
                "prompt": prompt,
                "max_gen_len": max_gen_len,
                "temperature": temperature,
                "top_p": top_p,
            }
        )

        response = self.bedrock.invoke_model(
            body=body, modelId=modelId, accept=accept, contentType=contentType
        )

        response_body = json.loads(response.get("body").read())

        return response_body.get("generation")

Finally, we create a worker that is responsible for executing our workflow and activity.


async def main():
    # Create client connected to server at the given address
    client = await Client.connect("localhost:7233")
    activities = BedrockActivities()

    # Run the worker
    with concurrent.futures.ThreadPoolExecutor(max_workers=100) as activity_executor:
        worker = Worker(
            client,
            task_queue="bedrock-task-queue",
            workflows=[BasicBedrockWorkflow],
            activities=[activities.prompt_bedrock],
            activity_executor=activity_executor,
        )
        await worker.run()

if __name__ == "__main__":
    print("Starting worker")
    print("Then run 'python send_message.py \"<prompt>\"'")

    logging.basicConfig(level=logging.INFO)

    asyncio.run(main())

To run our workflow, we first start an instance of the Temporal Server using the Temporal CLI with the command temporal server start-dev . The CLI server instance is a great way to quickly test workflows locally, but for higher level environments and production deployments we recommend Temporal Cloud.

First, we need to start our Temporal worker; this is what will process workflows and activities as they are requested.

$ poetry run python run_worker.py

In another terminal we can now start the workflow, with the prompt for Bedrock to complete a list of names for dogs, and view its output;


$ temporal workflow execute --type SimpleBedrockWorkflow --task-queue bedrock-task-queue --input '"Which animals are marsupials?"'
Running execution:
  WorkflowId  a79dff10-d6c9-4c97-8b2b-25429914b78c
  RunId       a0e7cb0a-dd3a-4b4d-bf82-cd690bd2cc5f
  Type        SimpleBedrockWorkflow
  Namespace   default
  TaskQueue   bedrock-task-queue
  Args        ["Which animals are
              marsupials?"]

Progress:

Time elapsed: 4s
  ID          Time                     Type
   1  2024-05-22T10:01:37Z  WorkflowExecutionStarted
   2  2024-05-22T10:01:37Z  WorkflowTaskScheduled
   3  2024-05-22T10:01:37Z  WorkflowTaskStarted
   4  2024-05-22T10:01:37Z  WorkflowTaskCompleted
   5  2024-05-22T10:01:37Z  ActivityTaskScheduled
   6  2024-05-22T10:01:37Z  ActivityTaskStarted
   7  2024-05-22T10:01:41Z  ActivityTaskCompleted
   8  2024-05-22T10:01:41Z  WorkflowTaskScheduled
   9  2024-05-22T10:01:41Z  WorkflowTaskStarted
  10  2024-05-22T10:01:41Z  WorkflowTaskCompleted
  11  2024-05-22T10:01:41Z  WorkflowExecutionCompleted

Result:
  Run Time: 5 seconds
  Status: COMPLETED
  Output: ["\n\nMarsupials are mammals that have a pouch in which they carry their young. Some examples of marsupials include:\n\n* Kangaroos and wallabies\n* Possums\n* Koalas\n* Wombats\n* Marsupial moles\n* Bilbies\n* Bandicoots\n* Dunnarts\n* Quokkas\n* Sugar gliders\n* Tasmanian devils\n\nThese are just a few examples of the many different species of marsupials that exist. Marsupials are found primarily in Australia and the surrounding islands, as well as in South America and parts of North America."]

Even with this very simple workflow we can already see some of the benefits Temporal provides.

  • You are guaranteed to get a result from Bedrock. For example, simulate a network fault by disconnecting your local environment from the internet and start the workflow, and after some time reconnect. Simulate a server or cluster fault by terminating your worker, then start the workflow. Restart the worker. In both of these cases your workflow will execute to completion.
  • Using the Temporal UI at http://localhost:8233 we can easily inspect the history of the workflow; which activity was executed, when it was executed, what inputs were passed to it, and what was the result. As workflows grow beyond a simple case such as this one, the ability to easily view a full history of a workflow execution is invaluable.

Adding signals & queries

Now we can expand on our workflow and begin modeling something that resembles a conversation. Users will now be able to provide follow-up prompts to Bedrock, and Bedrock will receive the prompt, along with the conversation history, which will allow Bedrock to provide a response that makes sense in the context of the conversation. If users don’t provide a prompt within a timeout window then the conversation will end, and a summary of the conversation is generated.

bedrock-signals-workflow

In our basic workflow we provided a single input at the start of execution and we received a single response at the end of execution. We’ll now store prompts as the user provides them, and we’ll also keep track of the conversation history.


@workflow.defn
class SignalQueryBedrockWorkflow:
    def __init__(self) -> None:
        # List to store prompt history
        self.conversation_history: List[Tuple[str, str]] = []
        self.prompt_queue: Deque[str] = deque()
        self.conversation_summary = ""
        self.chat_timeout: bool = False

We now need a way to receive user prompts and push them to the queue, and we want to expose the current conversation history to the user should they request it. There are two Temporal primitives that support this;

  • Signals allow us to inject data into a running workflow. They can also be used to start a new workflow execution that immediately receives the signal. We’ll use this feature in our conversation workflow as it reduces how we interact with our workflow to a single path; either our signal adds data to an existing conversation, or it starts a new conversation for us. Signals do not return any data to the original sender of the signal.
  • Queries are used to expose data from a running workflow. Queries cannot mutate the state / internal data of a workflow.

We use decorators to define our signal and query handlers, which in this case, are straightforward;


@workflow.signal
    async def user_prompt(self, prompt: str) -> None:
        # Chat timed out but the workflow is waiting for a chat summary to be generated
        if self.chat_timeout:
            workflow.logger.warn(f"Message dropped due to chat closed: {prompt}")
            return

        self.prompt_queue.append(prompt)

@workflow.query
def get_conversation_history(self) -> List[Tuple[str, str]]:
    return self.conversation_history

It’s important to note that signals and queries can be called any time during the lifetime of a workflow. If you have a case where a signal should not be processed immediately then this must be handled with additional logic, e.g. by adding the signal input to a queue. In our case, we will enqueue user inputs when they are received, and dequeue one each iteration of our main loop.

Next, we can modify our workflow to allow subsequent prompts from the user. In each iteration of our main loop, our workflow needs to:

  • Update the conversation history with the next user input
  • Create a prompt using the conversation history, and process it with our Bedrock activity
  • Add the Bedrock response to the conversation history
  • Check if there are user inputs on the queue, and if the queue is empty and no user inputs are received within a timeout window, finish the workflow

@workflow.run
async def run(self, inactivity_timeout_minutes: int) -> str:
    while True:
        workflow.logger.info(
            "Waiting for prompts... or closing chat after "
            + f"{inactivity_timeout_minutes} minute(s)"
        )

        # Wait for a chat message (signal) or timeout
        try:
            await workflow.wait_condition(
                lambda: bool(self.prompt_queue),
                timeout=timedelta(minutes=inactivity_timeout_minutes),
            )
        # If timeout was reached
        except asyncio.TimeoutError:
            self.chat_timeout = True
            workflow.logger.info("Chat closed due to inactivity")
            # End the workflow
            break

        while self.prompt_queue:
            # Fetch next user prompt and add to conversation history
            prompt = self.prompt_queue.popleft()
            self.conversation_history.append(("user", prompt))

            workflow.logger.info(f"Prompt: {prompt}")

            # Send the prompt to Amazon Bedrock
            response = await workflow.execute_activity_method(
                BedrockActivities.prompt_bedrock,
                self.prompt_with_history(prompt),
                schedule_to_close_timeout=timedelta(seconds=20),
            )

            workflow.logger.info(f"{response}")

            # Append the response to the conversation history
            self.conversation_history.append(("response", response))

    # Generate a summary before ending the workflow
    self.conversation_summary = await workflow.start_activity_method(
        BedrockActivities.prompt_bedrock,
        self.prompt_summary_from_history(),
        schedule_to_close_timeout=timedelta(seconds=20),
    )

    workflow.logger.info(f"Conversation summary:\n{self.conversation_summary}")

    return f"{self.conversation_history}"

As mentioned earlier, we can use signal with start to simplify how we start workflow execution and signal running workflows. Instead of having two paths, one to signal a running workflow, and one to start a new workflow then signal it, we can just send a signal and Temporal will start a new workflow execution if required.


async def main(prompt):
    # Create client connected to server at the given address
    client = await Client.connect("localhost:7233")

    workflow_id = "bedrock-workflow-with-signals"
    inactivity_timeout_minutes = 1

    # Sends a signal to the workflow (and starts it if needed)
    await client.start_workflow(
        SignalQueryBedrockWorkflow.run,
        inactivity_timeout_minutes,
        id=workflow_id,
        task_queue="bedrock-task-queue",
        start_signal="user_prompt",
        start_signal_args=[prompt],
    )

Start the worker with;

$ poetry run python run_worker.py

In another terminal we can send inputs to our workflow;

$ poetry run python send_message.py 'What animals are marsupials?'
$ poetry python send_message.py 'Do they lay eggs?'

Because we are sending inputs as signals, there is no Bedrock response returned from sending the signal. In a production deployment we could trigger a webhook or callback to notify the client when a response is available, but for this example, we’ll simply query the conversation history using poetry run python get_history.py or view the output from the worker (the output here has been formatted for readability);

Starting worker
Then run 'python send_message.py "<prompt>"'

INFO:temporalio.workflow:Waiting for prompts... or closing chat after 1 minute(s)

INFO:temporalio.workflow:Prompt: What animals are marsupials? INFO:temporalio.workflow: Response: Marsupials are mammals that have a pouch on their belly where they carry their young. They are found primarily in Australia, New Guinea, and nearby islands, although there are also some species found in the Americas and other parts of the world. Some examples of marsupials include kangaroos, wallabies, koalas, opossums, and wombats. Marsupials are known for their unique reproductive system, which involves a short gestation period and the birth of underdeveloped young that then complete their development inside the mother's pouch. 

INFO:temporalio.workflow:Waiting for prompts... or closing chat after 1 minute(s)

INFO:temporalio.workflow:Prompt: Do they lay eggs?
INFO:temporalio.workflow: Response: No, marsupials do not lay eggs. They are mammals, which means they give birth to live young instead of laying eggs like birds or reptiles. After the young are born, they crawl up to the mother's pouch and attach themselves to a nipple, where they continue to develop and grow until they are able to survive on their own.

INFO:temporalio.workflow:Waiting for prompts... or closing chat after 1 minute(s)

INFO:temporalio.workflow:Chat closed due to inactivity INFO:temporalio.workflow:Conversation summary:
INFO:temporalio.workflow:  Sure! Here is a two sentence summary of the conversation between the user and the chatbot: The chatbot provided information about marsupials, explaining that they are mammals with a pouch that carry their young, and gave examples of marsupials such as kangaroos and koalas. The chatbot also clarified that marsupials do not lay eggs, but instead give birth to live young that develop inside the mother's pouch.

This is a great opportunity to revisit the workflow history and to see that even as the workflow history grows, it is still easy to view the where/what/when of a workflow and its activities even as the history grows. The ‘compact’ view makes this especially easy to see.

We can also see how Temporal makes it simple to keep track of sessions, even as users change devices. After starting the worker and sending several inputs, try opening another terminal and querying the history or sending signals. Because the same workflow id is used, we can easily attach to a running conversation or view its history. In practice we wouldn’t hard-code workflow ids, we’d either provide unique ids based on some application/user id, or we’d let Temporal generate a unique id.

Further Improvements

Our workflow does a decent job of modeling a conversation by allowing users to have an ongoing back and forth exchange with Bedrock, but it currently has two significant limitations;

  1. Temporal Workflows are unbounded temporally but bounded spatially; they can run forever so long as they don’t exceed the maximum number of events in the event history (50,000 events) and the total size of the event history doesn’t grow too large (above 50MB). It’s possible that, in our previous workflow, a user chats long enough with our workflow that the workflow history limit is exceeded.
  2. Our workflow will finish the conversation after some fixed timeout. We could remove this timeout, but then the workflow would never finish. Ideally, we want conversations to last as long as a user wants - potentially forever. A user should be able to end the conversation when they’re ready.

When to finish the workflow is simple, and there are many ways to do this; in our case we just add a signal handler that can be used to end the conversation. An alternative would be to end the conversation based on the user’s prompt, i.e. finish the conversation when the user asks for it to finish.

Handling the case where we exceed the workflow history limits is an interesting problem, but is beyond the scope of this post. If you’re interested in how Temporal solves this challenge, Very Long-Running Workflows covers this in detail. The code for this blog also includes a sample that demonstrates very long running workflows with Bedrock.

Wrapping up

To recap, we saw how Temporal makes it easy to build Gen AI applications with Amazon Bedrock.

  • We discussed some of the challenges developers face when building Gen AI applications and how Temporal can be used to solve them.
  • We built a simple Temporal wrapper for Amazon Bedrock, and extended it to support signals and queries to allow for richer interactions with LLMs.
  • We extended our workflow to use the Entity Pattern to model entire conversations as entities that may last forever.
  • We saw how workflows provide greater visibility into the history of workflows, and also provide durability guarantees to ensure workflow history and state is always maintained.

What next?

In this blog we focused on conversational AI, which is just one of the thousands of great uses for Gen AI that Temporal and Bedrock make easy. Other common use-cases you may want to explore are;

  • AI Agents, including agents that require long-running external integrations,
  • Generating insights from reports or large datasets,
  • Personalization, including where a customer’s preferences change over time,
  • Image generation (text2image)

If you’re ready to learn more and start building your own bulletproof Gen AI applications with Temporal and Amazon Bedrock, make sure to check out these great resources;