Давно зафиксированное направление, формулируется явно). У нас один orchestrator — inngest. Изобретать в коде свою очередь задач, свой worker-pool с ручной concurrency, свой retry-loop с бэкоффом, свой «фан-аут руками» — переизобретение того, что Inngest даёт by-design, плюс свои failure modes, плюс невидимость в observability. Существующий код с такими механизмами — миграционные цели, не образец.
Контекст
Паттерн «каждый сервис изобретает свою очередь» уже проявлялся дважды:
- Старые Python-сервисы (normalization-service, loinc-harmonization-service) — каждый сделал свой Async-API + Redis-queue + worker-pod + job-polling + руками-кодированные retry/fallback; был инцидент с задачами, терявшимися несколько дней (детали — в Источниках). Direction зафиксирован: TS-порт переводит pipeline-шаги в Inngest functions (см. loinc-unification-direction).
- Validity-классификатор на ветке Артура (
feat/v2-5):classifyBatchWithSplit/classifyPatient— это in-app job queue: режет наблюдения на батчи, гоняет до N параллельно через ручнойnext++worker-pool, на терминальный фейл батча рекурсивно бисекирует батч (Promise.allлевой/правой половины), на soft-фейл — свой retry с подсказкой, потом синтезирует fallback-строки. См. validity-classifier. При порте в наш стек это надо переразложить на Inngest-шаги — конкретные варианты в biomarker-actuality-integration.
Как ручное переписать на Inngest нативно
| Руками в коде | Нативно в Inngest |
|---|---|
next++ / семафор-loop / pool воркеров в процессе | Promise.all([step.run(...), ...]) — параллельные шаги в одной функции; или step.invoke ×N — параллельные дочерние функции |
Свой retry-loop: for (attempt) { try ... catch { sleep(backoff) } } | function-level retries: N + step-replay (упавший шаг ретраится, предыдущие проигрываются из кэша); NonRetriableError / RetryAfterError для тонкой настройки |
| Рекурсивная бисекция батча руками + ручной merge результатов | дробление на шаги; «ядовитый» элемент изолируется как 1-элементный шаг с отдельным ретраем (или fallback-шаг) |
| Свой in-memory job-buffer / Redis-queue | events + functions; платформа держит durability, ordering, retries |
| Ручная concurrency-throttle «один tenant не задушит остальных» | function-level concurrency: { key, limit } |
| Ручной дедуп «двойного клика» / повторных событий | function-level idempotency: "event.data.<key>" / debounce / rateLimit |
| Прогресс/статус run руками в БД-колонках | Inngest run status (UI / API); Postgres-handle только если действительно нужен внешним читателям |
Граница: речь про оркестрацию (что когда параллельно, что ретраить, что дедупить, durability между шагами). Обычный async-код внутри одного шага (Promise.all нескольких независимых I/O в пределах step.run, нормальная обработка ошибок там) — не очередь. Различие step vs function и какой fan-out когда — в inngest § Step vs Function.
Следствия
- Новый pipeline-код, который «надо запустить N единиц работы с ретраями» — пишется как Inngest function(s) + step(s), не как in-app pool.
- Существующие самодельные очереди (Python normalization/LOINC сервисы; validity
classifyBatchWithSplit) — миграционные цели; не копировать их форму при порте. - Порт validity-классификатора в
feat/v2-5-on-stagingобязан переразложитьclassifyBatchWithSplitна Inngest-шаги — это часть biomarker-actuality-integration, не «оставить как есть». - Status-handle прогресса — сначала смотреть на Inngest run status; Postgres-колонки/таблица только если внешние читатели реально не могут поллить Inngest.
- По аналогии: model-fallback / cascade на уровне моделей — это то, что делает LLM-прокси bifrost «за код» (как Inngest для оркестрации шагов) — кандидат на вынос туда, а не ручной retry-loop в
callOnce.
Открытые вопросы
- Inngest cloud vs self-hosted / платный аккаунт. У нас платный аккаунт Inngest cloud. Там свои компромиссы — число вызовов / скорость / как дробить работу на шаги. Не зафиксированы. См. inngest.
- Локальный запуск. Локально через Inngest может быть неудобно — особенно для экспериментов. Была надежда, что Mastra под капотом использует Inngest (тогда один путь и локально, и в проде) — вернуться к этому вопросу.
- Inngest step-overhead. У дробления на много шагов есть цена (latency, лимит на число шагов в function run). Плюсы (durability, observability, retries by-design) vs минусы (небольшое замедление) — общий вопрос «как Inngest работает с этим»; стоит расписать на inngest.
- Batch sizing — отдельная тема. Сколько сущностей идёт в одну LLM-генерацию, от чего считать размер батча (число элементов / их размер / токены на входе), какая тут математика — это общий вопрос нашего AI-инжиниринга (не только Inngest); достоин отдельной страницы про работу батчами. Здесь всплыл через «батч = шаг» в Inngest-разложении (для validity ~50 биомаркеров / 8 ≈ 7 шагов).
Связано
- inngest — наш orchestrator; step vs function, паттерны использования
- bifrost / llm-proxy-choice — LLM-прокси; model-fallback/cascade — кандидат на вынос туда (по аналогии)
- llm-call-failure-classes — транспорт + схема (generic, → прокси) vs семантика (task-specific, → код агента); переизобретённые транспорт-обходы (worker-pool, ручной retry-loop) = тот же анти-паттерн, что и самодельные очереди
- biomarker-actuality-integration — конкретный кейс: как переразложить
classifyBatchWithSplitvalidity-классификатора на Inngest-шаги - validity-classifier — где живёт самодельный worker-pool, который этот принцип адресует
- loinc-unification-direction — TS-порт Python-сервисов; pipeline-шаги → Inngest functions, queue/retry/fallback автоматически
- normalization-service, loinc-harmonization-service — legacy Python сервисы со своими Redis-очередями (миграционные цели)
- health-report-vocabulary — словарь стадий пайплайна (смежно: что вообще считается «шагом»)
Источники
Источники: 1.
Сноски
-
Inngest docs, accessed 2026-05-17, https://www.inngest.com/docs — ; concurrency https://www.inngest.com/docs/functions/concurrency ; steps https://www.inngest.com/docs/learn/inngest-steps. ↩