11import asyncio
22import random
33import weakref
4- from typing import AsyncIterator , Iterable , Mapping , Optional , Sequence , Tuple , Type
4+ from typing import (
5+ AsyncIterator ,
6+ Iterable ,
7+ Mapping ,
8+ Optional ,
9+ Sequence ,
10+ Tuple ,
11+ Type ,
12+ Union ,
13+ )
514
615from redis .asyncio .client import Redis
716from redis .asyncio .connection import (
17+ BlockingConnectionPool ,
818 Connection ,
919 ConnectionPool ,
1020 EncodableT ,
@@ -203,12 +213,38 @@ async def get_master_address(self):
203213 def rotate_slaves (self ) -> AsyncIterator :
204214 """Round-robin slave balancer"""
205215 return self .proxy .rotate_slaves ()
216+
217+
218+ class SentinelBlockingConnectionPool (BlockingConnectionPool ):
219+ """
220+ Sentinel blocking connection pool.
221+
222+ If ``check_connection`` flag is set to True, SentinelManagedConnection
223+ sends a PING command right after establishing the connection.
224+ """
225+
226+ def __init__ (self , service_name , sentinel_manager , ** kwargs ):
227+ kwargs ["connection_class" ] = kwargs .get (
228+ "connection_class" ,
229+ (
230+ SentinelManagedSSLConnection
231+ if kwargs .pop ("ssl" , False )
232+ else SentinelManagedConnection
233+ ),
234+ )
235+ self .is_master = kwargs .pop ("is_master" , True )
236+ self .check_connection = kwargs .pop ("check_connection" , False )
237+ self .proxy = SentinelConnectionPoolProxy (
238+ connection_pool = self ,
239+ is_master = self .is_master ,
240+ check_connection = self .check_connection ,
241+ service_name = service_name ,
242+ sentinel_manager = sentinel_manager ,
243+ )
206244 super ().__init__ (** kwargs )
207- self .connection_kwargs ["connection_pool" ] = weakref .proxy ( self )
245+ self .connection_kwargs ["connection_pool" ] = self .proxy
208246 self .service_name = service_name
209247 self .sentinel_manager = sentinel_manager
210- self .master_address = None
211- self .slave_rr_counter = None
212248
213249 def __repr__ (self ):
214250 return (
@@ -218,8 +254,11 @@ def __repr__(self):
218254
219255 def reset (self ):
220256 super ().reset ()
221- self .master_address = None
222- self .slave_rr_counter = None
257+ self .proxy .reset ()
258+
259+ @property
260+ def master_address (self ):
261+ return self .proxy .master_address
223262
224263 def owns_connection (self , connection : Connection ):
225264 check = not self .is_master or (
@@ -228,31 +267,11 @@ def owns_connection(self, connection: Connection):
228267 return check and super ().owns_connection (connection )
229268
230269 async def get_master_address (self ):
231- master_address = await self .sentinel_manager .discover_master (self .service_name )
232- if self .is_master :
233- if self .master_address != master_address :
234- self .master_address = master_address
235- # disconnect any idle connections so that they reconnect
236- # to the new master the next time that they are used.
237- await self .disconnect (inuse_connections = False )
238- return master_address
270+ return await self .proxy .get_master_address ()
239271
240- async def rotate_slaves (self ) -> AsyncIterator :
272+ def rotate_slaves (self ) -> AsyncIterator :
241273 """Round-robin slave balancer"""
242- slaves = await self .sentinel_manager .discover_slaves (self .service_name )
243- if slaves :
244- if self .slave_rr_counter is None :
245- self .slave_rr_counter = random .randint (0 , len (slaves ) - 1 )
246- for _ in range (len (slaves )):
247- self .slave_rr_counter = (self .slave_rr_counter + 1 ) % len (slaves )
248- slave = slaves [self .slave_rr_counter ]
249- yield slave
250- # Fallback to the master connection
251- try :
252- yield await self .get_master_address ()
253- except MasterNotFoundError :
254- pass
255- raise SlaveNotFoundError (f"No slave found for { self .service_name !r} " )
274+ return self .proxy .rotate_slaves ()
256275
257276
258277class Sentinel (AsyncSentinelCommands ):
@@ -405,7 +424,10 @@ def master_for(
405424 self ,
406425 service_name : str ,
407426 redis_class : Type [Redis ] = Redis ,
408- connection_pool_class : Type [SentinelConnectionPool ] = SentinelConnectionPool ,
427+ connection_pool_class : Union [
428+ Type [SentinelConnectionPool ],
429+ Type [SentinelBlockingConnectionPool ],
430+ ] = SentinelConnectionPool ,
409431 ** kwargs ,
410432 ):
411433 """
@@ -442,7 +464,10 @@ def slave_for(
442464 self ,
443465 service_name : str ,
444466 redis_class : Type [Redis ] = Redis ,
445- connection_pool_class : Type [SentinelConnectionPool ] = SentinelConnectionPool ,
467+ connection_pool_class : Union [
468+ Type [SentinelConnectionPool ],
469+ Type [SentinelBlockingConnectionPool ],
470+ ] = SentinelConnectionPool ,
446471 ** kwargs ,
447472 ):
448473 """
0 commit comments