Skip to main content
fi-fhir docs

Basic Structure

Workflow Configuration

Workflows route semantic events to destinations based on filters, transforms, and actions. This guide covers the complete workflow DSL.

Basic Structure

workflow:
  name: my_workflow
  version: '1.0'

  routes:
    - name: route_name
      filter:
        # Which events to match
      transform:
        # How to modify events
      actions:
        # Where to send events

Routes

Routes are processed in order. An event can match multiple routes.

routes:
  - name: critical_alerts
    filter:
      event_type: lab_result
      condition: event.interpretation == "critical"
    actions:
      - type: webhook
        url: https://alerts.example.com/critical

  - name: all_labs_to_fhir
    filter:
      event_type: lab_result
    actions:
      - type: fhir
        endpoint: https://fhir.example.com/r4

Filters

Event Type Filter

Match by semantic event type:

filter:
  event_type: patient_admit              # Single type

filter:
  event_type: [patient_admit, patient_discharge]  # Multiple types

Source Filter

Match by source system:

filter:
  source: epic_adt                       # Single source

filter:
  source: [epic_adt, cerner_adt]         # Multiple sources

CEL Expressions

Complex conditions using Common Expression Language (CEL):

filter:
  condition: event.patient.age >= 65

filter:
  condition: event.encounter.class == "inpatient"

filter:
  condition: event.observation.interpretation in ["critical", "HH", "LL"]

filter:
  condition: |
    event.patient.age >= 65 &&
    event.encounter.class == "inpatient"

Combined Filters

All filter conditions must match (AND logic):

filter:
  event_type: patient_admit
  source: epic_adt
  condition: event.encounter.class == "inpatient"

CEL Expression Reference

ExpressionDescription
event.typeEvent type string
event.patient.mrnPatient MRN
event.patient.ageCalculated age
event.encounter.classEncounter classification
event.observation.valueObservation value
has(event.patient.ssn)Check field exists
size(event.patient.identifiers)Collection size

Transforms

Transforms modify events before sending to actions.

set_field

Set or update a field:

transform:
  - set_field: patient.status = "active"
  - set_field: processed_at = now()
  - set_field: meta.custom_field = "value"

map_terminology

Map local codes to standard terminology:

transform:
  - map_terminology: patient.race # Use configured mapping
  - map_terminology:
      field: observation.code
      source: LOCAL
      target: LOINC

redact

Remove sensitive data:

transform:
  - redact: patient.ssn
  - redact: patient.address
  - redact:
      fields: [patient.ssn, patient.phone]
      replacement: '[REDACTED]'

copy_field

Copy value to another field:

transform:
  - copy_field:
      source: patient.mrn
      target: meta.patient_id

delete_field

Remove a field:

transform:
  - delete_field: patient.internal_id

explain_warnings

Add LLM-powered explanations to parse warnings:

transform:
  - explain_warnings:
      model: qwen3-8b-fast # Optional: model override
      include_fix: true # Include fix suggestions

Actions

Actions send events to destinations.

FHIR Action

Send to a FHIR R4 server:

actions:
  - type: fhir
    endpoint: https://fhir.example.com/r4
    resource: Patient # Resource type to create

    # Authentication (optional)
    auth:
      type: oauth2
      tokenUrl: https://auth.example.com/token
      clientId: ${CLIENT_ID}
      clientSecret: ${CLIENT_SECRET}
      scopes: [system/Patient.write]

    # Options
    validate_fhir: true # Validate before sending
    batch: true # Use batch endpoint

Webhook Action

HTTP POST to any endpoint:

actions:
  - type: webhook
    url: https://api.example.com/events
    method: POST # POST, PUT, PATCH

    headers:
      Authorization: Bearer ${API_KEY}
      Content-Type: application/json

    # Reliability
    retry:
      maxAttempts: 3
      backoffMultiplier: 2
      initialDelay: 1s

Database Action

Write to a relational database:

actions:
  - type: database
    driver: postgres # postgres, mysql, sqlite
    dsn: ${DATABASE_URL}

    operation: upsert # insert, upsert, update
    table: healthcare_events

    # Field mapping
    fields:
      id: '{{.Meta.ID}}'
      event_type: '{{.Meta.Type}}'
      patient_mrn: '{{.Patient.MRN}}'
      encounter_id: '{{.Encounter.Identifier}}'
      payload: '{{. | json}}'
      created_at: '{{now}}'

    # Upsert conflict handling
    conflictColumns: [id]

Queue Action

Publish to a message queue:

actions:
  - type: queue
    driver: kafka # kafka, rabbitmq, nats, sqs
    brokers: ${KAFKA_BROKERS}
    topic: healthcare-events

    # Message key (for partitioning)
    key: '{{.Patient.MRN}}'

    # Headers
    headers:
      event_type: '{{.Meta.Type}}'
      source: '{{.Meta.Source}}'

Email Action

Send email notifications:

actions:
  - type: email
    smtp:
      host: smtp.example.com
      port: 587
      username: ${SMTP_USER}
      password: ${SMTP_PASS}

    from: [email protected]
    to: [[email protected]]

    subject: 'Critical Lab Result: {{.Patient.Name.Family}}'
    body: |
      Patient: {{.Patient.Name.Given}} {{.Patient.Name.Family}}
      MRN: {{.Patient.MRN}}
      Test: {{.Observation.Code.Display}}
      Value: {{.Observation.Value}}

File Action

Write to disk:

actions:
  - type: file
    path: /data/events/{{.Meta.Type}}/{{.Meta.ID}}.json
    format: json # json, yaml, csv

    # Atomic writes (write to temp, then rename)
    atomic: true

    # Permissions
    mode: 0644

Log Action

Write to logs:

actions:
  - type: log
    level: info # debug, info, warn, error
    message: 'Processed: {{.Meta.Type}} for {{.Patient.MRN}}'

    # Include full event
    include_event: false

Event Store Action

Write to event sourcing store:

actions:
  - type: eventstore
    stream: 'patient-{{.Patient.MRN}}'

    # Metadata to include
    metadata:
      source: '{{.Meta.Source}}'
      correlation_id: '{{.Meta.ID}}'

Exec Action

Run external command (with allowlist):

actions:
  - type: exec
    command: /usr/local/bin/notify-script
    args:
      - '{{.Meta.Type}}'
      - '{{.Patient.MRN}}'

    timeout: 30s

    # Must be in allowlist
    allowlist:
      - /usr/local/bin/notify-script
      - /usr/local/bin/audit-script

LLM Extract Action

Extract clinical entities from document text using LLM:

actions:
  - type: llm_extract
    config:
      model: qwen3-14b-quality # Model to use
      document_type: progress_note # Hint: progress_note, discharge_summary, consult_note
      min_confidence: 0.7 # Minimum confidence threshold
      text_field: document.content # Field containing clinical text

Extracted entities are added to the event under extracted_entities:

  • Conditions (SNOMED CT, ICD-10)
  • Medications (RxNorm)
  • Vital Signs (LOINC)
  • Allergies, Procedures

LLM Quality Check Action

Analyze data quality and optionally fail the route:

actions:
  - type: llm_quality_check
    config:
      model: qwen3-8b-fast
      fail_below: 0.5 # Fail route if score below threshold

Quality dimensions: completeness, accuracy, consistency, conformance, timeliness.

Results are added to the event under quality_score.

Template Functions

Templates use Go text/template with additional functions:

FunctionDescriptionExample
nowCurrent timestamp{{now}}
jsonJSON encode{{. | json}}
upperUppercase{{.Patient.MRN | upper}}
lowerLowercase{{.Meta.Type | lower}}
replaceString replace{{.Value | replace "old" "new"}}
defaultDefault value{{.Field | default "N/A"}}

Reliability Features

Retry Configuration

actions:
  - type: webhook
    url: https://api.example.com
    retry:
      maxAttempts: 5
      initialDelay: 1s
      maxDelay: 30s
      backoffMultiplier: 2

Circuit Breaker

actions:
  - type: fhir
    endpoint: https://fhir.example.com
    circuit_breaker:
      threshold: 5 # Failures before opening
      timeout: 60s # Time before retry

Dead Letter Queue

workflow:
  name: with_dlq
  dlq:
    enabled: true
    type: file
    path: /data/dlq/

    # Or send to queue
    # type: queue
    # driver: kafka
    # topic: dlq-events

Rate Limiting

workflow:
  name: rate_limited
  rate_limit:
    requests_per_second: 100
    burst: 50

Environment Variables

Reference environment variables with ${VAR}:

actions:
  - type: fhir
    endpoint: ${FHIR_ENDPOINT}
    auth:
      clientId: ${FHIR_CLIENT_ID}
      clientSecret: ${FHIR_CLIENT_SECRET}

Complete Example

workflow:
  name: hospital_integration
  version: '2.0'

  rate_limit:
    requests_per_second: 100

  dlq:
    enabled: true
    type: file
    path: /data/dlq/

  routes:
    # Critical lab results - immediate alert
    - name: critical_labs
      filter:
        event_type: lab_result
        condition: event.observation.interpretation in ["critical", "HH", "LL"]
      transform:
        - set_field: priority = "CRITICAL"
      actions:
        - type: webhook
          url: ${ALERT_WEBHOOK_URL}
          retry:
            maxAttempts: 5
        - type: email
          from: [email protected]
          to: [[email protected]]
          subject: 'CRITICAL: Lab Result for {{.Patient.Name.Family}}'

    # All patient events to FHIR
    - name: patients_to_fhir
      filter:
        event_type: [patient_admit, patient_discharge, patient_update]
      transform:
        - redact: patient.ssn
      actions:
        - type: fhir
          endpoint: ${FHIR_ENDPOINT}
          auth:
            type: oauth2
            tokenUrl: ${FHIR_TOKEN_URL}
            clientId: ${FHIR_CLIENT_ID}
            clientSecret: ${FHIR_CLIENT_SECRET}
          circuit_breaker:
            threshold: 5
            timeout: 60s

    # All events to data warehouse
    - name: data_warehouse
      filter: {} # Match all
      transform:
        - redact: [patient.ssn, patient.address]
      actions:
        - type: database
          driver: postgres
          dsn: ${DW_DATABASE_URL}
          operation: insert
          table: raw_events
          fields:
            id: '{{.Meta.ID}}'
            type: '{{.Meta.Type}}'
            payload: '{{. | json}}'
            created_at: '{{now}}'

CLI Commands

Validate Workflow

fi-fhir workflow validate workflow.yaml

Run Workflow

# From stdin
cat events.json | fi-fhir workflow run --config workflow.yaml

# From file
fi-fhir workflow run --config workflow.yaml events.json

# Dry-run (no side effects)
fi-fhir workflow run --dry-run --config workflow.yaml events.json

Test with Simulation

fi-fhir workflow simulate --config workflow.yaml --events test_events.json

See Also