Durable Workflows Quickstart

Rotor durable workflows are multi-step TypeScript functions that execute reliably across retries, sleeps, and external event waits — without occupying a long-lived worker process.

How it works

Each workflow step is durably checkpointed to Postgres. If a retry occurs, the function body re-executes from the top, but already-completed steps return their cached results immediately — no double-execution of side effects.

Installation

npm install @rotorsh/sdk

Define a function

import { createFunction } from '@rotorsh/sdk';
 
export const welcomeSequence = createFunction(
  {
    id: 'welcome-sequence',
    trigger: { event: 'user.created' },
    retries: 3,
  },
  async ({ event, step }) => {
    // step.run() executes exactly once per run — safe to put side effects here
    const profile = await step.run('fetch-profile', async () => {
      const res = await fetch(`/api/users/${event.data.userId}`);
      return res.json();
    });
 
    // Pause for 24 hours without occupying a worker slot
    await step.sleep('initial-wait', '24h');
 
    // Send a personalized welcome email
    await step.run('send-welcome', async () => {
      await sendEmail({
        to: profile.email,
        subject: `Welcome, ${profile.name}!`,
        body: `Thanks for signing up...`,
      });
    });
 
    return { sent: true };
  },
);

Register with serve()

import { Hono } from 'hono';
import { serveWorkflow } from '@rotorsh/sdk';
import { welcomeSequence } from './workflows/welcome-sequence';
 
const app = new Hono();
 
serveWorkflow(app, {
  functions: [welcomeSequence],
  signingKey: process.env.ROTOR_SIGNING_KEY!, // set in Rotor dashboard
});
 
export default app;

Register the function with Rotor

curl -X POST https://api.rotor.sh/v1/functions \
  -H "Authorization: Bearer $ROTOR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "id": "welcome-sequence",
    "serve_url": "https://your-app.example.com/api/rotor",
    "trigger_type": "event",
    "trigger_event": "user.created"
  }'

Trigger the function

import { Rotor } from '@rotorsh/sdk';
 
const rotor = new Rotor({ apiKey: process.env.ROTOR_API_KEY! });
 
// Publish an event — Rotor fans out to all matching functions
await rotor.send('user.created', { userId: '123', email: '[email protected]' });

Step primitives

PrimitiveDescription
step.run(id, fn)Executes fn exactly once per run; cached on retry
step.sleep(id, duration)Suspends for a duration — '30m', '2d', '1h'
step.sleepUntil(id, timestamp)Suspends until a specific timestamp
step.waitForEvent(id, opts)Suspends until a matching event arrives via rotor.send()

Phase 10 primitives

Three new step primitives extend the base set with sub-workflow composition, event fan-out, and human-in-the-loop capabilities:

step.invoke — Spawn a child workflow and wait for its result. timeout is required (TypeScript build fails without it). Returns { result, runId }. Throws WorkflowFailedError, WorkflowTimeoutError, or WorkflowCancelledError on child failure. Memoized on parent retry — child is not re-spawned.

step.sendEvent — Fire-and-forget event emission from inside a step. Parent step completes immediately. Optional ts parameter defers emission via BullMQ delay queue. Memoized on retry — no double-emit.

step.waitForSignal — Named pause/resume. Suspends the run until POST /v1/signals/:signal_id/complete is called. The signal ID is workspace-unique — use approval-${runId} to avoid collisions. Throws SignalTimeoutError if timeout elapses before the POST arrives.

Full-primitive workflow example

This example uses all six step primitives in one workflow — mirrors the backward-compatibility test suite Case E:

import { createFunction } from '@rotorsh/sdk';
import {
  WorkflowFailedError,
  WorkflowTimeoutError,
  SignalTimeoutError,
} from '@rotor/sdk/errors';
 
export const fullPrimitivesWorkflow = createFunction(
  {
    id: 'full-primitives-workflow',
    trigger: { event: 'campaign.requested' },
    retries: 2,
  },
  async ({ event, step, runId }) => {
    // 1. step.run — fetch contact profile (exactly once, cached on retry)
    const profile = await step.run('fetch-profile', async () => {
      const res = await fetch(`/api/contacts/${event.data.contactId}`);
      return res.json() as Promise<{ email: string; name: string }>;
    });
 
    // 2. step.sleep — wait 1 hour before enriching (no worker slot occupied)
    await step.sleep('initial-delay', '1h');
 
    // 3. step.invoke — delegate to enrichment specialist workflow
    let enriched: { score: number; company: string };
    try {
      const { result } = await step.invoke<typeof enriched>('enrich-contact', {
        workflow: { id: 'contact-enricher' },
        data: { contactId: event.data.contactId },
        timeout: '5m',   // REQUIRED — TypeScript build fails without this
      });
      enriched = result;
    } catch (err) {
      if (err instanceof WorkflowFailedError || err instanceof WorkflowTimeoutError) {
        enriched = { score: 0, company: 'Unknown' };
      } else {
        throw err;
      }
    }
 
    // 4. step.sendEvent — fan-out to analytics pipeline (fire and forget)
    await step.sendEvent('notify-analytics', {
      name: 'analytics/contact.scored',
      data: { contactId: event.data.contactId, score: enriched.score },
    });
 
    // 5. step.waitForSignal — require human approval before sending
    let approved = false;
    try {
      const approval = await step.waitForSignal<{ approved: boolean }>(
        'approval-gate',
        {
          signal: `campaign-approval-${runId}`,
          timeout: '48h',
        }
      );
      approved = approval.approved;
    } catch (err) {
      if (err instanceof SignalTimeoutError) {
        // Auto-reject after 48h
        return { status: 'auto-rejected', contactId: event.data.contactId };
      }
      throw err;
    }
 
    if (!approved) {
      return { status: 'rejected', contactId: event.data.contactId };
    }
 
    // 6. step.waitForEvent — wait for the contact to open the email (max 7 days)
    const opened = await step.waitForEvent<{ openedAt: string }>('wait-for-open', {
      event: 'email.opened',
      match: `data.contactId == "${event.data.contactId}"`,
      timeout: '7d',
    });
 
    return {
      status: 'complete',
      contactId: event.data.contactId,
      score: enriched.score,
      emailOpened: opened !== null,
    };
  }
);

Fan-out with Promise.all

const [resultA, resultB] = await Promise.all([
  step.run('enrich-email', () => enrichEmail(contact.email)),
  step.run('enrich-phone', () => enrichPhone(contact.phone)),
]);

Both steps run in parallel. The workflow resumes when both complete.

Cancellation

// Cancel a run via event
await rotor.send('rotor/cancel.run', { runId: 'run-abc-123' });

In-flight steps complete; subsequent steps are skipped. Run status moves to cancelled.

View step graph

Open /dashboard/runs/:id to see a real-time step DAG with status colors and output previews.

Starter templates

TemplateDescription
multi-step-outboundOutbound email sequence with waitForEvent reply detection
event-driven-enrichmentParallel contact enrichment on campaign.started
scheduled-with-approvalWeekly cron report requiring Slack/terminal approval
npx rotor template add multi-step-outbound