import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { beforeEach, describe, expect, it, vi } from "vitest";
import { AcpRuntimeError } from "../../acp/runtime/errors.js";
import type { AcpSessionStoreEntry } from "../../acp/runtime/session-meta.js";
import type { OpenClawConfig } from "../../config/config.js";
import type { SessionBindingRecord } from "../../infra/outbound/session-binding-service.js";
import type { MediaUnderstandingSkipError } from "../../media-understanding/errors.js";
import { withFetchPreconnect } from "../../test-utils/fetch-mock.js";
import { resolveAcpAttachments } from "./dispatch-acp-attachments.js";
import { tryDispatchAcpReply } from "./dispatch-acp.js";
import type { ReplyDispatcher } from "./reply-dispatcher.js";
import { buildTestCtx } from "./test-ctx.js";
import { createAcpSessionMeta, createAcpTestConfig } from "./test-fixtures/acp-runtime.js";

const managerMocks = vi.hoisted(() => ({
  resolveSession: vi.fn(),
  runTurn: vi.fn(),
  getObservabilitySnapshot: vi.fn(() => ({
    turns: { queueDepth: 0 },
    runtimeCache: { activeSessions: 0 },
  })),
}));

const policyMocks = vi.hoisted(() => ({
  resolveAcpDispatchPolicyError: vi.fn<(cfg: OpenClawConfig) => AcpRuntimeError | null>(() => null),
  resolveAcpAgentPolicyError: vi.fn<(cfg: OpenClawConfig, agent: string) => AcpRuntimeError | null>(
    () => null,
  ),
}));

const routeMocks = vi.hoisted(() => ({
  routeReply: vi.fn(async (_params: unknown) => ({ ok: true, messageId: "mock" })),
}));

const channelPluginMocks = vi.hoisted(() => ({
  getChannelPlugin: vi.fn((channelId: string) => {
    if (channelId !== "discord" && channelId !== "slack" && channelId !== "telegram") {
      return undefined;
    }
    return {
      outbound: {
        shouldTreatDeliveredTextAsVisible: ({
          kind,
          text,
        }: {
          kind: "tool" | "block" | "final";
          text?: string;
        }) => kind === "block" && typeof text === "string" && text.trim().length > 0,
      },
    };
  }),
}));

const messageActionMocks = vi.hoisted(() => ({
  runMessageAction: vi.fn(async (_params: unknown) => ({ ok: true as const })),
}));

const ttsMocks = vi.hoisted(() => ({
  maybeApplyTtsToPayload: vi.fn(async (paramsUnknown: unknown) => {
    const params = paramsUnknown as { payload: unknown };
    return params.payload;
  }),
  resolveTtsConfig: vi.fn((_cfg: OpenClawConfig) => ({ mode: "final" })),
}));

const mediaUnderstandingMocks = vi.hoisted(() => ({
  applyMediaUnderstanding: vi.fn(async (_params: unknown) => undefined),
}));

const sessionMetaMocks = vi.hoisted(() => ({
  readAcpSessionEntry: vi.fn<
    (params: { sessionKey: string; cfg?: OpenClawConfig }) => AcpSessionStoreEntry | null
  >(() => null),
}));

const bindingServiceMocks = vi.hoisted(() => ({
  listBySession: vi.fn<(sessionKey: string) => SessionBindingRecord[]>(() => []),
  unbind: vi.fn<(input: unknown) => Promise<SessionBindingRecord[]>>(async () => []),
}));

vi.mock("./dispatch-acp-manager.runtime.js", () => ({
  getAcpSessionManager: () => managerMocks,
  getSessionBindingService: () => ({
    listBySession: (targetSessionKey: string) =>
      bindingServiceMocks.listBySession(targetSessionKey),
    unbind: (input: unknown) => bindingServiceMocks.unbind(input),
  }),
}));

vi.mock("../../acp/policy.js", () => ({
  resolveAcpDispatchPolicyError: (cfg: OpenClawConfig) =>
    policyMocks.resolveAcpDispatchPolicyError(cfg),
  resolveAcpAgentPolicyError: (cfg: OpenClawConfig, agent: string) =>
    policyMocks.resolveAcpAgentPolicyError(cfg, agent),
}));

vi.mock("./route-reply.runtime.js", () => ({
  routeReply: (params: unknown) => routeMocks.routeReply(params),
}));

vi.mock("../../channels/plugins/index.js", () => ({
  getChannelPlugin: (channelId: string) => channelPluginMocks.getChannelPlugin(channelId),
  getLoadedChannelPlugin: (channelId: string) => channelPluginMocks.getChannelPlugin(channelId),
  normalizeChannelId: (channelId?: string | null) => channelId?.trim().toLowerCase() || null,
}));

vi.mock("../../infra/outbound/message-action-runner.js", () => ({
  runMessageAction: (params: unknown) => messageActionMocks.runMessageAction(params),
}));

vi.mock("./dispatch-acp-tts.runtime.js", () => ({
  maybeApplyTtsToPayload: (params: unknown) => ttsMocks.maybeApplyTtsToPayload(params),
}));

vi.mock("../../tts/status-config.js", () => ({
  resolveStatusTtsSnapshot: () => ({
    autoMode: "always",
    provider: "auto",
    maxLength: 1500,
    summarize: true,
  }),
}));

vi.mock("./dispatch-acp-media.runtime.js", () => ({
  applyMediaUnderstanding: (params: unknown) =>
    mediaUnderstandingMocks.applyMediaUnderstanding(params),
  isMediaUnderstandingSkipError: (error: unknown): error is MediaUnderstandingSkipError =>
    error instanceof Error && error.name === "MediaUnderstandingSkipError",
  normalizeAttachments: (ctx: { MediaPath?: string; MediaType?: string }) =>
    ctx.MediaPath
      ? [
          {
            path: ctx.MediaPath,
            mime: ctx.MediaType,
            index: 0,
          },
        ]
      : [],
  resolveMediaAttachmentLocalRoots: (params: {
    cfg: { channels?: Record<string, { attachmentRoots?: string[] } | undefined> };
    ctx: { Provider?: string; Surface?: string };
  }) => {
    const channel = params.ctx.Provider ?? params.ctx.Surface ?? "";
    return params.cfg.channels?.[channel]?.attachmentRoots ?? [];
  },
  MediaAttachmentCache: class {
    async getBuffer(): Promise<never> {
      const error = new Error("outside allowed roots");
      error.name = "MediaUnderstandingSkipError";
      throw error;
    }
  },
}));

vi.mock("./dispatch-acp-session.runtime.js", () => ({
  readAcpSessionEntry: (params: { sessionKey: string; cfg?: OpenClawConfig }) =>
    sessionMetaMocks.readAcpSessionEntry(params),
}));

const sessionKey = "agent:codex-acp:session-1";
const originalFetch = globalThis.fetch;
type MockTtsReply = Awaited<ReturnType<typeof ttsMocks.maybeApplyTtsToPayload>>;

function createDispatcher(): {
  dispatcher: ReplyDispatcher;
  counts: Record<"tool" | "block" | "final", number>;
} {
  const counts = { tool: 0, block: 0, final: 0 };
  const dispatcher: ReplyDispatcher = {
    sendToolResult: vi.fn(() => true),
    sendBlockReply: vi.fn(() => true),
    sendFinalReply: vi.fn(() => true),
    waitForIdle: vi.fn(async () => {}),
    getQueuedCounts: vi.fn(() => counts),
    getFailedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })),
    markComplete: vi.fn(),
  };
  return { dispatcher, counts };
}

function setReadyAcpResolution() {
  managerMocks.resolveSession.mockReturnValue({
    kind: "ready",
    sessionKey,
    meta: createAcpSessionMeta(),
  });
}

function createAcpConfigWithVisibleToolTags(): OpenClawConfig {
  return createAcpTestConfig({
    acp: {
      enabled: true,
      stream: {
        tagVisibility: {
          tool_call: true,
          tool_call_update: true,
        },
      },
    },
  });
}

async function runDispatch(params: {
  bodyForAgent: string;
  cfg?: OpenClawConfig;
  dispatcher?: ReplyDispatcher;
  shouldRouteToOriginating?: boolean;
  originatingChannel?: string;
  originatingTo?: string;
  onReplyStart?: () => void;
  ctxOverrides?: Record<string, unknown>;
  sessionKeyOverride?: string;
}) {
  const targetSessionKey = params.sessionKeyOverride ?? sessionKey;
  return tryDispatchAcpReply({
    ctx: buildTestCtx({
      Provider: "discord",
      Surface: "discord",
      SessionKey: targetSessionKey,
      BodyForAgent: params.bodyForAgent,
      ...params.ctxOverrides,
    }),
    cfg: params.cfg ?? createAcpTestConfig(),
    dispatcher: params.dispatcher ?? createDispatcher().dispatcher,
    sessionKey: targetSessionKey,
    inboundAudio: false,
    shouldRouteToOriginating: params.shouldRouteToOriginating ?? false,
    ...(params.shouldRouteToOriginating
      ? {
          originatingChannel: params.originatingChannel ?? "telegram",
          originatingTo: params.originatingTo ?? "telegram:thread-1",
        }
      : {}),
    shouldSendToolSummaries: true,
    bypassForCommand: false,
    ...(params.onReplyStart ? { onReplyStart: params.onReplyStart } : {}),
    recordProcessed: vi.fn(),
    markIdle: vi.fn(),
  });
}

async function emitToolLifecycleEvents(
  onEvent: (event: unknown) => Promise<void>,
  toolCallId: string,
) {
  await onEvent({
    type: "tool_call",
    tag: "tool_call",
    toolCallId,
    status: "in_progress",
    title: "Run command",
    text: "Run command (in_progress)",
  });
  await onEvent({
    type: "tool_call",
    tag: "tool_call_update",
    toolCallId,
    status: "completed",
    title: "Run command",
    text: "Run command (completed)",
  });
  await onEvent({ type: "done" });
}

function mockToolLifecycleTurn(toolCallId: string) {
  managerMocks.runTurn.mockImplementation(
    async ({ onEvent }: { onEvent: (event: unknown) => Promise<void> }) => {
      await emitToolLifecycleEvents(onEvent, toolCallId);
    },
  );
}

function mockVisibleTextTurn(text = "visible") {
  managerMocks.runTurn.mockImplementationOnce(
    async ({ onEvent }: { onEvent: (event: unknown) => Promise<void> }) => {
      await onEvent({ type: "text_delta", text, tag: "agent_message_chunk" });
      await onEvent({ type: "done" });
    },
  );
}

function mockRoutedTextTurn(text: string) {
  managerMocks.runTurn.mockImplementation(
    async ({ onEvent }: { onEvent: (event: unknown) => Promise<void> }) => {
      await onEvent({ type: "text_delta", text, tag: "agent_message_chunk" });
      await onEvent({ type: "done" });
    },
  );
}

async function dispatchVisibleTurn(onReplyStart: () => void) {
  await runDispatch({
    bodyForAgent: "visible",
    dispatcher: createDispatcher().dispatcher,
    onReplyStart,
  });
}

function queueTtsReplies(...replies: MockTtsReply[]) {
  for (const reply of replies) {
    ttsMocks.maybeApplyTtsToPayload.mockResolvedValueOnce(reply);
  }
}

async function runRoutedAcpTextTurn(text: string) {
  mockRoutedTextTurn(text);
  const { dispatcher } = createDispatcher();
  const result = await runDispatch({
    bodyForAgent: "run acp",
    dispatcher,
    shouldRouteToOriginating: true,
  });
  return { result };
}

function expectRoutedPayload(callIndex: number, payload: Partial<MockTtsReply>) {
  expect(routeMocks.routeReply).toHaveBeenNthCalledWith(
    callIndex,
    expect.objectContaining({
      payload: expect.objectContaining(payload),
    }),
  );
}

describe("tryDispatchAcpReply", () => {
  beforeEach(() => {
    managerMocks.resolveSession.mockReset();
    managerMocks.runTurn.mockReset();
    managerMocks.runTurn.mockImplementation(
      async ({ onEvent }: { onEvent?: (event: unknown) => Promise<void> }) => {
        await onEvent?.({ type: "done" });
      },
    );
    managerMocks.getObservabilitySnapshot.mockReset();
    managerMocks.getObservabilitySnapshot.mockReturnValue({
      turns: { queueDepth: 0 },
      runtimeCache: { activeSessions: 0 },
    });
    policyMocks.resolveAcpDispatchPolicyError.mockReset();
    policyMocks.resolveAcpDispatchPolicyError.mockReturnValue(null);
    policyMocks.resolveAcpAgentPolicyError.mockReset();
    policyMocks.resolveAcpAgentPolicyError.mockReturnValue(null);
    routeMocks.routeReply.mockReset();
    routeMocks.routeReply.mockResolvedValue({ ok: true, messageId: "mock" });
    channelPluginMocks.getChannelPlugin.mockClear();
    messageActionMocks.runMessageAction.mockReset();
    messageActionMocks.runMessageAction.mockResolvedValue({ ok: true as const });
    ttsMocks.maybeApplyTtsToPayload.mockClear();
    ttsMocks.resolveTtsConfig.mockReset();
    ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" });
    mediaUnderstandingMocks.applyMediaUnderstanding.mockReset();
    mediaUnderstandingMocks.applyMediaUnderstanding.mockResolvedValue(undefined);
    sessionMetaMocks.readAcpSessionEntry.mockReset();
    sessionMetaMocks.readAcpSessionEntry.mockReturnValue(null);
    bindingServiceMocks.listBySession.mockReset();
    bindingServiceMocks.listBySession.mockReturnValue([]);
    bindingServiceMocks.unbind.mockReset();
    bindingServiceMocks.unbind.mockResolvedValue([]);
    globalThis.fetch = originalFetch;
  });

  it("routes ACP block output to originating channel", async () => {
    setReadyAcpResolution();
    mockRoutedTextTurn("hello");

    const { dispatcher } = createDispatcher();
    const result = await runDispatch({
      bodyForAgent: "reply",
      dispatcher,
      shouldRouteToOriginating: true,
    });

    expect(result?.counts.block).toBe(1);
    expect(routeMocks.routeReply).toHaveBeenCalledWith(
      expect.objectContaining({
        channel: "telegram",
        to: "telegram:thread-1",
      }),
    );
    expect(dispatcher.sendBlockReply).not.toHaveBeenCalled();
  });

  it("edits ACP tool lifecycle updates in place when supported", async () => {
    setReadyAcpResolution();
    mockToolLifecycleTurn("call-1");
    routeMocks.routeReply.mockResolvedValueOnce({ ok: true, messageId: "tool-msg-1" });

    const { dispatcher } = createDispatcher();
    await runDispatch({
      bodyForAgent: "run tool",
      cfg: createAcpConfigWithVisibleToolTags(),
      dispatcher,
      shouldRouteToOriginating: true,
    });

    expect(routeMocks.routeReply).toHaveBeenCalledTimes(1);
    expect(messageActionMocks.runMessageAction).toHaveBeenCalledWith(
      expect.objectContaining({
        action: "edit",
        params: expect.objectContaining({
          messageId: "tool-msg-1",
        }),
      }),
    );
  });

  it("falls back to new tool message when edit fails", async () => {
    setReadyAcpResolution();
    mockToolLifecycleTurn("call-2");
    routeMocks.routeReply
      .mockResolvedValueOnce({ ok: true, messageId: "tool-msg-2" })
      .mockResolvedValueOnce({ ok: true, messageId: "tool-msg-2-fallback" });
    messageActionMocks.runMessageAction.mockRejectedValueOnce(new Error("edit unsupported"));

    const { dispatcher } = createDispatcher();
    await runDispatch({
      bodyForAgent: "run tool",
      cfg: createAcpConfigWithVisibleToolTags(),
      dispatcher,
      shouldRouteToOriginating: true,
    });

    expect(messageActionMocks.runMessageAction).toHaveBeenCalledTimes(1);
    expect(routeMocks.routeReply).toHaveBeenCalledTimes(2);
  });

  it("starts reply lifecycle when ACP turn starts, including hidden-only turns", async () => {
    setReadyAcpResolution();
    const onReplyStart = vi.fn();
    const { dispatcher } = createDispatcher();

    managerMocks.runTurn.mockImplementationOnce(
      async ({ onEvent }: { onEvent: (event: unknown) => Promise<void> }) => {
        await onEvent({
          type: "status",
          tag: "usage_update",
          text: "usage updated: 1/100",
          used: 1,
          size: 100,
        });
        await onEvent({ type: "done" });
      },
    );
    await runDispatch({
      bodyForAgent: "hidden",
      dispatcher,
      onReplyStart,
    });
    expect(onReplyStart).toHaveBeenCalledTimes(1);

    mockVisibleTextTurn();
    await dispatchVisibleTurn(onReplyStart);
    expect(onReplyStart).toHaveBeenCalledTimes(2);
  });

  it("starts reply lifecycle once per turn when output is delivered", async () => {
    setReadyAcpResolution();
    const onReplyStart = vi.fn();

    mockVisibleTextTurn();
    await dispatchVisibleTurn(onReplyStart);

    expect(onReplyStart).toHaveBeenCalledTimes(1);
  });

  it("does not start reply lifecycle for empty ACP prompt", async () => {
    setReadyAcpResolution();
    const onReplyStart = vi.fn();
    const { dispatcher } = createDispatcher();

    await runDispatch({
      bodyForAgent: "   ",
      dispatcher,
      onReplyStart,
    });

    expect(managerMocks.runTurn).not.toHaveBeenCalled();
    expect(onReplyStart).not.toHaveBeenCalled();
  });

  it("skips media understanding for text-only ACP turns", async () => {
    setReadyAcpResolution();
    mockVisibleTextTurn("text only");

    await runDispatch({
      bodyForAgent: "plain text prompt",
    });

    expect(mediaUnderstandingMocks.applyMediaUnderstanding).not.toHaveBeenCalled();
  });

  it("forwards normalized image attachments into ACP turns", async () => {
    const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "dispatch-acp-"));
    const imagePath = path.join(tempDir, "inbound.png");
    try {
      await fs.writeFile(imagePath, "image-bytes");
      const attachments = await resolveAcpAttachments({
        cfg: createAcpTestConfig({
          channels: {
            imessage: {
              attachmentRoots: [tempDir],
            },
          },
        }),
        ctx: buildTestCtx({
          Provider: "imessage",
          Surface: "imessage",
          MediaPath: imagePath,
          MediaType: "image/png",
        }),
        runtime: {
          MediaAttachmentCache: class {
            async getBuffer() {
              return {
                buffer: Buffer.from("image-bytes"),
                mime: "image/png",
                fileName: "inbound.png",
                size: "image-bytes".length,
              };
            }
          } as unknown as typeof import("./dispatch-acp-media.runtime.js").MediaAttachmentCache,
          isMediaUnderstandingSkipError: (_error: unknown): _error is MediaUnderstandingSkipError =>
            false,
          normalizeAttachments: (ctx) => [
            {
              path: ctx.MediaPath,
              mime: ctx.MediaType,
              index: 0,
            },
          ],
          resolveMediaAttachmentLocalRoots: () => [tempDir],
        },
      });

      expect(attachments).toEqual([
        {
          mediaType: "image/png",
          data: Buffer.from("image-bytes").toString("base64"),
        },
      ]);
    } finally {
      await fs.rm(tempDir, { recursive: true, force: true });
    }
  });

  it("skips ACP attachments outside allowed inbound roots", async () => {
    setReadyAcpResolution();
    const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "dispatch-acp-"));
    const imagePath = path.join(tempDir, "outside-root.png");
    try {
      await fs.writeFile(imagePath, "image-bytes");
      managerMocks.runTurn.mockResolvedValue(undefined);

      await runDispatch({
        bodyForAgent: "   ",
        ctxOverrides: {
          MediaPath: imagePath,
          MediaType: "image/png",
        },
      });

      expect(managerMocks.runTurn).not.toHaveBeenCalled();
    } finally {
      await fs.rm(tempDir, { recursive: true, force: true });
    }
  });

  it("skips file URL ACP attachments outside allowed inbound roots", async () => {
    setReadyAcpResolution();
    const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "dispatch-acp-"));
    const imagePath = path.join(tempDir, "outside-root.png");
    try {
      await fs.writeFile(imagePath, "image-bytes");
      managerMocks.runTurn.mockResolvedValue(undefined);

      await runDispatch({
        bodyForAgent: "   ",
        ctxOverrides: {
          MediaPath: `file://${imagePath}`,
          MediaType: "image/png",
        },
      });

      expect(managerMocks.runTurn).not.toHaveBeenCalled();
    } finally {
      await fs.rm(tempDir, { recursive: true, force: true });
    }
  });

  it("skips relative ACP attachment paths that resolve outside allowed inbound roots", async () => {
    setReadyAcpResolution();
    const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "dispatch-acp-"));
    const imagePath = path.join(tempDir, "outside-root.png");
    try {
      await fs.writeFile(imagePath, "image-bytes");
      managerMocks.runTurn.mockResolvedValue(undefined);

      await runDispatch({
        bodyForAgent: "   ",
        ctxOverrides: {
          MediaPath: path.relative(process.cwd(), imagePath),
          MediaType: "image/png",
        },
      });

      expect(managerMocks.runTurn).not.toHaveBeenCalled();
    } finally {
      await fs.rm(tempDir, { recursive: true, force: true });
    }
  });

  it("does not fall back to remote URLs when ACP local attachment paths are blocked", async () => {
    setReadyAcpResolution();
    const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "dispatch-acp-"));
    const imagePath = path.join(tempDir, "outside-root.png");
    const fetchSpy = vi.fn(
      async () =>
        new Response(Buffer.from("remote-image"), {
          headers: {
            "content-type": "image/png",
          },
        }),
    );
    globalThis.fetch = withFetchPreconnect(fetchSpy as typeof fetch);
    try {
      await fs.writeFile(imagePath, "image-bytes");
      managerMocks.runTurn.mockResolvedValue(undefined);

      await runDispatch({
        bodyForAgent: "   ",
        ctxOverrides: {
          MediaPath: imagePath,
          MediaUrl: "https://example.com/image.png",
          MediaType: "image/png",
        },
      });

      expect(fetchSpy).not.toHaveBeenCalled();
      expect(managerMocks.runTurn).not.toHaveBeenCalled();
    } finally {
      await fs.rm(tempDir, { recursive: true, force: true });
    }
  });

  it("skips ACP turns for non-image attachments when there is no text prompt", async () => {
    setReadyAcpResolution();
    const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "dispatch-acp-"));
    const docPath = path.join(tempDir, "inbound.pdf");
    const { dispatcher } = createDispatcher();
    const onReplyStart = vi.fn();
    try {
      await fs.writeFile(docPath, "pdf-bytes");

      await runDispatch({
        bodyForAgent: "   ",
        dispatcher,
        onReplyStart,
        ctxOverrides: {
          MediaPath: docPath,
          MediaType: "application/pdf",
        },
      });

      expect(managerMocks.runTurn).not.toHaveBeenCalled();
      expect(onReplyStart).not.toHaveBeenCalled();
    } finally {
      await fs.rm(tempDir, { recursive: true, force: true });
    }
  });

  it("surfaces ACP policy errors as final error replies", async () => {
    setReadyAcpResolution();
    policyMocks.resolveAcpDispatchPolicyError.mockReturnValue(
      new AcpRuntimeError("ACP_DISPATCH_DISABLED", "ACP dispatch is disabled by policy."),
    );
    const { dispatcher } = createDispatcher();

    await runDispatch({
      bodyForAgent: "test",
      dispatcher,
    });

    expect(managerMocks.runTurn).not.toHaveBeenCalled();
    expect(dispatcher.sendFinalReply).toHaveBeenCalledWith(
      expect.objectContaining({
        isError: true,
        text: expect.stringContaining("ACP dispatch is disabled by policy."),
      }),
    );
    expect(bindingServiceMocks.unbind).not.toHaveBeenCalled();
  });

  it("does not unbind stale bindings when ACP dispatch is disabled by policy", async () => {
    managerMocks.resolveSession.mockReturnValue({
      kind: "stale",
      sessionKey,
      error: new AcpRuntimeError("ACP_SESSION_INIT_FAILED", "ACP metadata is missing."),
    });
    policyMocks.resolveAcpDispatchPolicyError.mockReturnValue(
      new AcpRuntimeError("ACP_DISPATCH_DISABLED", "ACP dispatch is disabled by policy."),
    );
    const { dispatcher } = createDispatcher();

    await runDispatch({
      bodyForAgent: "test",
      dispatcher,
    });

    expect(managerMocks.runTurn).not.toHaveBeenCalled();
    expect(bindingServiceMocks.unbind).not.toHaveBeenCalled();
    expect(dispatcher.sendFinalReply).toHaveBeenCalledWith(
      expect.objectContaining({
        isError: true,
        text: expect.stringContaining("ACP dispatch is disabled by policy."),
      }),
    );
  });

  it("unbinds stale bound conversations before surfacing stale ACP resolution errors", async () => {
    const aliasSessionKey = "main";
    const canonicalSessionKey = "agent:main:main";
    managerMocks.resolveSession.mockReturnValue({
      kind: "stale",
      sessionKey: canonicalSessionKey,
      error: new AcpRuntimeError("ACP_SESSION_INIT_FAILED", "ACP metadata is missing."),
    });
    bindingServiceMocks.unbind.mockResolvedValueOnce([
      {
        bindingId: "discord:default:thread-1",
        targetSessionKey: canonicalSessionKey,
        targetKind: "session",
        conversation: {
          channel: "discord",
          accountId: "default",
          conversationId: "thread-1",
        },
        status: "active",
        boundAt: 0,
      },
    ]);
    const { dispatcher } = createDispatcher();

    await runDispatch({
      bodyForAgent: "test",
      dispatcher,
      sessionKeyOverride: aliasSessionKey,
    });

    expect(managerMocks.runTurn).not.toHaveBeenCalled();
    expect(bindingServiceMocks.unbind).toHaveBeenCalledTimes(1);
    expect(bindingServiceMocks.unbind).toHaveBeenCalledWith({
      targetSessionKey: canonicalSessionKey,
      reason: "acp-session-init-failed",
    });
    expect(dispatcher.sendFinalReply).toHaveBeenCalledWith(
      expect.objectContaining({
        isError: true,
        text: expect.stringContaining("ACP metadata is missing."),
      }),
    );
  });

  it("does not unbind valid bindings on generic ACP runTurn init failure", async () => {
    setReadyAcpResolution();
    // Match the post-reset module instance so dispatch-acp preserves the ACP error code.
    managerMocks.runTurn.mockRejectedValueOnce(
      new AcpRuntimeError("ACP_SESSION_INIT_FAILED", "Could not initialize ACP session runtime."),
    );
    const { dispatcher } = createDispatcher();

    await runDispatch({
      bodyForAgent: "test",
      dispatcher,
    });

    expect(bindingServiceMocks.unbind).not.toHaveBeenCalled();
    expect(dispatcher.sendFinalReply).toHaveBeenCalledWith(
      expect.objectContaining({
        isError: true,
        text: expect.stringContaining("Could not initialize ACP session runtime."),
      }),
    );
  });

  it("unbinds stale bindings on ACP runTurn missing-metadata failures", async () => {
    const aliasSessionKey = "main";
    const canonicalSessionKey = "agent:main:main";
    managerMocks.resolveSession.mockReturnValue({
      kind: "ready",
      sessionKey: canonicalSessionKey,
      meta: createAcpSessionMeta(),
    });
    managerMocks.runTurn.mockRejectedValueOnce(
      new AcpRuntimeError(
        "ACP_SESSION_INIT_FAILED",
        `ACP metadata is missing for ${canonicalSessionKey}. Recreate this ACP session with /acp spawn and rebind the thread.`,
      ),
    );
    bindingServiceMocks.unbind.mockResolvedValueOnce([
      {
        bindingId: "discord:default:thread-1",
        targetSessionKey: canonicalSessionKey,
        targetKind: "session",
        conversation: {
          channel: "discord",
          accountId: "default",
          conversationId: "thread-1",
        },
        status: "active",
        boundAt: 0,
      },
    ]);
    const { dispatcher } = createDispatcher();

    await runDispatch({
      bodyForAgent: "test",
      dispatcher,
      sessionKeyOverride: aliasSessionKey,
    });

    expect(bindingServiceMocks.unbind).toHaveBeenCalledTimes(1);
    expect(bindingServiceMocks.unbind).toHaveBeenCalledWith({
      targetSessionKey: canonicalSessionKey,
      reason: "acp-session-init-failed",
    });
    expect(dispatcher.sendFinalReply).toHaveBeenCalledWith(
      expect.objectContaining({
        isError: true,
        text: expect.stringContaining("ACP metadata is missing"),
      }),
    );
  });

  it("uses canonical session keys for bound-session identity notices", async () => {
    const aliasSessionKey = "main";
    const canonicalSessionKey = "agent:main:main";
    managerMocks.resolveSession.mockReturnValue({
      kind: "ready",
      sessionKey: canonicalSessionKey,
      meta: createAcpSessionMeta({
        identity: {
          state: "pending",
          source: "ensure",
          lastUpdatedAt: Date.now(),
          acpxRecordId: "rec-main",
        },
      }),
    });
    bindingServiceMocks.listBySession.mockImplementation((targetSessionKey: string) =>
      targetSessionKey === canonicalSessionKey
        ? [
            {
              bindingId: "discord:default:thread-1",
              targetSessionKey: canonicalSessionKey,
              targetKind: "session",
              conversation: {
                channel: "discord",
                accountId: "default",
                conversationId: "thread-1",
              },
              status: "active",
              boundAt: 0,
            },
          ]
        : [],
    );
    sessionMetaMocks.readAcpSessionEntry.mockImplementation(
      (params: { sessionKey: string; cfg?: OpenClawConfig }) =>
        params.sessionKey === canonicalSessionKey
          ? {
              cfg: params.cfg ?? createAcpTestConfig(),
              storePath: "/tmp/openclaw-session-store.json",
              sessionKey: canonicalSessionKey,
              storeSessionKey: canonicalSessionKey,
              acp: createAcpSessionMeta({
                identity: {
                  state: "resolved",
                  source: "status",
                  lastUpdatedAt: Date.now(),
                  acpxSessionId: "acpx-main",
                },
              }),
            }
          : null,
    );
    managerMocks.runTurn.mockResolvedValue(undefined);
    const { dispatcher } = createDispatcher();

    await runDispatch({
      bodyForAgent: "test",
      dispatcher,
      sessionKeyOverride: aliasSessionKey,
    });

    expect(bindingServiceMocks.listBySession).toHaveBeenCalledWith(canonicalSessionKey);
    expect(dispatcher.sendFinalReply).toHaveBeenCalledWith(
      expect.objectContaining({
        text: expect.stringContaining("Session ids resolved."),
      }),
    );
    expect(dispatcher.sendFinalReply).toHaveBeenCalledWith(
      expect.objectContaining({
        text: expect.stringContaining("acpx session id: acpx-main"),
      }),
    );
  });

  it("honors the configured default account when checking bound-session identity notices", async () => {
    const canonicalSessionKey = "agent:main:main";
    managerMocks.resolveSession.mockReturnValue({
      kind: "ready",
      sessionKey: canonicalSessionKey,
      meta: createAcpSessionMeta({
        identity: {
          state: "pending",
          source: "ensure",
          lastUpdatedAt: Date.now(),
          acpxRecordId: "rec-work",
        },
      }),
    });
    bindingServiceMocks.listBySession.mockImplementation((targetSessionKey: string) =>
      targetSessionKey === canonicalSessionKey
        ? [
            {
              bindingId: "discord:work:thread-1",
              targetSessionKey: canonicalSessionKey,
              targetKind: "session",
              conversation: {
                channel: "discord",
                accountId: "work",
                conversationId: "thread-1",
              },
              status: "active",
              boundAt: 0,
            },
          ]
        : [],
    );
    sessionMetaMocks.readAcpSessionEntry.mockImplementation(
      (params: { sessionKey: string; cfg?: OpenClawConfig }) =>
        params.sessionKey === canonicalSessionKey
          ? {
              cfg: params.cfg ?? createAcpTestConfig(),
              storePath: "/tmp/openclaw-session-store.json",
              sessionKey: canonicalSessionKey,
              storeSessionKey: canonicalSessionKey,
              acp: createAcpSessionMeta({
                identity: {
                  state: "resolved",
                  source: "status",
                  lastUpdatedAt: Date.now(),
                  acpxSessionId: "acpx-work",
                },
              }),
            }
          : null,
    );
    managerMocks.runTurn.mockResolvedValue(undefined);
    const { dispatcher } = createDispatcher();

    await runDispatch({
      bodyForAgent: "test",
      dispatcher,
      cfg: createAcpTestConfig({
        channels: {
          discord: {
            defaultAccount: "work",
          },
        },
      }),
      ctxOverrides: {
        Provider: "discord",
        Surface: "discord",
      },
      sessionKeyOverride: canonicalSessionKey,
    });

    expect(bindingServiceMocks.listBySession).toHaveBeenCalledWith(canonicalSessionKey);
    expect(dispatcher.sendFinalReply).toHaveBeenCalledWith(
      expect.objectContaining({
        text: expect.stringContaining("Session ids resolved."),
      }),
    );
    expect(dispatcher.sendFinalReply).toHaveBeenCalledWith(
      expect.objectContaining({
        text: expect.stringContaining("acpx session id: acpx-work"),
      }),
    );
  });

  it("does not deliver final fallback text when routed block text was already visible", async () => {
    setReadyAcpResolution();
    ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" });
    queueTtsReplies({ text: "CODEX_OK" }, {} as ReturnType<typeof ttsMocks.maybeApplyTtsToPayload>);
    const { result } = await runRoutedAcpTextTurn("CODEX_OK");

    expect(result?.counts.block).toBe(1);
    expect(result?.counts.final).toBe(0);
    expect(routeMocks.routeReply).toHaveBeenCalledTimes(1);
  });

  it("does not deliver final fallback text when routed discord block text was already visible", async () => {
    setReadyAcpResolution();
    ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" });
    queueTtsReplies(
      { text: "Received your test message." },
      {} as ReturnType<typeof ttsMocks.maybeApplyTtsToPayload>,
    );
    mockRoutedTextTurn("Received your test message.");

    const { dispatcher } = createDispatcher();
    const result = await runDispatch({
      bodyForAgent: "run acp",
      dispatcher,
      shouldRouteToOriginating: true,
      originatingChannel: "discord",
      originatingTo: "channel:1478836151241412759",
    });

    expect(result?.counts.block).toBe(1);
    expect(result?.counts.final).toBe(0);
    expect(routeMocks.routeReply).toHaveBeenCalledTimes(1);
    expect(routeMocks.routeReply).toHaveBeenCalledWith(
      expect.objectContaining({
        channel: "discord",
        to: "channel:1478836151241412759",
        payload: expect.objectContaining({ text: "Received your test message." }),
      }),
    );
  });

  it("does not deliver final fallback text when routed Slack block text was already visible", async () => {
    setReadyAcpResolution();
    ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" });
    queueTtsReplies(
      { text: "Shared update." },
      {} as ReturnType<typeof ttsMocks.maybeApplyTtsToPayload>,
    );
    mockRoutedTextTurn("Shared update.");

    const { dispatcher } = createDispatcher();
    const result = await runDispatch({
      bodyForAgent: "run acp",
      dispatcher,
      shouldRouteToOriginating: true,
      originatingChannel: "slack",
      originatingTo: "channel:C123",
    });

    expect(result?.counts.block).toBe(1);
    expect(result?.counts.final).toBe(0);
    expect(routeMocks.routeReply).toHaveBeenCalledTimes(1);
    expect(routeMocks.routeReply).toHaveBeenCalledWith(
      expect.objectContaining({
        channel: "slack",
        to: "channel:C123",
        payload: expect.objectContaining({ text: "Shared update." }),
      }),
    );
  });

  it("does not deliver final fallback text when direct block text was already visible", async () => {
    setReadyAcpResolution();
    ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" });
    queueTtsReplies({ text: "CODEX_OK" }, {} as ReturnType<typeof ttsMocks.maybeApplyTtsToPayload>);
    mockVisibleTextTurn("CODEX_OK");

    const { dispatcher, counts } = createDispatcher();
    const result = await runDispatch({
      bodyForAgent: "reply",
      dispatcher,
      ctxOverrides: {
        Provider: "telegram",
        Surface: "telegram",
      },
    });

    expect(result?.counts.block).toBe(0);
    expect(result?.counts.final).toBe(0);
    expect(counts.block).toBe(0);
    expect(counts.final).toBe(0);
    expect(dispatcher.sendBlockReply).toHaveBeenCalledWith(
      expect.objectContaining({ text: "CODEX_OK" }),
    );
    expect(dispatcher.sendFinalReply).not.toHaveBeenCalled();
  });

  it("does not deliver final fallback text when direct discord block text was already visible", async () => {
    setReadyAcpResolution();
    ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" });
    queueTtsReplies(
      { text: "Received." },
      {} as ReturnType<typeof ttsMocks.maybeApplyTtsToPayload>,
    );
    mockVisibleTextTurn("Received.");

    const { dispatcher, counts } = createDispatcher();
    const result = await runDispatch({
      bodyForAgent: "reply",
      dispatcher,
      ctxOverrides: {
        Provider: "discord",
        Surface: "discord",
      },
    });

    expect(result?.counts.block).toBe(0);
    expect(result?.counts.final).toBe(0);
    expect(counts.block).toBe(0);
    expect(counts.final).toBe(0);
    expect(dispatcher.sendBlockReply).toHaveBeenCalledWith(
      expect.objectContaining({ text: "Received." }),
    );
    expect(dispatcher.sendFinalReply).not.toHaveBeenCalled();
  });

  it("does not deliver final fallback text when direct Slack block text was already visible", async () => {
    setReadyAcpResolution();
    ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" });
    queueTtsReplies(
      { text: "Slack says hi." },
      {} as ReturnType<typeof ttsMocks.maybeApplyTtsToPayload>,
    );
    mockVisibleTextTurn("Slack says hi.");

    const { dispatcher, counts } = createDispatcher();
    const result = await runDispatch({
      bodyForAgent: "reply",
      dispatcher,
      ctxOverrides: {
        Provider: "slack",
        Surface: "slack",
      },
    });

    expect(result?.counts.block).toBe(0);
    expect(result?.counts.final).toBe(0);
    expect(counts.block).toBe(0);
    expect(counts.final).toBe(0);
    expect(dispatcher.sendBlockReply).toHaveBeenCalledWith(
      expect.objectContaining({ text: "Slack says hi." }),
    );
    expect(dispatcher.sendFinalReply).not.toHaveBeenCalled();
  });

  it("treats visible telegram ACP block delivery as a successful final response", async () => {
    setReadyAcpResolution();
    ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" });
    queueTtsReplies({ text: "CODEX_OK" }, {} as ReturnType<typeof ttsMocks.maybeApplyTtsToPayload>);
    mockVisibleTextTurn("CODEX_OK");

    const { dispatcher } = createDispatcher();
    const result = await runDispatch({
      bodyForAgent: "reply",
      dispatcher,
      ctxOverrides: {
        Provider: "telegram",
        Surface: "telegram",
      },
    });

    expect(result?.queuedFinal).toBe(true);
    expect(dispatcher.sendBlockReply).toHaveBeenCalledWith(
      expect.objectContaining({ text: "CODEX_OK" }),
    );
    expect(dispatcher.sendFinalReply).not.toHaveBeenCalled();
  });

  it("preserves final fallback when direct block text is filtered by channels without a visibility override", async () => {
    setReadyAcpResolution();
    ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" });
    queueTtsReplies({ text: "CODEX_OK" }, {} as ReturnType<typeof ttsMocks.maybeApplyTtsToPayload>);
    mockVisibleTextTurn("CODEX_OK");

    const { dispatcher, counts } = createDispatcher();
    const result = await runDispatch({
      bodyForAgent: "reply",
      dispatcher,
      ctxOverrides: {
        Provider: "whatsapp",
        Surface: "whatsapp",
      },
    });

    expect(result?.counts.block).toBe(0);
    expect(result?.counts.final).toBe(0);
    expect(counts.block).toBe(0);
    expect(counts.final).toBe(0);
    expect(dispatcher.sendBlockReply).toHaveBeenCalledWith(
      expect.objectContaining({ text: "CODEX_OK" }),
    );
    expect(dispatcher.sendFinalReply).toHaveBeenCalledWith(
      expect.objectContaining({ text: "CODEX_OK" }),
    );
  });

  it("falls back to final text when a later telegram ACP block delivery fails", async () => {
    setReadyAcpResolution();
    ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" });
    queueTtsReplies(
      { text: "First chunk. " },
      { text: "Second chunk." },
      {} as ReturnType<typeof ttsMocks.maybeApplyTtsToPayload>,
    );
    const cfg = createAcpTestConfig({
      acp: {
        enabled: true,
        stream: {
          deliveryMode: "live",
          coalesceIdleMs: 0,
          maxChunkChars: 64,
        },
      },
    });
    managerMocks.runTurn.mockImplementation(
      async ({ onEvent }: { onEvent: (event: unknown) => Promise<void> }) => {
        await onEvent({ type: "text_delta", text: "First chunk. ", tag: "agent_message_chunk" });
        await onEvent({ type: "text_delta", text: "Second chunk.", tag: "agent_message_chunk" });
        await onEvent({ type: "done" });
      },
    );

    const { dispatcher } = createDispatcher();
    (dispatcher.sendBlockReply as ReturnType<typeof vi.fn>)
      .mockReturnValueOnce(true)
      .mockReturnValueOnce(false);
    const result = await runDispatch({
      bodyForAgent: "reply",
      cfg,
      dispatcher,
      ctxOverrides: {
        Provider: "telegram",
        Surface: "telegram",
      },
    });

    expect(dispatcher.sendBlockReply).toHaveBeenNthCalledWith(
      1,
      expect.objectContaining({ text: "First chunk. " }),
    );
    expect(dispatcher.sendBlockReply).toHaveBeenNthCalledWith(
      2,
      expect.objectContaining({ text: "Second chunk." }),
    );
    expect(dispatcher.sendFinalReply).toHaveBeenCalledWith(
      expect.objectContaining({ text: "First chunk. \nSecond chunk." }),
    );
    expect(result?.queuedFinal).toBe(true);
  });

  it("honors the configured default account for ACP projector chunking when AccountId is omitted", async () => {
    setReadyAcpResolution();
    const cfg = createAcpTestConfig({
      channels: {
        discord: {
          defaultAccount: "work",
          accounts: {
            work: {
              textChunkLimit: 5,
            },
          },
        },
      },
    });
    managerMocks.runTurn.mockImplementation(
      async ({ onEvent }: { onEvent: (event: unknown) => Promise<void> }) => {
        await onEvent({ type: "text_delta", text: "abcdef", tag: "agent_message_chunk" });
        await onEvent({ type: "done" });
      },
    );

    const { dispatcher } = createDispatcher();
    await runDispatch({
      bodyForAgent: "reply",
      cfg,
      dispatcher,
      ctxOverrides: {
        Provider: "discord",
        Surface: "discord",
      },
    });

    expect(dispatcher.sendBlockReply).toHaveBeenNthCalledWith(
      1,
      expect.objectContaining({ text: "abcde" }),
    );
    expect(dispatcher.sendBlockReply).toHaveBeenNthCalledWith(
      2,
      expect.objectContaining({ text: "f" }),
    );
  });

  it("does not add a second routed payload when routed block text was already visible", async () => {
    setReadyAcpResolution();
    ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" });
    queueTtsReplies({ text: "Task completed" }, {
      mediaUrl: "https://example.com/final.mp3",
      audioAsVoice: true,
    } as MockTtsReply);
    const { result } = await runRoutedAcpTextTurn("Task completed");

    expect(result?.counts.block).toBe(1);
    expect(result?.counts.final).toBe(0);
    expect(routeMocks.routeReply).toHaveBeenCalledTimes(1);
    expectRoutedPayload(1, {
      text: "Task completed",
    });
  });

  it("skips fallback when TTS mode is all (blocks already processed with TTS)", async () => {
    setReadyAcpResolution();
    ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "all" });
    const { result } = await runRoutedAcpTextTurn("Response");

    expect(result?.counts.block).toBe(1);
    expect(result?.counts.final).toBe(0);
    expect(routeMocks.routeReply).toHaveBeenCalledTimes(1);
  });

  it("skips final TTS and fallback when no block text was accumulated", async () => {
    setReadyAcpResolution();
    ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" });

    managerMocks.runTurn.mockImplementation(
      async ({ onEvent }: { onEvent: (event: unknown) => Promise<void> }) => {
        await onEvent({ type: "done" });
      },
    );

    const { dispatcher } = createDispatcher();
    const result = await runDispatch({
      bodyForAgent: "run acp",
      dispatcher,
      shouldRouteToOriginating: true,
    });

    expect(result?.counts.block).toBe(0);
    expect(result?.counts.final).toBe(0);
    expect(routeMocks.routeReply).not.toHaveBeenCalled();
    expect(ttsMocks.maybeApplyTtsToPayload).not.toHaveBeenCalled();
  });
});
