import crypto from "node:crypto";
import fs from "node:fs/promises";
import { getAcpSessionManager } from "../acp/control-plane/manager.js";
import {
  cleanupFailedAcpSpawn,
  type AcpSpawnRuntimeCloseHandle,
} from "../acp/control-plane/spawn.js";
import { isAcpEnabledByPolicy, resolveAcpAgentPolicyError } from "../acp/policy.js";
import {
  resolveAcpSessionCwd,
  resolveAcpThreadSessionDetailLines,
} from "../acp/runtime/session-identifiers.js";
import type { AcpRuntimeSessionMode } from "../acp/runtime/types.js";
import { DEFAULT_HEARTBEAT_EVERY } from "../auto-reply/heartbeat.js";
import {
  getChannelPlugin,
  getLoadedChannelPlugin,
  normalizeChannelId,
} from "../channels/plugins/index.js";
import {
  resolveBundledChannelThreadBindingDefaultPlacement,
  resolveBundledChannelThreadBindingInboundConversation,
} from "../channels/plugins/thread-binding-api.js";
import {
  resolveThreadBindingIntroText,
  resolveThreadBindingThreadName,
} from "../channels/thread-bindings-messages.js";
import {
  formatThreadBindingDisabledError,
  formatThreadBindingSpawnDisabledError,
  resolveThreadBindingIdleTimeoutMsForChannel,
  resolveThreadBindingMaxAgeMsForChannel,
  resolveThreadBindingSpawnPolicy,
} from "../channels/thread-bindings-policy.js";
import { parseDurationMs } from "../cli/parse-duration.js";
import { loadConfig } from "../config/config.js";
import { resolveStorePath } from "../config/sessions/paths.js";
import { loadSessionStore } from "../config/sessions/store.js";
import { resolveSessionTranscriptFile } from "../config/sessions/transcript.js";
import type { SessionEntry } from "../config/sessions/types.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { callGateway } from "../gateway/call.js";
import { areHeartbeatsEnabled } from "../infra/heartbeat-wake.js";
import { resolveConversationIdFromTargets } from "../infra/outbound/conversation-id.js";
import { normalizeConversationTargetRef } from "../infra/outbound/session-binding-normalization.js";
import {
  getSessionBindingService,
  isSessionBindingError,
  type SessionBindingRecord,
} from "../infra/outbound/session-binding-service.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import {
  isSubagentSessionKey,
  normalizeAgentId,
  parseAgentSessionKey,
  resolveAgentIdFromSessionKey,
} from "../routing/session-key.js";
import {
  normalizeOptionalLowercaseString,
  normalizeOptionalString,
} from "../shared/string-coerce.js";
import { createRunningTaskRun } from "../tasks/task-executor.js";
import {
  deliveryContextFromSession,
  formatConversationTarget,
  normalizeDeliveryContext,
  resolveConversationDeliveryTarget,
} from "../utils/delivery-context.js";
import {
  type AcpSpawnParentRelayHandle,
  resolveAcpSpawnStreamLogPath,
  startAcpSpawnParentStreamRelay,
} from "./acp-spawn-parent-stream.js";
import { resolveAgentConfig, resolveDefaultAgentId } from "./agent-scope.js";
import { resolveSandboxRuntimeStatus } from "./sandbox/runtime-status.js";
import { resolveSpawnedWorkspaceInheritance } from "./spawned-context.js";
import { resolveInternalSessionKey, resolveMainSessionAlias } from "./tools/sessions-helpers.js";

const log = createSubsystemLogger("agents/acp-spawn");

export const ACP_SPAWN_MODES = ["run", "session"] as const;
export type SpawnAcpMode = (typeof ACP_SPAWN_MODES)[number];
export const ACP_SPAWN_SANDBOX_MODES = ["inherit", "require"] as const;
export type SpawnAcpSandboxMode = (typeof ACP_SPAWN_SANDBOX_MODES)[number];
export const ACP_SPAWN_STREAM_TARGETS = ["parent"] as const;
export type SpawnAcpStreamTarget = (typeof ACP_SPAWN_STREAM_TARGETS)[number];

export type SpawnAcpParams = {
  task: string;
  label?: string;
  agentId?: string;
  resumeSessionId?: string;
  cwd?: string;
  mode?: SpawnAcpMode;
  thread?: boolean;
  sandbox?: SpawnAcpSandboxMode;
  streamTo?: SpawnAcpStreamTarget;
};

export type SpawnAcpContext = {
  agentSessionKey?: string;
  agentChannel?: string;
  agentAccountId?: string;
  agentTo?: string;
  agentThreadId?: string | number;
  /** Group chat ID for channels that distinguish group vs. topic (e.g. Telegram). */
  agentGroupId?: string;
  sandboxed?: boolean;
};

export const ACP_SPAWN_ERROR_CODES = [
  "acp_disabled",
  "requester_session_required",
  "runtime_policy",
  "thread_required",
  "target_agent_required",
  "agent_forbidden",
  "cwd_resolution_failed",
  "thread_binding_invalid",
  "spawn_failed",
  "dispatch_failed",
] as const;
export type SpawnAcpErrorCode = (typeof ACP_SPAWN_ERROR_CODES)[number];

type SpawnAcpResultFields = {
  childSessionKey?: string;
  runId?: string;
  mode?: SpawnAcpMode;
  streamLogPath?: string;
  note?: string;
};

type SpawnAcpAcceptedResult = SpawnAcpResultFields & {
  status: "accepted";
  childSessionKey: string;
  runId: string;
  mode: SpawnAcpMode;
};

type SpawnAcpFailedResult = SpawnAcpResultFields & {
  status: "forbidden" | "error";
  error: string;
  errorCode: SpawnAcpErrorCode;
};

export type SpawnAcpResult = SpawnAcpAcceptedResult | SpawnAcpFailedResult;

export function isSpawnAcpAcceptedResult(result: SpawnAcpResult): result is SpawnAcpAcceptedResult {
  return result.status === "accepted";
}

export const ACP_SPAWN_ACCEPTED_NOTE =
  "initial ACP task queued in isolated session; follow-ups continue in the bound thread.";
export const ACP_SPAWN_SESSION_ACCEPTED_NOTE =
  "thread-bound ACP session stays active after this task; continue in-thread for follow-ups.";

export function resolveAcpSpawnRuntimePolicyError(params: {
  cfg: OpenClawConfig;
  requesterSessionKey?: string;
  requesterSandboxed?: boolean;
  sandbox?: SpawnAcpSandboxMode;
}): string | undefined {
  const sandboxMode = params.sandbox === "require" ? "require" : "inherit";
  const requesterRuntime = resolveSandboxRuntimeStatus({
    cfg: params.cfg,
    sessionKey: params.requesterSessionKey,
  });
  const requesterSandboxed = params.requesterSandboxed === true || requesterRuntime.sandboxed;
  if (requesterSandboxed) {
    return 'Sandboxed sessions cannot spawn ACP sessions because runtime="acp" runs on the host. Use runtime="subagent" from sandboxed sessions.';
  }
  if (sandboxMode === "require") {
    return 'sessions_spawn sandbox="require" is unsupported for runtime="acp" because ACP sessions run outside the sandbox. Use runtime="subagent" or sandbox="inherit".';
  }
  return undefined;
}

type PreparedAcpThreadBinding = {
  channel: string;
  accountId: string;
  placement: "current" | "child";
  conversationId: string;
  parentConversationId?: string;
};

type AcpSpawnInitializedSession = Awaited<
  ReturnType<ReturnType<typeof getAcpSessionManager>["initializeSession"]>
>;

type AcpSpawnInitializedRuntime = {
  initialized: AcpSpawnInitializedSession;
  runtimeCloseHandle: AcpSpawnRuntimeCloseHandle;
  sessionId?: string;
  sessionEntry: SessionEntry | undefined;
  sessionStore: Record<string, SessionEntry>;
  storePath: string;
};

type AcpSpawnRequesterState = {
  parentSessionKey?: string;
  isSubagentSession: boolean;
  hasActiveSubagentBinding: boolean;
  hasThreadContext: boolean;
  heartbeatEnabled: boolean;
  heartbeatRelayRouteUsable: boolean;
  origin: ReturnType<typeof normalizeDeliveryContext>;
};

type AcpSpawnStreamPlan = {
  implicitStreamToParent: boolean;
  effectiveStreamToParent: boolean;
};

type AcpSpawnBootstrapDeliveryPlan = {
  useInlineDelivery: boolean;
  channel?: string;
  accountId?: string;
  to?: string;
  threadId?: string;
};

function resolvePlacementWithoutChannelPlugin(params: {
  channel: string;
  capabilities: { placements: Array<"current" | "child"> };
}): "current" | "child" {
  switch (params.channel) {
    case "discord":
    case "matrix":
      return params.capabilities.placements.includes("child") ? "child" : "current";
    case "line":
    case "telegram":
      return "current";
  }
  return params.capabilities.placements.includes("child") ? "child" : "current";
}

function normalizeLineConversationIdFallback(value: string | undefined): string | undefined {
  const trimmed = normalizeOptionalString(value) ?? "";
  if (!trimmed) {
    return undefined;
  }
  const normalized = trimmed.match(/^line:(?:(?:user|group|room):)?(.+)$/i)?.[1]?.trim() ?? trimmed;
  return normalized ? normalized : undefined;
}

function normalizeTelegramConversationIdFallback(params: {
  to?: string;
  threadId?: string | number;
  groupId?: string;
}): string | undefined {
  const explicitGroupId = normalizeOptionalString(params.groupId);
  const explicitThreadId =
    params.threadId != null ? normalizeOptionalString(String(params.threadId)) : undefined;
  if (
    explicitGroupId &&
    explicitThreadId &&
    /^-?\d+$/.test(explicitGroupId) &&
    /^\d+$/.test(explicitThreadId)
  ) {
    return `${explicitGroupId}:topic:${explicitThreadId}`;
  }

  const trimmed = normalizeOptionalString(params.to) ?? "";
  if (!trimmed) {
    return undefined;
  }
  const normalized = trimmed.replace(/^telegram:(?:group:|channel:|direct:)?/i, "");
  const topicMatch = /^(-?\d+):topic:(\d+)$/i.exec(normalized);
  if (topicMatch?.[1] && topicMatch[2]) {
    return `${topicMatch[1]}:topic:${topicMatch[2]}`;
  }
  return /^-?\d+$/.test(normalized) ? normalized : undefined;
}

const threadBindingFallbackConversationResolvers = {
  line: (params: { to?: string; groupId?: string }) =>
    normalizeLineConversationIdFallback(params.groupId ?? params.to),
  telegram: (params: { to?: string; threadId?: string | number; groupId?: string }) =>
    normalizeTelegramConversationIdFallback(params),
} as const;

function resolvePluginConversationRefForThreadBinding(params: {
  channelId: string;
  to?: string;
  threadId?: string | number;
  groupId?: string;
}): { conversationId: string; parentConversationId?: string } | null {
  const resolverParams = {
    // Keep the live delivery target authoritative; conversationId is only a fallback hint.
    to: params.to,
    conversationId: params.groupId ?? params.to,
    threadId: params.threadId,
    isGroup: true,
  };
  const loadedPluginConversation = getLoadedChannelPlugin(
    params.channelId,
  )?.messaging?.resolveInboundConversation?.(resolverParams);
  const bundledApiConversation =
    loadedPluginConversation === undefined
      ? resolveBundledChannelThreadBindingInboundConversation({
          channelId: params.channelId,
          ...resolverParams,
        })
      : undefined;
  const resolvedConversation =
    loadedPluginConversation ??
    (bundledApiConversation !== undefined
      ? bundledApiConversation
      : getChannelPlugin(params.channelId)?.messaging?.resolveInboundConversation?.(
          resolverParams,
        ));
  const conversationId = normalizeOptionalString(resolvedConversation?.conversationId);
  if (!conversationId) {
    return null;
  }
  return normalizeConversationTargetRef({
    conversationId,
    parentConversationId: resolvedConversation?.parentConversationId,
  });
}

function resolveSpawnMode(params: {
  requestedMode?: SpawnAcpMode;
  threadRequested: boolean;
}): SpawnAcpMode {
  if (params.requestedMode === "run" || params.requestedMode === "session") {
    return params.requestedMode;
  }
  // Thread-bound spawns should default to persistent sessions.
  return params.threadRequested ? "session" : "run";
}

function resolveAcpSessionMode(mode: SpawnAcpMode): AcpRuntimeSessionMode {
  return mode === "session" ? "persistent" : "oneshot";
}

function isHeartbeatEnabledForSessionAgent(params: {
  cfg: OpenClawConfig;
  sessionKey?: string;
}): boolean {
  if (!areHeartbeatsEnabled()) {
    return false;
  }
  const requesterAgentId = parseAgentSessionKey(params.sessionKey)?.agentId;
  if (!requesterAgentId) {
    return true;
  }

  const agentEntries = params.cfg.agents?.list ?? [];
  const hasExplicitHeartbeatAgents = agentEntries.some((entry) => Boolean(entry?.heartbeat));
  const enabledByPolicy = hasExplicitHeartbeatAgents
    ? agentEntries.some(
        (entry) => Boolean(entry?.heartbeat) && normalizeAgentId(entry?.id) === requesterAgentId,
      )
    : requesterAgentId === resolveDefaultAgentId(params.cfg);
  if (!enabledByPolicy) {
    return false;
  }

  const heartbeatEvery =
    resolveAgentConfig(params.cfg, requesterAgentId)?.heartbeat?.every ??
    params.cfg.agents?.defaults?.heartbeat?.every ??
    DEFAULT_HEARTBEAT_EVERY;
  const trimmedEvery = normalizeOptionalString(heartbeatEvery) ?? "";
  if (!trimmedEvery) {
    return false;
  }
  try {
    return parseDurationMs(trimmedEvery, { defaultUnit: "m" }) > 0;
  } catch {
    return false;
  }
}

function resolveHeartbeatConfigForAgent(params: {
  cfg: OpenClawConfig;
  agentId: string;
}): NonNullable<NonNullable<OpenClawConfig["agents"]>["defaults"]>["heartbeat"] {
  const defaults = params.cfg.agents?.defaults?.heartbeat;
  const overrides = resolveAgentConfig(params.cfg, params.agentId)?.heartbeat;
  if (!defaults && !overrides) {
    return undefined;
  }
  return {
    ...defaults,
    ...overrides,
  };
}

function hasSessionLocalHeartbeatRelayRoute(params: {
  cfg: OpenClawConfig;
  parentSessionKey: string;
  requesterAgentId: string;
}): boolean {
  const scope = params.cfg.session?.scope ?? "per-sender";
  if (scope === "global") {
    return false;
  }

  const heartbeat = resolveHeartbeatConfigForAgent({
    cfg: params.cfg,
    agentId: params.requesterAgentId,
  });
  if ((heartbeat?.target ?? "none") !== "last") {
    return false;
  }

  // Explicit delivery overrides are not session-local and can route updates
  // to unrelated destinations (for example a pinned ops channel).
  if (normalizeOptionalString(heartbeat?.to)) {
    return false;
  }
  if (normalizeOptionalString(heartbeat?.accountId)) {
    return false;
  }

  const storePath = resolveStorePath(params.cfg.session?.store, {
    agentId: params.requesterAgentId,
  });
  const sessionStore = loadSessionStore(storePath);
  const parentEntry = sessionStore[params.parentSessionKey];
  const parentDeliveryContext = deliveryContextFromSession(parentEntry);
  return Boolean(parentDeliveryContext?.channel && parentDeliveryContext.to);
}

function resolveTargetAcpAgentId(params: {
  requestedAgentId?: string;
  cfg: OpenClawConfig;
}): { ok: true; agentId: string } | { ok: false; error: string } {
  const requested = normalizeOptionalAgentId(params.requestedAgentId);
  if (requested) {
    return { ok: true, agentId: requested };
  }

  const configuredDefault = normalizeOptionalAgentId(params.cfg.acp?.defaultAgent);
  if (configuredDefault) {
    return { ok: true, agentId: configuredDefault };
  }

  return {
    ok: false,
    error:
      "ACP target agent is not configured. Pass `agentId` in `sessions_spawn` or set `acp.defaultAgent` in config.",
  };
}

function normalizeOptionalAgentId(value: string | undefined | null): string | undefined {
  const trimmed = normalizeOptionalString(value) ?? "";
  if (!trimmed) {
    return undefined;
  }
  return normalizeAgentId(trimmed);
}

function summarizeError(err: unknown): string {
  if (err instanceof Error) {
    return err.message;
  }
  if (typeof err === "string") {
    return err;
  }
  return "error";
}

function createAcpSpawnFailure(params: {
  status: "forbidden" | "error";
  errorCode: SpawnAcpErrorCode;
  error: string;
  childSessionKey?: string;
}): SpawnAcpFailedResult {
  return {
    status: params.status,
    errorCode: params.errorCode,
    error: params.error,
    ...(params.childSessionKey ? { childSessionKey: params.childSessionKey } : {}),
  };
}

function isMissingPathError(error: unknown): boolean {
  const code = error instanceof Error ? (error as NodeJS.ErrnoException).code : undefined;
  return code === "ENOENT" || code === "ENOTDIR";
}

async function resolveRuntimeCwdForAcpSpawn(params: {
  resolvedCwd?: string;
  explicitCwd?: string;
}): Promise<string | undefined> {
  if (!params.resolvedCwd) {
    return undefined;
  }
  if (normalizeOptionalString(params.explicitCwd)) {
    return params.resolvedCwd;
  }
  try {
    await fs.access(params.resolvedCwd);
    return params.resolvedCwd;
  } catch (error) {
    if (isMissingPathError(error)) {
      return undefined;
    }
    throw error;
  }
}

function resolveRequesterInternalSessionKey(params: {
  cfg: OpenClawConfig;
  requesterSessionKey?: string;
}): string {
  const { mainKey, alias } = resolveMainSessionAlias(params.cfg);
  const requesterSessionKey = normalizeOptionalString(params.requesterSessionKey);
  return requesterSessionKey
    ? resolveInternalSessionKey({
        key: requesterSessionKey,
        alias,
        mainKey,
      })
    : alias;
}

async function persistAcpSpawnSessionFileBestEffort(params: {
  sessionId: string;
  sessionKey: string;
  sessionEntry: SessionEntry | undefined;
  sessionStore: Record<string, SessionEntry>;
  storePath: string;
  agentId: string;
  threadId?: string | number;
  stage: "spawn" | "thread-bind";
}): Promise<SessionEntry | undefined> {
  try {
    const resolvedSessionFile = await resolveSessionTranscriptFile({
      sessionId: params.sessionId,
      sessionKey: params.sessionKey,
      sessionEntry: params.sessionEntry,
      sessionStore: params.sessionStore,
      storePath: params.storePath,
      agentId: params.agentId,
      threadId: params.threadId,
    });
    return resolvedSessionFile.sessionEntry;
  } catch (error) {
    log.warn(
      `ACP session-file persistence failed during ${params.stage} for ${params.sessionKey}: ${summarizeError(error)}`,
    );
    return params.sessionEntry;
  }
}

function resolveConversationRefForThreadBinding(params: {
  channel?: string;
  to?: string;
  threadId?: string | number;
  groupId?: string;
}): { conversationId: string; parentConversationId?: string } | null {
  const channel = normalizeOptionalLowercaseString(params.channel);
  const normalizedChannelId = channel ? normalizeChannelId(channel) : null;
  const channelKey = normalizedChannelId ?? channel ?? null;
  const pluginResolvedConversation = normalizedChannelId
    ? resolvePluginConversationRefForThreadBinding({
        channelId: normalizedChannelId,
        to: params.to,
        threadId: params.threadId,
        groupId: params.groupId,
      })
    : null;
  if (pluginResolvedConversation) {
    return pluginResolvedConversation;
  }
  const compatibilityConversationId =
    channelKey && Object.hasOwn(threadBindingFallbackConversationResolvers, channelKey)
      ? threadBindingFallbackConversationResolvers[
          channelKey as keyof typeof threadBindingFallbackConversationResolvers
        ](params)
      : undefined;
  if (compatibilityConversationId) {
    return normalizeConversationTargetRef({ conversationId: compatibilityConversationId });
  }
  const parentConversationId = resolveConversationIdFromTargets({
    targets: [params.to],
  });
  const genericConversationId = resolveConversationIdFromTargets({
    threadId: params.threadId,
    targets: [params.to],
  });
  if (genericConversationId) {
    return normalizeConversationTargetRef({
      conversationId: genericConversationId,
      parentConversationId: params.threadId != null ? parentConversationId : undefined,
    });
  }
  return null;
}

function resolveAcpSpawnChannelAccountId(params: {
  cfg: OpenClawConfig;
  channel?: string;
  accountId?: string;
}): string | undefined {
  const channel = normalizeOptionalLowercaseString(params.channel);
  const explicitAccountId = normalizeOptionalString(params.accountId);
  if (explicitAccountId) {
    return explicitAccountId;
  }
  if (!channel) {
    return undefined;
  }
  const channels = params.cfg.channels as Record<string, { defaultAccount?: unknown } | undefined>;
  const configuredDefaultAccountId = channels?.[channel]?.defaultAccount;
  return normalizeOptionalString(configuredDefaultAccountId) ?? "default";
}

function prepareAcpThreadBinding(params: {
  cfg: OpenClawConfig;
  channel?: string;
  accountId?: string;
  to?: string;
  threadId?: string | number;
  groupId?: string;
}): { ok: true; binding: PreparedAcpThreadBinding } | { ok: false; error: string } {
  const channel = normalizeOptionalLowercaseString(params.channel);
  if (!channel) {
    return {
      ok: false,
      error: "thread=true for ACP sessions requires a channel context.",
    };
  }

  const accountId = resolveAcpSpawnChannelAccountId({
    cfg: params.cfg,
    channel,
    accountId: params.accountId,
  });
  const policy = resolveThreadBindingSpawnPolicy({
    cfg: params.cfg,
    channel,
    accountId,
    kind: "acp",
  });
  if (!policy.enabled) {
    return {
      ok: false,
      error: formatThreadBindingDisabledError({
        channel: policy.channel,
        accountId: policy.accountId,
        kind: "acp",
      }),
    };
  }
  if (!policy.spawnEnabled) {
    return {
      ok: false,
      error: formatThreadBindingSpawnDisabledError({
        channel: policy.channel,
        accountId: policy.accountId,
        kind: "acp",
      }),
    };
  }
  const bindingService = getSessionBindingService();
  const capabilities = bindingService.getCapabilities({
    channel: policy.channel,
    accountId: policy.accountId,
  });
  if (!capabilities.adapterAvailable) {
    return {
      ok: false,
      error: `Thread bindings are unavailable for ${policy.channel}.`,
    };
  }
  const loadedPluginPlacement = getLoadedChannelPlugin(policy.channel)?.conversationBindings
    ?.defaultTopLevelPlacement;
  const bundledApiPlacement =
    loadedPluginPlacement ?? resolveBundledChannelThreadBindingDefaultPlacement(policy.channel);
  const pluginPlacement =
    bundledApiPlacement ??
    getChannelPlugin(policy.channel)?.conversationBindings?.defaultTopLevelPlacement;
  const placementToUse =
    pluginPlacement ??
    resolvePlacementWithoutChannelPlugin({
      channel: policy.channel,
      capabilities,
    });
  if (!capabilities.bindSupported || !capabilities.placements.includes(placementToUse)) {
    return {
      ok: false,
      error: `Thread bindings do not support ${placementToUse} placement for ${policy.channel}.`,
    };
  }
  const conversationRef = resolveConversationRefForThreadBinding({
    channel: policy.channel,
    to: params.to,
    threadId: params.threadId,
    groupId: params.groupId,
  });
  if (!conversationRef?.conversationId) {
    return {
      ok: false,
      error: `Could not resolve a ${policy.channel} conversation for ACP thread spawn.`,
    };
  }

  return {
    ok: true,
    binding: {
      channel: policy.channel,
      accountId: policy.accountId,
      placement: placementToUse,
      conversationId: conversationRef.conversationId,
      ...(conversationRef.parentConversationId
        ? { parentConversationId: conversationRef.parentConversationId }
        : {}),
    },
  };
}

function resolveAcpSpawnRequesterState(params: {
  cfg: OpenClawConfig;
  parentSessionKey?: string;
  ctx: SpawnAcpContext;
}): AcpSpawnRequesterState {
  const bindingService = getSessionBindingService();
  const requesterParsedSession = parseAgentSessionKey(params.parentSessionKey);
  const isSubagentSession =
    Boolean(requesterParsedSession) && isSubagentSessionKey(params.parentSessionKey);
  const hasActiveSubagentBinding =
    isSubagentSession && params.parentSessionKey
      ? bindingService
          .listBySession(params.parentSessionKey)
          .some((record) => record.targetKind === "subagent" && record.status !== "ended")
      : false;
  const hasThreadContext =
    typeof params.ctx.agentThreadId === "string"
      ? Boolean(normalizeOptionalString(params.ctx.agentThreadId))
      : params.ctx.agentThreadId != null;
  const requesterAgentId = requesterParsedSession?.agentId;

  return {
    parentSessionKey: params.parentSessionKey,
    isSubagentSession,
    hasActiveSubagentBinding,
    hasThreadContext,
    heartbeatEnabled: isHeartbeatEnabledForSessionAgent({
      cfg: params.cfg,
      sessionKey: params.parentSessionKey,
    }),
    heartbeatRelayRouteUsable:
      params.parentSessionKey && requesterAgentId
        ? hasSessionLocalHeartbeatRelayRoute({
            cfg: params.cfg,
            parentSessionKey: params.parentSessionKey,
            requesterAgentId,
          })
        : false,
    origin: normalizeDeliveryContext({
      channel: params.ctx.agentChannel,
      accountId: params.ctx.agentAccountId,
      to: params.ctx.agentTo,
      threadId: params.ctx.agentThreadId,
    }),
  };
}

function resolveAcpSpawnStreamPlan(params: {
  spawnMode: SpawnAcpMode;
  requestThreadBinding: boolean;
  streamToParentRequested: boolean;
  requester: AcpSpawnRequesterState;
}): AcpSpawnStreamPlan {
  // For mode=run without thread binding, implicitly route output to parent
  // only for spawned subagent orchestrator sessions with heartbeat enabled
  // AND a session-local heartbeat delivery route (target=last + usable last route).
  // Skip requester sessions that are thread-bound (or carrying thread context)
  // so user-facing threads do not receive unsolicited ACP progress chatter
  // unless streamTo="parent" is explicitly requested. Use resolved spawnMode
  // (not params.mode) so default mode selection works.
  const implicitStreamToParent =
    !params.streamToParentRequested &&
    params.spawnMode === "run" &&
    !params.requestThreadBinding &&
    params.requester.isSubagentSession &&
    !params.requester.hasActiveSubagentBinding &&
    !params.requester.hasThreadContext &&
    params.requester.heartbeatEnabled &&
    params.requester.heartbeatRelayRouteUsable;

  return {
    implicitStreamToParent,
    effectiveStreamToParent: params.streamToParentRequested || implicitStreamToParent,
  };
}

async function initializeAcpSpawnRuntime(params: {
  cfg: OpenClawConfig;
  sessionKey: string;
  targetAgentId: string;
  runtimeMode: AcpRuntimeSessionMode;
  resumeSessionId?: string;
  cwd?: string;
}): Promise<AcpSpawnInitializedRuntime> {
  const storePath = resolveStorePath(params.cfg.session?.store, { agentId: params.targetAgentId });
  const sessionStore = loadSessionStore(storePath);
  let sessionEntry: SessionEntry | undefined = sessionStore[params.sessionKey];
  const sessionId = sessionEntry?.sessionId;
  if (sessionId) {
    sessionEntry = await persistAcpSpawnSessionFileBestEffort({
      sessionId,
      sessionKey: params.sessionKey,
      sessionStore,
      storePath,
      sessionEntry,
      agentId: params.targetAgentId,
      stage: "spawn",
    });
  }

  const initialized = await getAcpSessionManager().initializeSession({
    cfg: params.cfg,
    sessionKey: params.sessionKey,
    agent: params.targetAgentId,
    mode: params.runtimeMode,
    resumeSessionId: params.resumeSessionId,
    cwd: params.cwd,
    backendId: params.cfg.acp?.backend,
  });

  return {
    initialized,
    runtimeCloseHandle: {
      runtime: initialized.runtime,
      handle: initialized.handle,
    },
    sessionId,
    sessionEntry,
    sessionStore,
    storePath,
  };
}

async function bindPreparedAcpThread(params: {
  cfg: OpenClawConfig;
  sessionKey: string;
  targetAgentId: string;
  label?: string;
  preparedBinding: PreparedAcpThreadBinding;
  initializedRuntime: AcpSpawnInitializedRuntime;
}): Promise<{
  binding: SessionBindingRecord;
  sessionEntry: SessionEntry | undefined;
}> {
  const binding = await getSessionBindingService().bind({
    targetSessionKey: params.sessionKey,
    targetKind: "session",
    conversation: {
      channel: params.preparedBinding.channel,
      accountId: params.preparedBinding.accountId,
      conversationId: params.preparedBinding.conversationId,
      ...(params.preparedBinding.parentConversationId
        ? { parentConversationId: params.preparedBinding.parentConversationId }
        : {}),
    },
    placement: params.preparedBinding.placement,
    metadata: {
      threadName: resolveThreadBindingThreadName({
        agentId: params.targetAgentId,
        label: params.label || params.targetAgentId,
      }),
      agentId: params.targetAgentId,
      label: params.label || undefined,
      boundBy: "system",
      introText: resolveThreadBindingIntroText({
        agentId: params.targetAgentId,
        label: params.label || undefined,
        idleTimeoutMs: resolveThreadBindingIdleTimeoutMsForChannel({
          cfg: params.cfg,
          channel: params.preparedBinding.channel,
          accountId: params.preparedBinding.accountId,
        }),
        maxAgeMs: resolveThreadBindingMaxAgeMsForChannel({
          cfg: params.cfg,
          channel: params.preparedBinding.channel,
          accountId: params.preparedBinding.accountId,
        }),
        sessionCwd: resolveAcpSessionCwd(params.initializedRuntime.initialized.meta),
        sessionDetails: resolveAcpThreadSessionDetailLines({
          sessionKey: params.sessionKey,
          meta: params.initializedRuntime.initialized.meta,
        }),
      }),
    },
  });
  if (!binding.conversation.conversationId) {
    throw new Error(
      params.preparedBinding.placement === "child"
        ? `Failed to create and bind a ${params.preparedBinding.channel} thread for this ACP session.`
        : `Failed to bind the current ${params.preparedBinding.channel} conversation for this ACP session.`,
    );
  }

  let sessionEntry = params.initializedRuntime.sessionEntry;
  if (params.initializedRuntime.sessionId && params.preparedBinding.placement === "child") {
    const boundThreadId = normalizeOptionalString(binding.conversation.conversationId);
    if (boundThreadId) {
      sessionEntry = await persistAcpSpawnSessionFileBestEffort({
        sessionId: params.initializedRuntime.sessionId,
        sessionKey: params.sessionKey,
        sessionStore: params.initializedRuntime.sessionStore,
        storePath: params.initializedRuntime.storePath,
        sessionEntry,
        agentId: params.targetAgentId,
        threadId: boundThreadId,
        stage: "thread-bind",
      });
    }
  }

  return { binding, sessionEntry };
}

function resolveAcpSpawnBootstrapDeliveryPlan(params: {
  cfg: OpenClawConfig;
  spawnMode: SpawnAcpMode;
  requestThreadBinding: boolean;
  effectiveStreamToParent: boolean;
  requester: AcpSpawnRequesterState;
  binding: SessionBindingRecord | null;
}): AcpSpawnBootstrapDeliveryPlan {
  // Child-thread ACP spawns deliver bootstrap output to the new thread; current-conversation
  // binds deliver back to the originating target.
  const boundThreadIdRaw = params.binding?.conversation.conversationId;
  const boundThreadId = boundThreadIdRaw ? normalizeOptionalString(boundThreadIdRaw) : undefined;
  const fallbackThreadIdRaw = params.requester.origin?.threadId;
  const fallbackThreadId =
    fallbackThreadIdRaw != null ? normalizeOptionalString(String(fallbackThreadIdRaw)) : undefined;
  const deliveryThreadId = boundThreadId ?? fallbackThreadId;
  const requesterConversationRef = resolveConversationRefForThreadBinding({
    channel: params.requester.origin?.channel,
    threadId: fallbackThreadId,
    to: params.requester.origin?.to,
  });
  const requesterAccountId = resolveAcpSpawnChannelAccountId({
    cfg: params.cfg,
    channel: params.requester.origin?.channel,
    accountId: params.requester.origin?.accountId,
  });
  const bindingMatchesRequesterConversation = Boolean(
    params.requester.origin?.channel &&
    params.binding?.conversation.channel === params.requester.origin.channel &&
    params.binding?.conversation.accountId === requesterAccountId &&
    requesterConversationRef?.conversationId &&
    params.binding?.conversation.conversationId === requesterConversationRef.conversationId &&
    (params.binding?.conversation.parentConversationId ?? undefined) ===
      (requesterConversationRef.parentConversationId ?? undefined),
  );
  const boundDeliveryTarget = resolveConversationDeliveryTarget({
    channel: params.requester.origin?.channel ?? params.binding?.conversation.channel,
    conversationId: params.binding?.conversation.conversationId,
    parentConversationId: params.binding?.conversation.parentConversationId,
  });
  const inferredDeliveryTo =
    (bindingMatchesRequesterConversation
      ? normalizeOptionalString(params.requester.origin?.to)
      : undefined) ??
    boundDeliveryTarget.to ??
    normalizeOptionalString(params.requester.origin?.to) ??
    formatConversationTarget({
      channel: params.requester.origin?.channel,
      conversationId: deliveryThreadId,
    });
  const resolvedDeliveryThreadId = bindingMatchesRequesterConversation
    ? fallbackThreadId
    : (boundDeliveryTarget.threadId ?? deliveryThreadId);
  const hasDeliveryTarget = Boolean(params.requester.origin?.channel && inferredDeliveryTo);

  // Thread-bound session spawns always deliver inline to their bound thread.
  // Background run-mode spawns should stay internal and report back through
  // the parent task lifecycle notifier instead of letting the child ACP
  // session write raw output directly into the originating channel.
  const useInlineDelivery =
    hasDeliveryTarget && !params.effectiveStreamToParent && params.spawnMode === "session";

  return {
    useInlineDelivery,
    channel: useInlineDelivery ? params.requester.origin?.channel : undefined,
    accountId: useInlineDelivery ? requesterAccountId : undefined,
    to: useInlineDelivery ? inferredDeliveryTo : undefined,
    threadId: useInlineDelivery ? resolvedDeliveryThreadId : undefined,
  };
}

export async function spawnAcpDirect(
  params: SpawnAcpParams,
  ctx: SpawnAcpContext,
): Promise<SpawnAcpResult> {
  const cfg = loadConfig();
  const requesterInternalKey = resolveRequesterInternalSessionKey({
    cfg,
    requesterSessionKey: ctx.agentSessionKey,
  });
  if (!isAcpEnabledByPolicy(cfg)) {
    return createAcpSpawnFailure({
      status: "forbidden",
      errorCode: "acp_disabled",
      error: "ACP is disabled by policy (`acp.enabled=false`).",
    });
  }
  const streamToParentRequested = params.streamTo === "parent";
  const parentSessionKey = normalizeOptionalString(ctx.agentSessionKey);
  if (streamToParentRequested && !parentSessionKey) {
    return createAcpSpawnFailure({
      status: "error",
      errorCode: "requester_session_required",
      error: 'sessions_spawn streamTo="parent" requires an active requester session context.',
    });
  }

  let requestThreadBinding = params.thread === true;
  const runtimePolicyError = resolveAcpSpawnRuntimePolicyError({
    cfg,
    requesterSessionKey: ctx.agentSessionKey,
    requesterSandboxed: ctx.sandboxed,
    sandbox: params.sandbox,
  });
  if (runtimePolicyError) {
    return createAcpSpawnFailure({
      status: "forbidden",
      errorCode: "runtime_policy",
      error: runtimePolicyError,
    });
  }

  const spawnMode = resolveSpawnMode({
    requestedMode: params.mode,
    threadRequested: requestThreadBinding,
  });
  if (spawnMode === "session" && !requestThreadBinding) {
    return createAcpSpawnFailure({
      status: "error",
      errorCode: "thread_required",
      error: 'mode="session" requires thread=true so the ACP session can stay bound to a thread.',
    });
  }

  const requesterState = resolveAcpSpawnRequesterState({
    cfg,
    parentSessionKey,
    ctx,
  });
  const { effectiveStreamToParent } = resolveAcpSpawnStreamPlan({
    spawnMode,
    requestThreadBinding,
    streamToParentRequested,
    requester: requesterState,
  });

  const targetAgentResult = resolveTargetAcpAgentId({
    requestedAgentId: params.agentId,
    cfg,
  });
  if (!targetAgentResult.ok) {
    return createAcpSpawnFailure({
      status: "error",
      errorCode: "target_agent_required",
      error: targetAgentResult.error,
    });
  }
  const targetAgentId = targetAgentResult.agentId;
  const agentPolicyError = resolveAcpAgentPolicyError(cfg, targetAgentId);
  if (agentPolicyError) {
    return createAcpSpawnFailure({
      status: "forbidden",
      errorCode: "agent_forbidden",
      error: agentPolicyError.message,
    });
  }

  const sessionKey = `agent:${targetAgentId}:acp:${crypto.randomUUID()}`;
  const runtimeMode = resolveAcpSessionMode(spawnMode);
  const resolvedCwd = resolveSpawnedWorkspaceInheritance({
    config: cfg,
    targetAgentId,
    requesterSessionKey: ctx.agentSessionKey,
    explicitWorkspaceDir: params.cwd,
  });
  let runtimeCwd: string | undefined;
  try {
    runtimeCwd = await resolveRuntimeCwdForAcpSpawn({
      resolvedCwd,
      explicitCwd: params.cwd,
    });
  } catch (error) {
    return createAcpSpawnFailure({
      status: "error",
      errorCode: "cwd_resolution_failed",
      error: summarizeError(error),
    });
  }

  let preparedBinding: PreparedAcpThreadBinding | null = null;
  if (requestThreadBinding) {
    const prepared = prepareAcpThreadBinding({
      cfg,
      channel: ctx.agentChannel,
      accountId: ctx.agentAccountId,
      to: ctx.agentTo,
      threadId: ctx.agentThreadId,
      groupId: ctx.agentGroupId,
    });
    if (!prepared.ok) {
      return createAcpSpawnFailure({
        status: "error",
        errorCode: "thread_binding_invalid",
        error: prepared.error,
      });
    }
    preparedBinding = prepared.binding;
  }

  let binding: SessionBindingRecord | null = null;
  let sessionCreated = false;
  let initializedRuntime: AcpSpawnRuntimeCloseHandle | undefined;
  try {
    await callGateway({
      method: "sessions.patch",
      params: {
        key: sessionKey,
        spawnedBy: requesterInternalKey,
        ...(params.label ? { label: params.label } : {}),
      },
      timeoutMs: 10_000,
    });
    sessionCreated = true;
    const initializedSession = await initializeAcpSpawnRuntime({
      cfg,
      sessionKey,
      targetAgentId,
      runtimeMode,
      resumeSessionId: params.resumeSessionId,
      cwd: runtimeCwd,
    });
    initializedRuntime = initializedSession.runtimeCloseHandle;

    if (preparedBinding) {
      ({ binding } = await bindPreparedAcpThread({
        cfg,
        sessionKey,
        targetAgentId,
        label: params.label,
        preparedBinding,
        initializedRuntime: initializedSession,
      }));
    }
  } catch (err) {
    await cleanupFailedAcpSpawn({
      cfg,
      sessionKey,
      shouldDeleteSession: sessionCreated,
      deleteTranscript: true,
      runtimeCloseHandle: initializedRuntime,
    });
    return createAcpSpawnFailure({
      status: "error",
      errorCode: isSessionBindingError(err) ? "thread_binding_invalid" : "spawn_failed",
      error: isSessionBindingError(err) ? err.message : summarizeError(err),
    });
  }

  const deliveryPlan = resolveAcpSpawnBootstrapDeliveryPlan({
    cfg,
    spawnMode,
    requestThreadBinding,
    effectiveStreamToParent,
    requester: requesterState,
    binding,
  });
  const childIdem = crypto.randomUUID();
  let childRunId: string = childIdem;
  const streamLogPath =
    effectiveStreamToParent && parentSessionKey
      ? resolveAcpSpawnStreamLogPath({
          childSessionKey: sessionKey,
        })
      : undefined;
  // Resolve parent session delivery context so system events route to the
  // correct thread/topic instead of falling back to the main DM.
  const parentDeliveryCtx =
    effectiveStreamToParent && parentSessionKey
      ? deliveryContextFromSession(
          loadSessionStore(
            resolveStorePath(cfg.session?.store, {
              agentId: resolveAgentIdFromSessionKey(parentSessionKey),
            }),
          )[parentSessionKey],
        )
      : undefined;

  let parentRelay: AcpSpawnParentRelayHandle | undefined;
  if (effectiveStreamToParent && parentSessionKey) {
    // Register relay before dispatch so fast lifecycle failures are not missed.
    parentRelay = startAcpSpawnParentStreamRelay({
      runId: childIdem,
      parentSessionKey,
      childSessionKey: sessionKey,
      agentId: targetAgentId,
      logPath: streamLogPath,
      deliveryContext: parentDeliveryCtx,
      emitStartNotice: false,
    });
  }
  try {
    const response = await callGateway({
      method: "agent",
      params: {
        message: params.task,
        sessionKey,
        channel: deliveryPlan.channel,
        to: deliveryPlan.to,
        accountId: deliveryPlan.accountId,
        threadId: deliveryPlan.threadId,
        idempotencyKey: childIdem,
        deliver: deliveryPlan.useInlineDelivery,
        label: params.label || undefined,
      },
      timeoutMs: 10_000,
    });
    const responseRunId = normalizeOptionalString(response?.runId);
    if (responseRunId) {
      childRunId = responseRunId;
    }
  } catch (err) {
    parentRelay?.dispose();
    await cleanupFailedAcpSpawn({
      cfg,
      sessionKey,
      shouldDeleteSession: true,
      deleteTranscript: true,
    });
    return createAcpSpawnFailure({
      status: "error",
      errorCode: "dispatch_failed",
      error: summarizeError(err),
      childSessionKey: sessionKey,
    });
  }

  if (effectiveStreamToParent && parentSessionKey) {
    if (parentRelay && childRunId !== childIdem) {
      parentRelay.dispose();
      // Defensive fallback if gateway returns a runId that differs from idempotency key.
      parentRelay = startAcpSpawnParentStreamRelay({
        runId: childRunId,
        parentSessionKey,
        childSessionKey: sessionKey,
        agentId: targetAgentId,
        logPath: streamLogPath,
        deliveryContext: parentDeliveryCtx,
        emitStartNotice: false,
      });
    }
    parentRelay?.notifyStarted();
    try {
      createRunningTaskRun({
        runtime: "acp",
        sourceId: childRunId,
        ownerKey: requesterInternalKey,
        scopeKind: "session",
        requesterOrigin: requesterState.origin,
        childSessionKey: sessionKey,
        runId: childRunId,
        label: params.label,
        task: params.task,
        preferMetadata: true,
        deliveryStatus: requesterInternalKey ? "pending" : "parent_missing",
        startedAt: Date.now(),
      });
    } catch (error) {
      log.warn("Failed to create background task for ACP spawn", {
        sessionKey,
        runId: childRunId,
        error,
      });
    }
    return {
      status: "accepted",
      childSessionKey: sessionKey,
      runId: childRunId,
      mode: spawnMode,
      ...(streamLogPath ? { streamLogPath } : {}),
      note: spawnMode === "session" ? ACP_SPAWN_SESSION_ACCEPTED_NOTE : ACP_SPAWN_ACCEPTED_NOTE,
    };
  }

  try {
    createRunningTaskRun({
      runtime: "acp",
      sourceId: childRunId,
      ownerKey: requesterInternalKey,
      scopeKind: "session",
      requesterOrigin: requesterState.origin,
      childSessionKey: sessionKey,
      runId: childRunId,
      label: params.label,
      task: params.task,
      preferMetadata: true,
      deliveryStatus: requesterInternalKey ? "pending" : "parent_missing",
      startedAt: Date.now(),
    });
  } catch (error) {
    log.warn("Failed to create background task for ACP spawn", {
      sessionKey,
      runId: childRunId,
      error,
    });
  }

  return {
    status: "accepted",
    childSessionKey: sessionKey,
    runId: childRunId,
    mode: spawnMode,
    note: spawnMode === "session" ? ACP_SPAWN_SESSION_ACCEPTED_NOTE : ACP_SPAWN_ACCEPTED_NOTE,
  };
}
