Saltar al contenido principal

Motor de Flujos

Concepto

El analista funcional dibuja el flujo en el editor visual (React Flow) usando nodos de dominio. El sistema compila el grafo y lo ejecuta nativamente con motor propio (sin n8n ni dependencia externa).

Kuatia tiene dos motores independientes pero arquitectónicamente paralelos:

MotorWalkTriggerEstado
Dossier-flow (WorkflowEngineService)Se infiere del estado del Dossier (state machine)Al crear un Dossier o reanudar desde una taskVersionado en flow_versions
Doc-flow (DocumentWorkflowEngineService) — Layer 2Persiste en document_flow_walks (Mongo) con currentNodeId + context + tasks[] + history[]Post-clasificación de cada Document, o POST /documents/:id/flow/startVersionado en flow_versions por DocumentType

Ambos motores comparten el shape de flowDefinition (nodes + edges de React Flow); todo lo demás es independiente.

Ciclo de vida de un flujo

Cada publish crea un snapshot inmutable en flow_versions. Las ejecuciones en curso usan la versión publicada al momento de su startFlow; cambios posteriores aplican a nuevas ejecuciones.

Catálogo de nodos

Fuente de verdad: api/src/flow/nodes.catalog.json. Hoy: 14 nodos.

Nodos para Dossier-flow (appliesTo: ['dossier'])

NodoCategoríaDescripción
inicioControlPunto de entrada (obligatorio, uno solo)
finControlPunto de salida con estado final configurable
tarea_humanaTareasAsigna tarea a usuario o rol
tarea_ui_externaTareasTarea con UI externa embebida (UI Endpoint)
requerir_documentoDocumentosExige que se suba un documento específico
esperar_estado_documentoDocumentosPausa hasta que un documento llegue a cierto estado
decisionControlBifurca por condición sobre metadata
emitir_webhookIntegraciónPOST a sistema externo
esperar_webhookIntegraciónPausa hasta recibir POST externo (resume vía token)
notificarIntegraciónEmail o notificación al usuario/rol
consultar_documentosDocumentosConsulta semántica RAG sobre los documentos del tenant

Nodos para Doc-flow (appliesTo: ['document'])

NodoCategoríaDescripción
extract_with_processorExtracciónCorre un SpecializedProcessor sobre el doc y persiste resultados en metadata.extracted[procName]
manual_review_extracted_fieldsReviewPausa el walk con una task de revisión humana del extract; soporta editedFields para corregir
ai_assistant_reviewReviewPausa el walk con una task de chat operador↔IA; el operador agrega atributos libres ad-hoc al doc (bag metadata.assistantAttributes)

Ejecución del dossier-flow

Webhooks entrantes

Sistemas externos reanudan flujos pausados en esperar_webhook:

POST /flow/webhook/:token

El FlowWebhookController (público, validado por token) marca el FlowWait como RESOLVED y continúa la ejecución desde el edge sourceHandle=received del nodo en espera. Estados del wait: WAITING, RESOLVED, EXPIRED. Barrido de EXPIRED cada 60s por job repeatable de la cola flow-timeout.

Doc-flow Layer 2

Cuando un Document llega a su tipo (vía clasificación automática o reclasificación manual), el DocumentWorkflowEngineService arranca un walk si el DocumentType.flowStatus === 'PUBLISHED'. El walk persiste en document_flow_walks:

  • 1 walk activo por documento (índice parcial único (tenantId, documentId) sobre status running|paused).
  • Cada walk tiene tasks[] embebidas (id local + nodeId + status + payload + assignedTo) y history[] append-only.
  • supersede cuando un nuevo walk debe reemplazar al activo (reclasificación, override manual).
  • Loop advance con MAX_STEPS=100 contra ciclos.

Endpoints (sobre /documents/:id):

EndpointPermisoNotas
POST /flow/startdocument.UpdateBody opcional con flowDefinitionOverride. Devuelve {walkId, status, currentNodeId}
GET /flowdocument.ReadWalk activo o último terminado (history truncado a 20 entries)
POST /flow/tasks/:taskId/completedocument.ReviewBody {outcome?: 'approved'|'rejected', editedFields?: object}. Mergea editedFields en metadata.extracted[procName].fields antes de avanzar

Asistente AI en doc-flow (ai_assistant_review)

Pausa el walk con una task de chat operador↔IA. El operador agrega atributos libres ad-hoc al documento (bag metadata.assistantAttributes, separado de metadata.extracted).

ConceptoDetalle
Atributoskey, value (string/number/date/boolean/enum/reference), source (ai_suggested / operator_added / ai_confirmed_by_operator), citation (página + snippet + bbox opcionales), addedBy, addedAt
WhitelistConfig del nodo allowedAttributeKeys restringe las keys aceptadas; vacío = libre
CuotamessagesMax por task (default 20). Si se agota: 400 con mensaje accionable, el operador puede aprobar igual
ModeloTier local (gpt-oss:20b) o cloud (gpt-oss-120b) según config + budget
Greetingauto (LLM inicia conversación) o manual (espera al operador)

Endpoints (sobre /documents/:id/flow/tasks/:taskId/assistant):

EndpointPermisoAcción
POST /messagedocument.AssistantReviewEnvía mensaje del operador, devuelve respuesta del LLM + sugerencias
POST /attributedocument.AssistantReviewAgrega un atributo; valida whitelist y coerciona al type
DELETE /attribute/:keydocument.AssistantReviewRollback de atributos agregados en este task
POST /completedocument.ReviewReutiliza el endpoint genérico; avanza por edge sourceHandle='approved'

Las acciones destructivas requieren confirmación humana — el LLM nunca decide solo.

Contenedores como sub-Dossiers

Cuando un usuario sube un contenedor (zip/tar/tar.gz), el backend lo desempaca automáticamente y crea:

  • Dossier raíz auto-clasificado por contenido (vía DossierClassifierService).
  • Sub-Dossiers por cada carpeta interna (parentId + path poblados).
  • Documents por cada archivo (vía DocumentService.upload() — reusa SHA-256 dedup + MinIO + pipeline).

El contenedor original NO se conserva como Document — el upload responde con {dossier} en lugar de {document}.

Anti zip-bomb — 3 límites configurables por env:

VarDefaultNotas
CONTAINER_EXPAND_MAX_DEPTH10Carpetas anidadas
CONTAINER_EXPAND_MAX_FILES10000Archivos totales
CONTAINER_EXPAND_MAX_UNCOMPRESSED_BYTES2 GBTamaño total descomprimido

Opt-out: header X-No-Container-Expand: true o body noContainerExpand=true en el multipart preserva el zip como único Document.

Formatos no soportados (caen a single-Document): RAR, 7z (no hay binarios en runtime).

Monitoreo

Cada processNode y completeTask abre un span de OpenTelemetry con tenantId, dossierId, dossierTypeId, taskId, outcome. El histograma kuatia_workflow_duration_seconds mide tiempo punta-a-punta por flujo. Las tasks BullMQ de flow-timeout se instrumentan con el helper único instrumentQueueJob. La alerta HighWorkflowLatencyP95 dispara si p95 > 30s durante 10m.