<img height="1" width="1" style="display:none" src="https://www.facebook.com/tr?id=1063935717132479&amp;ev=PageView&amp;noscript=1 https://www.facebook.com/tr?id=1063935717132479&amp;ev=PageView&amp;noscript=1 "> Bitovi Blog - UX and UI design, JavaScript and Front-end development

How to Use Node.js Temporal Workflows to Batch Process Operations

Kevin Phillips

Node.js temporal workflows allow you to batch process thousands of operations. In this post, learn how to use child workflows, signals, queries, and more!

posted in Node.js on September 30, 2022 by Kevin Phillips


How to Use Node.js Temporal Workflows to Batch Process Operations

Kevin Phillips by Kevin Phillips

Temporal is a great tool for batch processing—in this post, we’ll show you all of the solutions that Temporal provides for when you need to handle tens, hundreds, or even thousands of operations.

New to Temporal? Check out our previous post to learn how to create a Workflow using the Temporal TypeScript SDK so you can focus on the business logic of your application and spend less time worrying about all of the things that can fail when developing distributed systems. If you want to jump in and try the code for yourself, take a look at our example repo on GitHub.

Workflow Overview

Each of the examples in this post will use the following workflow example. The goal of the workflow is to send an email.

The workflow code looks like this:

import { proxyActivities } from "@temporalio/workflow"

import type * as activities from "./activities"

const { sendEmail} = proxyActivities<typeof activities>({
  startToCloseTimeout: "1 minute",
})

export async function sendEmailWorkflow(
  emailAddress: string,
  subject: string,
  body: string
): Promise<void> {
  return sendEmail(emailAddress, subject, body)
}

Our workflow is defined with the sendEmailWorkflow function which takes three arguments: the emailAddress where the email will be sent, as well as the subject, and body of the email.

Note, in a real service you may not want to take in the raw body of the email. Instead, you may want to use an id that would allow you to fetch the email contents from another location (a CMS for example). This way you would not need to pass in large files or HTML content into your workflow function.

The sendEmail activity is responsible for sending the email. We won’t go into the details of how to create the activity in Node.js, so our basic activity code will just log the arguments:

export async function sendEmail(
  emailAddress: string,
  subject: string,
  body: string
): Promise<void> {
  console.log({ emailAddress, subject, body })
}

Now that we have the basic Workflow and Activity, let’s see what we need to do to update it to handle sending the email to more than one recipient.

Child Workflows

In order to be able to send several emails instead of just one, we can use Temporal’s child workflows. With child workflows, we will have a parent workflow sendEmailBatchWorkflow that will take in an array of email addresses. The parent workflow will start a child workflow for each email address and then wait for all of the child workflows to complete. The child workflow will be our original sendEmailWorkflow function.

import {
  proxyActivities,
  ChildWorkflowHandle,
  startChild,
} from "@temporalio/workflow"

import type * as activities from "./activities"

const { sendEmail } = proxyActivities<typeof activities>({
  startToCloseTimeout: "1 minute",
})

export async function sendEmailBatchWorkflow(
  emailAddresses: string[],
  subject: string,
  body: string
): Promise<void> {
  const childHandles: ChildWorkflowHandle<typeof childWorkflow>[] = []

  for (const emailAddress of emailAddresses) {
    const handle = await startChild(childWorkflow, {
      args: [emailAddress, subject, body],
    })
    childHandles.push(handle)
  }

  return Promise.all(childHandles.map((childHandle) => childHandle.result()))
}

export async function sendEmailWorkflow(
  emailAddress: string,
  subject: string,
  body: string
): Promise<void> {
  return sendEmail(emailAddress, subject, body)
}

Querying Child Workflow Status

Using parent workflows as a solution for firing off many child workflows and then waiting for them all to complete is a good start, but it is very useful to be able to track the overall progress of the batch operation. In order to make tracking possible, we need to use two more Temporal concepts: signals and queries.

Signals

Signals are a way to send a message to a running workflow. We can use signals to allow our child workflows to signal back to the parent workflow when the child completes. To set up a signal, we first use the defineSignal function from the @temporalio/workflow package:

export const childCompleteSignal = defineSignal<[]>("childComplete")

The defineSignal function takes a name ("childComplete") and its type signature takes an array of arguments (empty in our example).

We cannot call the signal from within the sendEmailWorkflow directly due to restrictions on where Temporal Clients can be created, so we will create a new Activity for sending the signal. The new Activity will need the workflowId for the parent workflow that will receive the signal, so we’ll have to add that as an argument to our workflows. The Activity that will be responsible for sending the signal looks like this:

export async function sendCompleteSignal(
  parentWorkflowId: string
): Promise<void> {
  const client = new WorkflowClient()
  const handle = client.getHandle(parentWorkflowId)
  return handle.signal(childCompleteSignal)
}

The parent sendEmailBatchWorkflow can then set up a handler that will be called when the the signal that is sent above by the sendCompleteSignal function is received by using the setHandler function from the @temporalio/workflow package:

  let complete = 0
  setHandler(childCompleteSignal, () => {
    complete += 1
  })

Now we have a way to track how many child workflows have completed within our parent workflow. Next, we’ll see how to use a query to make that information available outside of the workflow.

Queries

Queries are a way to expose internal state of a workflow to the outside—they can be called by a Workflow Client or the Temporal CLI. If you want to make it easier to call your query, you may want to set up an HTTP server that creates a Workflow client and calls the query.

Defining a query looks very similar to defining a signal:

export const statusQuery = defineQuery<string>("status")

The defineQuery function takes a name ("status") as well as the return type (string) and any arguments to the query. Our query has no arguments, but they would be placed after string in the angle brackets (<>) defining the TypeScript type parameter.

The parent workflow can then set up a handler for the query similar to the signal handler above:

  setHandler(statusQuery, () => {
    return `${complete} of ${emailAddresses.length} complete`
  })

Putting It All Together

After adding the signal and query logic, here is what the workflow code looks like:

import {
  proxyActivities,
  ChildWorkflowHandle,
  startChild,
  defineQuery,
  defineSignal,
  setHandler,
} from "@temporalio/workflow"

import type * as activities from "./activities"

const { sendEmail, sendCompleteSignal } = proxyActivities<
  typeof activities
>({
  startToCloseTimeout: "1 minute",
})

export const statusQuery = defineQuery<string>("status")
export const childCompleteSignal = defineSignal<[]>("childComplete")

export async function sendEmailBatchWorkflow(
  workflowId: string,
  emailAddresses: string[],
  subject: string,
  body: string
): Promise<void[]> {
  const childHandles: ChildWorkflowHandle<typeof sendEmailWorkflow>[] = []

  let complete = 0
  setHandler(childCompleteSignal, () => {
    complete += 1
  })
  
  setHandler(statusQuery, () => {
    return `${complete} of ${emailAddresses.length} complete`
  })

  for (const emailAddress of emailAddresses) {
    const handle = await startChild(sendEmailWorkflow, {
      args: [workflowId, emailAddress, subject, body],
    })
    childHandles.push(handle)
  }

  return Promise.all(childHandles.map((childHandle) => childHandle.result()))
}

export async function sendEmailWorkflow(
  parentWorkflowId: string,
  emailAddress: string,
  subject: string,
  body: string
): Promise<void> {
  await sendEmail(emailAddress, subject, body)
  return sendCompleteSignal(parentWorkflowId)
}

The full Activity code looks like this:

import { WorkflowClient } from "@temporalio/client"
import { childCompleteSignal } from "./workflows"

export async function sendEmail(
  emailAddress: string,
  subject: string,
  body: string
): Promise<void> {
  console.log({ emailAddress, subject, body })
}

export async function sendCompleteSignal(
  parentWorkflowId: string
): Promise<void> {
  const client = new WorkflowClient()
  const handle = client.getHandle(parentWorkflowId)
  return handle.signal(childCompleteSignal)
}

We now have a workflow that can send a batch of emails and respond to requests for its progress!

To try it yourself, take a look at the example on GitHub.

Dedicated Child Workflows

There are times when you don’t want to allow a workflow to be called multiple times with the same arguments — in our email example we may not want to send multiple emails to the same email address in a short amount of time. We only want one sendEmailWorkflow instance to run for each email address.

Temporal provides the signalwithstart API for this specific use case. signalWithStart will send a signal to a running workflow with a specific workflowId. If a workflow with that ID is not running, it will start the workflow and then send the signal.

You’ll first define a signal that will contain the arguments previously passed to sendEmailWorkflow:

export const queueSendEmailWorkflowSignal = defineSignal<[
  parentWorkflowId: string,
  subject: string,
  body: string
]>("queueSendEmailWorkflow")

Since signalWithStart needs to create a Temporal Client, it must be called from an Activity.

The code for calling signalWithStart looks like this:

export async function signalWithStartSendEmailWorkflow(
  parentWorkflowId: string,
  emailAddress: string,
  subject: string,
  body: string
): Promise<void> {
  const client = new WorkflowClient()

  await client.signalWithStart(sendEmailWorkflow, {
    taskQueue: "task-queue",
    workflowId: `${emailAddress}-send-email-workflow`,
    args: [emailAddress],
    signal: queueSendEmailWorkflowSignal,
    signalArgs: [parentWorkflowId, subject, body],
  })
}

You can then replace the startChild code in the sendEmailBatchWorkflow parent workflow to call the Activity below:

  for (const emailAddress of emailAddresses) {
    await signalWithStartSendEmailWorkflow(workflowId, emailAddress, subject, body)
  }

You also need to update the sendEmailWorkflow function to handle the queueSendEmailWorkflowSignal.

As implied by the name, the queueSendEmailWorkflowSignal will tell the workflow to add a new email to a queue. The workflow then has an infinite loop — on each iteration of the loop, it will use the @temporalio/workflow condition API to wait for a new email to be pushed onto the queue. You could put additional logic here to prevent multiple emails from being sent within some amount of time.

In our example, we will just pull the first email off the queue and call our Activities to send the email and signal that the workflow is complete.

interface PendingEmail {
  parentWorkflowId: string
  subject: string
  body: string
}
export async function sendEmailWorkflow(
  emailAddress: string,
) {
  const pendingEmails: PendingEmail[] = []

  setHandler(queueSendEmailWorkflowSignal, (parentWorkflowId: string, subject: string, body: string) => {
    pendingEmails.push({ parentWorkflowId, subject, body })
  })

  while (true) {
    await condition(() => pendingEmails.length > 0)
    const pendingEmail = pendingEmails.shift()
    await sendEmail(emailAddress, pendingEmail.subject, pendingEmail.body)
    await sendCompleteSignal(pendingEmail.parentWorkflowId)
  }
}

The final change that needs to be made is to update the condition for when the sendEmailBatchWorkflow parent workflow ends. Since the child workflows are long-running, the parent cannot wait for all of the child workflows to complete.

Instead, you can move the logic for receiving the childCompleteSignal to its own workflow and wait for that to complete. The new workflow will also handle the status query.

Here is what the new workflow looks like:

export async function sendEmailBatchStatusReceiverWorkflow(
  total: number
): Promise<void[]> {
  let complete = 0
  setHandler(childCompleteSignal, () => {
    complete += 1
  })
  
  setHandler(statusQuery, () => {
    return `${complete} of ${total} complete`
  })

  await condition(() => complete === total)
}

And here is what the updated parent workflow looks like:

export async function sendEmailBatchWorkflow(
  workflowId: string,
  emailAddresses: string[],
  subject: string,
  body: string
): Promise<void[]> {
  const statusReceiverHandler = await startChild(parentWorkflowSignalReceiver, {
    workflowId: `${workflowId}-status-receiver`,
    args: [emailAddresses.length],
  })

  for (const emailAddress of emailAddresses) {
    await signalWithStartSendEmailWorkflow(workflowId, emailAddress, subject, body)
  }

  return await statusReceiverHandler.result()
}

You now have a parent workflow that’s correctly coordinating dedicated child workflows for each email recipient as well as working signals and queries for accurately tracking their status.

There is just one more performance limitation that we will address in the next section.

Avoiding Event History Limit

Temporal has a hard limit on the number of events in a single workflow’s Event History. Events are added to the Event History for each activity, signal, query, and anything else that affects a workflow’s execution.

As a result of the hard limit, you cannot just let your dedicated child workflows run forever; eventually, they will reach the hard limit and the workflow will be terminated.

To avoid hitting this limit, you can use the continueAsNew API to start a new workflow execution with a fresh event history. You just need to update the while (true) { ... } loop in your child workflow to loop for a set amount of iterations before calling continueAsNew. Within the loop, you also must make sure to drain the pendingEmails queue so that no emails are lost when you call continueAsNew.

Here is what the loop looks like after these changes:

for (let i = 0; i < MAX_CHILD_ITERATIONS; i++) {
  await condition(() => pendingEmails.length > 0)

  while (pendingEmails.length > 0) {
    const pendingEmail = pendingEmails.shift()
    await sendEmail(emailAddress, pendingEmail.subject, pendingEmail.body)
    await sendCompleteSignal(parentWorkflowId)
  }
}
  
await continueAsNew<typeof sendEmailWorkflow>(parentWorkflowId, emailAddress)

Final Workflow

After making the changes to use signalWithStart and continueAsNew, here is our final workflow code:

import {
  condition,
  proxyActivities,
  startChild,
  defineQuery,
  defineSignal,
  setHandler,
  continueAsNew,
} from "@temporalio/workflow"

import type * as activities from "./activities"

const MAX_CHILD_ITERATIONS = 2000

const { sendEmail, sendCompleteSignal, signalWithStartSendEmailWorkflow } =
  proxyActivities<typeof activities>({
    startToCloseTimeout: "1 minute",
  })

export const statusQuery = defineQuery<string>("status")
export const childCompleteSignal = defineSignal<[]>("childComplete")

export const queueSendEmailWorkflowSignal = defineSignal<
  [parentWorkflowId: string, subject: string, body: string]
>("queueSendEmailWorkflow")

export async function sendEmailBatchWorkflow(
  workflowId: string,
  emailAddresses: string[],
  subject: string,
  body: string
): Promise<void> {
  const statusReceiverHandler = await startChild(
    sendEmailBatchStatusReceiverWorkflow,
    {
      workflowId: `${workflowId}-status-receiver`,
      args: [emailAddresses.length],
    }
  )

  for (const emailAddress of emailAddresses) {
    await signalWithStartSendEmailWorkflow(
      workflowId,
      emailAddress,
      subject,
      body
    )
  }

  return await statusReceiverHandler.result()
}

export async function sendEmailBatchStatusReceiverWorkflow(
  total: number
): Promise<void> {
  let complete = 0
  setHandler(childCompleteSignal, () => {
    complete += 1
  })

  setHandler(statusQuery, () => {
    return `${complete} of ${total} complete`
  })

  await condition(() => complete === total)

  return
}

interface PendingEmail {
  parentWorkflowId: string
  subject: string
  body: string
}
export async function sendEmailWorkflow(
  emailAddress: string
) {
  const pendingEmails: PendingEmail[] = []

  setHandler(queueSendEmailWorkflowSignal, (parentWorkflowId: string, subject: string, body: string) => {
    pendingEmails.push({ parentWorkflowId, subject, body })
  })

  for (let i = 0; i < MAX_CHILD_ITERATIONS; i++) {
    await condition(() => pendingEmails.length > 0)

    while (pendingEmails.length > 0) {
      const pendingEmail = pendingEmails.shift()
      if (pendingEmail) {
        await sendEmail(emailAddress, pendingEmail.subject, pendingEmail.body)
        await sendCompleteSignal(pendingEmail.parentWorkflowId)
      }
    }
  }

  await continueAsNew<typeof sendEmailWorkflow>(emailAddress)
}

And here is the full Activity code:

import { WorkflowClient } from "@temporalio/client"
import {
  sendEmailWorkflow,
  queueSendEmailWorkflowSignal,
  childCompleteSignal,
} from "./workflows"

export async function signalWithStartSendEmailWorkflow(
  parentWorkflowId: string,
  emailAddress: string,
  subject: string,
  body: string
): Promise<void> {
  const client = new WorkflowClient()

  await client.signalWithStart(sendEmailWorkflow, {
    taskQueue: "task-queue",
    workflowId: `${emailAddress}-send-email-workflow`,
    args: [emailAddress],
    signal: queueSendEmailWorkflowSignal,
    signalArgs: [parentWorkflowId, subject, body],
  })
}

export async function sendEmail(
  emailAddress: string,
  subject: string,
  body: string
): Promise<void> {
  console.log({ emailAddress, subject, body })
}

export async function sendCompleteSignal(
  parentWorkflowId: string
): Promise<void> {
  const client = new WorkflowClient()
  const handle = client.getHandle(`${parentWorkflowId}-status-receiver`)
  return handle.signal(childCompleteSignal)
}

To try it out yourself, check out the example on GitHub.

Conclusion

Now you have everything you need to continue processing your bulk operations with Temporal. With the power of child workflows, queries, signals, signalWithStart, and continueAsNew, you can use Temporal to process thousands of operations without breaking a sweat!

Need More Guidance?

Bitovi has expert backend consultants ready to dive in and assist you with your project! Schedule a free consultation to get started. 

Create better web applications. We’ll help. Let’s work together.