Skip to content

Conversation

@codeflash-ai
Copy link

@codeflash-ai codeflash-ai bot commented Nov 13, 2025

📄 15% (0.15x) speedup for TextStreamer.__anext__ in litellm/llms/vertex_ai/vertex_ai_non_gemini.py

⏱️ Runtime : 1.23 milliseconds 1.07 milliseconds (best of 90 runs)

📝 Explanation and details

The optimization applies two key micro-optimizations to the async iterator's hot path:

Key Changes:

  1. Pre-computed length caching: Store len(self.text) as self._len during initialization to eliminate repeated len() function calls
  2. Local variable optimization: Use local variable idx to cache self.index and reduce attribute lookups in the critical path

Performance Impact:
The optimization achieves a 15% runtime improvement (1.23ms → 1.07ms) by reducing overhead in the __anext__ method. In Python, attribute access (self.index) is slower than local variable access (idx) because it requires dictionary lookups in the object's __dict__. The pre-computed length also eliminates the overhead of calling len() on each iteration.

Workload Benefits:
This optimization is particularly effective for:

  • High-frequency streaming scenarios where __anext__ is called repeatedly
  • Concurrent streaming workloads as shown in the throughput tests with 100+ streamers
  • Large text processing where the iterator is called hundreds of times per instance

The test results show the optimization maintains correctness across all scenarios (single words, empty strings, concurrent access, large inputs) while providing consistent speedup. The micro-optimizations are most beneficial in tight loops or high-concurrency async contexts where the __anext__ method becomes a performance bottleneck.

Note: While individual runtime improved by 15%, the slight throughput decrease (-1.1%) suggests the optimization may have different effects under concurrent load, though the overall performance gain in sequential access patterns makes this a worthwhile improvement.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 4081 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 100.0%
🌀 Generated Regression Tests and Runtime
import asyncio  # used to run async functions

import pytest  # used for our unit tests
from litellm.llms.vertex_ai.vertex_ai_non_gemini import TextStreamer

# ------------------------
# Basic Test Cases
# ------------------------

@pytest.mark.asyncio
async def test_anext_basic_single_word():
    # Test with a single word input
    streamer = TextStreamer("hello")
    result = await streamer.__anext__()
    # Should raise StopAsyncIteration after the word is consumed
    with pytest.raises(StopAsyncIteration):
        await streamer.__anext__()

@pytest.mark.asyncio
async def test_anext_basic_multiple_words():
    # Test with multiple words input
    streamer = TextStreamer("hello world this is a test")
    expected = ["hello", "world", "this", "is", "a", "test"]
    actual = []
    for _ in expected:
        actual.append(await streamer.__anext__())
    # Should raise StopAsyncIteration after all words are consumed
    with pytest.raises(StopAsyncIteration):
        await streamer.__anext__()

@pytest.mark.asyncio
async def test_anext_basic_empty_string():
    # Test with empty string input
    streamer = TextStreamer("")
    # Should immediately raise StopAsyncIteration
    with pytest.raises(StopAsyncIteration):
        await streamer.__anext__()

# ------------------------
# Edge Test Cases
# ------------------------

@pytest.mark.asyncio
async def test_anext_edge_concurrent_access():
    # Test concurrent access on the same streamer instance
    streamer = TextStreamer("a b c")
    # Start three concurrent __anext__ calls
    results = await asyncio.gather(
        streamer.__anext__(),
        streamer.__anext__(),
        streamer.__anext__(),
    )
    # Further call should raise StopAsyncIteration
    with pytest.raises(StopAsyncIteration):
        await streamer.__anext__()

@pytest.mark.asyncio






async def test_TextStreamer__anext___throughput_small_load():
    # Throughput test: small load, multiple short streamers concurrently
    texts = ["a b", "c d", "e f", "g h"]
    streamers = [TextStreamer(t) for t in texts]
    # Consume all words concurrently
    results = []
    for s in streamers:
        results.append(await s.__anext__())
        results.append(await s.__anext__())
        with pytest.raises(StopAsyncIteration):
            await s.__anext__()

@pytest.mark.asyncio

async def test_TextStreamer__anext___throughput_high_volume():
    # Throughput test: high volume, 100 streamers, 20 words each
    texts = [" ".join([f"item{j}" for j in range(20)]) for _ in range(100)]
    streamers = [TextStreamer(t) for t in texts]
    # Use asyncio.gather to consume first word from all streamers concurrently
    first_words = await asyncio.gather(*(s.__anext__() for s in streamers))
    # Consume the rest of the words from each streamer
    for i in range(1, 20):
        words = await asyncio.gather(*(s.__anext__() for s in streamers))
    # All streamers should now be exhausted
    for s in streamers:
        with pytest.raises(StopAsyncIteration):
            await s.__anext__()

# ------------------------
# Async Iterator Protocol Test
# ------------------------

@pytest.mark.asyncio
async def test_anext_async_iterator_protocol():
    # Test using async for loop (async iterator protocol)
    streamer = TextStreamer("one two three")
    collected = []
    async for word in streamer:
        collected.append(word)

# ------------------------
# Async Iterator Exhaustion Test
# ------------------------

@pytest.mark.asyncio
async def test_anext_async_iterator_exhaustion():
    # Test that async for does not yield after exhaustion
    streamer = TextStreamer("foo")
    words = []
    async for w in streamer:
        words.append(w)
    # Further async for should not yield anything
    words2 = []
    async for w in streamer:
        words2.append(w)
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
import asyncio  # used to run async functions

import pytest  # used for our unit tests
from litellm.llms.vertex_ai.vertex_ai_non_gemini import TextStreamer

# unit tests

# 1. Basic Test Cases

@pytest.mark.asyncio
async def test_anext_basic_single_word():
    # Test that __anext__ returns the single word and then raises StopAsyncIteration
    streamer = TextStreamer("hello")
    result = await streamer.__anext__()
    with pytest.raises(StopAsyncIteration):
        await streamer.__anext__()

@pytest.mark.asyncio
async def test_anext_basic_multiple_words():
    # Test that __anext__ returns words in order and raises StopAsyncIteration at the end
    streamer = TextStreamer("hello world this is a test")
    expected = ["hello", "world", "this", "is", "a", "test"]
    for word in expected:
        result = await streamer.__anext__()
    with pytest.raises(StopAsyncIteration):
        await streamer.__anext__()

@pytest.mark.asyncio
async def test_anext_empty_text():
    # Test that __anext__ immediately raises StopAsyncIteration for empty text
    streamer = TextStreamer("")
    with pytest.raises(StopAsyncIteration):
        await streamer.__anext__()

# 2. Edge Test Cases

@pytest.mark.asyncio
async def test_anext_concurrent_access():
    # Test concurrent calls to __anext__ on the same instance
    streamer = TextStreamer("a b c d")
    # Only four words, so only four successful calls, the rest should raise StopAsyncIteration
    tasks = [streamer.__anext__() for _ in range(4)]
    results = await asyncio.gather(*tasks)
    # Further call should raise StopAsyncIteration
    with pytest.raises(StopAsyncIteration):
        await streamer.__anext__()

@pytest.mark.asyncio
async def test_anext_concurrent_on_multiple_instances():
    # Test concurrent calls to __anext__ on different instances
    texts = ["one two", "three four", "five six"]
    streamers = [TextStreamer(t) for t in texts]
    tasks = [s.__anext__() for s in streamers]
    results = await asyncio.gather(*tasks)
    # Second word from each
    tasks = [s.__anext__() for s in streamers]
    results = await asyncio.gather(*tasks)
    # All exhausted now
    for s in streamers:
        with pytest.raises(StopAsyncIteration):
            await s.__anext__()

@pytest.mark.asyncio

async def test_anext_multiple_exhaustion():
    # After exhaustion, repeated __anext__ calls always raise StopAsyncIteration
    streamer = TextStreamer("foo bar")
    await streamer.__anext__()  # foo
    await streamer.__anext__()  # bar
    for _ in range(3):
        with pytest.raises(StopAsyncIteration):
            await streamer.__anext__()

@pytest.mark.asyncio
async def test_anext_non_ascii_and_whitespace():
    # Test with non-ASCII and irregular whitespace
    streamer = TextStreamer("héllo   世界\tfoo\nbar")
    expected = ["héllo", "世界", "foo", "bar"]
    for word in expected:
        result = await streamer.__anext__()
    with pytest.raises(StopAsyncIteration):
        await streamer.__anext__()

# 3. Large Scale Test Cases

@pytest.mark.asyncio
async def test_anext_large_input():
    # Test with a large number of words
    words = [f"word{i}" for i in range(500)]
    text = " ".join(words)
    streamer = TextStreamer(text)
    for word in words:
        result = await streamer.__anext__()
    with pytest.raises(StopAsyncIteration):
        await streamer.__anext__()

@pytest.mark.asyncio
async def test_anext_large_concurrent_gather():
    # Test concurrent access to multiple streamers, each with a moderate number of words
    streamers = [TextStreamer(f"w{i} x{i} y{i} z{i}") for i in range(50)]
    # Gather first word from each
    tasks = [s.__anext__() for s in streamers]
    results = await asyncio.gather(*tasks)
    # Gather second word from each
    tasks = [s.__anext__() for s in streamers]
    results = await asyncio.gather(*tasks)

# 4. Throughput Test Cases

@pytest.mark.asyncio
async def test_TextStreamer__anext___throughput_small_load():
    # Throughput test: small load, many streamers with short texts
    streamers = [TextStreamer("a b") for _ in range(20)]
    # Each streamer should yield 'a' then 'b'
    tasks = [s.__anext__() for s in streamers]
    results = await asyncio.gather(*tasks)
    tasks = [s.__anext__() for s in streamers]
    results = await asyncio.gather(*tasks)
    # All exhausted now
    for s in streamers:
        with pytest.raises(StopAsyncIteration):
            await s.__anext__()

@pytest.mark.asyncio

async def test_TextStreamer__anext___throughput_high_concurrency():
    # Throughput test: high concurrency, many streamers and many __anext__ calls in parallel
    streamers = [TextStreamer("x y z") for _ in range(100)]
    # Gather all first words
    tasks = [s.__anext__() for s in streamers]
    results = await asyncio.gather(*tasks)
    # Gather all second words
    tasks = [s.__anext__() for s in streamers]
    results = await asyncio.gather(*tasks)
    # Gather all third words
    tasks = [s.__anext__() for s in streamers]
    results = await asyncio.gather(*tasks)
    # All exhausted now
    for s in streamers:
        with pytest.raises(StopAsyncIteration):
            await s.__anext__()

@pytest.mark.asyncio
async def test_TextStreamer__anext___throughput_many_short_streams():
    # Throughput test: many short streams, each with a unique word
    words = [f"word{i}" for i in range(100)]
    streamers = [TextStreamer(word) for word in words]
    tasks = [s.__anext__() for s in streamers]
    results = await asyncio.gather(*tasks)
    # All exhausted now
    for s in streamers:
        with pytest.raises(StopAsyncIteration):
            await s.__anext__()
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

To edit these changes git checkout codeflash/optimize-TextStreamer.__anext__-mhx4abvu and push.

Codeflash Static Badge

The optimization applies two key micro-optimizations to the async iterator's hot path:

**Key Changes:**
1. **Pre-computed length caching**: Store `len(self.text)` as `self._len` during initialization to eliminate repeated `len()` function calls
2. **Local variable optimization**: Use local variable `idx` to cache `self.index` and reduce attribute lookups in the critical path

**Performance Impact:**
The optimization achieves a **15% runtime improvement** (1.23ms → 1.07ms) by reducing overhead in the `__anext__` method. In Python, attribute access (`self.index`) is slower than local variable access (`idx`) because it requires dictionary lookups in the object's `__dict__`. The pre-computed length also eliminates the overhead of calling `len()` on each iteration.

**Workload Benefits:**
This optimization is particularly effective for:
- **High-frequency streaming scenarios** where `__anext__` is called repeatedly
- **Concurrent streaming workloads** as shown in the throughput tests with 100+ streamers
- **Large text processing** where the iterator is called hundreds of times per instance

The test results show the optimization maintains correctness across all scenarios (single words, empty strings, concurrent access, large inputs) while providing consistent speedup. The micro-optimizations are most beneficial in tight loops or high-concurrency async contexts where the `__anext__` method becomes a performance bottleneck.

Note: While individual runtime improved by 15%, the slight throughput decrease (-1.1%) suggests the optimization may have different effects under concurrent load, though the overall performance gain in sequential access patterns makes this a worthwhile improvement.
@codeflash-ai codeflash-ai bot requested a review from mashraf-222 November 13, 2025 07:38
@codeflash-ai codeflash-ai bot added ⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: Medium Optimization Quality according to Codeflash labels Nov 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: Medium Optimization Quality according to Codeflash

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant