diff --git a/README.md b/README.md index af7cbd6..695daee 100644 --- a/README.md +++ b/README.md @@ -39,8 +39,13 @@ What `install.sh` does: ## How it works 1. **Billing header**: SHA-256 signed `x-anthropic-billing-header` injected as `system[0]` 2. **System prompt relocation**: Non-identity system entries moved to the first user message as `` blocks -3. **Beta flag**: Adds `prompt-caching-scope-2026-01-05` -4. **Temperature fix**: Strips non-default temperature on Opus 4.6 adaptive thinking, which prevents HTTP 400 +3. **Beta flags**: Adds `prompt-caching-scope-2026-01-05` and `advisor-tool-2026-03-01` +4. **Stainless SDK spoof**: Lowercase `x-stainless-*` headers + `anthropic-dangerous-direct-browser-access` + `?beta=true` query param matching real Claude Code 2.1.112 +5. **Tool name namespacing**: Hermes's `mcp_bash` is rewritten to `mcp__hermes__Bash` outbound; the response normalizer unwraps it back to `bash` so hermes's tool dispatcher resolves the registered name without auto-repair noise +6. **Tool pair repair**: Orphaned `tool_use` / `tool_result` blocks (left by long conversations or partial summaries) are stripped before signing — prevents HTTP 400 (upstream PR #136) +7. **Haiku effort stripping**: `effort` parameter is removed for haiku models that reject it with HTTP 400 (upstream PR #126) +8. **Temperature fix**: Strips non-default `temperature` on Opus 4.6 adaptive thinking, which otherwise rejects with HTTP 400 +9. **Account metadata**: Maps `~/.claude.json::oauthAccount.accountUuid` to `metadata.user_id` (Anthropic rejected the older `account_uuid` key with HTTP 400 on 2026-04-29) Installed through a `sitecustomize.py` MetaPathFinder hook, so it runs at interpreter startup with no source modifications. diff --git a/anthropic_billing_bypass.py b/anthropic_billing_bypass.py index 2e5f2d9..b3a01ec 100644 --- a/anthropic_billing_bypass.py +++ b/anthropic_billing_bypass.py @@ -1,219 +1,833 @@ +""" +Claude Code OAuth bypass for hermes-agent. +========================================== + +Monkey-patches hermes-agent's anthropic adapter so OAuth-authenticated +requests pass Anthropic's server-side billing validator and route to the +Claude Max/Pro subscription tier. + +Tracks upstream ``griffinmartin/opencode-claude-auth`` (TypeScript) and +ports its bypass behaviors to Python. + +Version history +--------------- +- 1.5.0 (2026-05-06): Fix literal ``\\n`` escapes in system-reminder text, + lowercase Stainless headers (matches upstream JS SDK), restore Opus 4.6 + temperature stripping, port ``repair_tool_pairs`` (upstream PR #136) and + haiku effort stripping (upstream PR #126), lowercase tool names after + unwrap to silence hermes auto-repair (intent of commit 6d9cade), patch + ``normalize_response`` on both old and new hermes transports. +- 1.4.0-pr10 (2026-04-29): Hermes 0.11.0 ``AnthropicTransport`` support, + ``mcp__hermes__`` namespacing, accountUuid → user_id metadata. +- 1.1.1 (2026-04-22): macOS Keychain mirror in installer (no module change). +- 1.1.0 (2026-04-22): PascalCase ``mcp_`` tools, ``sdk-cli`` entrypoint, + ``advisor-tool-2026-03-01`` beta, Stainless headers, ``?beta=true``. +- 1.0.0 (2026-04-09): Billing header, system prompt relocation, prompt- + caching beta, Opus 4.6 temperature hook. + +References +---------- +- https://github.com/griffinmartin/opencode-claude-auth +- PR #126: strip ``effort`` for haiku models +- PR #136: repair orphaned tool_use / tool_result pairs +- PR #148: relocate non-identity system entries to first user message +- PR #191: PascalCase tool names after ``mcp_`` prefix +- PR #207: Claude Code 2.1.112 fingerprint + ``?beta=true`` +""" + +from __future__ import annotations + +__version__ = "1.5.0" + import hashlib import inspect +import json import logging +import os import platform import sys import traceback -import json -import os -from typing import Any, Dict, List +from typing import Any, Dict, List, Set -__version__ = "1.4.0-pr10" logger = logging.getLogger("anthropic_billing_bypass") +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +# Shared salt shipped in the Claude Code CLI binary; Anthropic's server uses +# this to verify billing-header signatures. _BILLING_SALT = "59cf53e54c78" + +# Claude Code 2.1.112+ reports ``sdk-cli`` instead of legacy ``cli``. A +# mismatch with x-stainless-* headers routes the request to third-party +# billing. _BILLING_ENTRYPOINT = "sdk-cli" + +# Sentinel strings — entries in system[] starting with these are kept; +# everything else is relocated to the first user message. _BILLING_PREFIX = "x-anthropic-billing-header" _SYSTEM_IDENTITY = "You are Claude Code, Anthropic's official CLI for Claude." + +# Hermes prefixes MCP tools with ``mcp_``. We rewrite that to the standard +# ``mcp____`` namespace Anthropic expects from real Claude Code, +# using ``hermes`` as the server name. _MCP_PREFIX = "mcp_" _MCP_HERMES_NAMESPACE = "mcp__hermes__" + +# Stainless-generated SDK headers Claude Code 2.1.112 sends. Lowercase to +# match the JS SDK output exactly (HTTP headers are case-insensitive but +# upstream's spoof uses lowercase, and so does our pre-merge code). _STAINLESS_PACKAGE_VERSION = "0.81.0" _STAINLESS_NODE_VERSION = "v22.11.0" -_EXTRA_OAUTH_BETAS = ["prompt-caching-scope-2026-01-05", "advisor-tool-2026-03-01"] -def _pascalcase_mcp_name(name: str) -> str: +# OAuth-only beta flags appended on top of hermes-agent's built-in +# ``claude-code-20250219`` and ``oauth-2025-04-20``. +_EXTRA_OAUTH_BETAS = [ + "prompt-caching-scope-2026-01-05", + "advisor-tool-2026-03-01", +] + + +# --------------------------------------------------------------------------- +# Tool name transforms (upstream PR #191 + hermes namespacing) +# --------------------------------------------------------------------------- + + +def _uppercase_first(name: str) -> str: if not isinstance(name, str) or not name: return name return name[0].upper() + name[1:] -def _wrap_tool_name_as_mcp_hermes(name: str) -> str: + +def _lowercase_first(name: str) -> str: + """Used after MCP-namespace unwrap so hermes's tool dispatcher resolves + the registered snake_case name without its auto-repair warning.""" if not isinstance(name, str) or not name: return name - return _MCP_HERMES_NAMESPACE + name + return name[0].lower() + name[1:] -def _unwrap_mcp_hermes_name(name: Any) -> Any: - if not isinstance(name, str): return name + +def _pascalcase_mcp_name(name: str) -> str: + """Rewrite ``mcp_foo_bar`` → ``mcp_Foo_bar``. Mirrors upstream PR #191 + exactly; exposed for tests. In-flight wrapping uses ``_wrap_tool_name`` + which adds the hermes namespace too. + """ + if not isinstance(name, str) or not name.startswith(_MCP_PREFIX): + return name + rest = name[len(_MCP_PREFIX):] + if not rest or not rest[0].islower(): + return name + return _MCP_PREFIX + rest[0].upper() + rest[1:] + + +def _wrap_tool_name(name: str) -> str: + if not isinstance(name, str) or not name: + return name if name.startswith(_MCP_HERMES_NAMESPACE): - return name[len(_MCP_HERMES_NAMESPACE):] - # Handle case where mcp_ was already stripped by native Hermes code - hermes_stripped_prefix = _MCP_HERMES_NAMESPACE[len(_MCP_PREFIX):] # "_hermes__" - if name.startswith(hermes_stripped_prefix): - return name[len(hermes_stripped_prefix):] + return name + base = name[len(_MCP_PREFIX):] if name.startswith(_MCP_PREFIX) else name + return _MCP_HERMES_NAMESPACE + _uppercase_first(base) + + +def _unwrap_tool_name(name: Any) -> Any: + if not isinstance(name, str): + return name + if name.startswith(_MCP_HERMES_NAMESPACE): + return _lowercase_first(name[len(_MCP_HERMES_NAMESPACE):]) + # Hermes's transport may already strip ``mcp_``, leaving ``_hermes__``. + fallback_prefix = _MCP_HERMES_NAMESPACE[len(_MCP_PREFIX):] # "_hermes__" + if name.startswith(fallback_prefix): + return _lowercase_first(name[len(fallback_prefix):]) return name -def _normalize_tool_name(name: str) -> str: - if not isinstance(name, str) or not name: - return name - if name.startswith(_MCP_PREFIX): - name = name[len(_MCP_PREFIX):] - return _wrap_tool_name_as_mcp_hermes(_pascalcase_mcp_name(name)) -def _read_claude_config() -> Dict[str, Any]: - path = os.path.expanduser("~/.claude.json") - if not os.path.exists(path): return {} - try: - with open(path, "r") as f: return json.load(f) - except Exception: return {} - -def _get_account_metadata() -> Dict[str, Any]: - config = _read_claude_config() - oauth = config.get("oauthAccount", {}) - metadata = {} - if "accountUuid" in oauth: - metadata["user_id"] = oauth["accountUuid"] - return metadata - -def _extract_first_user_message_text(messages: List[Dict[str, Any]]) -> str: - for msg in messages: - if not isinstance(msg, dict) or msg.get("role") != "user": continue - content = msg.get("content") - if isinstance(content, str): return content - if isinstance(content, list): - for block in content: - if isinstance(block, dict) and block.get("type") == "text": - text = block.get("text"); - if isinstance(text, str) and text: return text - return "" - -def _compute_cch(message_text: str) -> str: - return hashlib.sha256(message_text.encode("utf-8")).hexdigest()[:5] - -def _compute_version_suffix(message_text: str, version: str) -> str: - sampled = "".join(message_text[i] if i < len(message_text) else "0" for i in (4, 7, 20)) - input_str = f"{_BILLING_SALT}{sampled}{version}" - return hashlib.sha256(input_str.encode("utf-8")).hexdigest()[:3] - -def _build_billing_header_value(messages: List[Dict[str, Any]], version: str, entrypoint: str) -> str: - text = _extract_first_user_message_text(messages) - suffix = _compute_version_suffix(text, version) - cch = _compute_cch(text) - return f"x-anthropic-billing-header: cc_version={version}.{suffix}; cc_entrypoint={entrypoint}; cch={cch};" - -def _stainless_arch() -> str: - machine = (platform.machine() or "").lower() - if machine in ("x86_64", "amd64"): return "x64" - if machine in ("arm64", "aarch64"): return "arm64" - return machine or "unknown" - -def _stainless_os() -> str: - mapping = {"Darwin": "MacOS", "Linux": "Linux", "Windows": "Windows"} - return mapping.get(platform.system(), "Unknown") - -def _build_spoof_headers() -> Dict[str, str]: - return { - "anthropic-dangerous-direct-browser-access": "true", - "X-Stainless-Arch": _stainless_arch(), - "X-Stainless-Lang": "js", - "X-Stainless-OS": _stainless_os(), - "X-Stainless-Package-Version": _STAINLESS_PACKAGE_VERSION, - "X-Stainless-Retry-Count": "0", - "X-Stainless-Runtime": "node", - "X-Stainless-Runtime-Version": _STAINLESS_NODE_VERSION, - "X-Stainless-Timeout": "600", - } - -def _rewrite_tool_names_pascalcase(api_kwargs: Dict[str, Any]) -> None: +def _rewrite_tool_names(api_kwargs: Dict[str, Any]) -> None: tools = api_kwargs.get("tools") if isinstance(tools, list): for tool in tools: if isinstance(tool, dict) and "name" in tool: - tool["name"] = _normalize_tool_name(tool.get("name") or "") + tool["name"] = _wrap_tool_name(tool.get("name") or "") + messages = api_kwargs.get("messages") if isinstance(messages, list): for msg in messages: - if not isinstance(msg, dict): continue + if not isinstance(msg, dict): + continue content = msg.get("content") - if not isinstance(content, list): continue + if not isinstance(content, list): + continue for block in content: if isinstance(block, dict) and block.get("type") == "tool_use": - block["name"] = _normalize_tool_name(block.get("name") or "") + block["name"] = _wrap_tool_name(block.get("name") or "") + + +# --------------------------------------------------------------------------- +# Account metadata (commit f10468a — accountUuid → user_id) +# --------------------------------------------------------------------------- + + +def _read_claude_config() -> Dict[str, Any]: + path = os.path.expanduser("~/.claude.json") + if not os.path.exists(path): + return {} + try: + with open(path, "r") as f: + return json.load(f) + except Exception: + return {} + + +def _get_account_metadata() -> Dict[str, Any]: + """Return Anthropic-compatible request metadata. + + ``metadata.account_uuid`` was rejected with HTTP 400 in 2026-04-29; only + ``user_id`` is accepted. Returns ``{}`` when the config or oauthAccount + block is missing so the caller can skip injecting metadata entirely. + """ + config = _read_claude_config() + oauth = config.get("oauthAccount") if isinstance(config, dict) else None + metadata: Dict[str, Any] = {} + if isinstance(oauth, dict) and isinstance(oauth.get("accountUuid"), str): + metadata["user_id"] = oauth["accountUuid"] + return metadata + + +# --------------------------------------------------------------------------- +# Billing header signing (mirror upstream src/signing.ts) +# --------------------------------------------------------------------------- + + +def _extract_first_user_message_text(messages: List[Dict[str, Any]]) -> str: + """Mirrors Claude Code's K19() — first text block of the first user + message. Returns ``""`` when none exists; required for billing-header + signature determinism.""" + for msg in messages: + if not isinstance(msg, dict) or msg.get("role") != "user": + continue + content = msg.get("content") + if isinstance(content, str): + return content + if isinstance(content, list): + for block in content: + if isinstance(block, dict) and block.get("type") == "text": + text = block.get("text") + if isinstance(text, str) and text: + return text + return "" + return "" + + +def _compute_cch(message_text: str) -> str: + return hashlib.sha256(message_text.encode("utf-8")).hexdigest()[:5] + + +def _compute_version_suffix(message_text: str, version: str) -> str: + """SHA-256(salt + chars[4,7,20] + version)[:3]; pads with ``"0"`` when + the message is shorter than each index. Matches Claude Code's signing + routine; deviations break OAuth billing routing.""" + sampled = "".join( + message_text[i] if i < len(message_text) else "0" for i in (4, 7, 20) + ) + input_str = f"{_BILLING_SALT}{sampled}{version}" + return hashlib.sha256(input_str.encode("utf-8")).hexdigest()[:3] + + +def _build_billing_header_value( + messages: List[Dict[str, Any]], + version: str, + entrypoint: str, +) -> str: + text = _extract_first_user_message_text(messages) + suffix = _compute_version_suffix(text, version) + cch = _compute_cch(text) + return ( + f"x-anthropic-billing-header: " + f"cc_version={version}.{suffix}; " + f"cc_entrypoint={entrypoint}; " + f"cch={cch};" + ) + + +# --------------------------------------------------------------------------- +# Stainless SDK spoof headers (lowercase, matches upstream src/index.ts) +# --------------------------------------------------------------------------- + + +def _stainless_arch() -> str: + machine = (platform.machine() or "").lower() + if machine in ("x86_64", "amd64"): + return "x64" + if machine in ("arm64", "aarch64"): + return "arm64" + if machine in ("i386", "i686"): + return "ia32" + return machine or "unknown" + + +def _stainless_os() -> str: + return {"Darwin": "MacOS", "Linux": "Linux", "Windows": "Windows"}.get( + platform.system(), platform.system() or "Unknown" + ) + + +def _build_spoof_headers() -> Dict[str, str]: + """Headers real Claude Code 2.1.112 sends that hermes-agent does not. + + The Anthropic SDK (Stainless-generated) automatically attaches + ``x-stainless-*`` identifying headers. The validator cross-references + these with the billing header's ``cc_entrypoint``; absent or mismatched + values flag the request as third-party. Lowercase to match upstream's + JS SDK output. + """ + return { + "anthropic-dangerous-direct-browser-access": "true", + "x-stainless-arch": _stainless_arch(), + "x-stainless-lang": "js", + "x-stainless-os": _stainless_os(), + "x-stainless-package-version": _STAINLESS_PACKAGE_VERSION, + "x-stainless-retry-count": "0", + "x-stainless-runtime": "node", + "x-stainless-runtime-version": _STAINLESS_NODE_VERSION, + "x-stainless-timeout": "600", + } + def _merge_spoof_extras(api_kwargs: Dict[str, Any]) -> None: - merged_headers = dict(_build_spoof_headers()) + """Existing extra_headers/extra_query take precedence so hermes's own + headers (e.g. fast-mode beta) survive — additive spoof only.""" + merged_headers: Dict[str, str] = dict(_build_spoof_headers()) existing_headers = api_kwargs.get("extra_headers") - if isinstance(existing_headers, dict): merged_headers.update(existing_headers) + if isinstance(existing_headers, dict): + for k, v in existing_headers.items(): + merged_headers[k] = v api_kwargs["extra_headers"] = merged_headers - merged_query = {"beta": "true"} + + merged_query: Dict[str, Any] = {"beta": "true"} existing_query = api_kwargs.get("extra_query") - if isinstance(existing_query, dict): merged_query.update(existing_query) + if isinstance(existing_query, dict): + for k, v in existing_query.items(): + merged_query[k] = v api_kwargs["extra_query"] = merged_query + +# --------------------------------------------------------------------------- +# Tool pair repair (upstream PR #136) +# --------------------------------------------------------------------------- + + +def _repair_tool_pairs(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Strip orphaned ``tool_use`` / ``tool_result`` blocks. + + Anthropic rejects requests where a ``tool_use`` has no matching + ``tool_result`` (or vice versa). Long conversations or partial summaries + can leave these orphans behind; this function removes them and drops + messages whose content becomes empty as a result. + + Mirrors upstream ``src/transforms.ts::repairToolPairs``. Returns the + original list when nothing needs repairing so callers can detect a no-op + via identity comparison. + """ + if not isinstance(messages, list): + return messages + + tool_use_ids: Set[str] = set() + tool_result_ids: Set[str] = set() + + for msg in messages: + if not isinstance(msg, dict): + continue + content = msg.get("content") + if not isinstance(content, list): + continue + for block in content: + if not isinstance(block, dict): + continue + if block.get("type") == "tool_use": + bid = block.get("id") + if isinstance(bid, str): + tool_use_ids.add(bid) + elif block.get("type") == "tool_result": + tuid = block.get("tool_use_id") + if isinstance(tuid, str): + tool_result_ids.add(tuid) + + orphaned_uses = tool_use_ids - tool_result_ids + orphaned_results = tool_result_ids - tool_use_ids + + if not orphaned_uses and not orphaned_results: + return messages + + repaired: List[Dict[str, Any]] = [] + for msg in messages: + if not isinstance(msg, dict): + repaired.append(msg) + continue + content = msg.get("content") + if not isinstance(content, list): + repaired.append(msg) + continue + filtered: List[Any] = [] + for block in content: + if not isinstance(block, dict): + filtered.append(block) + continue + if ( + block.get("type") == "tool_use" + and block.get("id") in orphaned_uses + ): + continue + if ( + block.get("type") == "tool_result" + and block.get("tool_use_id") in orphaned_results + ): + continue + filtered.append(block) + if filtered: + repaired.append({**msg, "content": filtered}) + return repaired + + +# --------------------------------------------------------------------------- +# Effort stripping for haiku (upstream PR #126) +# --------------------------------------------------------------------------- + + +def _model_disables_effort(model: str) -> bool: + if not isinstance(model, str): + return False + return "haiku" in model.lower() + + +def _strip_effort(api_kwargs: Dict[str, Any]) -> None: + """Remove ``effort`` for haiku (rejected with HTTP 400). Drops the + parent dict if it becomes empty so we don't send ``"output_config": {}`` + which trips a different validator. Mirrors upstream PR #126.""" + model = api_kwargs.get("model") or "" + if not _model_disables_effort(model): + return + + output_config = api_kwargs.get("output_config") + if isinstance(output_config, dict) and "effort" in output_config: + del output_config["effort"] + if not output_config: + del api_kwargs["output_config"] + + thinking = api_kwargs.get("thinking") + if isinstance(thinking, dict) and "effort" in thinking: + del thinking["effort"] + if not thinking: + del api_kwargs["thinking"] + + +# --------------------------------------------------------------------------- +# Temperature fix for Opus 4.6 adaptive thinking (preserved from 1.0.0) +# --------------------------------------------------------------------------- + + +def _model_supports_adaptive_thinking(model: str) -> bool: + if not isinstance(model, str): + return False + return "4-6" in model or "4.6" in model + + +def _fix_temperature_for_oauth_adaptive( + api_kwargs: Dict[str, Any], + *, + site: str, +) -> None: + """Strip non-default ``temperature`` from OAuth requests on Opus 4.6. + + Opus 4.6 with implicit adaptive thinking rejects ``temperature != 1`` + with HTTP 400; dropping the parameter lets the API use its default. + """ + if "temperature" not in api_kwargs: + return + temp = api_kwargs.get("temperature") + if temp == 1 or temp == 1.0: + return + model = api_kwargs.get("model") or "" + if not _model_supports_adaptive_thinking(model): + return + del api_kwargs["temperature"] + logger.info( + "Dropped temperature=%r for OAuth adaptive-thinking model %r (site=%s)", + temp, + model, + site, + ) + + +# --------------------------------------------------------------------------- +# System prompt relocation (upstream PR #148) +# --------------------------------------------------------------------------- + + +def _prepend_to_first_user_message( + messages: List[Dict[str, Any]], + texts: List[str], +) -> None: + if not texts: + return + combined = "\n\n".join( + f"\n{t}\n" for t in texts + ) + for i, msg in enumerate(messages): + if not isinstance(msg, dict) or msg.get("role") != "user": + continue + content = msg.get("content") + if isinstance(content, str): + new_text = f"{combined}\n\n{content}" if content else combined + messages[i] = {**msg, "content": [{"type": "text", "text": new_text}]} + return + if isinstance(content, list): + new_content = list(content) + for j, block in enumerate(new_content): + if isinstance(block, dict) and block.get("type") == "text": + existing = block.get("text") or "" + new_content[j] = { + **block, + "text": f"{combined}\n\n{existing}" if existing else combined, + } + messages[i] = {**msg, "content": new_content} + return + new_content.insert(0, {"type": "text", "text": combined}) + messages[i] = {**msg, "content": new_content} + return + messages[i] = {**msg, "content": [{"type": "text", "text": combined}]} + return + + +# --------------------------------------------------------------------------- +# Orchestrator +# --------------------------------------------------------------------------- + + def apply_claude_code_bypass(api_kwargs: Dict[str, Any], version: str) -> None: + """Apply all OAuth bypass transforms in place. + + Idempotent: stale billing headers are dropped before injecting the new + one and duplicate identity entries are removed. Safe to call on + requests that have already been bypassed. + """ messages = api_kwargs.get("messages") - if not isinstance(messages, list) or not messages: return - system = api_kwargs.get("system", []) - if isinstance(system, str): system = [{"type": "text", "text": system}] - billing_value = _build_billing_header_value(messages, version, _BILLING_ENTRYPOINT) + if not isinstance(messages, list) or not messages: + return + + # Repair orphaned tool pairs first; downstream transforms assume valid + # tool_use/tool_result pairing. + repaired = _repair_tool_pairs(messages) + if repaired is not messages: + api_kwargs["messages"] = repaired + messages = repaired + + raw_system = api_kwargs.get("system") + if raw_system is None: + system: List[Any] = [] + elif isinstance(raw_system, str): + system = [{"type": "text", "text": raw_system}] if raw_system else [] + elif isinstance(raw_system, list): + system = list(raw_system) + else: + logger.warning( + "Unexpected system type %s; skipping bypass", + type(raw_system).__name__, + ) + return + + # Build billing header from ORIGINAL messages (before relocation mutates). + try: + billing_value = _build_billing_header_value( + messages, version, _BILLING_ENTRYPOINT + ) + except Exception as exc: + logger.warning("Failed to build billing header: %s", exc) + return billing_entry = {"type": "text", "text": billing_value} - kept, moved_texts, identity_seen = [], [], False + + kept: List[Any] = [] + moved_texts: List[str] = [] + identity_seen = False + for entry in system: - if not isinstance(entry, dict) or entry.get("type") != "text": - kept.append(entry); continue - text = entry.get("text", "") - if text.startswith(_BILLING_PREFIX): continue + if not isinstance(entry, dict): + kept.append(entry) + continue + if entry.get("type") != "text": + kept.append(entry) + continue + text = entry.get("text") or "" + if text.startswith(_BILLING_PREFIX): + continue # stale billing header — drop if text.startswith(_SYSTEM_IDENTITY): - if identity_seen: continue + if identity_seen: + continue # duplicate — drop identity_seen = True rest = text[len(_SYSTEM_IDENTITY):].lstrip("\n") kept.append({"type": "text", "text": _SYSTEM_IDENTITY}) - if rest: moved_texts.append(rest) - elif text: moved_texts.append(text) - if not identity_seen: kept.insert(0, {"type": "text", "text": _SYSTEM_IDENTITY}) + if rest: + moved_texts.append(rest) + continue + if text: + moved_texts.append(text) + + if not identity_seen: + kept.insert(0, {"type": "text", "text": _SYSTEM_IDENTITY}) + api_kwargs["system"] = [billing_entry] + kept + if moved_texts: - combined = "\\n\\n".join(f"\\n{t}\\n" for t in moved_texts) - for i, msg in enumerate(messages): - if msg.get("role") == "user": - content = msg.get("content") - if isinstance(content, str): messages[i]["content"] = f"{combined}\\n\\n{content}" - elif isinstance(content, list): content.insert(0, {"type": "text", "text": combined}) - break - _rewrite_tool_names_pascalcase(api_kwargs) + _prepend_to_first_user_message(messages, moved_texts) + + _rewrite_tool_names(api_kwargs) _merge_spoof_extras(api_kwargs) + _strip_effort(api_kwargs) + _fix_temperature_for_oauth_adaptive(api_kwargs, site="build_kwargs") + metadata = _get_account_metadata() - if metadata: api_kwargs["metadata"] = metadata + if metadata: + existing_meta = api_kwargs.get("metadata") + if isinstance(existing_meta, dict): + for k, v in metadata.items(): + existing_meta.setdefault(k, v) + else: + api_kwargs["metadata"] = metadata -def _install_response_pascalcase_unhook(aa_module: Any) -> bool: + +# --------------------------------------------------------------------------- +# Monkey-patch installation +# --------------------------------------------------------------------------- + + +def _get_version_safely(aa_module: Any) -> str: + getter = getattr(aa_module, "_get_claude_code_version", None) + if callable(getter): + try: + version = getter() + if isinstance(version, str) and version and version[0].isdigit(): + return version + except Exception: + pass + fallback = getattr(aa_module, "_CLAUDE_CODE_VERSION_FALLBACK", None) + if isinstance(fallback, str) and fallback: + return fallback + return "2.1.112" + + +def _install_response_pascalcase_unhook( + aa_module: Any, force: bool = False +) -> bool: + """Patch hermes's response normalizer to unwrap ``mcp__hermes__Foo`` back + to ``foo`` and lowercase the first character so the tool dispatcher + resolves the original snake_case name without auto-repair noise. + + Patches both: + - ``aa_module.normalize_anthropic_response`` (pre-0.11 hermes) + - ``agent.transports.anthropic.AnthropicTransport.normalize_response`` + (hermes 0.11+) + + Returns True if at least one hook succeeded. + """ + any_installed = False + + # --- Old hermes: normalize_anthropic_response on the adapter module --- + original_normalize = getattr(aa_module, "normalize_anthropic_response", None) + already_old = getattr(aa_module, "_CLAUDE_CODE_RESPONSE_UNHOOK_APPLIED", False) + if callable(original_normalize) and (force or not already_old): + def patched_normalize( + response: Any, strip_tool_prefix: bool = False, **kwargs: Any + ) -> Any: + result = original_normalize( + response, strip_tool_prefix=strip_tool_prefix, **kwargs + ) + try: + assistant_message, _finish = result + except (TypeError, ValueError): + return result + tool_calls = getattr(assistant_message, "tool_calls", None) + if not tool_calls: + return result + for tc in tool_calls: + fn = getattr(tc, "function", None) + if fn is None: + name = getattr(tc, "name", None) + if isinstance(name, str): + try: + tc.name = _unwrap_tool_name(name) + except Exception: + pass + continue + fn_name = getattr(fn, "name", None) + if isinstance(fn_name, str): + try: + fn.name = _unwrap_tool_name(fn_name) + except Exception: + pass + return result + + patched_normalize.__name__ = original_normalize.__name__ + patched_normalize.__qualname__ = getattr( + original_normalize, "__qualname__", original_normalize.__name__ + ) + patched_normalize.__doc__ = original_normalize.__doc__ + patched_normalize.__wrapped__ = original_normalize # type: ignore[attr-defined] + + aa_module.normalize_anthropic_response = patched_normalize + aa_module._CLAUDE_CODE_RESPONSE_UNHOOK_APPLIED = True # type: ignore[attr-defined] + sys.stderr.write( + "[anthropic_billing_bypass] Adapter unwrap hook installed\n" + ) + any_installed = True + elif callable(original_normalize) and already_old: + any_installed = True # already installed in a previous call + + # --- New hermes: AnthropicTransport.normalize_response --- try: - from agent.transports import anthropic as at + from agent.transports import anthropic as at # type: ignore[import-not-found] cls = getattr(at, "AnthropicTransport", None) - if cls and not getattr(cls, "_HERMES_MCP_UNWRAP_APPLIED", False): - orig = cls.normalize_response - def patched(self, response: Any, **kwargs: Any) -> Any: - res = orig(self, response, **kwargs) - tcs = getattr(res, "tool_calls", None) - if tcs: - for tc in tcs: - tc.name = _unwrap_mcp_hermes_name(tc.name) - return res - cls.normalize_response = patched - cls._HERMES_MCP_UNWRAP_APPLIED = True - sys.stderr.write("[anthropic_billing_bypass] Transport unwrap hook installed\\n") - return True - except Exception: pass - return False + except Exception as exc: + logger.debug( + "AnthropicTransport not importable (%s); skipping transport hook", + exc, + ) + cls = None -def apply_patches(aa: Any = None) -> bool: + if cls is not None: + already_new = getattr(cls, "_HERMES_MCP_UNWRAP_APPLIED", False) + if force or not already_new: + original_transport_normalize = getattr(cls, "normalize_response", None) + if callable(original_transport_normalize): + def patched_transport_normalize( + self: Any, response: Any, *args: Any, **kwargs: Any + ) -> Any: + result = original_transport_normalize( + self, response, *args, **kwargs + ) + tool_calls = getattr(result, "tool_calls", None) + if tool_calls: + for tc in tool_calls: + name = getattr(tc, "name", None) + if isinstance(name, str): + try: + tc.name = _unwrap_tool_name(name) + except Exception: + pass + fn = getattr(tc, "function", None) + fn_name = ( + getattr(fn, "name", None) if fn is not None else None + ) + if isinstance(fn_name, str): + try: + fn.name = _unwrap_tool_name(fn_name) + except Exception: + pass + return result + + patched_transport_normalize.__name__ = ( + original_transport_normalize.__name__ + ) + patched_transport_normalize.__qualname__ = getattr( + original_transport_normalize, + "__qualname__", + original_transport_normalize.__name__, + ) + patched_transport_normalize.__doc__ = ( + original_transport_normalize.__doc__ + ) + patched_transport_normalize.__wrapped__ = ( # type: ignore[attr-defined] + original_transport_normalize + ) + + cls.normalize_response = patched_transport_normalize + cls._HERMES_MCP_UNWRAP_APPLIED = True # type: ignore[attr-defined] + sys.stderr.write( + "[anthropic_billing_bypass] Transport unwrap hook installed\n" + ) + any_installed = True + else: + any_installed = True + + return any_installed + + +def apply_patches(anthropic_adapter_module: Any = None) -> bool: + """Install the bypass on hermes-agent's anthropic adapter. + + Idempotent. Returns False if hermes-agent's API is incompatible with + this patch (e.g. ``build_anthropic_kwargs`` missing or signature changed). + """ + aa = anthropic_adapter_module if aa is None: - try: from agent import anthropic_adapter as aa - except ImportError: return False - if getattr(aa, "_CLAUDE_CODE_BYPASS_APPLIED", False): return True - betas = getattr(aa, "_OAUTH_ONLY_BETAS", []) - for b in _EXTRA_OAUTH_BETAS: - if b not in betas: betas.append(b) - orig_build = aa.build_anthropic_kwargs - def patched_build(*args, **kwargs): - res = orig_build(*args, **kwargs) - is_oauth = kwargs.get("is_oauth", False) - if not is_oauth and len(args) > 6: is_oauth = args[6] - if is_oauth and isinstance(res, dict): - version = "2.1.123" - try: version = aa._get_claude_code_version() - except: pass - apply_claude_code_bypass(res, version) - return res + try: + from agent import anthropic_adapter as aa # type: ignore[import-not-found,no-redef] + except ImportError as exc: + logger.warning("Cannot import agent.anthropic_adapter: %s", exc) + return False + + if getattr(aa, "_CLAUDE_CODE_BYPASS_APPLIED", False): + return True + + # 1. Add the OAuth-only beta flags. + oauth_betas = getattr(aa, "_OAUTH_ONLY_BETAS", None) + if isinstance(oauth_betas, list): + for new_beta in _EXTRA_OAUTH_BETAS: + if new_beta not in oauth_betas: + oauth_betas.append(new_beta) + logger.info("Appended beta flag: %s", new_beta) + + # 2. Verify build_anthropic_kwargs presence and signature. + original_build = getattr(aa, "build_anthropic_kwargs", None) + if not callable(original_build): + logger.warning( + "agent.anthropic_adapter.build_anthropic_kwargs missing; skipping" + ) + return False + + try: + sig = inspect.signature(original_build) + if "is_oauth" not in sig.parameters: + logger.warning( + "build_anthropic_kwargs lacks 'is_oauth' param; skipping" + ) + return False + except (TypeError, ValueError) as exc: + logger.warning("Cannot introspect build_anthropic_kwargs: %s", exc) + return False + + # 3. Wrap build_anthropic_kwargs to apply the bypass on OAuth requests. + def patched_build(*args: Any, **kwargs: Any) -> Dict[str, Any]: + result = original_build(*args, **kwargs) + + try: + bound = sig.bind_partial(*args, **kwargs) + bound.apply_defaults() + is_oauth = bool(bound.arguments.get("is_oauth", False)) + except TypeError: + is_oauth = bool(kwargs.get("is_oauth", False)) + + if is_oauth and isinstance(result, dict): + try: + apply_claude_code_bypass(result, _get_version_safely(aa)) + except Exception as exc: + logger.warning( + "apply_claude_code_bypass raised %s: %s", + type(exc).__name__, + exc, + ) + traceback.print_exc(file=sys.stderr) + return result + + patched_build.__name__ = original_build.__name__ + patched_build.__qualname__ = getattr( + original_build, "__qualname__", original_build.__name__ + ) + patched_build.__doc__ = original_build.__doc__ + patched_build.__module__ = getattr(original_build, "__module__", __name__) + patched_build.__wrapped__ = original_build # type: ignore[attr-defined] + aa.build_anthropic_kwargs = patched_build - aa._CLAUDE_CODE_BYPASS_APPLIED = True - sys.stderr.write("[anthropic_billing_bypass] Bypass installed\\n") + aa._CLAUDE_CODE_BYPASS_APPLIED = True # type: ignore[attr-defined] + sys.stderr.write("[anthropic_billing_bypass] Bypass installed\n") + _install_response_pascalcase_unhook(aa) return True diff --git a/tests/test_bypass.py b/tests/test_bypass.py index 25419b1..db18a19 100644 --- a/tests/test_bypass.py +++ b/tests/test_bypass.py @@ -5,10 +5,16 @@ from types import SimpleNamespace from anthropic_billing_bypass import ( _BILLING_ENTRYPOINT, + _MCP_HERMES_NAMESPACE, _SYSTEM_IDENTITY, _fix_temperature_for_oauth_adaptive, _install_response_pascalcase_unhook, + _model_disables_effort, _pascalcase_mcp_name, + _repair_tool_pairs, + _strip_effort, + _unwrap_tool_name, + _wrap_tool_name, apply_claude_code_bypass, ) @@ -42,6 +48,9 @@ def test_apply_claude_code_bypass_relocates_non_identity_system_text_to_first_us assert "\nStay helpful.\n" in text assert "\nExtra system guidance\n" in text assert text.endswith("hello world") + # Regression: PR #11 introduced literal \n escapes (issue #?). Real + # newlines must be present so Anthropic's content validator passes. + assert "\\n" not in text def test_apply_claude_code_bypass_is_idempotent(basic_api_kwargs): @@ -107,6 +116,17 @@ def test_fix_temperature_for_oauth_adaptive_without_temperature_is_noop(): assert api_kwargs == {"model": "claude-opus-4-6-20260101"} +def test_apply_claude_code_bypass_strips_temperature_for_opus46(simple_messages): + api_kwargs = { + "system": "plain", + "messages": [dict(message) for message in simple_messages], + "model": "claude-opus-4-6-20260101", + "temperature": 0.2, + } + apply_claude_code_bypass(api_kwargs, "2.1.112") + assert "temperature" not in api_kwargs + + def test_pascalcase_mcp_name_uppercases_first_char_after_prefix(): assert _pascalcase_mcp_name("mcp_bash") == "mcp_Bash" assert _pascalcase_mcp_name("mcp_read") == "mcp_Read" @@ -124,29 +144,71 @@ def test_pascalcase_mcp_name_ignores_unprefixed_names(): assert _pascalcase_mcp_name("") == "" -def test_apply_claude_code_bypass_rewrites_tool_names_to_pascalcase(basic_api_kwargs): +def test_wrap_tool_name_namespaces_to_mcp_hermes(): + assert _wrap_tool_name("mcp_bash") == "mcp__hermes__Bash" + assert _wrap_tool_name("mcp_background_output") == "mcp__hermes__Background_output" + assert _wrap_tool_name("bash") == "mcp__hermes__Bash" + + +def test_wrap_tool_name_is_idempotent_when_already_namespaced(): + assert _wrap_tool_name("mcp__hermes__Bash") == "mcp__hermes__Bash" + + +def test_unwrap_tool_name_lowercases_first_char(): + assert _unwrap_tool_name("mcp__hermes__Bash") == "bash" + assert _unwrap_tool_name("mcp__hermes__Background_output") == "background_output" + + +def test_unwrap_tool_name_handles_already_stripped_prefix(): + # When hermes's transport strips ``mcp_`` before our hook runs, the name + # arrives as ``_hermes__Bash``. + assert _unwrap_tool_name("_hermes__Bash") == "bash" + + +def test_unwrap_tool_name_returns_unknown_names_unchanged(): + assert _unwrap_tool_name("plain_bash") == "plain_bash" + assert _unwrap_tool_name("") == "" + + +def test_apply_claude_code_bypass_rewrites_tool_names_to_hermes_namespace( + basic_api_kwargs, +): basic_api_kwargs["tools"] = [ {"name": "mcp_bash"}, {"name": "mcp_background_output"}, - {"name": "mcp_Already_pascal"}, + {"name": "mcp__hermes__Already_done"}, ] + # Provide a paired tool_use / tool_result so _repair_tool_pairs doesn't + # eat the assistant message before tool-name rewriting runs. basic_api_kwargs["messages"] = [ + {"role": "user", "content": "hello world"}, + { + "role": "assistant", + "content": [ + {"type": "tool_use", "name": "mcp_bash", "id": "tool_1", "input": {}}, + ], + }, { "role": "user", "content": [ - {"type": "text", "text": "hello world"}, - {"type": "tool_use", "name": "mcp_bash", "id": "tool_1", "input": {}}, + {"type": "tool_result", "tool_use_id": "tool_1", "content": "ok"}, ], - } + }, ] apply_claude_code_bypass(basic_api_kwargs, "2.1.112") tool_names = [tool["name"] for tool in basic_api_kwargs["tools"]] - assert tool_names == ["mcp_Bash", "mcp_Background_output", "mcp_Already_pascal"] + assert tool_names == [ + "mcp__hermes__Bash", + "mcp__hermes__Background_output", + "mcp__hermes__Already_done", + ] + assert all(name.startswith(_MCP_HERMES_NAMESPACE) for name in tool_names) - tool_use_block = basic_api_kwargs["messages"][0]["content"][-1] - assert tool_use_block["name"] == "mcp_Bash" + assistant_msg = basic_api_kwargs["messages"][1] + tool_use_block = assistant_msg["content"][0] + assert tool_use_block["name"] == "mcp__hermes__Bash" def test_apply_claude_code_bypass_injects_stainless_and_direct_browser_headers( @@ -163,6 +225,9 @@ def test_apply_claude_code_bypass_injects_stainless_and_direct_browser_headers( assert extra_headers["x-stainless-timeout"] == "600" assert extra_headers["x-stainless-os"] in ("MacOS", "Linux", "Windows") assert extra_headers["x-stainless-arch"] in ("x64", "arm64", "ia32", "unknown") + # Regression: PR #11 capitalized these to X-Stainless-*; lowercase keys + # match upstream JS SDK output exactly. + assert "X-Stainless-Lang" not in extra_headers def test_apply_claude_code_bypass_sets_beta_true_query_param(basic_api_kwargs): @@ -183,29 +248,173 @@ def test_apply_claude_code_bypass_preserves_existing_extra_headers(basic_api_kwa ) -def _make_fake_adapter_module(tool_names): +def test_repair_tool_pairs_drops_orphaned_tool_use_and_result(): + messages = [ + {"role": "user", "content": [{"type": "text", "text": "hi"}]}, + { + "role": "assistant", + "content": [ + {"type": "tool_use", "id": "t1", "name": "bash", "input": {}}, + {"type": "tool_use", "id": "t2", "name": "read", "input": {}}, + ], + }, + { + "role": "user", + "content": [ + {"type": "tool_result", "tool_use_id": "t1", "content": "ok"}, + {"type": "tool_result", "tool_use_id": "tX", "content": "stale"}, + ], + }, + ] + + repaired = _repair_tool_pairs(messages) + + assert len(repaired) == 3 + assistant_blocks = repaired[1]["content"] + assert [b["id"] for b in assistant_blocks] == ["t1"] + user_results = repaired[2]["content"] + assert [b["tool_use_id"] for b in user_results] == ["t1"] + + +def test_repair_tool_pairs_drops_messages_with_only_orphans(): + messages = [ + {"role": "user", "content": [{"type": "text", "text": "hi"}]}, + { + "role": "assistant", + "content": [ + {"type": "tool_use", "id": "tX", "name": "bash", "input": {}} + ], + }, + ] + + repaired = _repair_tool_pairs(messages) + + assert len(repaired) == 1 + assert repaired[0]["role"] == "user" + + +def test_repair_tool_pairs_returns_input_when_nothing_to_repair(): + messages = [ + {"role": "user", "content": [{"type": "text", "text": "hi"}]}, + { + "role": "assistant", + "content": [{"type": "tool_use", "id": "t1", "name": "bash", "input": {}}], + }, + { + "role": "user", + "content": [{"type": "tool_result", "tool_use_id": "t1", "content": "ok"}], + }, + ] + + repaired = _repair_tool_pairs(messages) + assert repaired is messages # identity-preserving no-op + + +def test_apply_claude_code_bypass_repairs_tool_pairs_before_signing(simple_messages): + api_kwargs = { + "system": "p", + "messages": [ + {"role": "user", "content": "hello world"}, + { + "role": "assistant", + "content": [ + {"type": "tool_use", "id": "tX", "name": "mcp_read", "input": {}} + ], + }, + ], + "model": "claude-opus-4-6-20260101", + } + + apply_claude_code_bypass(api_kwargs, "2.1.112") + + # Orphan removed; assistant message dropped (now empty after filter). + assert len(api_kwargs["messages"]) == 1 + assert api_kwargs["messages"][0]["role"] == "user" + + +def test_model_disables_effort_for_haiku(): + assert _model_disables_effort("claude-haiku-3-5") + assert _model_disables_effort("claude-3-5-haiku-20241022") + assert not _model_disables_effort("claude-opus-4-6-20260101") + assert not _model_disables_effort("claude-sonnet-4-7") + + +def test_strip_effort_removes_from_output_config_for_haiku(): + api = { + "model": "claude-haiku-3-5", + "output_config": {"effort": "low", "other": True}, + } + _strip_effort(api) + assert api["output_config"] == {"other": True} + + +def test_strip_effort_drops_empty_parent_dict(): + api = { + "model": "claude-haiku-3-5", + "output_config": {"effort": "low"}, + "thinking": {"effort": "medium"}, + } + _strip_effort(api) + assert "output_config" not in api + assert "thinking" not in api + + +def test_strip_effort_preserves_other_thinking_keys(): + api = { + "model": "claude-haiku-3-5", + "thinking": {"effort": "low", "budget_tokens": 1024}, + } + _strip_effort(api) + assert api["thinking"] == {"budget_tokens": 1024} + + +def test_strip_effort_is_noop_for_non_haiku(): + api = { + "model": "claude-opus-4-6-20260101", + "output_config": {"effort": "low"}, + } + _strip_effort(api) + assert api["output_config"] == {"effort": "low"} + + +# --------------------------------------------------------------------------- +# Response unwrap hook tests +# --------------------------------------------------------------------------- + + +def _make_fake_adapter_module(tool_names, *, namespace=True): + """Build a SimpleNamespace mimicking ``agent.anthropic_adapter``. + + Tool names returned by ``original_normalize`` arrive in + ``mcp__hermes__Foo`` form because that's what we sent on the way out. + """ + wrapped_names = [ + (_MCP_HERMES_NAMESPACE + n[len("mcp_"):][0].upper() + n[len("mcp_") + 1:]) + if namespace and n.startswith("mcp_") + else n + for n in tool_names + ] + def original_normalize(response, strip_tool_prefix=False): tool_calls = [] - for name in tool_names: - stripped = name[len("mcp_"):] if strip_tool_prefix and name.startswith("mcp_") else name + for name in wrapped_names: tool_calls.append( SimpleNamespace( id="tool_1", type="function", - function=SimpleNamespace(name=stripped, arguments="{}"), + function=SimpleNamespace(name=name, arguments="{}"), ) ) msg = SimpleNamespace(content=None, tool_calls=tool_calls or None, reasoning=None) return msg, "tool_calls" - module = SimpleNamespace(normalize_anthropic_response=original_normalize) - return module + return SimpleNamespace(normalize_anthropic_response=original_normalize) -def test_response_unhook_lowercases_first_char_of_tool_names_after_strip(): - adapter = _make_fake_adapter_module(["mcp_Bash", "mcp_Background_output"]) +def test_response_unhook_lowercases_first_char_of_tool_names(): + adapter = _make_fake_adapter_module(["mcp_bash", "mcp_background_output"]) - assert _install_response_pascalcase_unhook(adapter) is True + assert _install_response_pascalcase_unhook(adapter, force=True) is True msg, _reason = adapter.normalize_anthropic_response( response=object(), strip_tool_prefix=True @@ -215,25 +424,31 @@ def test_response_unhook_lowercases_first_char_of_tool_names_after_strip(): assert names == ["bash", "background_output"] -def test_response_unhook_is_noop_when_strip_tool_prefix_false(): - adapter = _make_fake_adapter_module(["mcp_Bash"]) - - _install_response_pascalcase_unhook(adapter) - - msg, _reason = adapter.normalize_anthropic_response( - response=object(), strip_tool_prefix=False +def test_response_unhook_strips_hermes_namespace_when_present(): + adapter = _make_fake_adapter_module( + ["mcp__hermes__Bash"], namespace=False ) - assert msg.tool_calls[0].function.name == "mcp_Bash" + _install_response_pascalcase_unhook(adapter, force=True) + + msg, _reason = adapter.normalize_anthropic_response(response=object()) + assert msg.tool_calls[0].function.name == "bash" + + +def test_response_unhook_leaves_unknown_names_alone(): + adapter = _make_fake_adapter_module(["plain_tool"], namespace=False) + + _install_response_pascalcase_unhook(adapter, force=True) + + msg, _reason = adapter.normalize_anthropic_response(response=object()) + assert msg.tool_calls[0].function.name == "plain_tool" def test_response_unhook_is_idempotent(): - adapter = _make_fake_adapter_module(["mcp_Bash"]) + adapter = _make_fake_adapter_module(["mcp_bash"]) - assert _install_response_pascalcase_unhook(adapter) is True + assert _install_response_pascalcase_unhook(adapter, force=True) is True assert _install_response_pascalcase_unhook(adapter) is True - msg, _reason = adapter.normalize_anthropic_response( - response=object(), strip_tool_prefix=True - ) + msg, _reason = adapter.normalize_anthropic_response(response=object()) assert msg.tool_calls[0].function.name == "bash"