Technical Reference

Meiro Pipes — Architecture

Architecture, configuration objects, data processing pipeline, identity resolution algorithm, and operational tooling. Intended for senior engineers and architects evaluating or implementing Pipes.

AudienceSenior engineers · Architects
VersionTechnical Reference v2

Architecture Overview

Pipes is the core processing engine behind Meiro's customer data infrastructure. A single backend handles the full event lifecycle — collection, schema validation, identity resolution, routing, warehouse modeling, and real-time observability. Everything is API-driven, with a built-in JavaScript sandbox for custom transforms and delivery logic.

Data Processing Pipeline

Every event that enters Pipes passes through a seven-stage pipeline. Stages 1–3 are synchronous (in the request path). Stages 4–7 are asynchronous (background workers).

Stage Mechanism Notes
1 Collect POST /collect/:sourceSlug Webhook, Web SDK, or Scheduled Import
2 Transform Source's transform() function runs Normalizes raw payload into typed events
3 Validate JSON Schema validation against Event Type schema Rejects malformed payloads with field-level detail
4 Deduplicate Content-based deduplication on event type, time, and identifiers Duplicate events are silently skipped
5 Resolve Background workers match identifiers to profiles 184ms median resolution latency
6 Route Enabled Pipes filter and forward events Pipe transform() → Event Destination send()
7 Deliver Delivery logged per Pipe and per Destination Queryable via dashboard

Sources

Sources are the entry points for event data. Each source has a slug, a JavaScript transform function, and bound secrets. The source's transform normalizes raw incoming payloads into arrays of structured events before they enter the processing pipeline.

Webhook Sources

Accepts HTTP POST at /collect/:sourceSlug. Authentication is handled upstream or via IP filtering at the infrastructure layer. The transform function receives the raw request body and headers and must return an array of event objects.

Required event shape from transform()
{ event_type: string, event_time: ISO8601 string, event_payload: object }

Web SDK (mpt.js)

Served from your Pipes instance at /mpt.js with a 5-minute cache. The SDK manages four cookies: bp_device (2yr), bp_session (30min), bp_session_count (2yr), bp_first_visit (2yr).

Auto-tracked events (no config required): The SDK ships with over 40 built-in event types — including page views, sessions, clicks, form interactions, video engagement, search, file downloads, and more. All tracked automatically out of the box.

Custom event tracking is defined in SDK config served from /sdk-config/:sourceSlug, cached via ETags and localStorage. Config exposes: sdk.emit(), sdk.on.page(), sdk.on.click(), sdk.on.submit(), sdk.dom.text(), sdk.on.dataLayer().

Collect Endpoint Limits

LimitValue
Max request body1 MB
Max events per request10,000
Max single event payload10 MB
Backpressure responseHTTP 429 with code: backpressure

Scheduled Imports

Run a JavaScript fetch function on a cron schedule. The function can call external APIs, read from object storage, or query any HTTP endpoint, then return arrays of events. Imports can also be triggered manually on demand.

Event Types & Validation

Event Types define the accepted vocabulary for a source — names like purchase, page_view, or subscription_changed. Each has three components:

FieldBehavior
name Stable, business-meaningful string. Used in routing, attributes, and analysis. Changing names breaks downstream references.
jsonSchema Optional JSON Schema validation. When enabled, every incoming payload is validated before storage. Invalid payloads are rejected with field-level error messages. Configured via the product UI.
identifierRules Array of JSONPath rules mapping payload fields to Identifier Types. E.g., $.email → email identifier type. Extraction happens after validation.
Event deduplication

Events are automatically deduplicated based on event type, time, and extracted identifiers. Duplicate events are silently skipped. No configuration required.

Identity Resolution

Identifier Types are the keys that connect events, warehouse rows, and profiles. Each type has three properties controlling merge behavior:

PropertyDescription
name Unique string — e.g., email, user_id, device_id, cookie. Defined before sources and event types are created.
maxIdentifiers null = unlimited. When adding a new identifier would exceed this limit, the system creates an overflow profile instead of merging. Must be >= 1 if set.
priority Controls which profile wins during a contested merge. Higher number = higher priority. null treated as 0. Gap by 100 for easy insertion.

Resolution Algorithm

No match
Create a new profile, link all identifiers from the event.
Single profile matched
For each new identifier, check if adding it would exceed maxIdentifiers. If yes → create overflow profile. If no → add to the matched profile.
Multiple profiles matched
Compute the union of all identifiers. If no type exceeds its limit → merge all into one. If any limit would be violated → pick the winner (profile matched via highest-priority identifier type) and only add identifiers within limits; leave other profiles untouched.
Recommended baseline config
email:      maxIdentifiers=2,  priority=300  // stable, rare; >2 is usually pathological
device_id:  maxIdentifiers=3,  priority=200  // phone + laptop + tablet
cookie:     maxIdentifiers=20, priority=100  // transient, many per real user
Overflow profiles

Overflow profiles start empty — they do not inherit historical event attribution. This is intentional: preventing false merges on shared devices is more valuable than complete attribution for edge-case profiles.

Pipes & Event Destinations

A Pipe connects one Source to one Event Destination via an optional JavaScript transform function. The transform receives the event batch and returns a filtered or reshaped array. An Event Destination receives the transformed batch in a custom send() function.

Pipe Transform Function

// Pipe transform — filter to purchase events only
function transform(payload) {
  return payload.events.filter(e => e.event_type === 'purchase');
}

Event Destination send() Function

// Event Destination — POST to a webhook
async function send(payload) {
  const url = env.secrets.WEBHOOK_URL;
  for (const e of payload.events) {
    const res = await fetch(url, {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify(e)
    });
    if (!res.ok) throw new Error(`Delivery failed: ${res.status}`);
  }
}
CapabilityDetail
Secret encryptionAES-256-GCM, bound to source / pipe / destination by ID. Access via env.secrets.KEY_NAME
JS sandbox packages48 npm packages including lodash, dayjs, crypto, uuid
Custom fetchPermitted to any external HTTPS endpoint
Enable/disableEvery Pipe and Destination can be independently toggled without deleting config
Delivery historyStored per Pipe, per Destination — queryable via dashboard

Warehouse Modeling

Warehouse Models map tables in Snowflake, BigQuery, or Databricks to Identifier Types, making historical and batch data available to the same identity graph that processes real-time events.

Setup Sequence

  1. Create or confirm Identifier Types — identifier columns in the warehouse table must map to an existing type.
  2. Create the warehouse connection: provide host, credentials, catalog, and schema.
  3. Validate the connection before modeling.
  4. Create a Model: specify table name, schema, and identifierRules mapping column names to Identifier Types.
  5. Validate by checking whether known test profiles resolve against DWH-backed attributes.
Zero-copy architecture

DWH attributes and DWH-backed audiences in Customer Studio draw directly from these models — no ETL, no export, no copy. Models are used by Customer Studio for DWH attributes, DWH audiences, and Reverse ETL activation.

Piper — AI Agent

Every Pipes instance ships with Piper, a context-aware AI agent with full read access to the instance configuration — sources, event types, transforms, destinations, pipes, secrets bindings, queue depths, error logs, and delivery history.

ModeBehavior
Autonomous monitoring Runs heartbeat checks on schedule. Detects anomalies (volume drops, error rate spikes, queue backlog) and posts a diagnosis with recommended action to Slack, Teams, Discord, Google Chat, or GitHub.
Human-approval edits Proposes configuration changes — create a source, update a transform, add an Event Destination. Every edit requires human review and approval before execution. Piper cannot apply changes unilaterally.
On-demand queries Answers operational questions in natural language: "Why are no events routing to Destination X?" or "Which sources have not received traffic in 24 hours?" Responses cite specific queue states, delivery logs, and config objects.

Piper is accessible via the in-product chat and external AI agents that install the CDI skill.

Operations & Observability

Pipes exposes four queues with inspectable depth and worker health: identity resolution, event delivery, profile refresh, and profile delivery. The dashboard provides aggregate counts of events received, stored, and routed.

Troubleshooting Sequence

Follow this order before debugging downstream integrations:

  1. Is the source receiving traffic? Check dashboard event volume.
  2. Is the payload matching the correct Event Type? Check identifier extraction on representative events.
  3. Is the Pipe enabled? Is the Event Destination enabled?
  4. Is the transform logic dropping events unexpectedly? Test the transform function directly.
  5. What does delivery history show for the affected Pipe and Destination?
  6. Is there queue backpressure? Check /api/health/queues.

Version History & Soft Delete

Every entity in Pipes — sources, event types, pipes, destinations, models, identifier types — has version history and soft delete. Accidentally deleted or changed configuration can be restored or reverted without data loss.

CLI

Pipes ships with a CLI that mirrors the full product API — making it fully CI/CD-compatible. A quick taste:

mpcli status
mpcli api GET /api/health/queues

Platform Specifications

SpecificationValueNotes
Identity resolution latency184ms medianBackground workers, async to ingestion
Collect endpoint body limit1 MB per requestHTTP 429 backpressure when queue is full
Max events per request10,000Across all event types
Max single event payload10 MBAfter transform, before validation
JS sandbox packages48 npm packagesIncludes lodash, dayjs, crypto, uuid
Secret encryptionAES-256-GCMBound to source / pipe / destination by ID
DeduplicationContent-basedOn event type, time, and identifiers — automatic
Warehouse supportSnowflake · BigQuery · DatabricksZero-copy DWH-native, no ETL
Deployment targetsSelf-hosted K8s · Any cloud/region · Meiro SaaSZero data egress in self-hosted and any-cloud
CLIFull API surfaceCI/CD-compatible
AI agentPiper (ships with every instance)Slack · Teams · Discord · GitHub

Recommended Setup Order

  1. Define Identifier Types first — before any source, event type, or model.
  2. Create the Source and define Event Types with JSON Schema validation.
  3. Test with representative payloads before routing or activating.
  4. Add Pipes and Event Destinations only after ingestion is stable.
  5. Connect warehouse and create Models if historical data is needed.
  6. Enable Piper for autonomous operational monitoring.
Explore Architecture →