Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 24 additions & 44 deletions litellm/proxy/db/db_transaction_queue/redis_update_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ def _should_commit_spend_updates_to_redis() -> bool:
"""
from litellm.proxy.proxy_server import general_settings

_use_redis_transaction_buffer: Optional[Union[bool, str]] = (
general_settings.get("use_redis_transaction_buffer", False)
_use_redis_transaction_buffer: Optional[Union[bool, str]] = general_settings.get(
"use_redis_transaction_buffer", False
)
if isinstance(_use_redis_transaction_buffer, str):
_use_redis_transaction_buffer = str_to_bool(_use_redis_transaction_buffer)
Expand Down Expand Up @@ -151,15 +151,11 @@ async def store_in_memory_spend_updates_in_redis(
```
"""
if self.redis_cache is None:
verbose_proxy_logger.debug(
"redis_cache is None, skipping store_in_memory_spend_updates_in_redis"
)
verbose_proxy_logger.debug("redis_cache is None, skipping store_in_memory_spend_updates_in_redis")
return

# Get all transactions
db_spend_update_transactions = (
await spend_update_queue.flush_and_get_aggregated_db_spend_update_transactions()
)
db_spend_update_transactions = await spend_update_queue.flush_and_get_aggregated_db_spend_update_transactions()
daily_spend_update_transactions = (
await daily_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions()
)
Expand All @@ -170,12 +166,8 @@ async def store_in_memory_spend_updates_in_redis(
await daily_tag_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions()
)

verbose_proxy_logger.debug(
"ALL DB SPEND UPDATE TRANSACTIONS: %s", db_spend_update_transactions
)
verbose_proxy_logger.debug(
"ALL DAILY SPEND UPDATE TRANSACTIONS: %s", daily_spend_update_transactions
)
verbose_proxy_logger.debug("ALL DB SPEND UPDATE TRANSACTIONS: %s", db_spend_update_transactions)
verbose_proxy_logger.debug("ALL DAILY SPEND UPDATE TRANSACTIONS: %s", daily_spend_update_transactions)

await self._store_transactions_in_redis(
transactions=db_spend_update_transactions,
Expand Down Expand Up @@ -295,9 +287,7 @@ async def get_all_daily_spend_update_transactions_from_redis_buffer(
)
if list_of_transactions is None:
return None
list_of_daily_spend_update_transactions = [
json.loads(transaction) for transaction in list_of_transactions
]
list_of_daily_spend_update_transactions = [json.loads(transaction) for transaction in list_of_transactions]
return cast(
Dict[str, DailyUserSpendTransaction],
DailySpendUpdateQueue.get_aggregated_daily_spend_update_transactions(
Expand All @@ -319,9 +309,7 @@ async def get_all_daily_team_spend_update_transactions_from_redis_buffer(
)
if list_of_transactions is None:
return None
list_of_daily_spend_update_transactions = [
json.loads(transaction) for transaction in list_of_transactions
]
list_of_daily_spend_update_transactions = [json.loads(transaction) for transaction in list_of_transactions]
return cast(
Dict[str, DailyTeamSpendTransaction],
DailySpendUpdateQueue.get_aggregated_daily_spend_update_transactions(
Expand All @@ -343,9 +331,7 @@ async def get_all_daily_tag_spend_update_transactions_from_redis_buffer(
)
if list_of_transactions is None:
return None
list_of_daily_spend_update_transactions = [
json.loads(transaction) for transaction in list_of_transactions
]
list_of_daily_spend_update_transactions = [json.loads(transaction) for transaction in list_of_transactions]
return cast(
Dict[str, DailyTagSpendTransaction],
DailySpendUpdateQueue.get_aggregated_daily_spend_update_transactions(
Expand All @@ -372,19 +358,9 @@ def _combine_list_of_transactions(
"""
Combines the list of transactions into a single DBSpendUpdateTransactions object
"""
# Initialize a new combined transaction object with empty dictionaries
combined_transaction = DBSpendUpdateTransactions(
user_list_transactions={},
end_user_list_transactions={},
key_list_transactions={},
team_list_transactions={},
team_member_list_transactions={},
org_list_transactions={},
tag_list_transactions={},
)

# Define the transaction fields to process
transaction_fields = [
# Initialize new dicts to accumulate results, reducing attribute lookups
field_names = [
"user_list_transactions",
"end_user_list_transactions",
"key_list_transactions",
Expand All @@ -395,16 +371,20 @@ def _combine_list_of_transactions(
]

# Loop through each transaction and combine the values
for transaction in list_of_transactions:
# Process each field type
for field in transaction_fields:
if transaction.get(field):
for entity_id, amount in transaction[field].items(): # type: ignore
combined_transaction[field][entity_id] = ( # type: ignore
combined_transaction[field].get(entity_id, 0) + amount # type: ignore
)
combined_data = {field: {} for field in field_names}

return combined_transaction
# Fast attribute access via .get and built-in dicts
for transaction in list_of_transactions:
for field in field_names:
trans_dict = transaction.get(field)
if trans_dict:
combined_dict = combined_data[field]
for entity_id, amount in trans_dict.items():
# Use setdefault for a single hash lookup instead of get + assignment
combined_dict[entity_id] = combined_dict.get(entity_id, 0) + amount

# Construct and return combined transaction object with the gathered dicts
return DBSpendUpdateTransactions(**combined_data)

async def _emit_new_item_added_to_redis_buffer_event(
self,
Expand Down