Skip to content

Pipeline & Runtime

Every message flows through an ordered sequence of stages. Each stage is optional — the engine skips any stage that has no handler configured.

Source
→ Preprocessor
→ Validator
→ Source Filter
→ Transformer
→ Destination Filter
→ Destination Transformer
→ Send
→ Response Transformer
→ Postprocessor
StagePurpose
SourceReceives raw bytes from the listener (TCP, HTTP, Kafka, file, etc.) and wraps them in an IntuMessage
PreprocessorRuns before any business logic — decode character sets, decompress, strip wrappers
ValidatorChecks structural and semantic validity (e.g. HL7v2 schema conformance, FHIR profile validation)
Source FilterDecides whether to accept or drop the message based on source-side criteria
TransformerCore business logic — map fields, translate codes, reshape payloads
Destination FilterPer-destination routing — include or exclude the message for each configured destination
Destination TransformerPer-destination payload adjustments (e.g. different FHIR profiles for different receivers)
SendDelivers the message to the destination connector
Response TransformerProcesses the destination’s response before returning it upstream
PostprocessorRuns after everything else — audit logging, metrics, cleanup

When a stage throws an error or returns a failure status, the engine:

  1. Records the error on the message (status, error message, stack trace).
  2. Skips remaining stages — the message does not continue through the pipeline.
  3. Triggers the retry policy if one is configured on the channel.
  4. Returns an error response to the source connector (e.g. an HL7v2 AE ACK or an HTTP 500).

Common error status codes:

CodeMeaning
VALIDATION_FAILEDThe validator rejected the message
TRANSFORM_ERRORThe transformer threw an exception
FILTER_REJECTEDA filter explicitly dropped the message
DESTINATION_ERRORThe destination connector could not deliver
TIMEOUTA stage exceeded its deadline

Failed deliveries can be retried automatically. Configure the policy in channel.yaml:

retry:
max_attempts: 5
backoff: exponential # constant | linear | exponential
initial_delay_ms: 500
max_delay_ms: 30000
jitter: true
KeyTypeDefaultDescription
max_attemptsinteger3Total delivery attempts (including the first)
backoffstringconstantDelay strategy between attempts
initial_delay_msinteger1000Delay before the first retry
max_delay_msinteger60000Upper bound on delay (for linear/exponential)
jitterbooleanfalseAdd random jitter to prevent thundering-herd

Backoff strategies:

  • constant — wait initial_delay_ms between every attempt
  • linear — increase by initial_delay_ms each attempt (500, 1000, 1500 …)
  • exponential — double the delay each attempt (500, 1000, 2000, 4000 …), capped at max_delay_ms

When all retry attempts are exhausted, the message is routed to the dead-letter queue (DLQ). The DLQ stores the original message, the last error, and all attempt metadata so operators can inspect and replay failures.

retry:
max_attempts: 5
backoff: exponential
initial_delay_ms: 1000
max_delay_ms: 30000
dlq:
destination: dlq_store

When runtime.hot_reload is true, the engine uses fsnotify to watch for file changes:

  • channel.yaml — channel configuration changes trigger a graceful restart of only the affected channel.
  • .ts files — TypeScript transformer, validator, and filter files are recompiled automatically and hot-swapped.
  • intu.yaml — root config changes require a full engine restart.

Only the affected channel is restarted. Other channels continue processing without interruption.

Some sources deliver multiple messages in a single payload. intu can split these into individual messages using splitters:

listener:
type: tcp
port: 6661
content_type: hl7v2
splitter: hl7_batch
SplitterInputBehaviour
hl7_batchHL7v2 batch (BHS/BTS wrapped)Extracts each MSH segment group
fhir_bundleFHIR Bundle resourceExtracts each Bundle.entry.resource
newlineNewline-delimited textSplits on \n
xml_rootXML with repeated child elementsSplits on the first-level child element

Each sub-message is wrapped in its own IntuMessage and processed independently through the full pipeline. Failures in one sub-message do not affect others.

intu provides four scoped key-value maps accessible from TypeScript code:

MapScopeLifetimeUse Case
globalMapAll channelsEngine lifetimeShared lookup tables, cross-channel state
channelMapSingle channelChannel lifetimeChannel-scoped counters, caches
connectorMapSingle connectorConnector lifetimeConnection-specific state (sequence numbers)
responseMapSingle messageRequest → response cyclePass data from the transformer to the response transformer
export default function transform(msg: IntuMessage, ctx: IntuContext) {
const counter = (ctx.channelMap.get("seq") as number ?? 0) + 1;
ctx.channelMap.set("seq", counter);
const lookup = ctx.globalMap.get("facilityNames") as Map<string, string>;
const name = lookup?.get(msg.body.MSH[4]) ?? "UNKNOWN";
ctx.responseMap.set("processedAt", new Date().toISOString());
return msg;
}

intu natively parses and generates the following healthcare and general data formats:

TypeContent TypeDescription
HL7v2hl7v2ADT, ORM, ORU, MDM, and all standard message types
FHIR R4fhir+json, fhir+xmlResources, Bundles, Parameters
X12x12837, 835, 270/271, 276/277 transaction sets
CDA/CCDAcda+xmlClinical Document Architecture
DICOMdicomMetadata extraction and routing (no pixel data)
JSONjsonGeneric JSON payloads
XMLxmlGeneric XML documents
CSVcsvDelimited text with configurable separators

Shared TypeScript utilities can be placed in code_templates/ directories at the project root or inside a channel folder:

my-project/
├── code_templates/ # project-wide shared code
│ ├── hl7-helpers.ts
│ └── fhir-mappers.ts
└── channels/
└── adt-to-fhir/
├── channel.yaml
├── transform.ts
└── code_templates/ # channel-specific shared code
└── adt-utils.ts

Import shared code in your transformers using relative paths:

import { buildPatientResource } from "../code_templates/fhir-mappers";

Channel-level code_templates/ are resolved first, then project-level — allowing channels to override shared utilities.

Storage mode is configurable globally in intu.yaml and can be overridden per channel:

# Global default
storage:
driver: postgres
mode: status
# Per-channel override in channel.yaml
storage:
mode: full
ModeWhat is StoredTypical Use
noneNothingHigh-throughput fire-and-forget pipelines
statusMessage ID, status, timestamps, errorsMonitoring and alerting without payload overhead
fullComplete IntuMessage including bodyAudit trails, debugging, message replay