Skip to content

Conversation

@henrypark133
Copy link
Contributor

@henrypark133 henrypark133 commented Dec 9, 2025

Add graceful handling of client disconnects for streaming. When clients disconnects, we want to stop the vllm inferencing, record usage based on number of output tokens and input tokens for that inference such that we are still charging for usage. In order to calculate usage, we will call tokenize vllm api to count number of tokens based on the model tokenizer.

2025-12-09T06:01:43.850990Z ERROR Error processing response stream: InternalError("Failed to send event")

2025-12-09T06:01:43.851007Z ERROR Error sending error event: SendError { kind: Disconnected }

2025-12-09T06:01:43.851228Z DEBUG starting new connection: [http://160.72.54.254:8002/⁠](http://160.72.54.254:8002/)

2025-12-09T06:01:43.851348Z DEBUG connecting to 160.72.54.254:8002

2025-12-09T06:01:43.943429Z DEBUG connected to 160.72.54.254:8002

2025-12-09T06:01:48.962943Z DEBUG pooling idle connection for ("http", 160.72.54.254:8002)

2025-12-09T06:01:48.962989Z DEBUG Successfully tokenized text, token count: 193

2025-12-09T06:01:48.963016Z DEBUG reuse idle connection for ("http", 160.72.54.254:8002)

2025-12-09T06:01:49.104481Z DEBUG pooling idle connection for ("http", 160.72.54.254:8002)

2025-12-09T06:01:49.104750Z DEBUG Successfully tokenized text, token count: 245

2025-12-09T06:01:49.104781Z DEBUG Starting database operation operation="get_model_by_id"

2025-12-09T06:01:49.104806Z DEBUG preparing query s20: 

                    SELECT

                        id, model_name, model_display_name, model_description, model_icon,

                        input_cost_per_token, output_cost_per_token,

                        context_length, verifiable, is_active, created_at, updated_at

                    FROM models

                    WHERE id = $1

                    

2025-12-09T06:01:49.106605Z DEBUG executing statement s20 with parameters: [b87d2b50-946b-4ea1-9f5e-3d2c2be050c3]

2025-12-09T06:01:49.107391Z DEBUG Starting database operation operation="record_organization_usage"

2025-12-09T06:01:49.107766Z DEBUG executing statement batch: START TRANSACTION

2025-12-09T06:01:49.108157Z DEBUG preparing query s21: 

                    INSERT INTO organization_usage_log (

                        id, organization_id, workspace_id, api_key_id,

                        model_id, model_name, input_tokens, output_tokens, total_tokens,

                        input_cost, output_cost, total_cost,

                        request_type, inference_type, created_at, ttft_ms, avg_itl_ms, inference_id

                    ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18)

                    RETURNING *

Note

Records usage when clients disconnect mid-stream by counting tokens via provider tokenizers; services now return inference_id used by the API header.

  • Usage & Streaming:
    • Add InterceptStream accumulation and Drop-based fallback to record usage if clients disconnect early, counting both input and partial output tokens via model tokenizer.
    • Pre-generate inference_id (UUID v4) per request for deduplication; prevent double-recording with atomic flag.
  • Providers:
    • Extend InferenceProvider with count_tokens(text, model); implement in vLLM via /v1/tokenize and in MockProvider (approximate).
    • Add InferenceProviderPool::count_tokens_for_model with timeout and safe fallbacks.
    • Add ChatDelta::accumulate_into to collect all textual content (content, tool calls, reasoning, name) for accurate token counting.
  • Service API changes:
    • CompletionServiceTrait now returns wrappers (StreamingCompletionResult, ChatCompletionResult) containing inference_id.
    • Remove hashing/peeking for inference ID; API routes read inference_id from service and always set Inference-Id header.
  • Attestation/Signatures:
    • Keep request/response hash registration/update flow; unchanged behavior but simplified header handling.
  • Responses service:
    • Updated to use new streaming result type; no logic changes beyond type adaptation.
  • Tests:
    • Add unit tests for ChatDelta::accumulate_into; remove obsolete SSE inference ID extraction tests; adapt existing tests to new fields.

Written by Cursor Bugbot for commit d6e87e7. This will update automatically on new commits. Configure here.

…ts disconnects, we want to stop the vllm inferencing, record usage based on number of output tokens and input tokens for that inference such that we are still charging for usage. In order to calculate usage, we will call tokenize vllm api to count number of tokens based on the model tokenizer
@henrypark133 henrypark133 marked this pull request as ready for review December 9, 2025 06:33
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is being reviewed by Cursor Bugbot

Details

Your team is on the Bugbot Free tier. On this plan, Bugbot will review limited PRs each billing cycle for each member of your team.

To receive Bugbot reviews on all of your PRs, visit the Cursor dashboard to activate Pro and start your 14-day free trial.

@henrypark133
Copy link
Contributor Author

@claude review

@claude
Copy link

claude bot commented Dec 12, 2025

Code Review - PR 234

I've reviewed this PR focusing on critical issues. Found 4 CRITICAL issues that need fixing before merge:

CRITICAL ISSUE 1: Race Condition - Double Billing Risk (crates/services/src/completions/mod.rs:248-308)

The Drop handler has a race condition that can cause double billing. When the stream completes normally, self.accumulated_text.clear() is called, but Drop always executes afterward. Between clearing and Drop execution, if another event comes in, the accumulated_text might be repopulated. This means customers could be charged twice for the same tokens.

Fix: Use an atomic flag to track if normal completion occurred.


CRITICAL ISSUE 2: Missing Timeout on Tokenize API Calls (crates/inference_providers/src/vllm/mod.rs:342-379)

The tokenize API calls in the Drop handler have no timeout. Drop handler spawns async task calling vLLM tokenize API without timeout. If vLLM is slow/hung, background tasks accumulate indefinitely leading to resource leak, memory exhaustion, and cascading failures in production.

Fix: Add timeout to tokenize requests (5-10 seconds).


CRITICAL ISSUE 3: Privacy Violation - Logging Customer Data (crates/services/src/completions/mod.rs:254-257)

The debug log exposes the length of customer conversation content via accumulated_text.len() and input_text.len(). This violates CLAUDE.md privacy requirements. Text length is customer metadata that could reveal conversation patterns.

Fix: Remove length logging or log only boolean has_accumulated flag.


CRITICAL ISSUE 4: Potential Panic in Production (crates/services/src/completions/mod.rs:266-308)

The Drop handler clones potentially large strings without error handling. If accumulated_text is huge (malicious client, long conversation), cloning could fail/OOM. Drop handlers should NEVER panic as this causes process crash and service disruption.

Fix: Add size check and truncation (e.g., 1MB limit).


Status: NEEDS CHANGES - The double-billing race condition and missing timeouts are particularly critical for production safety in a multi-tenant billing system.

…nstead of relying on id from vllm to create a hash deterministically and added a lock for guardrails for double writing to the database
@henrypark133
Copy link
Contributor Author

@claude review

@claude
Copy link

claude bot commented Dec 13, 2025

Code Review - PR #234

I've reviewed the implementation for tracking usage when clients disconnect mid-stream. CRITICAL issues found that must be addressed before merge.

CRITICAL ISSUES

1. Race Condition: Double Usage Recording

Location: crates/services/src/completions/mod.rs:145-174

The atomic flag is set to true BEFORE the async task actually records usage. If the stream is dropped between setting the flag and the spawned task completing, usage could be lost.

Better approach: Rely ONLY on database UNIQUE constraint for deduplication.

2. Missing Database UNIQUE Constraint

Location: crates/database/src/migrations/sql/V0029__add_inference_id_type.sql

The migration adds inference_id column but does NOT add a UNIQUE constraint. The PR description claims "DB UNIQUE constraint prevents duplicates" but no such constraint exists.

Impact: Double billing is possible if both normal path AND Drop handler execute.

Required Fix: Add migration:

CREATE UNIQUE INDEX IF NOT EXISTS idx_org_usage_inference_id_unique 
    ON organization_usage_log(inference_id) 
    WHERE inference_id IS NOT NULL;

3. Drop Handler Fire-and-Forget Async Work

Location: crates/services/src/completions/mod.rs:252-324

The Drop handler spawns async work (tokio::spawn) with no guarantee it completes before shutdown.

Impact: During rolling deployments/restarts, usage tracking for disconnected clients may be silently lost.

4. Fallback to 0 Tokens = Free Tier Abuse

When token counting fails, code returns 0 tokens. This creates abuse vector where users get free inference.

Recommendation: Track failures in metrics, consider estimated fallback instead of 0.

PRODUCTION SAFETY CONCERNS

  • Performance: Accumulating full response text in memory for long responses (10k+ tokens)
  • Token Counting Timeout: Ensure TOKENIZE_TIMEOUT_SECS is short (2-5s) to avoid blocking cleanup

POSITIVE ASPECTS

✅ Good test coverage for ChatDelta accumulation
✅ Timeout protection on tokenize calls
✅ Clear separation of concerns
✅ Proper handling of all content fields

REQUIRED ACTIONS

  1. Add UNIQUE constraint on inference_id in migration (CRITICAL)
  2. Fix race condition or rely solely on DB constraint (CRITICAL)
  3. Document Drop handler limitations re: shutdown behavior
  4. Add metrics for tokenization failures
  5. Verify TOKENIZE_TIMEOUT_SECS value (recommend 2-5s)

⚠️ Issues #1 and #2 together create real risk of double billing or lost usage tracking.

@PierreLeGuen PierreLeGuen self-requested a review December 13, 2025 00:43
Copy link
Contributor

@PierreLeGuen PierreLeGuen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline, should use "stream_options": {"continuous_usage_stats": true} to track ongoing tokens.

Example:

curl https://cloud-api.near.ai/v1/chat/completions \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $token" \
  -d '{
    "model": "zai-org/GLM-4.6",
    "messages": [
      {"role": "user", "content": "What's the weather in New York and Paris today?"}
    ],
    "tools": [
      {
        "type": "function",
        "function": {
          "name": "get_weather",
          "description": "Get the current weather for a city",
          "parameters": {
            "type": "object",
            "properties": {
              "location": {"type": "string", "description": "City name"}
            },
            "required": ["location"]
          }
        }
      }
    ],
    "tool_choice": "auto",
    "stream": true,
    "stream_options": {"continuous_usage_stats": true}
  }'

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants