Warning

Audit-event data is available now via GET /v1/audit?format=csv. Automated push to S3/Snowflake is an Enterprise-tier add-on shipping in Phase 4 — until then, customers can run the documented pg_dump + COPY INTO recipe themselves on a CSV export.

Why Snowflake

The audit_event table is append-only and partitioned by month — a perfect fit for Snowflake's billing model. Rotor generates structured JSON in the metadata column (JSONB in Postgres, VARIANT in Snowflake), which Snowflake queries natively with metadata:key dot-notation.

Key advantages of routing audit data through Snowflake:

  • Cost-efficient long-term storage. Monthly parquet files are ~10× smaller than row-based exports. A 10M-row workspace accumulates ~200 MB parquet/month vs 2 GB raw CSV.
  • Clustered tables. Snowflake's automatic clustering on occurred_at keeps query costs predictable for time-range scans regardless of table size.
  • VARIANT for metadata. No schema migration is needed as Rotor adds fields to the metadata payload — Snowflake's semi-structured storage handles schema evolution transparently.

Pipeline Overview

Postgres (audit_event, partitioned by month)
   │
   │  pg_dump --format=custom → parquet conversion (or direct CSV → S3 → Snowflake)
   ▼
S3 bucket (customer-managed or Rotor-managed)
   s3://bucket/rotor-audit/workspace=<ws_id>/year=YYYY/month=MM/
   │
   │  Snowflake COPY INTO (scheduled daily task, X-SMALL warehouse)
   ▼
Snowflake table: rotor_audit_events

Data residency note: Enterprise customers may require that Rotor writes parquet files to a customer-owned S3 bucket, and that no data transits Rotor-managed S3. See the Customer-Managed Bucket Variant section below.

Schema

The audit_event Postgres table maps to Snowflake as follows:

Postgres columnPostgres typeSnowflake typeNotes
idUUIDVARCHAR(36)Hyphen-separated UUID string
workspace_idTEXTVARCHAR(64)Foreign key; use for row-level security
event_typeTEXTVARCHAR(128)e.g. webhook.created, job.completed
actor_idTEXTVARCHAR(64)API key ID or system actor
resource_idTEXTVARCHAR(256)Affected resource ID (nullable)
metadataJSONBVARIANTEvent-specific payload; query with :
occurred_atTIMESTAMPTZTIMESTAMP_TZ(9)Cluster key; used for COPY INTO partition
ip_addressTEXTVARCHAR(45)IPv4 or IPv6 (nullable)

S3 Stage Setup

Create a Snowflake external stage pointing to your S3 bucket. Replace <ACCOUNT_ID>, <BUCKET>, and <ROLE_ARN> with your values:

-- Create storage integration (run once as ACCOUNTADMIN)
CREATE OR REPLACE STORAGE INTEGRATION rotor_audit_s3
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'S3'
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = '<ROLE_ARN>'
  STORAGE_ALLOWED_LOCATIONS = ('s3://<BUCKET>/rotor-audit/');
 
-- Retrieve the Snowflake AWS account ID and external ID for IAM trust policy
DESC INTEGRATION rotor_audit_s3;
 
-- Create the stage
CREATE OR REPLACE STAGE rotor_audit
  STORAGE_INTEGRATION = rotor_audit_s3
  URL = 's3://<BUCKET>/rotor-audit/'
  FILE_FORMAT = (TYPE = 'PARQUET');

IAM Role (trust policy)

Attach this trust policy to the IAM role you pass as STORAGE_AWS_ROLE_ARN. Replace <SNOWFLAKE_ACCOUNT_ID> and <SNOWFLAKE_EXTERNAL_ID> with the values from DESC INTEGRATION:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::<SNOWFLAKE_ACCOUNT_ID>:root"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "<SNOWFLAKE_EXTERNAL_ID>"
        }
      }
    }
  ]
}

IAM permissions policy

The role needs minimal S3 access on the audit prefix:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": ["s3:GetObject", "s3:ListBucket"],
      "Resource": [
        "arn:aws:s3:::<BUCKET>",
        "arn:aws:s3:::<BUCKET>/rotor-audit/*"
      ]
    }
  ]
}

COPY INTO Command

Run this after each daily parquet drop to load the previous day's partition:

COPY INTO rotor_audit_events
FROM @rotor_audit/workspace=<YOUR_WS_ID>/year=2026/month=04/
FILE_FORMAT = (TYPE = 'PARQUET')
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE
ON_ERROR = 'CONTINUE';

For a full historical backfill, omit the date-prefix to copy all partitions:

COPY INTO rotor_audit_events
FROM @rotor_audit/workspace=<YOUR_WS_ID>/
FILE_FORMAT = (TYPE = 'PARQUET')
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE
ON_ERROR = 'CONTINUE';

Scheduling (Daily Snowflake Task)

Schedule the COPY INTO to run automatically each night. This keeps warehouse size at X-SMALL since parquet files for a typical 10M-row workspace are ~20 MB/day:

CREATE OR REPLACE TASK load_rotor_audit_daily
  WAREHOUSE = 'COMPUTE_WH'
  SCHEDULE = 'USING CRON 0 3 * * * UTC'   -- 03:00 UTC daily
AS
  COPY INTO rotor_audit_events
  FROM @rotor_audit/workspace=<YOUR_WS_ID>/
  FILE_FORMAT = (TYPE = 'PARQUET')
  MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE
  ON_ERROR = 'CONTINUE';
 
ALTER TASK load_rotor_audit_daily RESUME;

Cost estimate

Workspace sizeParquet/dayWarehouse timeApprox cost/month
1M events/month~2 MB~10s X-SMALLunder $1
10M events/month~20 MB~30s X-SMALL~$2
100M events/month~200 MB~3 min X-SMALL~$15

Prices based on Snowflake on-demand at $3.00/credit; X-SMALL = 1 credit/hour.

Customer-Managed S3 Bucket Variant

Enterprise customers with data-residency requirements can configure Rotor to write parquet files directly to a customer-owned S3 bucket. In this mode:

  1. Rotor assumes a customer-provided IAM role via sts:AssumeRole (cross-account).
  2. The customer S3 bucket receives parquet files at the standard Hive-style prefix: s3://customer-bucket/rotor-audit/workspace=<ws_id>/year=YYYY/month=MM/day=DD/.
  3. Rotor never reads the data back — write-only access via s3:PutObject.
  4. The customer runs their own Snowflake COPY INTO on their own schedule.

To enable, contact [email protected] with your AWS account ID and the ARN of the role you want Rotor to assume.

Verification

After each COPY INTO, verify the row count matches the Postgres source:

-- Snowflake: count for a specific day
SELECT COUNT(*) FROM rotor_audit_events
WHERE occurred_at::DATE = '2026-04-13'
  AND workspace_id = '<YOUR_WS_ID>';
-- Postgres: equivalent count (run against your audit replica)
SELECT COUNT(*) FROM audit_event
WHERE occurred_at::DATE = '2026-04-13'
  AND workspace_id = '<YOUR_WS_ID>';

Row counts should match. A discrepancy of ≤0.01% is acceptable (COPY INTO ON_ERROR=CONTINUE skips malformed rows; these are logged in COPY_HISTORY).

-- Check for skipped rows
SELECT * FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY(
  TABLE_NAME => 'ROTOR_AUDIT_EVENTS',
  START_TIME => DATEADD('hour', -25, CURRENT_TIMESTAMP())
))
WHERE STATUS = 'PARTIALLY_LOADED';

Reference: Snowflake audit log to S3 pipeline pattern — see Snowflake docs: Loading from S3.