Workflow-движок, который создаёт иллюзию единого процесса: ты пишешь обычный последовательный код, а движок прячет за ним распределённую систему — шаги выполняются на разных машинах, переживают crash’и, ждут событий, ретраятся. У нас — основной оркестратор pipeline’ов recognize / process / interpret анализов крови, plus целевой substrate для inter-service-коммуникации / очередей / fallback’ов вместо ad-hoc Python-сервисов.
Почему именно Inngest (а не Temporal / Vercel WDK / job queue) — service-coordination-engine: мы — data pipeline (saga не нужна → Temporal оверкилл), TS-стек, минимальная инфра, лучший flow control, connect() для K8s. Эта страница — про сам Inngest и как мы им пользуемся.
Как мы используем (коротко)
- Recognize pipeline — оркестратор гоняет шаги извлечения данных из загруженного документа: crosscheck (сверка нескольких прогонов VLM), normalize (нормализация имён/единиц параметров), LOINC (маппинг в коды), ranges (reference-диапазоны), save-fhir (запись Observation/DiagnosticReport в Healthcare API), webhook (нотификация клиенту). Каждый шаг — со своими retries и под function-level concurrency. Триггер — событие
input/recognize.uploaded(или...legacy.uploadedдля старого PDF-native движка). - Process / interpret pipeline — отдельный flow для интерпретации (после recognize): параметр-анализ, enrichment-шаги (overview / follow-up / trends), сборка FHIR Composition/CarePlan, генерация PDF. У patient-scoped (V2.5) и test-scoped (legacy b2b) треков — разные Composition (см. patient-summary-composition-naming).
- Provisioning — на событие
org/createdсоздаёт для новой организации Healthcare API dataset + Organization-ресурс (AI-author) + GCS-префикс. - Scheduled functions — verification poll для CF Custom Hostnames (Phase 2), audit cleanup, evaluation-judge cron (
0 * * * *, см. CLAUDE.md § Evaluation Cron). - Что НЕ Inngest: model fallback / cascade между провайдерами живёт на LLM-прокси (Bifrost), не как Inngest-шаги — см. llm-proxy-choice. Тот же паттерн «инфраструктурный слой вместо самодельного»: Inngest для шагов pipeline, Bifrost для model-routing.
Базовая модель
inngest.createFunction(
{ id: "process-order" },
{ event: "order/created" },
async ({ event, step }) => {
const validated = await step.run("validate", () => validate(event.data))
const charged = await step.run("charge", () => charge(validated))
const receipt = await step.run("receipt", () => sendReceipt(charged))
return receipt
}
)Выглядит как обычная функция. Но каждый step.run() — это checkpoint: при crash между шагами 2 и 3 Inngest вызовет функцию заново, подставит результаты шагов 1–2 из state store (не выполняя их повторно), и выполнит только шаг 3 и дальше. Код вне step.run() (в теле функции напрямую) выполняется при каждом таком re-invoke — поэтому в нём не должно быть side-эффектов.
Function vs Step — два уровня
Их легко спутать.
Function — durable workflow, привязанный к триггеру (event или cron). Создаётся inngest.createFunction({ id, name, ... }, { event } | { cron }, async ({ event, step }) => {...}). Это единица работы целиком: один logical job. Конфиг живёт на функции: concurrency (сколько runs параллельно, опц. keyed), retries (по умолчанию 4), idempotency / debounce / rateLimit / throttle (дедуп и троттлинг входных событий), batchEvents, cancelOn, priority. Функцию можно вызвать извне (inngest.send(event)), из другой функции (step.invoke), запланировать (cron).
Step — step.run("id", fn) — это memoized retry-checkpoint ВНУТРИ функции. Каждый step.run атомарен: упала функция после шага N → при retry шаги 0..N-1 проигрываются из закэшированных результатов, повторно бежит только упавший шаг и дальше. Шаги — то, чем длинный workflow делается durable и режется на наблюдаемые куски (каждый виден в Inngest UI отдельно). id шага уникален внутри функции и стабилен — это ключ мемоизации. Ретраи у шага — это ретраи функции (function-level retries), не отдельный per-step счётчик; throw new NonRetriableError(...) чтобы не ретраить, RetryAfterError для backoff-хинта.
Прочие step-примитивы: step.sleep("id", "1h") / step.sleepUntil("id", date) (durable пауза), step.waitForEvent("id", { event, timeout, match }) (ждать события — HITL, например doctor review с timeout: "7d"), step.sendEvent("id", { name, data }) (эмитить события durable изнутри функции), step.invoke("id", { function, data }) (вызвать другую функцию и дождаться её return).
Когда отдельная функция, когда шаг: отдельная функция — когда у неё другой триггер, или нужен другой concurrency/retries/idempotency, или хочется чтобы она была независимо вызываемой / ретраимой / наблюдаемой. Шаг — когда это стадия одного logical workflow, которая при retry должна проиграться, а не перезапустить весь workflow с нуля.
Три инструмента: step.run vs step.invoke vs step.sendEvent
step.run() | step.invoke() | step.sendEvent() | |
|---|---|---|---|
| Что | inline-код в той же функции | вызвать другую Inngest-функцию | эмитить событие |
| Результат доступен | да | да | нет |
| Retry / concurrency | от parent-функции | свои (у вызванной функции) | свои (у обработчика) |
| При failure | parent ретраится | NonRetriableError → parent падает | функция-обработчик ретраится |
| Где код | тот же файл | отдельный файл | отдельный файл |
| Лимит | 1000 шагов на функцию | считается как 1 шаг parent | без лимита |
step.run() — простые операции (DB query, API call, трансформация), не нужна своя конфигурация. step.invoke() — переиспользуемые шаги или когда нужна своя config (например concurrency: { key: "event.data.testId", limit: 1 }); важно: вызванная функция, упав после всех своих retries, отдаёт parent’у NonRetriableError — это защита от compounding retries (invoke 3× × parent 3× = 9 вызовов, а не 3). step.sendEvent() — fan-out, decoupling, независимые обработчики; внутри функций всегда step.sendEvent (добавляет tracing/context текущего run’а), inngest.send — только из кода вне функций (API routes).
Orchestration vs Choreography — ключевое архитектурное решение
Orchestration (step.invoke) Choreography (step.sendEvent)
=========================== =============================
Orchestrator FnA --evt--> FnB --evt--> FnC
├─ invoke "recognize" → result слушает слушает слушает
├─ invoke "normalize" → result "uploaded" "recognized" "normalized"
├─ invoke "analyze" → result
└─ return final процесс размазан по N файлам,
вся логика в одном файле, никто не знает всей цепочки,
queryable state, failure = fan-out (1 событие → N функций),
весь оркестратор retry failure = одна функция, без лимита шагов
Orchestration — один оркестратор последовательно вызывает функции, получает результаты, управляет потоком. Видна вся логика, есть queryable state («где заказ / анализ?»), один владелец процесса. Минус: failure = ретрай всего оркестратора; добавить шаг = менять оркестратор; нет fan-out (один invoke = одна функция); лимит 1000 шагов.
Choreography — каждая функция слушает событие, делает работу, эмитит следующее. Слабая связность, fan-out (одно событие → N независимых функций, разные команды владеют), failure локализован одной функцией, без лимита шагов. Минус: control flow неочевиден, отлаживать тяжело, состояние размазано, результат недоступен вызывающему.
Что говорят: Temporal рекомендует orchestration — choreography «can make control flow unclear», «challenging to debug». Maxim Fateev (создатель Temporal): «Events and queues are not the right abstraction to build such systems. They’re very good runtime abstractions… but as a design choice, they actually create very brutal systems because everything is connected to everything and there are no clear APIs.» Inngest нейтральнее: step.invoke для coordinated, interdependent functions; step.sendEvent для fan-out и decoupling. Обычно лучший выбор — гибрид: оркестратор для основного последовательного pipeline, choreography для fan-out независимых реакций (у нас — enrichment-шаги после параметр-анализа), step.waitForEvent для HITL (doctor review).
У нас сделано и так, и так: фаза recognize — чистая orchestration (recognize.orchestrator.ts — step.invoke-цепочка image-crosscheck → fhir-context → normalize/LOINC → ranges → save-fhir-preliminary → save-fhir-enrich → webhook); всё ниже по потоку (interpret → enrichments → save-FHIR-AI → doctor-review → PDF) — choreography (функции слушают события enrichment/*.requested, fhir/ai-resources.save, doctor/review.pending, …). Граница «orchestration → choreography» проходит по линии recognize|interpret — сложилась, не выбрана. Что делать с этим — отдельный вопрос: inngest-pipeline-orchestration-vs-choreography (status: draft; ⊥ no-self-rolled-queues, biomarker-actuality-integration § Inngest-декомпозиция).
Connect vs Serve — две модели подключения
serve() — Push (HTTP) | connect() — Pull (WebSocket) | |
|---|---|---|
| Кто инициирует | Inngest вызывает твой HTTP endpoint | твой app открывает исходящий WS к Inngest |
| Публичный URL | нужен | не нужен (за NAT/firewall/private network) |
| Latency | HTTP roundtrip + возможный cold start; каждый шаг = отдельный HTTP-вызов (N шагов = N+1) | persistent connection — минимальная |
| Таймауты | ограничены платформой (Vercel 60s, Lambda 15m) | нет |
| Среда | serverless (Vercel, Lambda) | long-running сервисы (K8s, ECS, свой сервер) |
С connect() Inngest по execution-модели ближе к Temporal (pull через persistent connection), сохраняя простоту API. Наши воркеры (analysis-worker, fhir-services) — long-running в K8s, так что вероятно connect() — TBD verify.
Передача данных между шагами — через store, не через step output
Где живёт state между шагами workflow’а — отдельная дилемма. Inngest накладывает лимиты на размеры: step output — 4 MB, function run state (все step results + event data + metadata) — 32 MB, event payload — 256 KB (free) / 3 MB (paid). Большие данные (PDF, распознанные параметры, FHIR-ресурсы) физически нельзя гонять через step output. Паттерн — внешний store как state-store: шаг пишет в store, следующий читает по ID; через Inngest идут только ID + metadata.
Save FHIR step → returns { observationIds: ["o1","o2",...] }
AI Analysis step → читает Observation'ы из FHIR → обновляет note[] + interpretation
У нас этот store — FHIR (Healthcare API), не Prisma BloodTest.rawRecognitionResult JSON. Причина не только в лимитах: медицинские данные пациентов не должны оседать в логах / трейсах оркестратора (step state виден в Inngest UI и его storage) — в FHIR-store они под нашим контролем доступа (см. phi-in-fhir-not-sql), а через step output фактически бы дублировались в инфраструктуре оркестратора. Это и обоснование cleanup’а rawRecognitionResult после миграции на FHIR-as-source-of-truth.
Multi-tenant паттерн (наш)
organizationId пробрасывается через ВСЕ events в payload:
inngest.send({ name: "input/recognize.uploaded", data: { testId, organizationId, ...rest } });В каждом step: const { fhirClient, prisma } = await resolveTenantContext({ testId, organizationId }) (apps/analysis-worker/.../utils/tenant-context.ts — кэшированный tenant-aware FhirClient + tenant-scoped Prisma). Per-tenant concurrency через function-level limit: concurrency: { key: "event.data.organizationId", limit: 5 } — один tenant с burst-нагрузкой не блокирует остальных (noisy-neighbor protection). См. multi-tenant-fhir-storage для defense-in-depth контекста.
Лимиты
| Что | Лимит |
|---|---|
| Step output | 4 MB |
| Function run state | 32 MB (все step results + event data + metadata) |
| Event payload | 256 KB (free) / 3 MB (paid) |
| Events per request | 5000 |
| Event batch | 10 MB |
| Steps per function | 1000 |
| Step timeout | 2 часа |
sleep / waitForEvent timeout | 7 дней (free) / 1 год (paid) |
1000 шагов — ловушка при step.run() в цикле: 500 параметров × 2 шага = 1000. Решение: обрабатывать цикл внутри одного step.run(), либо fan-out через sendEvent. При serve() — добавляются таймауты платформы хостинга (Vercel 10–900s, Lambda 15min, GCF 10–60min); при connect() этих ограничений нет.
Deployment и pricing
Cloud (managed) — ничего не деплоишь, регистрируешь функции через serve() / connect(). Self-hosted — single binary (inngest start, включает Event API / Runner / Executor / Queue / State / Dashboard), SQLite по умолчанию (dev, single node) / PostgreSQL для production (+ опц. Redis для queue/state), Docker / Compose / Kubernetes Helm chart (с KEDA autoscaling); статус beta, нет явной доки про feature parity с cloud, dashboard включён.
Pricing (Cloud): Hobby (free) — 50K runs/мес, 5 concurrent steps, 3 workers; Pro — от 50 / 1M runs (1–5M) … 25 за 25. Один вызов функции = один run; внутри run’а любое число step.run — это один run; step.invoke создаёт ещё один run (оркестратор с 5 invoke = 6 runs).
У нас: в Cloud — платный тариф (какой именно — TBD verify); локально — self-hosted (поднимается через Tilt). Потенциально можем перейти на self-hosted и в проде, но это не зафиксировано.
Заменяет ad-hoc queue-логику в Python-сервисах
Старая normalization-service (Python) и loinc-harmonization-service (Python) каждая изобрела свой механизм: Async API + Redis queue (BZPOPMIN) + worker pod, job polling, hand-coded retry/fallback — дублирование того, что Inngest даёт из коробки. После TS-порта (см. loinc-unification-direction) pipeline-шаги становятся Inngest-функциями — queue/retry/fallback автоматически. Любая самодельная in-app очередь / ручной worker-pool / рекурсивная concurrency / retry-loop с бэкоффом — анти-паттерн; rationale, граница и зафиксированный direction — на no-self-rolled-queues. Текущий пример из кода: validity-классификатор на feat/v2-5 (classifyBatchWithSplit — next++ worker-pool + рекурсивная бисекция батчей), при порте переразложить на Inngest-шаги (см. validity-classifier, biomarker-actuality-integration).
Roadmap и недавние фичи
- Checkpointing (Dec 2025, developer preview) — near-zero inter-step latency: SDK оркестрирует шаги локально, checkpoint’ы пишутся в Inngest асинхронно; при failure — fallback на стандартную HTTP-модель.
checkpointing: true, требуетinngest@3.51.0+, ~−50% latency. Решает главную проблему HTTP-модели (latency между шагами) — даёт скорость Temporal при durability Inngest. - Connect (Jun 2025) — WebSocket pull-модель (см. выше).
- Realtime (May 2025) — стриминг обновлений из функций напрямую юзерам.
- step.fetch() (May 2025) — HTTP-запросы выполняются на стороне Inngest (экономия compute).
- AgentKit (2025) — фреймворк для AI-агентов (single/multi-agent, MCP tools, OpenAI/Anthropic/Gemini).
- Metrics Export (Jun 2025) — экспорт в Datadog / Prometheus / Grafana.
- Singleton Functions — предотвращение дублирующихся concurrent runs.
- Запланировано: per-step options (override retry/concurrency на уровне отдельного шага — сейчас для этого нужен
step.invokeс отдельной функцией), self-hosting improvements (запуск отдельных сервисов вместо single binary, HA guide), alerts (сейчас нет из коробки), multi-region data, lookbackwaitForEvent.
Открытые вопросы
- Orchestration vs choreography — как должно быть → вынесено в inngest-pipeline-orchestration-vs-choreography (status: draft): recognize-хребет — orchestration, всё downstream — choreography, граница сложилась исторически; решить, расширять orchestration вниз / кодифицировать критерий границы / уходить в choreography. ⊥ no-self-rolled-queues, biomarker-actuality-integration § Inngest-декомпозиция (батч =
step.run? fan-out черезsendEvent? дочерняя функция черезstep.invoke?). - Connect vs serve — какую модель подключения реально используют наши воркеры (analysis-worker, fhir-services) — TBD verify; вероятно
connect()(K8s, long-running). - Inngest Cloud vs self-hosted — сейчас Cloud (платный тариф, точный — TBD), локально self-hosted; можно ли / стоит ли перейти на self-hosted в проде — не зафиксировано (self-hosted beta, нет явной feature-parity доки, нужен HA-setup).
Связано
- google-healthcare-api — FHIR backend, которым Inngest-шаги оперируют (state-store между шагами)
- multi-tenant-fhir-storage — defense-in-depth + tenant context propagation
- fhir-bundle — transactional save в FHIR
- phi-in-fhir-not-sql — почему большие/медицинские данные между шагами — через FHIR-store, не step output
- ai-enrichment-separate-step — отдельный update-step после AI-анализа
- patient-summary-composition-naming — два interpret-трека (patient-scoped V2.5 / test-scoped legacy) → разные Composition
- normalization-service / loinc-harmonization-service — legacy Python-сервисы; queue-логика будет заменена Inngest
- loinc-unification-direction — TS-port direction; pipeline-шаги → Inngest-функции
- service-coordination-engine — почему Inngest (а не Temporal / Vercel WDK / job queue): мы data pipeline, TS-стек, минимальная инфра, flow control, connect()
- no-self-rolled-queues — оркестрация / очереди / concurrency / retry — через Inngest, не самодельные in-app механизмы (rationale + цитата direction’а)
- inngest-pipeline-orchestration-vs-choreography — как именно компоновать наш пайплайн на Inngest: один большой оркестратор vs event-choreography vs гибрид с явной границей (status: draft)
- llm-proxy-choice — Bifrost для model-routing/fallback ⟷ Inngest для шагов pipeline: один паттерн (инфраструктурный слой вместо самодельного)
- validity-classifier / biomarker-actuality-integration — пример кода с самодельным worker-pool + конкретный кейс Inngest-декомпозиции
- mastra — Mastra-агенты бегут как шаги внутри Inngest-функций
Источники
Сноски
-
Inngest docs, accessed 2026-05-17, https://www.inngest.com/docs — · Steps · How functions are executed · step.invoke · Sending events from functions · Connect (WebSocket) · Checkpointing / Improve Performance · Usage limits · Self-hosting · Pricing · Roadmap. ↩
-
[Temporal: Orchestrate or Choreograph Your Saga](, accessed 2026-05-17, https://temporal.io/blog/to-choreograph-or-orchestrate-your-saga-that-is-the-question — · SE Radio 596: Maxim Fateev on Durable Execution. ↩
-
Сессия
ildar/871a7608, 2026-02-13 — multi-tenant flow design. ↩