When creating a Message, you can set "stream": true to incrementally stream the response using server-sent events (SSE).

Streaming with SDKs

Our Python and TypeScript SDKs offer multiple ways of streaming. The Python SDK allows both sync and async streams. See the documentation in each SDK for details.

Event types

Each server-sent event includes a named event type and associated JSON data. Each event will use an SSE event name (e.g. event: message_stop), and include the matching event type in its data.

Each stream uses the following event flow:

  1. message_start: contains a Message object with empty content.
  2. A series of content blocks, each of which have a content_block_start, one or more content_block_delta events, and a content_block_stop event. Each content block will have an index that corresponds to its index in the final Message content array.
  3. One or more message_delta events, indicating top-level changes to the final Message object.
  4. A final message_stop event.

Ping events

Event streams may include regular ping events. These are server-sent events with an event type of ping and an empty data field, which the server sends periodically to keep the connection alive. Your client should be configured to ignore these events.

Error events

We may occasionally send error events when something goes wrong during streaming. For example, during periods of high usage, you may receive an overloaded_error, which would contain data like:

{
  "type": "error",
  "error": {
    "type": "overloaded_error",
    "message": "Overloaded"
  }
}

Tool use with streaming

When using the streaming API with tool use, you’ll need to handle special event types and data structures.

Python
import anthropic

client = anthropic.Anthropic()

with client.messages.stream(
    messages=[{"role": "user", "content": "What's the weather in San Francisco?"}],
    model="claude-3-7-sonnet-20250219",
    max_tokens=1024,
    tools=[
        {
            "name": "get_weather",
            "description": "Get current weather for a location",
            "input_schema": {
                "type": "object",
                "properties": {
                    "location": {
                        "type": "string",
                        "description": "City name"
                    }
                },
                "required": ["location"]
            }
        }
    ]
) as stream:
    for event in stream:
        if event.type == "content_block_start":
            if event.content_block.type == "tool_use":
                print(f"Using tool: {event.content_block.name}")
        elif event.type == "content_block_delta":
            if event.delta.type == "tool_use":
                print(f"Tool input: {event.delta.partial_json}")

When streaming tool use, you’ll receive:

  • content_block_start events for tool use blocks
  • content_block_delta events with accumulated partial JSON

Delta vs. snapshot streaming

The streaming API returns incremental deltas for each message, not full conversation snapshots. This design is bandwidth-efficient for long conversations.

If you require full conversation snapshots, you’ll need to accumulate the messages yourself.

Was this page helpful?