Architecture Overview
Pipes is the core processing engine behind Meiro's customer data infrastructure. A single backend handles the full event lifecycle — collection, schema validation, identity resolution, routing, warehouse modeling, and real-time observability. Everything is API-driven, with a built-in JavaScript sandbox for custom transforms and delivery logic.
Data Processing Pipeline
Every event that enters Pipes passes through a seven-stage pipeline. Stages 1–3 are synchronous (in the request path). Stages 4–7 are asynchronous (background workers).
| Stage | Mechanism | Notes |
|---|---|---|
| 1 Collect | POST /collect/:sourceSlug | Webhook, Web SDK, or Scheduled Import |
| 2 Transform | Source's transform() function runs | Normalizes raw payload into typed events |
| 3 Validate | JSON Schema validation against Event Type schema | Rejects malformed payloads with field-level detail |
| 4 Deduplicate | Content-based deduplication on event type, time, and identifiers | Duplicate events are silently skipped |
| 5 Resolve | Background workers match identifiers to profiles | 184ms median resolution latency |
| 6 Route | Enabled Pipes filter and forward events | Pipe transform() → Event Destination send() |
| 7 Deliver | Delivery logged per Pipe and per Destination | Queryable via dashboard |
Sources
Sources are the entry points for event data. Each source has a slug, a JavaScript transform function, and bound secrets. The source's transform normalizes raw incoming payloads into arrays of structured events before they enter the processing pipeline.
Webhook Sources
Accepts HTTP POST at /collect/:sourceSlug. Authentication is handled upstream or via IP filtering at the infrastructure layer. The transform function receives the raw request body and headers and must return an array of event objects.
{ event_type: string, event_time: ISO8601 string, event_payload: object } Web SDK (mpt.js)
Served from your Pipes instance at /mpt.js with a 5-minute cache. The SDK manages four cookies: bp_device (2yr), bp_session (30min), bp_session_count (2yr), bp_first_visit (2yr).
Auto-tracked events (no config required): The SDK ships with over 40 built-in event types — including page views, sessions, clicks, form interactions, video engagement, search, file downloads, and more. All tracked automatically out of the box.
Custom event tracking is defined in SDK config served from /sdk-config/:sourceSlug, cached via ETags and localStorage. Config exposes: sdk.emit(), sdk.on.page(), sdk.on.click(), sdk.on.submit(), sdk.dom.text(), sdk.on.dataLayer().
Collect Endpoint Limits
| Limit | Value |
|---|---|
| Max request body | 1 MB |
| Max events per request | 10,000 |
| Max single event payload | 10 MB |
| Backpressure response | HTTP 429 with code: backpressure |
Scheduled Imports
Run a JavaScript fetch function on a cron schedule. The function can call external APIs, read from object storage, or query any HTTP endpoint, then return arrays of events. Imports can also be triggered manually on demand.
Event Types & Validation
Event Types define the accepted vocabulary for a source — names like purchase, page_view, or subscription_changed. Each has three components:
| Field | Behavior |
|---|---|
name | Stable, business-meaningful string. Used in routing, attributes, and analysis. Changing names breaks downstream references. |
jsonSchema | Optional JSON Schema validation. When enabled, every incoming payload is validated before storage. Invalid payloads are rejected with field-level error messages. Configured via the product UI. |
identifierRules | Array of JSONPath rules mapping payload fields to Identifier Types. E.g., $.email → email identifier type. Extraction happens after validation. |
Events are automatically deduplicated based on event type, time, and extracted identifiers. Duplicate events are silently skipped. No configuration required.
Identity Resolution
Identifier Types are the keys that connect events, warehouse rows, and profiles. Each type has three properties controlling merge behavior:
| Property | Description |
|---|---|
name | Unique string — e.g., email, user_id, device_id, cookie. Defined before sources and event types are created. |
maxIdentifiers | null = unlimited. When adding a new identifier would exceed this limit, the system creates an overflow profile instead of merging. Must be >= 1 if set. |
priority | Controls which profile wins during a contested merge. Higher number = higher priority. null treated as 0. Gap by 100 for easy insertion. |
Resolution Algorithm
maxIdentifiers. If yes → create overflow profile. If no → add to the matched profile.email: maxIdentifiers=2, priority=300 // stable, rare; >2 is usually pathological
device_id: maxIdentifiers=3, priority=200 // phone + laptop + tablet
cookie: maxIdentifiers=20, priority=100 // transient, many per real user Overflow profiles start empty — they do not inherit historical event attribution. This is intentional: preventing false merges on shared devices is more valuable than complete attribution for edge-case profiles.
Pipes & Event Destinations
A Pipe connects one Source to one Event Destination via an optional JavaScript transform function. The transform receives the event batch and returns a filtered or reshaped array. An Event Destination receives the transformed batch in a custom send() function.
Pipe Transform Function
// Pipe transform — filter to purchase events only
function transform(payload) {
return payload.events.filter(e => e.event_type === 'purchase');
} Event Destination send() Function
// Event Destination — POST to a webhook
async function send(payload) {
const url = env.secrets.WEBHOOK_URL;
for (const e of payload.events) {
const res = await fetch(url, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(e)
});
if (!res.ok) throw new Error(`Delivery failed: ${res.status}`);
}
} | Capability | Detail |
|---|---|
| Secret encryption | AES-256-GCM, bound to source / pipe / destination by ID. Access via env.secrets.KEY_NAME |
| JS sandbox packages | 48 npm packages including lodash, dayjs, crypto, uuid |
| Custom fetch | Permitted to any external HTTPS endpoint |
| Enable/disable | Every Pipe and Destination can be independently toggled without deleting config |
| Delivery history | Stored per Pipe, per Destination — queryable via dashboard |
Warehouse Modeling
Warehouse Models map tables in Snowflake, BigQuery, or Databricks to Identifier Types, making historical and batch data available to the same identity graph that processes real-time events.
Setup Sequence
- Create or confirm Identifier Types — identifier columns in the warehouse table must map to an existing type.
- Create the warehouse connection: provide host, credentials, catalog, and schema.
- Validate the connection before modeling.
- Create a Model: specify table name, schema, and
identifierRulesmapping column names to Identifier Types. - Validate by checking whether known test profiles resolve against DWH-backed attributes.
DWH attributes and DWH-backed audiences in Customer Studio draw directly from these models — no ETL, no export, no copy. Models are used by Customer Studio for DWH attributes, DWH audiences, and Reverse ETL activation.
Piper — AI Agent
Every Pipes instance ships with Piper, a context-aware AI agent with full read access to the instance configuration — sources, event types, transforms, destinations, pipes, secrets bindings, queue depths, error logs, and delivery history.
| Mode | Behavior |
|---|---|
| Autonomous monitoring | Runs heartbeat checks on schedule. Detects anomalies (volume drops, error rate spikes, queue backlog) and posts a diagnosis with recommended action to Slack, Teams, Discord, Google Chat, or GitHub. |
| Human-approval edits | Proposes configuration changes — create a source, update a transform, add an Event Destination. Every edit requires human review and approval before execution. Piper cannot apply changes unilaterally. |
| On-demand queries | Answers operational questions in natural language: "Why are no events routing to Destination X?" or "Which sources have not received traffic in 24 hours?" Responses cite specific queue states, delivery logs, and config objects. |
Piper is accessible via the in-product chat and external AI agents that install the CDI skill.
Operations & Observability
Pipes exposes four queues with inspectable depth and worker health: identity resolution, event delivery, profile refresh, and profile delivery. The dashboard provides aggregate counts of events received, stored, and routed.
Troubleshooting Sequence
Follow this order before debugging downstream integrations:
- Is the source receiving traffic? Check dashboard event volume.
- Is the payload matching the correct Event Type? Check identifier extraction on representative events.
- Is the Pipe enabled? Is the Event Destination enabled?
- Is the transform logic dropping events unexpectedly? Test the transform function directly.
- What does delivery history show for the affected Pipe and Destination?
- Is there queue backpressure? Check
/api/health/queues.
Version History & Soft Delete
Every entity in Pipes — sources, event types, pipes, destinations, models, identifier types — has version history and soft delete. Accidentally deleted or changed configuration can be restored or reverted without data loss.
CLI
Pipes ships with a CLI that mirrors the full product API — making it fully CI/CD-compatible. A quick taste:
mpcli status
mpcli api GET /api/health/queues Platform Specifications
| Specification | Value | Notes |
|---|---|---|
| Identity resolution latency | 184ms median | Background workers, async to ingestion |
| Collect endpoint body limit | 1 MB per request | HTTP 429 backpressure when queue is full |
| Max events per request | 10,000 | Across all event types |
| Max single event payload | 10 MB | After transform, before validation |
| JS sandbox packages | 48 npm packages | Includes lodash, dayjs, crypto, uuid |
| Secret encryption | AES-256-GCM | Bound to source / pipe / destination by ID |
| Deduplication | Content-based | On event type, time, and identifiers — automatic |
| Warehouse support | Snowflake · BigQuery · Databricks | Zero-copy DWH-native, no ETL |
| Deployment targets | Self-hosted K8s · Any cloud/region · Meiro SaaS | Zero data egress in self-hosted and any-cloud |
| CLI | Full API surface | CI/CD-compatible |
| AI agent | Piper (ships with every instance) | Slack · Teams · Discord · GitHub |
Recommended Setup Order
- Define Identifier Types first — before any source, event type, or model.
- Create the Source and define Event Types with JSON Schema validation.
- Test with representative payloads before routing or activating.
- Add Pipes and Event Destinations only after ingestion is stable.
- Connect warehouse and create Models if historical data is needed.
- Enable Piper for autonomous operational monitoring.