Skip to main content
v2

Pipeline Steps (what each step does)

Step responsibilities

StepPrimary responsibilityMain session mutationsConfig/table dependencies
LoadOrCreateConversationStepFetch or bootstrap conversation rowconversation, intent/state/context syncce_conversation
CacheInspectAuditStepConditionally audit cache snapshotnoneconvengine.audit.cache-inspector property
ResetConversationStepEarly explicit resetintent/state/context/input params resetinput flags, command text
PersistConversationBootstrapStepEnsure conversation row persistednone/metadatace_conversation
AuditUserInputStepPersist user input auditnonece_audit
PolicyEnforcementStepPolicy block and stoppayload + stop result on blockce_policy
DialogueActStepClassify user turn action typedialogue_act, dialogue_act_confidence, standalone_query, resolved_user_inputce_config (dialogue act mode), ce_audit
InteractionPolicyStepDecide runtime policy before intentpolicy_decision, skip_intent_resolutionce_config, session pending state
CorrectionStepRoute confirmation turns and apply in-place correction patchesrouting_decision, skip_schema_extraction, correction_appliedce_output_schema, session context, ce_audit
ActionLifecycleStepMaintain pending action runtime TTL/statuspending_action_runtime contextce_pending_action, ce_audit
DisambiguationStepAsk question when multiple actions fitpending_clarification question/contextce_pending_action, ce_config, ce_audit
GuardrailStepApply guardrails and approval rulesguardrail flags/sanitized textce_config, ce_audit
IntentResolutionStepResolve intent with classifier+agentintent/state/clarification fieldsce_intent, ce_intent_classifier, ce_config
ResetResolvedIntentStepReset on configured reset intentfull resetce_config RESET_INTENT_CODES
FallbackIntentStateStepFill missing intent/state defaultsintent/statenone
AddContainerDataStepFetch and attach container datacontainerData/context mergece_container_config
PendingActionStepExecute/reject pending action taskpending_action_runtime status/resultce_pending_action, CeTaskExecutor, ce_audit
ToolOrchestrationStepRun tool_group based orchestrationtool_request/tool_result fieldsce_tool, ce_mcp_tool, ce_audit
McpToolStepMCP planner/tool loopcontext_json.mcp.*, mcp tool metadatace_mcp_tool, ce_mcp_db_tool, ce_mcp_planner (fallback ce_config)
SchemaExtractionStepSchema-driven extraction and lock handlingschema facts/context/lock, POST_SCHEMA_EXTRACTION factsce_output_schema, ce_prompt_template
AutoAdvanceStepCompute schema status factsschemaComplete/hasAnyresolved schema + context
RulesStepMatch and apply transitions/actionsintent/state/input paramsce_rule
StateGraphStepValidate state transition pathstate_graph_valid/reasonce_state_graph, ce_audit
ResponseResolutionStepResolve and generate output payloadpayload/last assistant jsonce_response, ce_prompt_template
MemoryStepWrite memory/session summarymemory.session_summary in contextce_memory, ce_audit
PersistConversationStepPersist final conversation and resultfinalResultce_conversation
PipelineEndGuardStepTiming audit + terminal guardtimingsce_audit
Hook where you need precision
  • Before schema extraction: normalize inputs
  • After rules: inspect intent/state transition correctness
  • Before response resolution: inject display hints
Rule phases

Rules execute by phase. Available native phases include PRE_RESPONSE_RESOLUTION, POST_AGENT_INTENT, POST_SCHEMA_EXTRACTION, PRE_AGENT_MCP, POST_AGENT_MCP, and POST_TOOL_EXECUTION.

Don’t hardcode transitions in steps

Prefer transitions in ce_rule unless absolutely framework-level behavior is required. This keeps domain behavior data-driven and testable via audit traces.

Prompt-template interaction metadata

ce_prompt_template.interaction_mode and ce_prompt_template.interaction_contract are the preferred turn-semantics contract in v2.0.9+.

  • SchemaExtractionStep uses the scoped SCHEMA_JSON template for extraction prompts; COLLECT plus expects:["structured_input"] is the recommended shape.
  • CorrectionStep should rely on the active prompt template semantics (CONFIRM, PROCESSING, and interaction_contract capabilities such as affirm, edit, retry) instead of parsing state-name substrings.
  • ResponseResolutionStep still selects by intent_code + state_code + response_type, but interaction_mode helps document what kind of user interaction that template represents.
1LoadOrCreateConversationStep

Responsibility: Fetch or bootstrap conversation row
Session Mutations: conversation, intent/state/context sync
Config/Table Dependencies: ce_conversation

Detailed Execution Logic

This is the initial bootstrap step of the runtime engine. It uses the conversationId provided in the HTTP request to lookup an existing CeConversation row in the Postgres database.

If the conversation exists:

  • The context JSON is hydrated into the runtime EngineSession.
  • Previous intent and state codes are restored.
  • All stored memory summaries and pending actions are fetched from the database and loaded into memory.

If the conversation is new:

  • A new CeConversation entity is instantiated.
  • The intent and state default to UNKNOWN.

This ensures that regardless of scale, the API is entirely stateless and can route requests to any pod.

LoadOrCreateConversationStep.execute()
file: src/main/java/com/github/salilvnair/convengine/engine/steps/LoadOrCreateConversationStep.java
JAVA
public StepResult execute(EngineSession session) {
UUID id = session.getConversationId();
CeConversation convo = conversationRepo.findById(id).orElseGet(() -> createNewConversation(id, conversationRepo));

convo.setLastUserText(session.getUserText());
convo.setUpdatedAt(OffsetDateTime.now());

session.setConversation(convo);
session.syncFromConversation();

return new StepResult.Continue();
}

When the LLM path is used, DialogueActStep now audits:

  • DIALOGUE_ACT_LLM_INPUT
  • DIALOGUE_ACT_LLM_OUTPUT
  • DIALOGUE_ACT_LLM_ERROR

It still emits the final classification checkpoint as DIALOGUE_ACT_CLASSIFIED.


2CacheInspectAuditStep

Responsibility: Conditionally audit cache snapshot
Session Mutations: none
Config/Table Dependencies: convengine.audit.cache-inspector property

Detailed Execution Logic

Dynamically runs if convengine.audit.cache-inspector is true. It takes a complete JSON snapshot of the hydrated EngineSession conversation cache tree and logs it under the CACHE_INSPECTION priority stage before explicit user input audits are finalized.

CacheInspectAuditStep.execute()
file: src/main/java/com/github/salilvnair/convengine/engine/steps/CacheInspectAuditStep.java
JAVA
public StepResult execute(EngineSession session) {
Map<String, Object> payload = new LinkedHashMap<>();
payload.put("cache_snapshot", objectMapper.valueToTree(session.getConversation()));
auditService.audit(ConvEngineAuditStage.CACHE_INSPECTION, session.getConversationId(), payload);
return new StepResult.Continue();
}

3ResetConversationStep

Responsibility: Early explicit reset
Session Mutations: intent/state/context/input params reset
Config/Table Dependencies: input flags, command text

Detailed Execution Logic

Checks EngineSession properties to see if an explicit reset has been triggered by the invoking consumer (this is usually passed as a param like _reset=true).

When triggered, it clears:

  • The intent and state trackers.
  • The contextJson (wiping all extracted schema facts).
  • The inputParamsJson.

The session is marked as `RUNNING` again, but completely fresh. An audit event CONVERSATION_RESET is logged.

ResetConversationStep.execute()
file: src/main/java/com/github/salilvnair/convengine/engine/steps/ResetConversationStep.java
JAVA
public StepResult execute(EngineSession session) {
if (!shouldReset(session)) {
return new StepResult.Continue();
}

String reason = resetReason(session);
session.resetForConversationRestart();
session.getConversation().setStatus("RUNNING");
session.getConversation().setIntentCode("UNKNOWN");
session.getConversation().setStateCode("UNKNOWN");
session.getConversation().setContextJson("{}");
session.getConversation().setInputParamsJson("{}");
session.getConversation().setLastAssistantJson(null);
session.getConversation().setUpdatedAt(OffsetDateTime.now());
conversationRepository.save(session.getConversation());

Map<String, Object> payload = new LinkedHashMap<>();
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.REASON, reason);
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.INTENT, session.getIntent());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.STATE, session.getState());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.CONTEXT, session.getContextJson());
audit.audit(ConvEngineAuditStage.CONVERSATION_RESET, session.getConversationId(), payload);

return new StepResult.Continue();
}

4PersistConversationBootstrapStep

Responsibility: Ensure conversation row persisted
Session Mutations: none/metadata
Config/Table Dependencies: ce_conversation

Detailed Execution Logic

A simple lifecycle checkpoint to ensure the conversation has a createdAt timestamp. If the user session just started in LoadOrCreateConversationStep, this step performs the initial INSERT (ce_conversation) to the database to ensure foreign-key dependencies (like audit logs) don't fail later in the loop.

PersistConversationBootstrapStep.execute()
file: src/main/java/com/github/salilvnair/convengine/engine/steps/PersistConversationBootstrapStep.java
JAVA
public StepResult execute(EngineSession session) {
if (session.getConversation().getCreatedAt() == null) {
session.getConversation().setCreatedAt(OffsetDateTime.now());
session.getConversation().setUpdatedAt(OffsetDateTime.now());
conversationRepo.save(session.getConversation());
}
return new StepResult.Continue();
}

5AuditUserInputStep

Responsibility: Persist user input audit
Session Mutations: none
Config/Table Dependencies: ce_audit

Detailed Execution Logic

Records the raw text query the user typed on this turn into the ce_audit table. This is purely for debug tracing and business analytics. It binds the USER_INPUT audit stage with the conversation ID and the text payload.

AuditUserInputStep.execute()
file: src/main/java/com/github/salilvnair/convengine/engine/steps/AuditUserInputStep.java
JAVA
public StepResult execute(EngineSession session) {
Map<String, Object> payload = new LinkedHashMap<>();
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.TEXT, session.getUserText());
audit.audit(ConvEngineAuditStage.USER_INPUT, session.getConversationId(), payload);
return new StepResult.Continue();
}

6PolicyEnforcementStep

Responsibility: Policy block and stop
Session Mutations: payload + stop result on block
Config/Table Dependencies: ce_policy

Detailed Execution Logic

Secures the pipeline against prohibited input using ce_policy.

It reads all active rows from ce_policy, executing either REGEX, EXACT, or LLM rules against the user's raw text. If a match occurs:

  • The conversation is forced to a `BLOCKED` status.
  • A StepResult.Stop() is returned immediately, skipping all remaining NLP and intent steps.
  • The ce_policy.response_text is loaded as the final payload shipped back to the consumer.
PolicyEnforcementStep.execute()
file: src/main/java/com/github/salilvnair/convengine/engine/steps/PolicyEnforcementStep.java
JAVA
public StepResult execute(EngineSession session) {
String userText = session.getUserText();

for (CePolicy policy : policyRepo.findByEnabledTrueOrderByPriorityAsc()) {
if (matches(policy.getRuleType(), policy.getPattern(), userText)) {
Map<String, Object> payload = new LinkedHashMap<>();
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.POLICY_ID, policy.getPolicyId());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.RULE_TYPE, policy.getRuleType());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.PATTERN, policy.getPattern());
audit.audit(ConvEngineAuditStage.POLICY_BLOCK, session.getConversationId(), payload);

session.getConversation().setStatus("BLOCKED");
session.getConversation().setLastAssistantJson(jsonText(policy.getResponseText()));
session.getConversation().setUpdatedAt(OffsetDateTime.now());
conversationRepo.save(session.getConversation());

EngineResult out = new EngineResult(
session.getIntent(),
session.getState(),
new TextPayload(policy.getResponseText()),
session.getContextJson()
);
return new StepResult.Stop(out);
}
}

return new StepResult.Continue();
}

7DialogueActStep

Responsibility: Classify user turn action type
Session Mutations: dialogue_act in input params
Config/Table Dependencies: ce_config (dialogue act mode), ce_audit

Detailed Execution Logic

This step attempts to classify the raw user text into an explicit conversational "act" through Regex, and optionally as fallback, a LLM request depending on the strictness of convengine.flow.dialogue-act.resolute (e.g., REGEX_THEN_LLM).

Why this step exists:

  • it turns raw free-form phrases like yes, go ahead, change amount to 350000, or start over into stable engine signals
  • downstream steps should not branch on raw text because raw phrasing is inconsistent and expensive to reason about repeatedly
  • it reduces unnecessary intent/schema/LLM work for short operational turns
  • it makes ce_rule conditions deterministic by giving rules normalized fields instead of unstructured user text

If convengine.flow.queryRewrite.enabled=true and the conversionHistory is present, the step morphs its LLM request into a dual Classifier & RAG Context Optimizer. It supplies the ongoing conversation to the LLM to rewrite ambiguous pronouns into an explicit standalone search query, returning it as "standaloneQuery".

The core parameters for these prompts are dynamically loaded from ce_config, allowing administrators to Hot-Swap classifier behaviors in production without recompiling Java arrays.

Supported Dialogue Acts

Enum NamePurpose
AFFIRMThe user expressed agreement or confirmation (e.g., "yes", "go ahead"). Can override Guardrails or execute Pending Actions.
NEGATEThe user expressed rejection or cancellation (e.g., "no", "stop"). Can cancel active Interaction Policies.
EDITThe user wants to change previously supplied context or entity slots.
RESETThe user wants to clear memory and start completely fresh.
QUESTIONThe user is asking a direct conversational question (triggers fallback intent workflows).
NEW_REQUESTThe baseline generic classification. Engine routes normally.
GREETINGThe user issued a pleasantry ("Hi", "Hello"). Typically bypassed by downstream Orchestrators to prevent expensive RAG queries.

Source Execution Profile

Regex Guardrails

The runtime still keeps a conservative regex guard for destructive resets. RESET can be forced back to a REGEX_GUARD result when the user text does not clearly match the reset regex. EDIT is no longer globally downgraded in Java. Instead, the engine preserves regex and LLM candidate values, and a POST_DIALOGUE_ACT rule pass can apply a DB-driven override using SET_DIALOGUE_ACT when the workflow should trust the LLM candidate.

Why POST_DIALOGUE_ACT exists

The engine should stay conservative by default, but business workflows still need a safe override point.

Examples:

  • user says: Ohh wait, I missed one zero. Change amount to 350000.
  • regex may still classify this as NEW_REQUEST
  • the LLM candidate may classify it as EDIT
  • a POST_DIALOGUE_ACT rule can inspect:
    • inputParams.dialogue_act
    • inputParams.dialogue_act_source
    • inputParams.dialogue_act_llm_candidate
    • inputParams.dialogue_act_llm_standalone_query
  • then use SET_DIALOGUE_ACT to restore EDIT before InteractionPolicyStep

This keeps:

  • the default engine behavior safe
  • the override behavior DB-driven
  • the state model dynamic instead of hardcoded in Java
DialogueActStep.java (Simplified)
file: src/main/java/com/github/salilvnair/convengine/engine/steps/DialogueActStep.java
JAVA
@MustRunAfter(AuditUserInputStep.class)
@MustRunBefore(IntentResolutionStep.class)
public class DialogueActStep implements EngineStep {

private final ConvEngineFlowConfig flowConfig;
private final CeConfigResolver configResolver;

// Extracted from ce_config during Application Initialization
private Pattern REGEX_GREETING;
private Pattern REGEX_AFFIRM;
private Pattern REGEX_NEGATE;

private String SYSTEM_PROMPT;
private String QUERY_REWRITE_SYSTEM_PROMPT;
// ...

@PostConstruct
public void init() {
// Core Regex Evaluation Patterns mapped securely in ce_config
REGEX_GREETING = Pattern.compile(
configResolver.resolveString(this, "REGEX_GREETING", "^(\s)*(hi|hello|hey|greetings|good morning|...)$"),
Pattern.CASE_INSENSITIVE
);

// Loads from ce_config where config_type="DialogueActStep" and config_key="SYSTEM_PROMPT".
SYSTEM_PROMPT = configResolver.resolveString(this, "SYSTEM_PROMPT", """
You are a dialogue-act classifier.
Return JSON only with:
{"dialogueAct":"AFFIRM|NEGATE|EDIT|RESET|QUESTION|NEW_REQUEST|GREETING","confidence":0.0}
""");

// Loads the query-rewrite variant
QUERY_REWRITE_SYSTEM_PROMPT = configResolver.resolveString(this, "QUERY_REWRITE_SYSTEM_PROMPT", """
You are a dialogue-act classifier and intelligent query search rewriter.
Using the conversation history, rewrite the user's text into an explicit, standalone query that perfectly describes their intent without needing the conversation history context.
Also classify their dialogue act.
Return JSON only matching the exact schema.
""");
}

@Override
public StepResult execute(EngineSession session) {
String userText = session.getUserText();

// 1. Regex Classification First
DialogueActResult regexResult = classifyByRegex(userText);

// 2. Resolve final intention while preserving regex + LLM candidates
DialogueActResolution resolution = resolveByMode(session, userText, regexResult);
DialogueActResult resolved = resolution.resolved();

// 3. Persist regex, LLM candidate, guard metadata, and final decision
session.putInputParam(ConvEngineInputParamKey.DIALOGUE_ACT_REGEX, regexResult.act().name());
session.putInputParam(ConvEngineInputParamKey.DIALOGUE_ACT_LLM_CANDIDATE, ...);
session.putInputParam(ConvEngineInputParamKey.DIALOGUE_ACT, resolved.act().name());
session.putInputParam(ConvEngineInputParamKey.DIALOGUE_ACT_CONFIDENCE, resolved.confidence());

if (resolved.standaloneQuery() != null) {
session.putInputParam(ConvEngineInputParamKey.STANDALONE_QUERY, resolved.standaloneQuery());
session.setStandaloneQuery(resolved.standaloneQuery());
}

// 4. Let DB rules override the guarded result before policy routing
rulesStep.applyRules(session, "DialogueActStep", RulePhase.POST_DIALOGUE_ACT.name());

return new StepResult.Continue();
}
}

8InteractionPolicyStep

Responsibility: Decide runtime policy before intent
Session Mutations: policy_decision, skip_intent_resolution
Config/Table Dependencies: ce_config, session pending state

Detailed Execution Logic

Uses the identified DialogueAct to decide how the engine should route the turn. This step prevents the system from confusing follow-up answers (like saying "yes") with new intents.

Before this step runs, the engine now executes a POST_DIALOGUE_ACT rule pass. That lets DB rules override guarded dialogue-act outcomes using the regex result, the LLM candidate result, or the dedicated SET_DIALOGUE_ACT action without hardcoding state names in Java.

The output maps to the InteractionPolicyDecision enum:

  • EXECUTE_PENDING_ACTION: If the DialogueAct is AFFIRM and there's a background API task waiting.
  • REJECT_PENDING_ACTION: If NEGATE and an action is waiting.
  • FILL_PENDING_SLOT: If the user is currently answering a schema extraction question.
  • RECLASSIFY_INTENT: If this is a NEW_REQUEST.

Evaluation Matrix Flow:

  1. Checks for context hints: hasPendingAction, hasPendingSlot, hasResolvedIntent, hasResolvedState.
  2. First, it attempts a lookup using the resolveFromMatrix() method against any custom configurations in your YAML properties.
  3. If no custom matrix decision applies, it checks boolean flags from the config.
    • isExecutePendingOnAffirm(): If true, AFFIRM + hasPendingAction -> EXECUTE_PENDING_ACTION
    • isRejectPendingOnNegate(): If true, NEGATE + hasPendingAction -> REJECT_PENDING_ACTION
    • isFillPendingSlotOnNonNewRequest(): If true, != NEW_REQUEST && != GREETING + hasPendingSlot -> FILL_PENDING_SLOT
  4. Most crucially, if any of these policy decisions trigger, it sets skipIntentResolution = true. This stops Step 11 (IntentResolutionStep) from overriding the sticky intent. The payload is attached with POLICY_DECISION.
InteractionPolicyStep.execute()
file: src/main/java/com/github/salilvnair/convengine/engine/steps/InteractionPolicyStep.java
JAVA
public StepResult execute(EngineSession session) {
String dialogueActRaw = session.inputParamAsString(ConvEngineInputParamKey.DIALOGUE_ACT);
DialogueAct dialogueAct = parseDialogueAct(dialogueActRaw);
Map<String, Object> context = session.contextDict();
Map<String, Object> inputParams = session.getInputParams();

boolean hasPendingAction = hasValue(context.get("pending_action"))
|| hasValue(context.get("pendingAction"))
|| hasValue(inputParams.get("pending_action"))
|| hasValue(inputParams.get("pendingAction"))
|| hasValue(inputParams.get(ConvEngineInputParamKey.PENDING_ACTION_KEY))
|| hasValue(inputParams.get("pending_action_task"))
|| hasPendingActionFromRegistry(session);
boolean hasPendingSlot = hasValue(context.get("pending_slot"))
|| hasValue(context.get("pendingSlot"));
boolean hasResolvedIntent = session.getIntent() != null
&& !session.getIntent().isBlank()
&& !"UNKNOWN".equalsIgnoreCase(session.getIntent());
boolean hasResolvedState = session.getState() != null
&& !session.getState().isBlank()
&& !"UNKNOWN".equalsIgnoreCase(session.getState());
boolean requireResolvedIntentAndState = flowConfig.getInteractionPolicy().isRequireResolvedIntentAndState();
boolean hasResolvedContext = !requireResolvedIntentAndState || (hasResolvedIntent && hasResolvedState);

InteractionPolicyDecision decision = InteractionPolicyDecision.RECLASSIFY_INTENT;
boolean skipIntentResolution = false;

if (hasResolvedContext) {
InteractionPolicyDecision matrixDecision = resolveFromMatrix(hasPendingAction, hasPendingSlot, dialogueAct);
if (matrixDecision != null) {
decision = matrixDecision;
skipIntentResolution = true;
} else if (flowConfig.getInteractionPolicy().isExecutePendingOnAffirm()
&& hasPendingAction
&& dialogueAct == DialogueAct.AFFIRM) {
decision = InteractionPolicyDecision.EXECUTE_PENDING_ACTION;
skipIntentResolution = true;
} else if (flowConfig.getInteractionPolicy().isRejectPendingOnNegate()
&& hasPendingAction
&& dialogueAct == DialogueAct.NEGATE) {
decision = InteractionPolicyDecision.REJECT_PENDING_ACTION;
skipIntentResolution = true;
} else if (flowConfig.getInteractionPolicy().isFillPendingSlotOnNonNewRequest()
&& hasPendingSlot
&& dialogueAct != DialogueAct.NEW_REQUEST) {
decision = InteractionPolicyDecision.FILL_PENDING_SLOT;
skipIntentResolution = true;
}
}

session.putInputParam(ConvEngineInputParamKey.POLICY_DECISION, decision.name());
session.putInputParam(ConvEngineInputParamKey.SKIP_INTENT_RESOLUTION, skipIntentResolution);

Map<String, Object> payload = new LinkedHashMap<>();
payload.put(ConvEnginePayloadKey.DIALOGUE_ACT, dialogueAct.name());
payload.put(ConvEnginePayloadKey.POLICY_DECISION, decision.name());
payload.put(ConvEnginePayloadKey.SKIP_INTENT_RESOLUTION, skipIntentResolution);
payload.put("hasPendingAction", hasPendingAction);
payload.put("hasPendingSlot", hasPendingSlot);
payload.put(ConvEnginePayloadKey.INTENT, session.getIntent());
payload.put(ConvEnginePayloadKey.STATE, session.getState());
audit.audit(ConvEngineAuditStage.INTERACTION_POLICY_DECIDED, session.getConversationId(), payload);

return new StepResult.Continue();
}

9ActionLifecycleStep

Responsibility: Maintain pending action runtime TTL/status
Session Mutations: pending_action_runtime context
Config/Table Dependencies: ce_pending_action, ce_audit

Detailed Execution Logic

Tracks time-to-live (TTL) for CePendingAction rows. If the user was asked "Are you sure you want to cancel?" 3 turns ago, but started talking about the weather instead, this step will mark the pending_action_runtime as EXPIRED.

Status transitions (Enum PendingActionStatus):

  • OPEN: Task is created but waiting for user confirmation.
  • IN_PROGRESS: The user affirmed, and the task is ready to execute.
  • REJECTED: The user negated.
  • EXPIRED: The TTL turn limit was reached before the user confirmed.
ActionLifecycleStep.execute()
file: src/main/java/com/github/salilvnair/convengine/engine/steps/ActionLifecycleStep.java
JAVA
public StepResult execute(EngineSession session) {
if (!flowConfig.getActionLifecycle().isEnabled()) {
return new StepResult.Continue();
}

ObjectNode root = contextHelper.readRoot(session);
ObjectNode runtime = contextHelper.ensureObject(root, RUNTIME_NODE);
int currentTurn = session.conversionHistory().size() + 1;
long now = Instant.now().toEpochMilli();

PendingActionStatus currentStatus = PendingActionStatus.from(runtime.path("status").asText(null), null);
if (isExpired(runtime, currentTurn, now) && (currentStatus == PendingActionStatus.OPEN || currentStatus == PendingActionStatus.IN_PROGRESS)) {
runtime.put("status", PendingActionStatus.EXPIRED.name());
runtime.put("expired_turn", currentTurn);
runtime.put("expired_at_epoch_ms", now);
session.putInputParam(ConvEngineInputParamKey.PENDING_ACTION_RUNTIME_STATUS, PendingActionStatus.EXPIRED.name());
audit.audit(ConvEngineAuditStage.PENDING_ACTION_LIFECYCLE, session.getConversationId(), mapOf(
"event", "EXPIRED",
"status", PendingActionStatus.EXPIRED.name(),
"turn", currentTurn
));
}

String actionKey = resolveActionKey(session);
String actionRef = resolveActionReferenceFromTable(session, actionKey);
if (actionRef == null || actionRef.isBlank()) {
contextHelper.writeRoot(session, root);
return new StepResult.Continue();
}

boolean isNewRuntime = isRuntimeNew(runtime, actionKey, actionRef);
if (isNewRuntime) {
runtime.put("action_key", actionKey == null ? "" : actionKey);
runtime.put("action_ref", actionRef);
runtime.put("status", PendingActionStatus.OPEN.name());
runtime.put("created_turn", currentTurn);
runtime.put("created_at_epoch_ms", now);
runtime.put("expires_turn", flowConfig.getActionLifecycle().getTtlTurns() > 0
? currentTurn + flowConfig.getActionLifecycle().getTtlTurns()
: -1);
runtime.put("expires_at_epoch_ms", flowConfig.getActionLifecycle().getTtlMinutes() > 0
? now + (flowConfig.getActionLifecycle().getTtlMinutes() * 60_000L)
: -1);
session.putInputParam(ConvEngineInputParamKey.PENDING_ACTION_RUNTIME_STATUS, PendingActionStatus.OPEN.name());
audit.audit(ConvEngineAuditStage.PENDING_ACTION_LIFECYCLE, session.getConversationId(), mapOf(
"event", "OPEN",
"status", PendingActionStatus.OPEN.name(),
"actionKey", actionKey,
"actionRef", actionRef
));
}

InteractionPolicyDecision decision = parseDecision(session.inputParamAsString(ConvEngineInputParamKey.POLICY_DECISION));
if (decision == InteractionPolicyDecision.EXECUTE_PENDING_ACTION) {
runtime.put("status", PendingActionStatus.IN_PROGRESS.name());
runtime.put("in_progress_turn", currentTurn);
runtime.put("in_progress_at_epoch_ms", now);
session.putInputParam(ConvEngineInputParamKey.PENDING_ACTION_RUNTIME_STATUS, PendingActionStatus.IN_PROGRESS.name());
audit.audit(ConvEngineAuditStage.PENDING_ACTION_LIFECYCLE, session.getConversationId(), mapOf(
"event", "IN_PROGRESS",
"status", PendingActionStatus.IN_PROGRESS.name(),
"actionKey", actionKey,
"actionRef", actionRef
));
} else if (decision == InteractionPolicyDecision.REJECT_PENDING_ACTION) {
runtime.put("status", PendingActionStatus.REJECTED.name());
runtime.put("rejected_turn", currentTurn);
runtime.put("rejected_at_epoch_ms", now);
session.putInputParam(ConvEngineInputParamKey.PENDING_ACTION_RUNTIME_STATUS, PendingActionStatus.REJECTED.name());
audit.audit(ConvEngineAuditStage.PENDING_ACTION_LIFECYCLE, session.getConversationId(), mapOf(
"event", "REJECTED",
"status", PendingActionStatus.REJECTED.name(),
"actionKey", actionKey,
"actionRef", actionRef
));
}

contextHelper.writeRoot(session, root);
return new StepResult.Continue();
}

10DisambiguationStep

Responsibility: Ask question when multiple actions fit
Session Mutations: pending_clarification question/context
Config/Table Dependencies: ce_pending_action, ce_config, ce_audit

Detailed Execution Logic

A smart conversational router. If multiple pending actions apply to the current context (e.g., "Cancel flight" vs "Cancel hotel" both valid), it pauses the pipeline.

It dynamically builds a multiple-choice prompt (or LLM synthesis) asking the user to clarify which action they meant. It emits an ASSISTANT_OUTPUT step, stalling the pipeline until the user clarifies.

DisambiguationStep.execute()
file: src/main/java/com/github/salilvnair/convengine/engine/steps/DisambiguationStep.java
JAVA
public StepResult execute(EngineSession session) {
if (!flowConfig.getDisambiguation().isEnabled()) {
return new StepResult.Continue();
}
InteractionPolicyDecision decision = parseDecision(session.inputParamAsString(ConvEngineInputParamKey.POLICY_DECISION));
if (decision != InteractionPolicyDecision.EXECUTE_PENDING_ACTION) {
return new StepResult.Continue();
}

String explicitActionKey = session.inputParamAsString(ConvEngineInputParamKey.PENDING_ACTION_KEY);
if (explicitActionKey != null && !explicitActionKey.isBlank()) {
return new StepResult.Continue();
}

List<CePendingAction> candidates = pendingActionRepository.findEligibleByIntentAndStateOrderByPriorityAsc(
session.getIntent(),
session.getState()
);
if (candidates == null || candidates.size() <= 1) {
return new StepResult.Continue();
}

int bestPriority = candidates.getFirst().getPriority() == null ? Integer.MAX_VALUE : candidates.getFirst().getPriority();
List<CePendingAction> top = candidates.stream()
.filter(c -> (c.getPriority() == null ? Integer.MAX_VALUE : c.getPriority()) == bestPriority)
.toList();
if (top.size() <= 1) {
return new StepResult.Continue();
}

Set<String> options = new LinkedHashSet<>();
for (CePendingAction row : top) {
if (row.getActionKey() == null || row.getActionKey().isBlank()) {
continue;
}
String option = row.getActionKey().trim();
if (row.getDescription() != null && !row.getDescription().isBlank()) {
option = option + " (" + row.getDescription().trim() + ")";
}
options.add(option);
if (options.size() >= Math.max(1, flowConfig.getDisambiguation().getMaxOptions())) {
break;
}
}
if (options.isEmpty()) {
return new StepResult.Continue();
}

QuestionResult questionResult = buildQuestion(session, top, options);
String question = questionResult.question();
session.setPendingClarificationQuestion(question);
session.setPendingClarificationReason("PENDING_ACTION_DISAMBIGUATION");
session.putInputParam(ConvEngineInputParamKey.POLICY_DECISION, InteractionPolicyDecision.RECLASSIFY_INTENT.name());
session.putInputParam(ConvEngineInputParamKey.PENDING_ACTION_DISAMBIGUATION_REQUIRED, true);

Map<String, Object> payload = new LinkedHashMap<>();
payload.put(ConvEnginePayloadKey.REASON, "MULTIPLE_PENDING_ACTIONS");
payload.put(ConvEnginePayloadKey.QUESTION, question);
payload.put(ConvEnginePayloadKey.CANDIDATE_COUNT, top.size());
payload.put(ConvEnginePayloadKey.OPTIONS, new ArrayList<>(options));
payload.put(ConvEnginePayloadKey.INTENT, session.getIntent());
payload.put(ConvEnginePayloadKey.STATE, session.getState());
payload.put(ConvEnginePayloadKey.QUESTION_SOURCE, questionResult.source());
audit.audit(ConvEngineAuditStage.DISAMBIGUATION_REQUIRED, session.getConversationId(), payload);
return new StepResult.Continue();
}

11GuardrailStep

Responsibility: Apply guardrails and approval rules
Session Mutations: guardrail flags/sanitized text
Config/Table Dependencies: ce_config, ce_audit

Detailed Execution Logic

The last line of defense before intent triggers. Reads the ce_config guardrail thresholds and sanitize instructions. If a command is flagged as "sensitive" (e.g., destructive actions like closing an account), it can force an explicit SENSITIVE_ACTION_APPROVAL_REQUIRED pause, blocking the pipeline from executing tasks until MFA or explicit user verification is acquired.

GuardrailStep.execute()
file: src/main/java/com/github/salilvnair/convengine/engine/steps/GuardrailStep.java
JAVA
public StepResult execute(EngineSession session) {
if (!flowConfig.getGuardrail().isEnabled()) {
return new StepResult.Continue();
}

String originalUserText = session.getUserText() == null ? "" : session.getUserText();
String sanitizedUserText = sanitize(originalUserText);
if (flowConfig.getGuardrail().isSanitizeInput()) {
session.putInputParam(ConvEngineInputParamKey.SANITIZED_USER_TEXT, sanitizedUserText);
}

boolean sensitive = matchesSensitivePattern(sanitizedUserText);
boolean approvalRequired = flowConfig.getGuardrail().isRequireApprovalForSensitiveActions() && sensitive;
boolean approvalGranted = isApprovalGranted(session);
boolean failClosed = flowConfig.getGuardrail().isApprovalGateFailClosed();
boolean denied = approvalRequired && (!approvalGranted || failClosed && !approvalGranted);

if (denied) {
session.putInputParam(ConvEngineInputParamKey.GUARDRAIL_BLOCKED, true);
session.putInputParam(ConvEngineInputParamKey.GUARDRAIL_REASON, "SENSITIVE_ACTION_APPROVAL_REQUIRED");
session.putInputParam(ConvEngineInputParamKey.POLICY_DECISION, InteractionPolicyDecision.RECLASSIFY_INTENT.name());
session.putInputParam(ConvEngineInputParamKey.SKIP_TOOL_EXECUTION, true);
session.putInputParam(ConvEngineInputParamKey.SKIP_PENDING_ACTION_EXECUTION, true);

Map<String, Object> payload = new LinkedHashMap<>();
payload.put("result", "DENY");
payload.put("reason", "SENSITIVE_ACTION_APPROVAL_REQUIRED");
payload.put("sensitive", true);
payload.put("approvalGranted", approvalGranted);
payload.put("userText", sanitizedUserText);
payload.put("intent", session.getIntent());
payload.put("state", session.getState());
audit.audit(ConvEngineAuditStage.GUARDRAIL_DENY, session.getConversationId(), payload);
return new StepResult.Continue();
}

session.putInputParam(ConvEngineInputParamKey.GUARDRAIL_BLOCKED, false);
Map<String, Object> payload = new LinkedHashMap<>();
payload.put("result", "ALLOW");
payload.put("sensitive", sensitive);
payload.put("approvalRequired", approvalRequired);
payload.put("approvalGranted", approvalGranted);
payload.put("intent", session.getIntent());
payload.put("state", session.getState());
audit.audit(ConvEngineAuditStage.GUARDRAIL_ALLOW, session.getConversationId(), payload);
return new StepResult.Continue();
}

12IntentResolutionStep

Responsibility: Resolve intent with classifier+agent
Session Mutations: intent/state/clarification fields
Config/Table Dependencies: ce_intent, ce_intent_classifier, ce_config

Detailed Execution Logic

The primary intent matching gateway. Uses the CompositeIntentResolver (which merges Regex, Semantic Search, and LLM classifiers based off ce_intent_classifier).

If the interaction policy decided we are in FILL_PENDING_SLOT mode, this step is bypassed entirely (referred to as a "Locked Intent").

Otherwise:

  • Queries ce_intent_classifier for matches.
  • Uses INTENT_RESOLVED audit logs to map the intentCode.
  • Sets the context state to IDLE (or whatever the initial configuration demands).
IntentResolutionStep.execute()
file: src/main/java/com/github/salilvnair/convengine/engine/steps/IntentResolutionStep.java
JAVA
public StepResult execute(EngineSession session) {

String previousIntent = session.getIntent();

Map<String, Object> startPayload = new LinkedHashMap<>();
startPayload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.PREVIOUS_INTENT, previousIntent);
startPayload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.INTENT_LOCKED, session.isIntentLocked());
startPayload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.INTENT_LOCK_REASON, session.getIntentLockReason());
audit.audit(ConvEngineAuditStage.INTENT_RESOLVE_START, session.getConversationId(), startPayload);

if (session.isIntentLocked() || isActiveSchemaCollection(session)) {
if (!session.isIntentLocked()) {
session.lockIntent("SCHEMA_INCOMPLETE");
}
session.clearClarification();
if (session.getConversation() != null) {
session.getConversation().setIntentCode(session.getIntent());
session.getConversation().setStateCode(session.getState());
}
Map<String, Object> payload = new LinkedHashMap<>();
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.INTENT, session.getIntent());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.STATE, session.getState());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.INTENT_LOCKED, session.isIntentLocked());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.INTENT_LOCK_REASON, session.getIntentLockReason());
audit.audit(ConvEngineAuditStage.INTENT_RESOLVE_SKIPPED_SCHEMA_COLLECTION, session.getConversationId(), payload);
return new StepResult.Continue();
}

if (shouldSkipResolutionForPolicy(session)) {
if (session.getConversation() != null) {
session.getConversation().setIntentCode(session.getIntent());
session.getConversation().setStateCode(session.getState());
}
Map<String, Object> payload = new LinkedHashMap<>();
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.INTENT, session.getIntent());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.STATE, session.getState());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.DIALOGUE_ACT, session.inputParamAsString(ConvEngineInputParamKey.DIALOGUE_ACT));
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.POLICY_DECISION, session.inputParamAsString(ConvEngineInputParamKey.POLICY_DECISION));
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.SKIP_INTENT_RESOLUTION, true);
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.REASON, "policy decision retained existing intent/state");
audit.audit(ConvEngineAuditStage.INTENT_RESOLVE_SKIPPED_POLICY, session.getConversationId(), payload);
return new StepResult.Continue();
}

if (shouldSkipResolutionForStickyIntent(session)) {
if (session.getConversation() != null) {
session.getConversation().setIntentCode(session.getIntent());
session.getConversation().setStateCode(session.getState());
}
Map<String, Object> payload = existingIntentRetainedAuditPayload(session);
audit.audit(ConvEngineAuditStage.INTENT_RESOLVE_SKIPPED_STICKY_INTENT, session.getConversationId(), payload);
return new StepResult.Continue();
}

CompositeIntentResolver.IntentResolutionResult result = intentResolver.resolveWithTrace(session);

if (result == null || result.resolvedIntent() == null) {
audit.audit(ConvEngineAuditStage.INTENT_RESOLVE_NO_CHANGE, session.getConversationId(), Map.of());
return new StepResult.Continue();
}

if (!result.resolvedIntent().equals(previousIntent)) {
session.setIntent(result.resolvedIntent());
}
session.getConversation().setIntentCode(session.getIntent());
session.getConversation().setStateCode(session.getState());

audit.audit(
ConvEngineAuditStage.intentResolvedBy(result.source().name()),
session.getConversationId(),
result
);

return new StepResult.Continue();
}

13ResetResolvedIntentStep

Responsibility: Reset on configured reset intent
Session Mutations: full reset
Config/Table Dependencies: ce_config RESET_INTENT_CODES

Detailed Execution Logic

A quality of life check. If the resolved intent matches one of the RESET_INTENT_CODES configured in Spring configuration (e.g. START_OVER, RESET), this step immediately executes a session wipe akin to ResetConversationStep, returning the conversation to a clean slate.

ResetResolvedIntentStep.execute()
file: src/main/java/com/github/salilvnair/convengine/engine/steps/ResetResolvedIntentStep.java
JAVA
public StepResult execute(EngineSession session) {
String intent = session.getIntent();
if (intent == null || !resetIntentCodes.contains(intent.trim().toUpperCase())) {
return new StepResult.Continue();
}

session.resetForConversationRestart();
session.getConversation().setStatus("RUNNING");
session.getConversation().setIntentCode("UNKNOWN");
session.getConversation().setStateCode("UNKNOWN");
session.getConversation().setContextJson("{}");
session.getConversation().setInputParamsJson("{}");
session.getConversation().setLastAssistantJson(null);
session.getConversation().setUpdatedAt(OffsetDateTime.now());
conversationRepository.save(session.getConversation());

Map<String, Object> payload = new LinkedHashMap<>();
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.REASON, "INTENT_RESOLVED_RESET");
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.MATCHED_RESET_INTENT_CODES, resetIntentCodes);
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.INTENT, session.getIntent());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.STATE, session.getState());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.CONTEXT, session.getContextJson());
audit.audit(ConvEngineAuditStage.CONVERSATION_RESET, session.getConversationId(), payload);

return new StepResult.Continue();
}

14FallbackIntentStateStep

Responsibility: Fill missing intent/state defaults
Session Mutations: intent/state
Config/Table Dependencies: none

Detailed Execution Logic

A safety net. If the classifier fails to return any confidence, or an exception occurred, this step forcibly binds the native engine defaults to UNKNOWN intent and UNKNOWN state so that ce_rule and ce_response tables can still define fallback messaging (e.g., "I didn't understand that").

FallbackIntentStateStep.execute()
file: src/main/java/com/github/salilvnair/convengine/engine/steps/FallbackIntentStateStep.java
JAVA
public StepResult execute(EngineSession session) {

if (session.getIntent() == null) {
Map<String, Object> payload = new LinkedHashMap<>();
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.INTENT, session.getIntent());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.STATE, session.getState());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.CONTEXT, session.contextDict());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.USER_TEXT, session.getUserText());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.FALLBACK_INTENT, fallbackIntent);
audit.audit(ConvEngineAuditStage.INTENT_MISSING, session.getConversationId(), payload);
session.setIntent(fallbackIntent);
}

if (session.getState() == null) {
Map<String, Object> payload = new LinkedHashMap<>();
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.INTENT, session.getIntent());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.STATE, session.getState());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.CONTEXT, session.contextDict());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.USER_TEXT, session.getUserText());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.FALLBACK_STATE, fallbackState);
audit.audit(ConvEngineAuditStage.STATE_MISSING, session.getConversationId(), payload);
session.setState(fallbackState);
}

session.getConversation().setIntentCode(session.getIntent());
session.getConversation().setStateCode(session.getState());
return new StepResult.Continue();
}

15AddContainerDataStep

Responsibility: Fetch and attach container data
Session Mutations: containerData/context merge
Config/Table Dependencies: ce_container_config

Detailed Execution Logic

Bridges static tenant/consumer configurations. Evaluates ce_container_config to pull any global JSON context relevant to the intent and merges it directly into session.contextJson. This allows things like "Store Hours" or "Region Policies" to be globally attached to all LLM contexts without hardcoding.

AddContainerDataStep.execute()
file: src/main/java/com/github/salilvnair/convengine/engine/steps/AddContainerDataStep.java
JAVA
public StepResult execute(EngineSession session) {

List<CeContainerConfig> configs =
containerConfigRepo.findByIntentAndState(
session.getIntent(),
session.getState()
);

if (configs.isEmpty()) {
configs = containerConfigRepo.findFallbackByState(session.getState());
}

if (configs.isEmpty()) {
configs = containerConfigRepo.findGlobalFallback();
}

if (configs.isEmpty()) {
Map<String, Object> reasonMap = new HashMap<>();
reasonMap.put("reason", "no container configs for intent/state");
reasonMap.put("intent", session.getIntent());
reasonMap.put("state", session.getState());
audit.audit(
"CONTAINER_DATA_SKIPPED",
session.getConversationId(),
reasonMap
);
return new StepResult.Continue();
}

ObjectNode containerRoot = mapper.createObjectNode();

for (CeContainerConfig cfg : configs) {

try {
Map<String, Object> inputParams = new HashMap<>();
String key = cfg.getInputParamName();
Object value = session.extractValueFromContext(key);
if(value == null) {
value = session.getUserText();
}
inputParams.put(key, value);
if (session.getInputParams() != null) {
inputParams.putAll(session.getInputParams());
}
if (session.getEngineContext().getInputParams() != null) {
inputParams.putAll(session.getEngineContext().getInputParams());
}

PageInfoRequest pageInfo = PageInfoRequest.builder()
.userId("convengine")
.loggedInUserId("convengine")
.pageId(cfg.getPageId())
.sectionId(cfg.getSectionId())
.containerId(cfg.getContainerId())
.inputParams(inputParams)
.build();

ContainerComponentRequest req = new ContainerComponentRequest();
req.setPageInfo(List.of(pageInfo));
req.setRequestTypes(List.of(RequestType.CONTAINER));
interceptorExecutor.beforeExecute(req, session);
ContainerComponentResponse resp = ccfCoreService.execute(req);
resp = interceptorExecutor.afterExecute(resp, session);
// find classes with @ContainerDataTransformer(state, intent) to transform resp if needed
Map<String, Object> transformedData = transformerService.transformIfApplicable(resp, session, inputParams);
JsonNode responseNode = transformedData == null ? mapper.valueToTree(resp) : mapper.valueToTree(transformedData);
session.setContainerData(responseNode);
containerRoot.set(cfg.getInputParamName(), responseNode);
Map<String, Object> jsonMap = Map.of(
"containerId", cfg.getContainerId(),
"pageId", cfg.getPageId(),
"sectionId", cfg.getSectionId(),
"inputParam", cfg.getInputParamName(),
"requestInput", inputParams,
"response", responseNode
);
audit.audit(
"CONTAINER_DATA_EXECUTED",
session.getConversationId(),
jsonMap
);

} catch (Exception e) {
Map<String, Object> errorJsonMap = new HashMap<>();
errorJsonMap.put("containerId", cfg.getContainerId());
errorJsonMap.put("error", e.getMessage());
audit.audit(
"CONTAINER_DATA_FAILED",
session.getConversationId(),
errorJsonMap
);
}
}

if (!containerRoot.isEmpty()) {

// attach to session
session.setContainerDataJson(containerRoot.toString());
session.setHasContainerData(true);

// merge into conversation context
try {
ObjectNode ctx = (ObjectNode) mapper.readTree(session.getContextJson());
ctx.set("container_data", containerRoot);
session.setContextJson(mapper.writeValueAsString(ctx));
session.getConversation().setContextJson(session.getContextJson());
} catch (Exception ignore) {
// context merge failure should not break pipeline
}

audit.audit(
"CONTAINER_DATA_ATTACHED",
session.getConversationId(),
containerRoot.toString()
);
}

return new StepResult.Continue();
}

16PendingActionStep

Responsibility: Execute/reject pending action task
Session Mutations: pending_action_runtime status/result
Config/Table Dependencies: ce_pending_action, CeTaskExecutor, ce_audit

Detailed Execution Logic

Executes Java code. If the InteractionPolicy is EXECUTE_PENDING_ACTION and the status is IN_PROGRESS, this step resolves the Spring Bean ID attached to the ce_pending_action row.

It invokes CeTaskExecutor.execute(), runs the backend transaction (e.g. Stripe Refund), and captures the boolean/json result back into the engine EngineSession context for downstream rules to evaluate.

PendingActionStep.execute()
file: src/main/java/com/github/salilvnair/convengine/engine/steps/PendingActionStep.java
JAVA
public StepResult execute(EngineSession session) {
if (Boolean.TRUE.equals(session.getInputParams().get(ConvEngineInputParamKey.SKIP_PENDING_ACTION_EXECUTION))
|| Boolean.TRUE.equals(session.getInputParams().get(ConvEngineInputParamKey.GUARDRAIL_BLOCKED))) {
Map<String, Object> payload = basePayload(session, InteractionPolicyDecision.RECLASSIFY_INTENT, null);
payload.put(ConvEnginePayloadKey.REASON, "pending action skipped by guardrail");
audit.audit(ConvEngineAuditStage.PENDING_ACTION_SKIPPED, session.getConversationId(), payload);
return new StepResult.Continue();
}

String decisionRaw = session.inputParamAsString(ConvEngineInputParamKey.POLICY_DECISION);
InteractionPolicyDecision decision = parseDecision(decisionRaw);
if (decision != InteractionPolicyDecision.EXECUTE_PENDING_ACTION
&& decision != InteractionPolicyDecision.REJECT_PENDING_ACTION) {
return new StepResult.Continue();
}

Map<String, Object> context = session.contextDict();
Object pendingAction = context.get("pending_action");
if (pendingAction == null) {
pendingAction = context.get("pendingAction");
}

String actionKey = resolveActionKey(session, context, pendingAction);
String actionRef = resolveActionReference(session, pendingAction, actionKey);
if (actionRef == null || actionRef.isBlank()) {
Map<String, Object> payload = basePayload(session, decision, null);
payload.put("actionKey", actionKey);
payload.put(ConvEnginePayloadKey.REASON, actionKey == null || actionKey.isBlank()
? "pending action reference not found or ambiguous registry mapping"
: "pending action reference not found");
audit.audit(ConvEngineAuditStage.PENDING_ACTION_SKIPPED, session.getConversationId(), payload);
return new StepResult.Continue();
}

if (decision == InteractionPolicyDecision.REJECT_PENDING_ACTION) {
session.putInputParam(ConvEngineInputParamKey.PENDING_ACTION_RESULT, "REJECTED");
updateRuntimeStatus(session, PendingActionStatus.REJECTED);
Map<String, Object> payload = basePayload(session, decision, actionRef);
payload.put(ConvEnginePayloadKey.PENDING_ACTION_RESULT, "REJECTED");
audit.audit(ConvEngineAuditStage.PENDING_ACTION_REJECTED, session.getConversationId(), payload);
return new StepResult.Continue();
}

String[] taskRef = parseTaskReference(actionRef);
if (taskRef == null) {
session.putInputParam(ConvEngineInputParamKey.PENDING_ACTION_RESULT, "FAILED");
Map<String, Object> payload = basePayload(session, decision, actionRef);
payload.put(ConvEnginePayloadKey.PENDING_ACTION_RESULT, "FAILED");
payload.put(ConvEnginePayloadKey.REASON, "invalid pending action reference");
audit.audit(ConvEngineAuditStage.PENDING_ACTION_FAILED, session.getConversationId(), payload);
return new StepResult.Continue();
}

Object executionResult = ceTaskExecutor.execute(taskRef[0], taskRef[1], session);
if (executionResult == null) {
session.putInputParam(ConvEngineInputParamKey.PENDING_ACTION_RESULT, "FAILED");
Map<String, Object> payload = basePayload(session, decision, actionRef);
payload.put(ConvEnginePayloadKey.PENDING_ACTION_RESULT, "FAILED");
payload.put(ConvEnginePayloadKey.REASON, "task execution returned null");
audit.audit(ConvEngineAuditStage.PENDING_ACTION_FAILED, session.getConversationId(), payload);
return new StepResult.Continue();
}

session.putInputParam(ConvEngineInputParamKey.PENDING_ACTION_RESULT, "EXECUTED");
updateRuntimeStatus(session, PendingActionStatus.EXECUTED);
Map<String, Object> payload = basePayload(session, decision, actionRef);
payload.put(ConvEnginePayloadKey.PENDING_ACTION_RESULT, "EXECUTED");
payload.put("taskBean", taskRef[0]);
payload.put("taskMethods", taskRef[1]);
audit.audit(ConvEngineAuditStage.PENDING_ACTION_EXECUTED, session.getConversationId(), payload);

return new StepResult.Continue();
}

17ToolOrchestrationStep

Responsibility: Run tool_group based orchestration
Session Mutations: tool_request/tool_result fields
Config/Table Dependencies: ce_tool, ce_mcp_tool, ce_audit

Detailed Execution Logic

The gateway for Model Context Protocol (MCP) tooling. If ce_tool specifies that this intent requires a tool_group, this step binds the request and delegates to an external executor. It pauses the LLM, executes the backend SQL or REST fetch, and dumps the massive JSON result into tool_result dictionary in context.

ToolOrchestrationStep.execute()
file: src/main/java/com/github/salilvnair/convengine/engine/steps/ToolOrchestrationStep.java
JAVA
public StepResult execute(EngineSession session) {
if (!flowConfig.getToolOrchestration().isEnabled()) {
return new StepResult.Continue();
}
if (Boolean.TRUE.equals(session.getInputParams().get(ConvEngineInputParamKey.SKIP_TOOL_EXECUTION))) {
return new StepResult.Continue();
}

ToolRequest request = resolveRequest(session);
if (request == null) {
return new StepResult.Continue();
}

session.putInputParam(ConvEngineInputParamKey.TOOL_REQUEST, request.toMap());
audit.audit(ConvEngineAuditStage.TOOL_ORCHESTRATION_REQUEST, session.getConversationId(), request.toMap());

try {
CeMcpTool tool = request.toolCode() == null || request.toolCode().isBlank()
? null
: registry.requireTool(request.toolCode(), session.getIntent(), session.getState());
String group = request.toolGroup();
if ((group == null || group.isBlank()) && tool != null) {
group = registry.normalizeToolGroup(tool.getToolGroup());
}
if (group == null || group.isBlank()) {
throw new IllegalStateException("tool_group is required when tool_code is not resolvable");
}

McpToolExecutor executor = resolveExecutor(group);
String resultJson = executor.execute(tool, request.args(), session);

Map<String, Object> result = new LinkedHashMap<>();
result.put("status", "SUCCESS");
result.put("tool_code", request.toolCode());
result.put("tool_group", group);
result.put("result", parseJsonOrString(resultJson));
session.putInputParam(ConvEngineInputParamKey.TOOL_RESULT, result);
session.putInputParam(ConvEngineInputParamKey.TOOL_STATUS, "SUCCESS");
audit.audit(ConvEngineAuditStage.TOOL_ORCHESTRATION_RESULT, session.getConversationId(), result);

rulesStep.applyRules(session, "ToolOrchestrationStep PostTool", RulePhase.POST_TOOL_EXECUTION.name());
} catch (IllegalStateException e) {
if (e.getMessage() != null && e.getMessage().contains("Missing enabled MCP tool for current intent/state")) {
Map<String, Object> result = new LinkedHashMap<>();
result.put("status", "SKIPPED_SCOPE_MISMATCH");
result.put("tool_code", request.toolCode());
result.put("tool_group", request.toolGroup());
result.put("intent", session.getIntent());
result.put("state", session.getState());
session.putInputParam(ConvEngineInputParamKey.TOOL_RESULT, result);
session.putInputParam(ConvEngineInputParamKey.TOOL_STATUS, "SKIPPED_SCOPE_MISMATCH");
audit.audit(ConvEngineAuditStage.TOOL_ORCHESTRATION_RESULT, session.getConversationId(), result);
return new StepResult.Continue();
}
Map<String, Object> result = new LinkedHashMap<>();
result.put("status", "ERROR");
result.put("tool_code", request.toolCode());
result.put("tool_group", request.toolGroup());
result.put("error", String.valueOf(e.getMessage()));
session.putInputParam(ConvEngineInputParamKey.TOOL_RESULT, result);
session.putInputParam(ConvEngineInputParamKey.TOOL_STATUS, "ERROR");
audit.audit(ConvEngineAuditStage.TOOL_ORCHESTRATION_ERROR, session.getConversationId(), result);
} catch (Exception e) {
Map<String, Object> result = new LinkedHashMap<>();
result.put("status", "ERROR");
result.put("tool_code", request.toolCode());
result.put("tool_group", request.toolGroup());
result.put("error", String.valueOf(e.getMessage()));
session.putInputParam(ConvEngineInputParamKey.TOOL_RESULT, result);
session.putInputParam(ConvEngineInputParamKey.TOOL_STATUS, "ERROR");
audit.audit(ConvEngineAuditStage.TOOL_ORCHESTRATION_ERROR, session.getConversationId(), result);
}
return new StepResult.Continue();
}

18McpToolStep

Responsibility: MCP planner/tool loop
Session Mutations: context_json.mcp.*
Config/Table Dependencies: ce_mcp_tool, ce_mcp_db_tool, ce_mcp_planner (fallback ce_config)

Detailed Execution Logic

Specifically iterates over ce_mcp_tool bindings. Instead of static grouped tools, this triggers an agent planner that interprets the input, selects an MCP tool, writes the payload, and executes it. This is the core of dynamic tool use in ConvEngine V2.

McpToolStep.execute()
file: src/main/java/com/github/salilvnair/convengine/engine/steps/McpToolStep.java
JAVA
public StepResult execute(EngineSession session) {
if (Boolean.TRUE.equals(session.getInputParams().get(ConvEngineInputParamKey.SKIP_TOOL_EXECUTION))
|| Boolean.TRUE.equals(session.getInputParams().get(ConvEngineInputParamKey.GUARDRAIL_BLOCKED))) {
session.putInputParam(ConvEngineInputParamKey.MCP_STATUS, "SKIPPED_BY_GUARDRAIL");
return new StepResult.Continue();
}

if (session.hasPendingClarification()) {
audit.audit(
ConvEngineAuditStage.MCP_SKIPPED_PENDING_CLARIFICATION,
session.getConversationId(),
mapOf(
"intent", session.getIntent(),
"state", session.getState()
)
);
return new StepResult.Continue();
}

List<CeMcpTool> tools = registry.listEnabledTools(session.getIntent(), session.getState());

if (CollectionUtils.isEmpty(tools)) {
session.putInputParam(ConvEngineInputParamKey.MCP_STATUS, "NO_TOOLS_FOR_SCOPE");
audit.audit(ConvEngineAuditStage.MCP_NO_TOOLS_AVAILABLE, session.getConversationId(),
mapOf("intent", session.getIntent(), "state", session.getState()));
return new StepResult.Continue();
}

clearMcpContext(session);
List<McpObservation> observations = readObservationsFromContext(session);
boolean mcpTouched = false;

for (int i = 0; i < MAX_LOOPS; i++) {

McpPlan plan = planner.plan(session, tools, observations);
mcpTouched = true;
session.putInputParam(ConvEngineInputParamKey.MCP_ACTION, plan.action());
session.putInputParam(ConvEngineInputParamKey.MCP_TOOL_CODE, plan.tool_code());
session.putInputParam(ConvEngineInputParamKey.MCP_TOOL_ARGS, plan.args() == null ? Map.of() : plan.args());

if ("ANSWER".equalsIgnoreCase(plan.action())) {
// store final answer in contextJson; your ResponseResolutionStep can use it via derivation_hint
writeFinalAnswerToContext(session, plan.answer());
session.putInputParam(ConvEngineInputParamKey.MCP_FINAL_ANSWER, plan.answer() == null ? "" : plan.answer());
session.putInputParam(ConvEngineInputParamKey.MCP_STATUS, "ANSWER");
audit.audit(
ConvEngineAuditStage.MCP_FINAL_ANSWER,
session.getConversationId(),
mapOf("answer", plan.answer())
);
break;
}

if (!"CALL_TOOL".equalsIgnoreCase(plan.action())) {
writeFinalAnswerToContext(session, "I couldn't decide the next tool step safely.");
session.putInputParam(ConvEngineInputParamKey.MCP_FINAL_ANSWER, "I couldn't decide the next tool step safely.");
session.putInputParam(ConvEngineInputParamKey.MCP_STATUS, "FALLBACK");
break;
}

String toolCode = plan.tool_code();
Map<String, Object> args = (plan.args() == null) ? Map.of() : plan.args();

audit.audit(
ConvEngineAuditStage.MCP_TOOL_CALL,
session.getConversationId(),
mapOf("tool_code", toolCode, "args", args)
);

CeMcpTool tool = registry.requireTool(toolCode, session.getIntent(), session.getState());
String toolGroup = registry.normalizeToolGroup(tool.getToolGroup());
session.putInputParam(ConvEngineInputParamKey.MCP_TOOL_GROUP, toolGroup);

try {
McpToolExecutor executor = resolveExecutor(toolGroup);
String rowsJson = executor.execute(tool, args, session);

observations.add(new McpObservation(toolCode, rowsJson));
writeObservationsToContext(session, observations);
session.putInputParam(ConvEngineInputParamKey.MCP_OBSERVATIONS, observations);
session.putInputParam(ConvEngineInputParamKey.MCP_STATUS, "TOOL_RESULT");

audit.audit(
ConvEngineAuditStage.MCP_TOOL_RESULT,
session.getConversationId(),
mapOf("tool_code", toolCode, "tool_group", toolGroup, "rows", rowsJson)
);

} catch (Exception e) {
audit.audit(
ConvEngineAuditStage.MCP_TOOL_ERROR,
session.getConversationId(),
mapOf("tool_code", toolCode, "tool_group", toolGroup, "error", String.valueOf(e.getMessage()))
);
writeFinalAnswerToContext(session, "Tool execution failed safely. Can you narrow the request?");
session.putInputParam(ConvEngineInputParamKey.MCP_FINAL_ANSWER, "Tool execution failed safely. Can you narrow the request?");
session.putInputParam(ConvEngineInputParamKey.MCP_STATUS, "TOOL_ERROR");
break;
}
}

if (mcpTouched) {
rulesStep.applyRules(session, "McpToolStep", RulePhase.POST_AGENT_MCP.name());
}

session.syncToConversation();
return new StepResult.Continue();
}

19SchemaExtractionStep

Responsibility: Schema-driven extraction and lock handling
Session Mutations: schema facts/context/lock
Config/Table Dependencies: ce_output_schema, ce_prompt_template

Detailed Execution Logic

Evaluates ce_output_schema. It injects the missing required slots into an LLM extracting prompt using ce_prompt_template. The LLM returns a structured JSON map. This step merges it with session.contextJson.

Prompt-template usage details:

  • selects ce_prompt_template with response_type=SCHEMA_JSON for the same intent_code + state_code
  • interaction_mode=COLLECT is the recommended semantic marker for these templates
  • interaction_contract can declare expects:["structured_input"] so the template contract remains explicit in configuration
  • after merge, POST_SCHEMA_EXTRACTION rules can move the state or set runtime flags

It then runs missingFieldEvaluator.evaluate(). If fields are missing, it sets session.setSchemaLocked(true).

SchemaExtractionStep.execute()
file: src/main/java/com/github/salilvnair/convengine/engine/steps/SchemaExtractionStep.java
JAVA
public StepResult execute(EngineSession session) {

String intent = session.getIntent();
String state = session.getState();

CeOutputSchema schema = outputSchemaRepo.findAll().stream()
.filter(s -> Boolean.TRUE.equals(s.getEnabled()))
.filter(s -> equalsIgnoreCase(s.getIntentCode(), intent))
.filter(s -> equalsIgnoreCase(s.getStateCode(), state) || equalsIgnoreCase(s.getStateCode(), "ANY"))
.min((a, b) -> Integer.compare(priorityOf(a), priorityOf(b)))
.orElse(null);

if (schema != null) {
runExtraction(session, schema);
} else {
session.unlockIntent();
session.setResolvedSchema(null);
session.setSchemaComplete(false);
session.setSchemaHasAnyValue(false);
session.setMissingRequiredFields(new ArrayList<>());
session.setMissingFieldOptions(new LinkedHashMap<>());
session.addPromptTemplateVars();
}

session.syncFromConversation(true);
return new StepResult.Continue();
}

20AutoAdvanceStep

Responsibility: Compute schema status facts
Session Mutations: schemaComplete/hasAny
Config/Table Dependencies: resolved schema + context

Detailed Execution Logic

In V1, rules had to manually check if schema extraction was done. In V2, this step computes the boolean flags schemaComplete and hasAny and binds them to the session context. This allows ce_rule to simply trigger on schemaComplete == true.

AutoAdvanceStep.execute()
file: src/main/java/com/github/salilvnair/convengine/engine/steps/AutoAdvanceStep.java
JAVA
public StepResult execute(EngineSession session) {

if (session.getResolvedSchema() == null) {
Map<String, Object> payload = new LinkedHashMap<>();
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.REASON, "no schema resolved");
audit.audit(ConvEngineAuditStage.AUTO_ADVANCE_SKIPPED_NO_SCHEMA, session.getConversationId(), payload);
return new StepResult.Continue();
}

String schemaJson = session.getResolvedSchema().getJsonSchema();
String contextJson = session.getContextJson();

boolean hasAnySchemaValue = JsonUtil.hasAnySchemaValue(contextJson, schemaJson);
boolean schemaComplete = JsonUtil.isSchemaComplete(schemaJson, contextJson);
session.setSchemaHasAnyValue(hasAnySchemaValue);
session.setSchemaComplete(schemaComplete);

Map<String, Object> payload = new LinkedHashMap<>();
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.SCHEMA_COMPLETE, schemaComplete);
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.HAS_ANY_SCHEMA_VALUE, hasAnySchemaValue);
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.INTENT, session.getIntent());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.STATE, session.getState());
audit.audit(ConvEngineAuditStage.AUTO_ADVANCE_FACTS, session.getConversationId(), payload);
return new StepResult.Continue();
}

21RulesStep

Responsibility: Match and apply transitions/actions
Session Mutations: intent/state/input params
Config/Table Dependencies: ce_rule

Detailed Execution Logic

The core state-machine driver. It queries ce_rule for the current Intent and State. It evaluates expressions (like JSON_PATH or REGEX) against the session.contextJson.

If a rule matches, it executes the target action (e.g. SET_STATE to CONFIRMATION, or SET_TASK). It loops until no more rules match, effectively "auto-advancing" state machine nodes.

RulesStep.execute()
file: src/main/java/com/github/salilvnair/convengine/engine/steps/RulesStep.java
JAVA
public StepResult execute(EngineSession session) {
applyRules(session, "RulesStep", RulePhase.PRE_RESPONSE_RESOLUTION.name());
session.syncToConversation();
return new StepResult.Continue();
}

22StateGraphStep

Responsibility: Validate state transition path
Session Mutations: state_graph_valid/reason
Config/Table Dependencies: ce_state_graph, ce_audit

Detailed Execution Logic

A strict validater. Checks ce_state_graph to see if the transition that just occurred in RulesStep was legally defined by the developer. If a rule jumped from IDLE to CANCELLED but there is no edge in the graph, this step logs an error and optionally reverts the state to prevent invalid transitions.

StateGraphStep.execute()
file: src/main/java/com/github/salilvnair/convengine/engine/steps/StateGraphStep.java
JAVA
public StepResult execute(EngineSession session) {
if (!flowConfig.getStateGraph().isEnabled()) {
return new StepResult.Continue();
}
String fromState = session.getConversation() == null ? null : session.getConversation().getStateCode();
String toState = session.getState();

if (fromState == null || fromState.isBlank() || toState == null || toState.isBlank()
|| fromState.equalsIgnoreCase(toState)) {
return new StepResult.Continue();
}

boolean allowed = isAllowedTransition(fromState, toState);
Map<String, Object> payload = new LinkedHashMap<>();
payload.put("fromState", fromState);
payload.put("toState", toState);
payload.put("intent", session.getIntent());
payload.put("validateOnly", true);

if (allowed) {
session.putInputParam(ConvEngineInputParamKey.STATE_GRAPH_VALID, true);
payload.put("allowed", true);
audit.audit(ConvEngineAuditStage.STATE_GRAPH_VALID, session.getConversationId(), payload);
return new StepResult.Continue();
}

payload.put("allowed", false);
payload.put("softBlock", flowConfig.getStateGraph().isSoftBlockOnViolation());
audit.audit(ConvEngineAuditStage.STATE_GRAPH_VIOLATION, session.getConversationId(), payload);
session.putInputParam(ConvEngineInputParamKey.STATE_GRAPH_VALID, false);
if (flowConfig.getStateGraph().isSoftBlockOnViolation()) {
session.putInputParam(ConvEngineInputParamKey.STATE_GRAPH_SOFT_BLOCK, true);
}
return new StepResult.Continue();
}

23ResponseResolutionStep

Responsibility: Resolve and generate output payload
Session Mutations: payload/last assistant json
Config/Table Dependencies: ce_response, ce_prompt_template

Detailed Execution Logic

The final output generator. Queries ce_response for the current intent and state.

  • If TEXT: Returns a hardcoded string.
  • If DERIVED: Loads ce_prompt_template, injects the contextJson, tool_result, and schema, and asks the LLM to write a fluid, contextual response to the user.
  • interaction_mode does not change template lookup, but it should describe the intended turn semantics for the selected state (CONFIRM, PROCESSING, FINAL, etc.)
  • interaction_contract is where consumers should declare capabilities such as retry on PROCESSING prompts or affirm/edit on CONFIRM prompts Sets session.getConversation().setLastAssistantJson() with the payload.
ResponseResolutionStep.execute()
file: src/main/java/com/github/salilvnair/convengine/engine/steps/ResponseResolutionStep.java
JAVA
public StepResult execute(EngineSession session) {

if(AgentIntentResolver.INTENT_COLLISION_STATE.equals(session.getState())) {
Map<String, Object> payload = new LinkedHashMap<>();
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.INTENT, session.getIntent());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.STATE, session.getState());
audit.audit(
ConvEngineAuditStage.INTENT_COLLISION_DETECTED,
session.getConversationId(),
payload
);
agentIntentCollisionResolver.resolve(session);
return new StepResult.Continue();
}

Optional<CeResponse> responseOptional = resolveResponse(session);

if(responseOptional.isEmpty()) {
Map<String, Object> payload = new LinkedHashMap<>();
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.INTENT, session.getIntent());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.STATE, session.getState());
audit.audit(
ConvEngineAuditStage.RESPONSE_MAPPING_NOT_FOUND,
session.getConversationId(),
payload
);
throw new ConversationEngineException(
ConversationEngineErrorCode.RESPONSE_MAPPING_NOT_FOUND,
"No response found for intent=" + session.getIntent() + ", state=" + session.getState()
);
}
CeResponse resp = responseOptional.get();
if (!matches(resp.getStateCode(), session.getState()) && !matches(resp.getStateCode(), "ANY")) {
session.setState(resp.getStateCode());
session.getConversation().setStateCode(resp.getStateCode());
}
Map<String, Object> responsePayload = new LinkedHashMap<>();
responsePayload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.RESPONSE_ID, resp.getResponseId());
responsePayload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.INTENT, session.getIntent());
responsePayload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.STATE, session.getState());
audit.audit(
ConvEngineAuditStage.RESOLVE_RESPONSE,
session.getConversationId(),
responsePayload
);
List<ConversationTurn> conversationTurns = historyProvider.lastTurns(session.getConversationId(), 10);
session.setConversationHistory(conversationTurns);



CePromptTemplate template = null;
if(ResponseType.DERIVED.name().equalsIgnoreCase(resp.getResponseType())) {
template = promptRepo.findAll().stream()
.filter(t -> Boolean.TRUE.equals(t.getEnabled()))
.filter(t -> resp.getOutputFormat().equalsIgnoreCase(t.getResponseType()))
.filter(t -> matchesOrNull(t.getIntentCode(), session.getIntent()))
.filter(t -> matchesOrNull(t.getStateCode(), session.getState()) || matches(t.getStateCode(), "ANY"))
.max(Comparator.comparingInt(t -> score(t, session)))
.orElseThrow(() ->
new IllegalStateException(
"No ce_prompt_template found for response_type=" +
resp.getOutputFormat() + ", intent=" + session.getIntent() + ", state=" + session.getState()
)
);
}
typeFactory
.get(resp.getResponseType())
.resolve(session, PromptTemplate.initFrom(template), ResponseTemplate.initFrom(resp));


OutputPayload transformedOutput = responseTransformerService.transformIfApplicable(session.getPayload(), session, session.getInputParams());
session.setPayload(transformedOutput);

Object payloadValue = switch (session.getPayload()) {
case TextPayload(String text) -> text;
case JsonPayload(String json) -> json;
case null -> null;
};

Map<String, Object> outputPayload = new LinkedHashMap<>();
outputPayload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.OUTPUT, payloadValue);
outputPayload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.OUTPUT_FORMAT, resp.getOutputFormat());
outputPayload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.RESPONSE_TYPE, resp.getResponseType());
outputPayload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.RESPONSE_ID, resp.getResponseId());
outputPayload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.INTENT, session.getIntent());
outputPayload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.STATE, session.getState());
outputPayload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.CONTEXT, session.contextDict());
outputPayload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.SCHEMA_JSON, session.schemaJson());
audit.audit(ConvEngineAuditStage.ASSISTANT_OUTPUT, session.getConversationId(), outputPayload);

return new StepResult.Continue();
}

24MemoryStep

Responsibility: Write memory/session summary
Session Mutations: memory.session_summary in context
Config/Table Dependencies: ce_memory, ce_audit

Detailed Execution Logic

Evaluates the rolling history. If ce_memory is configured, and recentTurns exceeds the threshold, this step fires off a summarization prompt to the LLM. It compresses the last N turns into a dense paragraph and saves it as memory.session_summary in the context JSON, enabling infinite-context retention without blowing up token limits.

MemoryStep.execute()
file: src/main/java/com/github/salilvnair/convengine/engine/steps/MemoryStep.java
JAVA
public StepResult execute(EngineSession session) {
if (!flowConfig.getMemory().isEnabled()) {
return new StepResult.Continue();
}

String recalled = null;
for (ConversationMemoryStore store : memoryStores) {
try {
String value = store.read(session);
if (value != null && !value.isBlank()) {
recalled = value;
break;
}
} catch (Exception ignored) {
}
}
if (recalled != null) {
session.putInputParam(ConvEngineInputParamKey.MEMORY_RECALL, recalled);
}

String summary = buildSummary(session);
if (summary.length() > flowConfig.getMemory().getSummaryMaxChars()) {
summary = summary.substring(0, flowConfig.getMemory().getSummaryMaxChars());
}
session.putInputParam(ConvEngineInputParamKey.MEMORY_SESSION_SUMMARY, summary);

ObjectNode root = contextHelper.readRoot(session);
ObjectNode memoryNode = contextHelper.ensureObject(root, "memory");
memoryNode.put("session_summary", summary);
if (recalled != null) {
memoryNode.put("recalled_summary", recalled);
}
contextHelper.writeRoot(session, root);

for (ConversationMemoryStore store : memoryStores) {
try {
store.write(session, summary);
} catch (Exception ignored) {
}
}

Map<String, Object> payload = new LinkedHashMap<>();
payload.put("summaryChars", summary.length());
payload.put("recalled", recalled != null);
payload.put("stores", memoryStores.size());
payload.put("intent", session.getIntent());
payload.put("state", session.getState());
audit.audit(ConvEngineAuditStage.MEMORY_UPDATED, session.getConversationId(), payload);
return new StepResult.Continue();
}

25PersistConversationStep

Responsibility: Persist final conversation and result
Session Mutations: finalResult
Config/Table Dependencies: ce_conversation

Detailed Execution Logic

The database commit step. Writes the CeConversation row, saving the mutated contextJson, inputParams, new intentCode, and stateCode. The step is placed at the end so if an exception occurs mid-pipeline, the corrupted context is ignored and rolled back natively.

PersistConversationStep.execute()
file: src/main/java/com/github/salilvnair/convengine/engine/steps/PersistConversationStep.java
JAVA
public StepResult execute(EngineSession session) {

// --- sanity check ---
if (session.getPayload() == null) {
throw new ConversationEngineException(
ConversationEngineErrorCode.PIPELINE_NO_RESPONSE_PAYLOAD,
"Engine pipeline ended without payload. ResponseResolutionStep did not run."
);
}

// --- persist conversation ---
sanitizeConversationForPostgres(session);
session.getConversation().setStatus("RUNNING");
session.getConversation().setUpdatedAt(OffsetDateTime.now());
session.getConversation().setInputParamsJson(session.ejectInputParamsJson());
conversationRepo.save(session.getConversation());

// --- build FINAL EngineResult ---
EngineResult result = new EngineResult(
session.getIntent(),
session.getState(),
session.getPayload(),
session.getContextJson()
);

session.setFinalResult(result);

Map<String, Object> payload = new LinkedHashMap<>();
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.INTENT, session.getIntent());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.STATE, session.getState());
payload.put(com.github.salilvnair.convengine.engine.constants.ConvEnginePayloadKey.FINAL_RESULT, result);
audit.audit(ConvEngineAuditStage.ENGINE_RETURN, session.getConversationId(), payload);

return new StepResult.Continue();
}

26PipelineEndGuardStep

Responsibility: Timing audit + terminal guard
Session Mutations: timings
Config/Table Dependencies: ce_audit

Detailed Execution Logic

Timing and safety metrics. Audits the total millisecond execution time from Step 1 to 25. Fires the PIPELINE_COMPLETE audit log. Verifies that the resulting payload isn't null.

PipelineEndGuardStep.execute()
file: src/main/java/com/github/salilvnair/convengine/engine/steps/PipelineEndGuardStep.java
JAVA
public StepResult execute(EngineSession session) {

// Sort by start time just in case
session.getStepTimings().sort(Comparator.comparingLong(StepTiming::getStartedAtNs));

long totalMs = session.getStepTimings().stream().mapToLong(StepTiming::getDurationMs).sum();

// Log in app logs
String timingLine = session.getStepTimings().stream()
.map(t -> t.getStepName() + "=" + t.getDurationMs() + "ms" + (t.isSuccess() ? "" : "(ERR)"))
.reduce((a, b) -> a + ", " + b)
.orElse("");

log.info("ConvEngine timings convId={} total={}ms [{}]",
session.getConversationId(), totalMs, timingLine);

// Optional audit row (single compact record)
String payload = "{"totalMs":" + totalMs +
","steps":"" + JsonUtil.escape(timingLine) + ""}";

audit.audit(ConvEngineAuditStage.PIPELINE_TIMING, session.getConversationId(), payload);

return new StepResult.Continue();
}