Playbook

The lead-to-cash event stream.

Every CRM event mirrored to a queryable database. One unified timeline per account. Debug churn. Forecast better. Audit everything.

10 min read

The pain

Why your data is fragmented

A customer churns. You want to understand why. The signals are scattered: Marketo has the campaign attribution. HubSpot has the deal stage history. Stripe has the payment history. Zendesk has the support tickets. Intercom has the in-app messages. Looker has the product usage. Six tools. One missing answer.

The four ways naive data integration breaks:

  • ETL daily batches. By the time data lands in the warehouse, the question that needed answering is 24 hours old.
  • Per-tool reporting. Each tool has its own dashboard. Joining them requires SQL nobody writes.
  • No event ordering. Events from different sources land at different times. Timeline reconstruction requires guessing.
  • Schema drift. Marketo adds a field. Your ETL doesn't know about it. Three weeks later, the field is silently missing from your warehouse.

The architecture

What we're building

One workflow per source, all writing to a single normalized account_events table. Every event has a consistent shape. Querying the timeline for one account means one SQL query against one table.

  • Per-source webhook handlers. Each source triggers its own workflow.
  • Normalize to one schema. {accountId, eventType, source, payload, occurredAt, ingestedAt}.
  • Idempotent on (source, sourceEventId). Replays are safe.
  • Schema flexibility. Payload is a JSONB column so new fields don't break the pipeline.
workflow({
  id: "ingest-stripe-event",
  trigger: { event: "stripe.event.received" },

  steps: async ({ event, step }) => {
    const stripeEvent = event.data;

    const accountId = await step.run("resolve-account", () =>
      mapStripeCustomerToAccount(stripeEvent.data.object.customer)
    );

    if (!accountId) {
      // Unknown customer — log to quarantine and alert
      await step.run("quarantine", () =>
        db.unmappedEvents.insert({ source: "stripe", payload: stripeEvent })
      );
      return { quarantined: true };
    }

    await step.run("write-event", () =>
      db.accountEvents.insert({
        accountId,
        eventType: stripeEvent.type,           // "charge.succeeded" etc
        source: "stripe",
        sourceEventId: stripeEvent.id,
        payload: stripeEvent,
        occurredAt: new Date(stripeEvent.created * 1000),
        ingestedAt: new Date(),
      }, {
        idempotencyKey: `stripe-${stripeEvent.id}`,
      })
    );
  },
});

The schema

One table, all sources

CREATE TABLE account_events (
  id BIGSERIAL PRIMARY KEY,
  account_id UUID NOT NULL,
  event_type TEXT NOT NULL,
  source TEXT NOT NULL,                      -- 'stripe', 'hubspot', 'intercom', etc.
  source_event_id TEXT NOT NULL,             -- the source's unique ID for this event
  payload JSONB NOT NULL,
  occurred_at TIMESTAMPTZ NOT NULL,          -- when it happened in source
  ingested_at TIMESTAMPTZ NOT NULL,          -- when Rotor wrote it
  UNIQUE (source, source_event_id)
);

CREATE INDEX ON account_events (account_id, occurred_at DESC);
CREATE INDEX ON account_events (event_type, occurred_at DESC);
CREATE INDEX ON account_events USING GIN (payload);

The unique constraint on (source, source_event_id) is the idempotency key in the database. Webhook replays from any source are no-ops at the SQL layer.

Querying the timeline

The whole point

Once events from all sources land in one table, the churn-debug query becomes a single SQL.

-- All events for an account in the last 90 days
SELECT
  occurred_at,
  source,
  event_type,
  payload->>'subject' AS subject
FROM account_events
WHERE account_id = $1
  AND occurred_at > NOW() - INTERVAL '90 days'
ORDER BY occurred_at;

-- Result for an account that churned:
-- 2026-01-15  hubspot   contact.created
-- 2026-01-22  marketo   email.opened
-- 2026-02-01  hubspot   deal.stage.changed (closed_won)
-- 2026-02-03  stripe    invoice.paid
-- 2026-02-15  intercom  conversation.created    "billing question"
-- 2026-02-15  zendesk   ticket.created          "charged twice"
-- 2026-02-16  zendesk   ticket.replied
-- 2026-02-17  intercom  conversation.replied
-- 2026-04-20  stripe    invoice.payment_failed
-- 2026-04-25  stripe    subscription.deleted

The story tells itself: a billing dispute on Feb 15 wasn't resolved cleanly, customer churned in April. With this table, you find that pattern in seconds. Without it, you spend a half-day across six tools.

Edge cases

What goes wrong, and how to handle it

Account mapping fails. A Stripe customer with no HubSpot match. Quarantine table catches it. Daily cron alerts on the quarantine count. Human resolves the mapping. Replay the quarantined event.

Source schema drift. JSONB payload column means new fields land automatically. Add a daily cron that diffs field shapes against what your dashboards expect.

Webhook delivery delayed. Sort by occurred_at, not ingested_at. Late deliveries land in the right place in the timeline.

Bulk historical backfill. Trigger one workflow run per source-event-page. Each writes a batch idempotently. Restart-safe.

The math

What this costs on Rotor

A typical SaaS receives ~50,000 cross-tool events per month (Stripe, HubSpot, Intercom, Zendesk, Marketo combined). Each = 2 step-runs (resolve account, write). 100,000 step-runs/month.

That fits Rotor Pro ($99/mo). Compared to:

  • Fivetran or Airbyte: $300-1,500/mo, batch-only, no real-time. Schema-managed but high latency.
  • Census reverse-ETL: similar pricing, similar latency.
  • Building it on a worker: free in compute, plus the time you spend on idempotency, ordering, schema drift handling, and quarantine for unknown accounts. A month of work, then ongoing maintenance.

The compounding value isn't the ingest cost. It's the queryable timeline. Every churn debug, every customer success deep dive, every forecast revision starts with this table.

Fork this playbook on Rotor.

$9 to start. 30-day money back. Hard caps protect you from runaway bills.

Start shipping