Motor de Flujos
Concepto
El analista funcional dibuja el flujo en el editor visual (React Flow) usando nodos de dominio. El sistema compila el grafo y lo ejecuta nativamente con motor propio (sin n8n ni dependencia externa).
Kuatia tiene dos motores independientes pero arquitectónicamente paralelos:
| Motor | Walk | Trigger | Estado |
|---|---|---|---|
Dossier-flow (WorkflowEngineService) | Se infiere del estado del Dossier (state machine) | Al crear un Dossier o reanudar desde una task | Versionado en flow_versions |
Doc-flow (DocumentWorkflowEngineService) — Layer 2 | Persiste en document_flow_walks (Mongo) con currentNodeId + context + tasks[] + history[] | Post-clasificación de cada Document, o POST /documents/:id/flow/start | Versionado en flow_versions por DocumentType |
Ambos motores comparten el shape de flowDefinition (nodes + edges de React Flow); todo lo demás es independiente.
Ciclo de vida de un flujo
Cada publish crea un snapshot inmutable en flow_versions. Las ejecuciones en curso usan la versión publicada al momento de su startFlow; cambios posteriores aplican a nuevas ejecuciones.
Catálogo de nodos
Fuente de verdad: api/src/flow/nodes.catalog.json. Hoy: 14 nodos.
Nodos para Dossier-flow (appliesTo: ['dossier'])
| Nodo | Categoría | Descripción |
|---|---|---|
inicio | Control | Punto de entrada (obligatorio, uno solo) |
fin | Control | Punto de salida con estado final configurable |
tarea_humana | Tareas | Asigna tarea a usuario o rol |
tarea_ui_externa | Tareas | Tarea con UI externa embebida (UI Endpoint) |
requerir_documento | Documentos | Exige que se suba un documento específico |
esperar_estado_documento | Documentos | Pausa hasta que un documento llegue a cierto estado |
decision | Control | Bifurca por condición sobre metadata |
emitir_webhook | Integración | POST a sistema externo |
esperar_webhook | Integración | Pausa hasta recibir POST externo (resume vía token) |
notificar | Integración | Email o notificación al usuario/rol |
consultar_documentos | Documentos | Consulta semántica RAG sobre los documentos del tenant |
Nodos para Doc-flow (appliesTo: ['document'])
| Nodo | Categoría | Descripción |
|---|---|---|
extract_with_processor | Extracción | Corre un SpecializedProcessor sobre el doc y persiste resultados en metadata.extracted[procName] |
manual_review_extracted_fields | Review | Pausa el walk con una task de revisión humana del extract; soporta editedFields para corregir |
ai_assistant_review | Review | Pausa el walk con una task de chat operador↔IA; el operador agrega atributos libres ad-hoc al doc (bag metadata.assistantAttributes) |
Ejecución del dossier-flow
Webhooks entrantes
Sistemas externos reanudan flujos pausados en esperar_webhook:
POST /flow/webhook/:token
El FlowWebhookController (público, validado por token) marca el FlowWait como RESOLVED y continúa la ejecución desde el edge sourceHandle=received del nodo en espera. Estados del wait: WAITING, RESOLVED, EXPIRED. Barrido de EXPIRED cada 60s por job repeatable de la cola flow-timeout.
Doc-flow Layer 2
Cuando un Document llega a su tipo (vía clasificación automática o reclasificación manual), el DocumentWorkflowEngineService arranca un walk si el DocumentType.flowStatus === 'PUBLISHED'. El walk persiste en document_flow_walks:
- 1 walk activo por documento (índice parcial único
(tenantId, documentId)sobre statusrunning|paused). - Cada walk tiene
tasks[]embebidas (id local + nodeId + status + payload + assignedTo) yhistory[]append-only. supersedecuando un nuevo walk debe reemplazar al activo (reclasificación, override manual).- Loop
advanceconMAX_STEPS=100contra ciclos.
Endpoints (sobre /documents/:id):
| Endpoint | Permiso | Notas |
|---|---|---|
POST /flow/start | document.Update | Body opcional con flowDefinitionOverride. Devuelve {walkId, status, currentNodeId} |
GET /flow | document.Read | Walk activo o último terminado (history truncado a 20 entries) |
POST /flow/tasks/:taskId/complete | document.Review | Body {outcome?: 'approved'|'rejected', editedFields?: object}. Mergea editedFields en metadata.extracted[procName].fields antes de avanzar |
Asistente AI en doc-flow (ai_assistant_review)
Pausa el walk con una task de chat operador↔IA. El operador agrega atributos libres ad-hoc al documento (bag metadata.assistantAttributes, separado de metadata.extracted).
| Concepto | Detalle |
|---|---|
| Atributos | key, value (string/number/date/boolean/enum/reference), source (ai_suggested / operator_added / ai_confirmed_by_operator), citation (página + snippet + bbox opcionales), addedBy, addedAt |
| Whitelist | Config del nodo allowedAttributeKeys restringe las keys aceptadas; vacío = libre |
| Cuota | messagesMax por task (default 20). Si se agota: 400 con mensaje accionable, el operador puede aprobar igual |
| Modelo | Tier local (gpt-oss:20b) o cloud (gpt-oss-120b) según config + budget |
| Greeting | auto (LLM inicia conversación) o manual (espera al operador) |
Endpoints (sobre /documents/:id/flow/tasks/:taskId/assistant):
| Endpoint | Permiso | Acción |
|---|---|---|
POST /message | document.AssistantReview | Envía mensaje del operador, devuelve respuesta del LLM + sugerencias |
POST /attribute | document.AssistantReview | Agrega un atributo; valida whitelist y coerciona al type |
DELETE /attribute/:key | document.AssistantReview | Rollback de atributos agregados en este task |
POST /complete | document.Review | Reutiliza el endpoint genérico; avanza por edge sourceHandle='approved' |
Las acciones destructivas requieren confirmación humana — el LLM nunca decide solo.
Contenedores como sub-Dossiers
Cuando un usuario sube un contenedor (zip/tar/tar.gz), el backend lo desempaca automáticamente y crea:
- Dossier raíz auto-clasificado por contenido (vía
DossierClassifierService). - Sub-Dossiers por cada carpeta interna (
parentId+pathpoblados). - Documents por cada archivo (vía
DocumentService.upload()— reusa SHA-256 dedup + MinIO + pipeline).
El contenedor original NO se conserva como Document — el upload responde con {dossier} en lugar de {document}.
Anti zip-bomb — 3 límites configurables por env:
| Var | Default | Notas |
|---|---|---|
CONTAINER_EXPAND_MAX_DEPTH | 10 | Carpetas anidadas |
CONTAINER_EXPAND_MAX_FILES | 10000 | Archivos totales |
CONTAINER_EXPAND_MAX_UNCOMPRESSED_BYTES | 2 GB | Tamaño total descomprimido |
Opt-out: header X-No-Container-Expand: true o body noContainerExpand=true en el multipart preserva el zip como único Document.
Formatos no soportados (caen a single-Document): RAR, 7z (no hay binarios en runtime).
Monitoreo
Cada processNode y completeTask abre un span de OpenTelemetry con tenantId, dossierId, dossierTypeId, taskId, outcome. El histograma kuatia_workflow_duration_seconds mide tiempo punta-a-punta por flujo. Las tasks BullMQ de flow-timeout se instrumentan con el helper único instrumentQueueJob. La alerta HighWorkflowLatencyP95 dispara si p95 > 30s durante 10m.