From a01bc9aa9003509d5516b6f1fca4cfd7e4a38bb7 Mon Sep 17 00:00:00 2001 From: Evgeny Kotkov Date: Thu, 18 Mar 2021 21:15:46 +0300 Subject: [PATCH 01/20] sync: Reimplement CSemaphore for better performance MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The existing implementation uses a spinlock that falls back to a kernel semaphore. When the contention is high and cannot be alleviated by the spinlock, this approach fails to deliver an adequate throughput. The probable cause for that is the general slowness of the semaphore object created with `CreateSemaphore()`. The new implementation is based on the `WaitOnAddress()` primitive and delivers an up to 20x synthetic improvement. Since the CSemaphore is used as a building block for other synchronization primitives, such as CCriticalSection, this translates into the up to ~5.4x throughput improvement in a benchmark with multiple concurrent db writers. The benchmarks below were conducted in an environment with an i9-9900K CPU and an NVMe SSD. Synthetic benchmark based on the adjusted semaphoreperf.cpp test: 1 thread: 93850 KOps/Thread → 93950 KOps/Thread 2 threads: 20445 KOps/Thread → 30165 KOps/Thread (+48 %) 3 threads: 6083 KOps/Thread → 10897 KOps/Thread (+79 %) 8 threads: 102 KOps/Thread → 2058 KOps/Thread (+1917 %) 16 threads: 45 KOps/Thread → 719 KOps/Thread (+1498 %) Real benchmark with multiple concurrent db writers: (The numbers represent the amount of complete "write sequences" and are meaningful only as a relative measurement of throughput) 1 writer: 8443 Sequences → 8562 Sequences 2 writers: 14934 Sequences → 15331 Sequences 3 writers: 17434 Sequences → 17386 Sequences 4 writers: 16583 Sequences → 17997 Sequences 5 writers: 15924 Sequences → 17431 Sequences 6 writers: 8437 Sequences → 17223 Sequences (+104 %) 7 writers: 5020 Sequences → 18145 Sequences (+261 %) 8 writers: 4131 Sequences → 17670 Sequences (+339 %) 9 writers: 4168 Sequences → 17438 Sequences (+318 %) 10 writers: 3670 Sequences → 19709 Sequences (+437 %) On Win64, the size of CSemaphore remains unchanged (8 bytes). On Win32, the size of CSemaphore changes from 4 to 8 bytes, so the patch updates the size assertions for structures like PIB. --- dev/ese/published/inc/sync.hxx | 266 ++++++--------- dev/ese/src/inc/_bf.hxx | 2 +- dev/ese/src/inc/pib.hxx | 2 +- dev/ese/src/inc/tdb.hxx | 2 +- dev/ese/src/sync/CMakeLists.txt | 4 + dev/ese/src/sync/sync.cxx | 561 ++++++++++++-------------------- 6 files changed, 331 insertions(+), 506 deletions(-) diff --git a/dev/ese/published/inc/sync.hxx b/dev/ese/published/inc/sync.hxx index 0da6ab09..c766954f 100644 --- a/dev/ese/published/inc/sync.hxx +++ b/dev/ese/published/inc/sync.hxx @@ -256,6 +256,9 @@ extern const INT cbCacheLine; extern BOOL g_fSyncProcessAbort; +extern INT g_cSpinMax; + + inline const BOOL IsAtomicallyModifiable( LONG* plTarget ) @@ -1572,36 +1575,24 @@ class CSemaphoreState public: - - CSemaphoreState( const CSyncStateInitNull& null ) : m_cAvail( 0 ) {} - CSemaphoreState( const INT cAvail ); - CSemaphoreState( const INT cWait, const INT irksem ); - CSemaphoreState( const CSemaphoreState& state ) - { - C_ASSERT( OffsetOf( CSemaphoreState, m_irksem ) == OffsetOf( CSemaphoreState, m_cAvail ) ); - C_ASSERT( OffsetOf( CSemaphoreState, m_cWaitNeg ) > OffsetOf( CSemaphoreState, m_cAvail ) ); - C_ASSERT( ( OffsetOf( CSemaphoreState, m_cWaitNeg ) + sizeof ( m_cWaitNeg ) ) <= ( OffsetOf( CSemaphoreState, m_cAvail ) + sizeof ( m_cAvail ) ) ); - m_cAvail = state.m_cAvail; - } + CSemaphoreState( const CSyncStateInitNull& null ) : m_cAvail( 0 ), m_cWait( 0 ) {} + CSemaphoreState( const INT cAvail, const INT cWait ) : m_cAvail( cAvail ), m_cWait( cWait ) {} + CSemaphoreState( const CSemaphoreState& state ) : m_qwState( AtomicRead( (QWORD*)&state.m_qwState ) ) {} ~CSemaphoreState() {} - CSemaphoreState& operator=( CSemaphoreState& state ) { m_cAvail = state.m_cAvail; return *this; } + CSemaphoreState& operator=( CSemaphoreState& state ) { m_qwState = AtomicRead( (QWORD*)&state.m_qwState ); return *this; } const BOOL FChange( const CSemaphoreState& stateCur, const CSemaphoreState& stateNew ); - const BOOL FIncAvail( const INT cToInc ); + void IncAvail( const INT cToInc ); const BOOL FDecAvail(); + void IncWait(); + void DecWait(); - - const BOOL FNoWait() const { return m_cAvail >= 0; } - const BOOL FWait() const { return m_cAvail < 0; } - const BOOL FAvail() const { return m_cAvail > 0; } - const BOOL FNoWaitAndNoAvail() const { return m_cAvail == 0; } - - const INT CAvail() const { OSSYNCAssert( FNoWait() ); return m_cAvail; } - const INT CWait() const { OSSYNCAssert( FWait() ); return -m_cWaitNeg; } - const CKernelSemaphorePool::IRKSEM Irksem() const { OSSYNCAssert( FWait() ); return CKernelSemaphorePool::IRKSEM( m_irksem ); } + const INT CAvail() const { return (INT)m_cAvail; } + const INT CWait() const { return (INT)m_cWait; } + volatile void * PAvail() { return &m_cAvail; } void Dump( const CDumpContext& dc ) const; @@ -1613,52 +1604,31 @@ class CSemaphoreState union { - volatile LONG m_cAvail; + volatile QWORD m_qwState; struct { - volatile USHORT m_irksem; - volatile SHORT m_cWaitNeg; + volatile DWORD m_cAvail; + volatile DWORD m_cWait; }; }; }; -inline CSemaphoreState::CSemaphoreState( const INT cAvail ) +inline const BOOL CSemaphoreState::FChange( const CSemaphoreState& stateCur, const CSemaphoreState& stateNew ) { - - OSSYNCAssert( cAvail >= 0 ); - OSSYNCAssert( cAvail <= 0x7FFFFFFF ); - - - m_cAvail = LONG( cAvail ); + return AtomicCompareExchange( (QWORD*)&m_qwState, stateCur.m_qwState, stateNew.m_qwState ) == stateCur.m_qwState; } -inline CSemaphoreState::CSemaphoreState( const INT cWait, const INT irksem ) +__forceinline void CSemaphoreState::IncAvail( const INT cToInc ) { - - OSSYNCAssert( cWait > 0 ); - OSSYNCAssert( cWait <= 0x7FFF ); - OSSYNCAssert( irksem >= 0 ); - OSSYNCAssert( irksem <= 0xFFFE ); - - - m_cWaitNeg = SHORT( -cWait ); - - - m_irksem = (USHORT) irksem; + AtomicExchangeAdd( (LONG*)&m_cAvail, cToInc ); } -inline const BOOL CSemaphoreState::FChange( const CSemaphoreState& stateCur, const CSemaphoreState& stateNew ) -{ - return AtomicCompareExchange( (LONG*)&m_cAvail, stateCur.m_cAvail, stateNew.m_cAvail ) == stateCur.m_cAvail; -} - - -__forceinline const BOOL CSemaphoreState::FIncAvail( const INT cToInc ) +__forceinline const BOOL CSemaphoreState::FDecAvail() { OSSYNC_FOREVER @@ -1666,94 +1636,27 @@ __forceinline const BOOL CSemaphoreState::FIncAvail( const INT cToInc ) const LONG cAvail = m_cAvail; - - const LONG cAvailStart = cAvail & 0x7FFFFFFF; - - - const LONG cAvailEnd = cAvailStart + cToInc; - - - OSSYNCAssert( cAvail < 0 || ( cAvailStart >= 0 && cAvailEnd <= 0x7FFFFFFF && cAvailEnd == cAvailStart + cToInc ) ); - - - const LONG cAvailOld = AtomicCompareExchange( (LONG*)&m_cAvail, cAvailStart, cAvailEnd ); - - - if ( cAvailOld == cAvailStart ) + if ( cAvail == 0 ) { - - return fTrue; + return fFalse; } - - - else + else if ( AtomicCompareExchange( (LONG*)&m_cAvail, cAvail, cAvail - 1 ) == cAvail ) { - - if ( cAvailOld >= 0 ) - { - - continue; - } - - - else - { - - return fFalse; - } + return fTrue; } } } -__forceinline const BOOL CSemaphoreState::FDecAvail() +__forceinline void CSemaphoreState::IncWait( ) { - - OSSYNC_FOREVER - { - - const LONG cAvail = m_cAvail; - - - OSSYNCAssert( cAvail != 0x80000000 ); - - - const LONG cAvailEnd = ( cAvail - 1 ) & 0x7FFFFFFF; - - - const LONG cAvailStart = cAvailEnd + 1; - - - OSSYNCAssert( cAvail <= 0 || ( cAvailStart > 0 && cAvailEnd >= 0 && cAvailEnd == cAvail - 1 ) ); - - - const LONG cAvailOld = AtomicCompareExchange( (LONG*)&m_cAvail, cAvailStart, cAvailEnd ); - - - if ( cAvailOld == cAvailStart ) - { - - return fTrue; - } - - - else - { - - if ( cAvailOld > 0 ) - { - - continue; - } + AtomicIncrement( (LONG*)&m_cWait ); +} - else - { - - return fFalse; - } - } - } +__forceinline void CSemaphoreState::DecWait( ) +{ + AtomicDecrement( (LONG*)&m_cWait ); } @@ -1791,10 +1694,20 @@ class CSemaphore CSemaphore& operator=( CSemaphore& ) = delete; + // Resolves the internal timeout value to an OS level timeout. + static const DWORD _DwOSTimeout( const INT cmsecTimeout ); - const BOOL _FAcquire( const INT cmsecTimeout ); - const BOOL _FWait( const INT cmsecTimeout ); + const BOOL _FTryAcquire( const INT cSpin ); + const BOOL _FAcquire( const DWORD dwTimeout ); void _Release( const INT cToRelease ); + + // Waits until the semaphore counter value changes. + // This method has the same semantics as the WaitOnAddress() function and is + // guaranteed to return when the address is signaled, but it is also allowed + // to return for other reasons. The caller should compare the new value with + // the original. + const BOOL _FWait( const INT cAvail, const DWORD dwTimeout ); + const BOOL _FOSWait( const INT cAvail, const DWORD dwTimeout ); }; @@ -1816,69 +1729,104 @@ inline void CSemaphore::Wait() inline const BOOL CSemaphore::FTryAcquire() { - - const BOOL fAcquire = State().FDecAvail(); - - - if ( !fAcquire ) + if ( _FTryAcquire( 0 ) ) { - - State().SetContend(); + State().SetAcquire(); + return fTrue; } - - else { - - State().SetAcquire(); + State().SetContend(); + return fFalse; } - - return fAcquire; } inline const BOOL CSemaphore::FAcquire( const INT cmsecTimeout ) { - - return FTryAcquire() || _FAcquire( cmsecTimeout ); + if ( _FTryAcquire( g_cSpinMax ) || _FAcquire( _DwOSTimeout( cmsecTimeout ) ) ) + { + State().SetAcquire(); + return fTrue; + } + else + { + State().SetContend(); + return fFalse; + } } inline const BOOL CSemaphore::FWait( const INT cmsecTimeout ) { - - return ( CAvail() > 0 ) || _FWait( cmsecTimeout ); + if ( _FTryAcquire( 0 ) || _FAcquire( _DwOSTimeout( cmsecTimeout ) ) ) + { + _Release( 1 ); + State().SetAcquire(); + return fTrue; + } + else + { + State().SetContend(); + return fFalse; + } } inline void CSemaphore::Release( const INT cToRelease ) { - - if ( !State().FIncAvail( cToRelease ) ) - { - - _Release( cToRelease ); - } + _Release( cToRelease ); } inline const INT CSemaphore::CWait() const { + OSSYNC_FOREVER + { + const CSemaphoreState state = State(); - const CSemaphoreState stateCur = (CSemaphoreState&) State(); - - - return stateCur.FWait() ? stateCur.CWait() : 0; + if ( state.CAvail() > 0 && state.CWait() > 0 ) + { + // The existing waiters are in transition. + continue; + } + else + { + return state.CWait(); + } + } } inline const INT CSemaphore::CAvail() const { + return State().CAvail(); +} - const CSemaphoreState stateCur = (CSemaphoreState&) State(); +inline const BOOL CSemaphore::_FTryAcquire( const INT cSpin ) +{ + INT cSpinCount = cSpin; - return stateCur.FNoWait() ? stateCur.CAvail() : 0; + OSSYNC_FOREVER + { + if ( State().CAvail() == 0 ) + { + if ( cSpinCount ) + { + cSpinCount--; + continue; + } + else + { + return fFalse; + } + } + else if ( State().FDecAvail() ) + { + return fTrue; + } + } } diff --git a/dev/ese/src/inc/_bf.hxx b/dev/ese/src/inc/_bf.hxx index 65ba426a..98c7638e 100644 --- a/dev/ese/src/inc/_bf.hxx +++ b/dev/ese/src/inc/_bf.hxx @@ -359,7 +359,7 @@ C_ASSERT( sizeof( BF::bfbitfield ) == sizeof( FLAG32 ) ); #ifdef _WIN64 C_ASSERT( sizeof(BF) == 192 ); #else -C_ASSERT( sizeof(BF) == 160 ); +C_ASSERT( sizeof(BF) == 176 ); #endif diff --git a/dev/ese/src/inc/pib.hxx b/dev/ese/src/inc/pib.hxx index 5085719b..f8d84db4 100644 --- a/dev/ese/src/inc/pib.hxx +++ b/dev/ese/src/inc/pib.hxx @@ -461,7 +461,7 @@ public: #ifdef _WIN64 C_ASSERT( sizeof(PIB) == 608 ); #else -C_ASSERT( sizeof(PIB) == 504 ); +C_ASSERT( sizeof(PIB) == 528 ); #endif INLINE SIZE_T OffsetOfTrxOldestILE() { return OffsetOf( PIB, m_ileTrxOldest ); } diff --git a/dev/ese/src/inc/tdb.hxx b/dev/ese/src/inc/tdb.hxx index f1b10f67..39dccc0d 100644 --- a/dev/ese/src/inc/tdb.hxx +++ b/dev/ese/src/inc/tdb.hxx @@ -432,7 +432,7 @@ class TDB #ifdef _AMD64_ BYTE m_bReserved2[8]; #else - BYTE m_bReserved2[4]; + #endif diff --git a/dev/ese/src/sync/CMakeLists.txt b/dev/ese/src/sync/CMakeLists.txt index da2e8da7..0dc560fd 100644 --- a/dev/ese/src/sync/CMakeLists.txt +++ b/dev/ese/src/sync/CMakeLists.txt @@ -10,4 +10,8 @@ add_library(sync STATIC target_include_directories(sync PRIVATE ${INC_OUTPUT_DIRECTORY}/jet +) + +target_link_libraries(sync + synchronization ) \ No newline at end of file diff --git a/dev/ese/src/sync/sync.cxx b/dev/ese/src/sync/sync.cxx index a935ed03..638a4a11 100644 --- a/dev/ese/src/sync/sync.cxx +++ b/dev/ese/src/sync/sync.cxx @@ -319,341 +319,6 @@ CSemaphore::~CSemaphore() } -void CSemaphore::ReleaseAllWaiters() -{ - - OSSYNC_FOREVER - { - - const CSemaphoreState stateCur = State(); - - - if ( stateCur.FNoWait() ) - { - - return; - } - - - else - { - OSSYNCAssert( stateCur.FWait() ); - - - if ( State().FChange( stateCur, CSemaphoreState( 0 ) ) ) - { - - g_ksempoolGlobal.Ksem( stateCur.Irksem(), this ).Release( stateCur.CWait() ); - - - return; - } - } - } -} - - -const BOOL CSemaphore::_FAcquire( const INT cmsecTimeout ) -{ - - INT cSpin = g_cSpinMax; - - - CKernelSemaphorePool::IRKSEM irksemAlloc = CKernelSemaphorePool::irksemNil; - - - OSSYNC_FOREVER - { - - const CSemaphoreState stateCur = (CSemaphoreState&) State(); - - - if ( stateCur.FAvail() ) - { - - if ( State().FChange( stateCur, CSemaphoreState( stateCur.CAvail() - 1 ) ) ) - { - - if ( irksemAlloc != CKernelSemaphorePool::irksemNil ) - { - g_ksempoolGlobal.Unreference( irksemAlloc ); - } - - - State().SetAcquire(); - return fTrue; - } - } - - - else if ( cSpin ) - { - - cSpin--; - continue; - } - - - else if ( stateCur.FNoWaitAndNoAvail() ) - { - - if ( irksemAlloc == CKernelSemaphorePool::irksemNil ) - { - irksemAlloc = g_ksempoolGlobal.Allocate( this ); - } - - - if ( State().FChange( stateCur, CSemaphoreState( 1, irksemAlloc ) ) ) - { - - State().StartWait(); - const BOOL fCompleted = g_ksempoolGlobal.Ksem( irksemAlloc, this ).FAcquire( cmsecTimeout ); - State().StopWait(); - - - if ( fCompleted ) - { - - g_ksempoolGlobal.Unreference( irksemAlloc ); - - - State().SetAcquire(); - return fTrue; - } - - - else - { - - OSSYNC_INNER_FOREVER - { - - const CSemaphoreState stateAfterWait = (CSemaphoreState&) State(); - - - if ( stateAfterWait.FNoWait() || stateAfterWait.Irksem() != irksemAlloc ) - { - - - g_ksempoolGlobal.Ksem( irksemAlloc, this ).Acquire(); - - - g_ksempoolGlobal.Unreference( irksemAlloc ); - - - return fTrue; - } - - - else if ( stateAfterWait.CWait() == 1 ) - { - OSSYNCAssert( stateAfterWait.FWait() ); - OSSYNCAssert( stateAfterWait.Irksem() == irksemAlloc ); - - - if ( State().FChange( stateAfterWait, CSemaphoreState( 0 ) ) ) - { - - g_ksempoolGlobal.Unreference( irksemAlloc ); - - - return fFalse; - } - } - - - else - { - OSSYNCAssert( stateAfterWait.CWait() > 1 ); - OSSYNCAssert( stateAfterWait.FWait() ); - OSSYNCAssert( stateAfterWait.Irksem() == irksemAlloc ); - - - if ( State().FChange( stateAfterWait, CSemaphoreState( stateAfterWait.CWait() - 1, stateAfterWait.Irksem() ) ) ) - { - - g_ksempoolGlobal.Unreference( irksemAlloc ); - - - return fFalse; - } - } - } - } - } - } - - - else - { - OSSYNCAssert( stateCur.FWait() ); - - - g_ksempoolGlobal.Reference( stateCur.Irksem() ); - - - if ( State().FChange( stateCur, CSemaphoreState( stateCur.CWait() + 1, stateCur.Irksem() ) ) ) - { - - if ( irksemAlloc != CKernelSemaphorePool::irksemNil ) - { - g_ksempoolGlobal.Unreference( irksemAlloc ); - } - - - State().StartWait(); - const BOOL fCompleted = g_ksempoolGlobal.Ksem( stateCur.Irksem(), this ).FAcquire( cmsecTimeout ); - State().StopWait(); - - - if ( fCompleted ) - { - - g_ksempoolGlobal.Unreference( stateCur.Irksem() ); - - - State().SetAcquire(); - return fTrue; - } - - - else - { - - OSSYNC_INNER_FOREVER - { - - const CSemaphoreState stateAfterWait = (CSemaphoreState&) State(); - - - if ( stateAfterWait.FNoWait() || stateAfterWait.Irksem() != stateCur.Irksem() ) - { - - - g_ksempoolGlobal.Ksem( stateCur.Irksem(), this ).Acquire(); - - - g_ksempoolGlobal.Unreference( stateCur.Irksem() ); - - - return fTrue; - } - - - else if ( stateAfterWait.CWait() == 1 ) - { - OSSYNCAssert( stateAfterWait.FWait() ); - OSSYNCAssert( stateAfterWait.Irksem() == stateCur.Irksem() ); - - - if ( State().FChange( stateAfterWait, CSemaphoreState( 0 ) ) ) - { - - g_ksempoolGlobal.Unreference( stateCur.Irksem() ); - - - return fFalse; - } - } - - - else - { - OSSYNCAssert( stateAfterWait.CWait() > 1 ); - OSSYNCAssert( stateAfterWait.FWait() ); - OSSYNCAssert( stateAfterWait.Irksem() == stateCur.Irksem() ); - - - if ( State().FChange( stateAfterWait, CSemaphoreState( stateAfterWait.CWait() - 1, stateAfterWait.Irksem() ) ) ) - { - - g_ksempoolGlobal.Unreference( stateCur.Irksem() ); - - - return fFalse; - } - } - } - } - } - - - g_ksempoolGlobal.Unreference( stateCur.Irksem() ); - } - } -} - - -const BOOL CSemaphore::_FWait( const INT cmsecTimeout ) -{ - if ( _FAcquire( cmsecTimeout ) ) - { - Release(); - return fTrue; - } - - return fFalse; -} - - -void CSemaphore::_Release( const INT cToRelease ) -{ - - OSSYNC_FOREVER - { - - const CSemaphoreState stateCur = State(); - - - if ( stateCur.FNoWait() ) - { - - if ( State().FChange( stateCur, CSemaphoreState( stateCur.CAvail() + cToRelease ) ) ) - { - - return; - } - } - - - else - { - OSSYNCAssert( stateCur.FWait() ); - - - if ( stateCur.CWait() <= cToRelease ) - { - - if ( State().FChange( stateCur, CSemaphoreState( cToRelease - stateCur.CWait() ) ) ) - { - - g_ksempoolGlobal.Ksem( stateCur.Irksem(), this ).Release( stateCur.CWait() ); - - - return; - } - } - - - else - { - OSSYNCAssert( stateCur.CWait() > cToRelease ); - - - if ( State().FChange( stateCur, CSemaphoreState( stateCur.CWait() - cToRelease, stateCur.Irksem() ) ) ) - { - - g_ksempoolGlobal.Ksem( stateCur.Irksem(), this ).Release( cToRelease ); - - - return; - } - } - } - } -} - - CAutoResetSignal::CAutoResetSignal( const CSyncBasicInfo& sbi ) @@ -3479,6 +3144,221 @@ void CKernelSemaphore::Release( const INT cToRelease ) +const DWORD CSemaphore::_DwOSTimeout( const INT cmsecTimeout ) +{ + if ( cmsecTimeout == cmsecInfinite || cmsecTimeout == cmsecInfiniteNoDeadlock ) + { + return INFINITE; + } + else if ( cmsecTimeout >= 0 ) + { + return cmsecTimeout; + } + else + { + return 0; + } +} + + +const BOOL CSemaphore::_FAcquire( const DWORD dwTimeout ) +{ + const DWORD dwStart = DwOSSyncITickTime(); + DWORD dwRemaining = dwTimeout; + + State().IncWait(); + OSSYNC_FOREVER + { + CSemaphoreState state( syncstateNull ); + BOOL fTimedOut = fFalse; + + while ( fTrue ) + { + state = State(); + if ( state.CAvail() > 0 ) + { + break; + } + + if ( dwRemaining == 0 ) + { + fTimedOut = fTrue; + break; + } + + if ( !_FWait( state.CAvail(), dwRemaining ) ) + { + fTimedOut = fTrue; + break; + } + + const DWORD dwElapsed = DwOSSyncITickTime() - dwStart; + if ( dwElapsed > dwTimeout ) + { + fTimedOut = fTrue; + break; + } + + dwRemaining = dwTimeout - dwElapsed; + } + + if ( fTimedOut ) + { + State().DecWait(); + return fFalse; + } + + OSSYNCAssert( state.CAvail() > 0 ); + OSSYNCAssert( state.CWait() > 0 ); + + // Atomically acquire the semaphore and decrement the waiting counter. + const CSemaphoreState stateNew( state.CAvail() - 1, state.CWait() - 1 ); + if ( State().FChange( state, stateNew ) ) + { + return fTrue; + } + } +} + + +void CSemaphore::ReleaseAllWaiters() +{ + OSSYNC_FOREVER + { + const CSemaphoreState state = State(); + + if ( state.CAvail() > 0 && state.CWait() > 0 ) + { + // The existing waiters are in transition. + continue; + } + else + { + const CSemaphoreState stateNew( state.CWait(), state.CWait() ); + if ( State().FChange(state, stateNew ) ) + { + volatile void *pv = State().PAvail(); + + WakeByAddressAll( (void*)pv ); + + return; + } + } + } +} + + +void CSemaphore::_Release( const INT cToRelease ) +{ + if ( cToRelease <= 0 ) + { + return; + } + + State().IncAvail( cToRelease ); + + LONG cWait = State().CWait(); + if ( cWait == 0 ) + { + // No one is waiting. + } + else if ( cWait <= cToRelease ) + { + volatile void *pv = State().PAvail(); + // No more waiting threads than `cToRelease`, wake everyone. + WakeByAddressAll( (void*)pv ); + } + else + { + volatile void *pv = State().PAvail(); + // Wake at most `cToRelease` threads. The benefit from not waking unnecessary + // threads is expected to be greater than the loss on extra calls below. + for ( INT i = 0; i < cToRelease; i++ ) + { + WakeByAddressSingle( (void*)pv ); + } + } +} + + +const BOOL CSemaphore::_FWait( const INT cAvail, const DWORD dwTimeout ) +{ + PERFOpt( AtomicIncrement( (LONG*)&g_cOSSYNCThreadBlock ) ); + State().StartWait(); + + BOOL fSuccess; + +#ifdef SYNC_DEADLOCK_DETECTION + if (dwTimeout > cmsecDeadlock ) + { + fSuccess = _FOSWait( cAvail, cmsecDeadlock ); + if ( !fSuccess ) + { +#ifdef DEBUG + SYNCDeadLockTimeOutState sdltosStatePre = sdltosEnabled; + OSSYNC_FOREVER + { + C_ASSERT( sizeof(g_sdltosState) == sizeof(LONG) ); + sdltosStatePre = (SYNCDeadLockTimeOutState)AtomicCompareExchange( (LONG*)&g_sdltosState, sdltosEnabled, sdltosCheckInProgress ); + if ( sdltosStatePre != sdltosCheckInProgress ) + { + break; + } + Sleep( 16 ); + } +#else + SYNCDeadLockTimeOutState sdltosStatePre = sdltosEnabled; +#endif + + OSSYNCAssertSzRTL( fSuccess || sdltosStatePre == sdltosDisabled, "Potential Deadlock Detected (Timeout); FYI ed [dll]!OSSYNC::g_sdltosState to 0 to disable." ); + +#ifdef DEBUG + if ( sdltosStatePre != sdltosDisabled ) + { + OSSYNCAssert( sdltosStatePre != sdltosCheckInProgress ); + + + const SYNCDeadLockTimeOutState sdltosCheck = (SYNCDeadLockTimeOutState)AtomicCompareExchange( (LONG*)&g_sdltosState, sdltosCheckInProgress, sdltosEnabled ); + OSSYNCAssertSzRTL( sdltosCheck != sdltosDisabled, "Devs, the debugger used to set g_sdltosState to 0 and disables further timeout detection asserts!? Just an FYI. If you did not, then code is confused." ); + } +#endif + + DWORD dwNewTimeout = dwTimeout; + + if ( dwNewTimeout < INFINITE ) + { + dwNewTimeout -= cmsecDeadlock; + } + + fSuccess = _FOSWait( cAvail, dwNewTimeout ); + } + } + else +#endif + { + fSuccess = _FOSWait( cAvail, dwTimeout ); + } + + State().StopWait(); + PERFOpt( AtomicIncrement( (LONG*)&g_cOSSYNCThreadResume ) ); + + return fSuccess; +} + + +const BOOL CSemaphore::_FOSWait( const INT cAvail, const DWORD dwTimeout ) +{ + volatile void *pv = State().PAvail(); + + OnThreadWaitBegin(); + BOOL fSuccess = WaitOnAddress( pv, (PVOID)&cAvail, sizeof(cAvail), dwTimeout ); + OnThreadWaitEnd(); + + return fSuccess; +} + + + #include #include #include @@ -4821,15 +4701,8 @@ void CKernelSemaphore::Dump( const CDumpContext& dc ) const void CSemaphoreState::Dump( const CDumpContext& dc ) const { - if ( FNoWait() ) - { - DumpMember( dc, m_cAvail ); - } - else - { - DumpMember( dc, m_irksem ); - DumpMember( dc, m_cWaitNeg ); - } + DumpMember( dc, m_cAvail ); + DumpMember( dc, m_cWait ); } void CSemaphoreInfo::Dump( const CDumpContext& dc ) const From 9dd322f9312eaa421fc6bf2a909bb8656aa7d60a Mon Sep 17 00:00:00 2001 From: Evgeny Kotkov Date: Tue, 23 Mar 2021 16:14:19 +0300 Subject: [PATCH 02/20] sync: Block stealing the semaphore from woken threads Allowing to steal the semaphore from the threads that are woken up introduces a fairness issue and can lead to starvations. The underlying `WakeOnAddress()` primitive by itself allows for a fair implementation, because it guarantees that the threads are woken up in FIFO order [1]. However, if we don't block the other threads from stealing the semaphore from the waiters right after waking them up, those waiters may keep getting starved out. This behavior is shown by the new regression test, CSemaphoreFairnessTest. [1] https://docs.microsoft.com/windows/win32/api/synchapi/nf-synchapi-wakebyaddresssingle --- dev/ese/published/inc/sync.hxx | 4 +- .../sync/syncunit/semaphoreperf.cxx | 143 ++++++++++++++++++ 2 files changed, 146 insertions(+), 1 deletion(-) diff --git a/dev/ese/published/inc/sync.hxx b/dev/ese/published/inc/sync.hxx index c766954f..1f450a67 100644 --- a/dev/ese/published/inc/sync.hxx +++ b/dev/ese/published/inc/sync.hxx @@ -1810,7 +1810,9 @@ inline const BOOL CSemaphore::_FTryAcquire( const INT cSpin ) OSSYNC_FOREVER { - if ( State().CAvail() == 0 ) + // Do not acquire the semaphore with waiting threads to avoid inadvertently + // stealing it from those waiting threads themselves. + if ( State().CAvail() == 0 || State().CWait() > 0 ) { if ( cSpinCount ) { diff --git a/test/ese/src/devlibtest/sync/syncunit/semaphoreperf.cxx b/test/ese/src/devlibtest/sync/syncunit/semaphoreperf.cxx index f98c7bfb..6e073473 100644 --- a/test/ese/src/devlibtest/sync/syncunit/semaphoreperf.cxx +++ b/test/ese/src/devlibtest/sync/syncunit/semaphoreperf.cxx @@ -736,3 +736,146 @@ ERR CSemaphorePerfTest::ErrTest() } +class CSemaphoreFairnessTest : public UNITTEST +{ +private: + static CSemaphoreFairnessTest s_instance; + +public: + const char * SzName() const; + const char * SzDescription() const; + bool FRunUnderESE98() const; + bool FRunUnderESENT() const; + bool FRunUnderESE97() const; + ERR ErrTest(); + void TestCase( const LONG cThreads ); +}; + +CSemaphoreFairnessTest CSemaphoreFairnessTest::s_instance; + +const char * CSemaphoreFairnessTest::SzName() const +{ + return "CSemaphoreFairnessTest"; +}; + +const char * CSemaphoreFairnessTest::SzDescription() const +{ + return "Tests the CSemaphore for fairness."; +} + +bool CSemaphoreFairnessTest::FRunUnderESE98() const +{ + return true; +} + +bool CSemaphoreFairnessTest::FRunUnderESENT() const +{ + return true; +} + +bool CSemaphoreFairnessTest::FRunUnderESE97() const +{ + return true; +} + +struct CSemaphoreFairnessTestContext +{ + HANDLE hThread; + CSemaphore *pSemaphore; + BOOL bAggressive; + volatile ULONG *pulExit; + volatile ULONG ulAcquire; +}; + +static DWORD WINAPI FairnessTestThread( LPVOID pvContext ) +{ + CSemaphoreFairnessTestContext *pContext = (CSemaphoreFairnessTestContext*)pvContext; + + while ( !InterlockedCompareExchange( pContext->pulExit, 0, 0 ) ) + { + if ( pContext->bAggressive ) + { + pContext->pSemaphore->Acquire(); + InterlockedIncrement( &pContext->ulAcquire ); + Sleep(1); + pContext->pSemaphore->Release(); + } + else + { + Sleep(1); + pContext->pSemaphore->Acquire(); + InterlockedIncrement( &pContext->ulAcquire ); + pContext->pSemaphore->Release(); + } + } + + return ERROR_SUCCESS; +} + +ERR CSemaphoreFairnessTest::ErrTest() +{ + TestAssert( FOSSyncPreinit() ); + + TestCase( 2 ); + TestCase( 3 ); + TestCase( 4 ); + TestCase( 6 ); + TestCase( 8 ); + + OSSyncPostterm(); + + return JET_errSuccess; +} + +void CSemaphoreFairnessTest::TestCase( const LONG cThreads ) +{ + CSemaphore semaphore( CSyncBasicInfo( "CSemaphoreFairnessTest" ) ); + volatile ULONG ulExit = 0; + + CSemaphoreFairnessTestContext *pContexts = new CSemaphoreFairnessTestContext[cThreads]; + for ( LONG i = 0; i < cThreads; i++ ) + { + CSemaphoreFairnessTestContext *pContext = &pContexts[i]; + + memset( pContext, 0, sizeof(*pContext) ); + pContext->pSemaphore = &semaphore; + pContext->pulExit = &ulExit; + pContext->bAggressive = i % 2; + pContext->hThread = CreateThread( NULL, 0, FairnessTestThread, pContext, 0, NULL ); + TestAssert( pContext->hThread ); + } + + semaphore.Release( 1 ); + Sleep( 3 * 1000 ); + InterlockedExchange( &ulExit, 1 ); + + ULONG ulAcquireMin = ULONG_MAX; + ULONG ulAcquireMax = 0; + for ( LONG i = 0; i < cThreads; i++ ) + { + CSemaphoreFairnessTestContext *pContext = &pContexts[i]; + + TestAssert( WaitForSingleObject( pContext->hThread, INFINITE ) == WAIT_OBJECT_0 ); + TestAssert( CloseHandle( pContext->hThread ) ); + + if ( pContext->ulAcquire < ulAcquireMin ) + { + ulAcquireMin = pContext->ulAcquire; + } + + if ( pContext->ulAcquire > ulAcquireMax ) + { + ulAcquireMax = pContext->ulAcquire; + } + + wprintf( L"\t%lu", pContext->ulAcquire ); + } + + wprintf( L"\n" ); + + TestAssert( ulAcquireMax < ulAcquireMin * 4 ); + + delete[] pContexts; +} + + From 1add999475af0263d93f947c1433fe435db214e0 Mon Sep 17 00:00:00 2001 From: Evgeny Kotkov Date: Tue, 23 Mar 2021 16:22:28 +0300 Subject: [PATCH 03/20] sync: Rewrite the semaphore wait loop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Simplify the code by switching to a single loop, instead of using a nested loop for the OS-level wait. Ensure that when returning from the OS-level wait, we try to acquire the semaphore as soon as possible, without any intermediate actions such as adjusting the remaining time — as this is better from the standpoint of working under contention. --- dev/ese/src/sync/sync.cxx | 55 +++++++++++++++------------------------ 1 file changed, 21 insertions(+), 34 deletions(-) diff --git a/dev/ese/src/sync/sync.cxx b/dev/ese/src/sync/sync.cxx index 638a4a11..a544afcb 100644 --- a/dev/ese/src/sync/sync.cxx +++ b/dev/ese/src/sync/sync.cxx @@ -3169,55 +3169,42 @@ const BOOL CSemaphore::_FAcquire( const DWORD dwTimeout ) State().IncWait(); OSSYNC_FOREVER { - CSemaphoreState state( syncstateNull ); - BOOL fTimedOut = fFalse; + CSemaphoreState state = State(); - while ( fTrue ) + if ( state.CAvail() > 0 ) { - state = State(); - if ( state.CAvail() > 0 ) - { - break; - } - - if ( dwRemaining == 0 ) - { - fTimedOut = fTrue; - break; - } + OSSYNCAssert( state.CWait() > 0 ); - if ( !_FWait( state.CAvail(), dwRemaining ) ) + // Atomically acquire the semaphore and decrement the waiting counter. + const CSemaphoreState stateNew( state.CAvail() - 1, state.CWait() - 1 ); + if ( State().FChange( state, stateNew ) ) { - fTimedOut = fTrue; - break; + return fTrue; } - + } + else if ( dwRemaining == 0 ) + { + break; + } + else + { const DWORD dwElapsed = DwOSSyncITickTime() - dwStart; if ( dwElapsed > dwTimeout ) { - fTimedOut = fTrue; break; } dwRemaining = dwTimeout - dwElapsed; - } - - if ( fTimedOut ) - { - State().DecWait(); - return fFalse; - } - OSSYNCAssert( state.CAvail() > 0 ); - OSSYNCAssert( state.CWait() > 0 ); - - // Atomically acquire the semaphore and decrement the waiting counter. - const CSemaphoreState stateNew( state.CAvail() - 1, state.CWait() - 1 ); - if ( State().FChange( state, stateNew ) ) - { - return fTrue; + if ( !_FWait( state.CAvail(), dwRemaining ) ) + { + break; + } } } + State().DecWait(); + + return fFalse; } From fb08168c3aab1559ce2a3149ed1fbeab601929e1 Mon Sep 17 00:00:00 2001 From: Evgeny Kotkov Date: Tue, 23 Mar 2021 16:23:49 +0300 Subject: [PATCH 04/20] Fix formatting in sync.hxx --- dev/ese/published/inc/sync.hxx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dev/ese/published/inc/sync.hxx b/dev/ese/published/inc/sync.hxx index 1f450a67..9ee2d31b 100644 --- a/dev/ese/published/inc/sync.hxx +++ b/dev/ese/published/inc/sync.hxx @@ -1648,13 +1648,13 @@ __forceinline const BOOL CSemaphoreState::FDecAvail() } -__forceinline void CSemaphoreState::IncWait( ) +__forceinline void CSemaphoreState::IncWait() { AtomicIncrement( (LONG*)&m_cWait ); } -__forceinline void CSemaphoreState::DecWait( ) +__forceinline void CSemaphoreState::DecWait() { AtomicDecrement( (LONG*)&m_cWait ); } From cbb7c51cb34a726dadba30ff29c077e6b78ccf0a Mon Sep 17 00:00:00 2001 From: Evgeny Kotkov Date: Tue, 23 Mar 2021 16:28:26 +0300 Subject: [PATCH 05/20] Use an appropriate type for a local variable Within the CSemaphore::_Release() method, use `const INT` for an immutable local variable instead of a `LONG`, to match the return type of the CSemaphoreState::CWait(). --- dev/ese/src/sync/sync.cxx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/ese/src/sync/sync.cxx b/dev/ese/src/sync/sync.cxx index a544afcb..e7153cbe 100644 --- a/dev/ese/src/sync/sync.cxx +++ b/dev/ese/src/sync/sync.cxx @@ -3244,7 +3244,7 @@ void CSemaphore::_Release( const INT cToRelease ) State().IncAvail( cToRelease ); - LONG cWait = State().CWait(); + const INT cWait = State().CWait(); if ( cWait == 0 ) { // No one is waiting. From 73c4e6210d05b679f6068a2169677b69d05f8343 Mon Sep 17 00:00:00 2001 From: Evgeny Kotkov Date: Sat, 5 Jun 2021 23:36:14 +0300 Subject: [PATCH 06/20] Restore the easy-out check in CSemaphore::FWait() The new implementation inadvertently removed the check that allowed skipping the full acquire-release cycle when the semaphore is not contended. So let's bring it back. --- dev/ese/published/inc/sync.hxx | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/dev/ese/published/inc/sync.hxx b/dev/ese/published/inc/sync.hxx index 544dc42d..2d1e8028 100644 --- a/dev/ese/published/inc/sync.hxx +++ b/dev/ese/published/inc/sync.hxx @@ -2193,6 +2193,11 @@ inline const BOOL CSemaphore::FAcquire( const INT cmsecTimeout ) inline const BOOL CSemaphore::FWait( const INT cmsecTimeout ) { + if ( State().CAvail() > 0 ) + { + return fTrue; + } + if ( _FTryAcquire( 0 ) || _FAcquire( _DwOSTimeout( cmsecTimeout ) ) ) { _Release( 1 ); From c559a0ecffad8a4e3e5a3f7a5f35883b462276d2 Mon Sep 17 00:00:00 2001 From: Evgeny Kotkov Date: Sat, 5 Jun 2021 23:44:02 +0300 Subject: [PATCH 07/20] Fix a potential race condition in CSemaphore::_FTryAcquire() Within that method, check the condition against a local copy of the state, as otherwise the state might change between the two parts of the condition. --- dev/ese/published/inc/sync.hxx | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dev/ese/published/inc/sync.hxx b/dev/ese/published/inc/sync.hxx index 2d1e8028..8e2e48c0 100644 --- a/dev/ese/published/inc/sync.hxx +++ b/dev/ese/published/inc/sync.hxx @@ -2253,9 +2253,11 @@ inline const BOOL CSemaphore::_FTryAcquire( const INT cSpin ) OSSYNC_FOREVER { + const CSemaphoreState state = State(); + // Do not acquire the semaphore with waiting threads to avoid inadvertently // stealing it from those waiting threads themselves. - if ( State().CAvail() == 0 || State().CWait() > 0 ) + if ( state.CAvail() == 0 || state.CWait() > 0 ) { if ( cSpinCount ) { From fc221ebc322544808414ca16cbf26eed011e7850 Mon Sep 17 00:00:00 2001 From: Evgeny Kotkov Date: Sat, 5 Jun 2021 23:50:57 +0300 Subject: [PATCH 08/20] Move the CSemaphore's ctor and dtor below in sync.cxx The new implementation of the CSemaphore is OS dependent, so it is located in the bottom half of the file. Let's keep the CSemaphore's constructor and destructor together with that implementation in the file. --- dev/ese/src/sync/sync.cxx | 75 +++++++++++++++++++-------------------- 1 file changed, 37 insertions(+), 38 deletions(-) diff --git a/dev/ese/src/sync/sync.cxx b/dev/ese/src/sync/sync.cxx index 4cc0e60d..1013a8a2 100644 --- a/dev/ese/src/sync/sync.cxx +++ b/dev/ese/src/sync/sync.cxx @@ -332,44 +332,6 @@ CSyncPerfAcquire::~CSyncPerfAcquire() } -// Semaphore - -// ctor - -CSemaphore::CSemaphore( const CSyncBasicInfo& sbi ) - : CEnhancedStateContainer< CSemaphoreState, CSyncStateInitNull, CSemaphoreInfo, CSyncBasicInfo >( syncstateNull, sbi ) -{ - // further init of CSyncBasicInfo - - State().SetTypeName( "CSemaphore" ); - State().SetInstance( (CSyncObject*)this ); -} - -// dtor - -CSemaphore::~CSemaphore() -{ -#ifdef SYNC_ANALYZE_PERFORMANCE -#ifdef SYNC_DUMP_PERF_DATA - - // dump performance data - - OSSyncStatsDump( State().SzTypeName(), - State().SzInstanceName(), - State().Instance(), - (DWORD)-1, - State().CWaitTotal(), - State().CsecWaitElapsed(), - State().CAcquireTotal(), - State().CContendTotal(), - 0, - 0 ); - -#endif // SYNC_DUMP_PERF_DATA -#endif // SYNC_ANALYZE_PERFORMANCE -} - - // Auto-Reset Signal // ctor @@ -3855,6 +3817,43 @@ void CKernelSemaphore::Release( const INT cToRelease ) } +// Semaphore + +// ctor + +CSemaphore::CSemaphore( const CSyncBasicInfo& sbi ) + : CEnhancedStateContainer< CSemaphoreState, CSyncStateInitNull, CSemaphoreInfo, CSyncBasicInfo >( syncstateNull, sbi ) +{ + // further init of CSyncBasicInfo + + State().SetTypeName( "CSemaphore" ); + State().SetInstance( (CSyncObject*)this ); +} + +// dtor + +CSemaphore::~CSemaphore() +{ +#ifdef SYNC_ANALYZE_PERFORMANCE +#ifdef SYNC_DUMP_PERF_DATA + + // dump performance data + + OSSyncStatsDump( State().SzTypeName(), + State().SzInstanceName(), + State().Instance(), + (DWORD)-1, + State().CWaitTotal(), + State().CsecWaitElapsed(), + State().CAcquireTotal(), + State().CContendTotal(), + 0, + 0 ); + +#endif // SYNC_DUMP_PERF_DATA +#endif // SYNC_ANALYZE_PERFORMANCE +} + const DWORD CSemaphore::_DwOSTimeout( const INT cmsecTimeout ) { From 2aed4616604a9d55ee82738b22a19c4f27734107 Mon Sep 17 00:00:00 2001 From: Evgeny Kotkov Date: Sat, 5 Jun 2021 23:55:11 +0300 Subject: [PATCH 09/20] Rename the local variable in CSemaphore::_FTryAcquire() Use a slightly more descriptive name `cSpinRemaining` instead of `cSpinCount`. That also makes the new name more distinctive from the method's parameter, `cSpin`. --- dev/ese/published/inc/sync.hxx | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dev/ese/published/inc/sync.hxx b/dev/ese/published/inc/sync.hxx index 8e2e48c0..355e9e1b 100644 --- a/dev/ese/published/inc/sync.hxx +++ b/dev/ese/published/inc/sync.hxx @@ -2249,7 +2249,7 @@ inline const INT CSemaphore::CAvail() const inline const BOOL CSemaphore::_FTryAcquire( const INT cSpin ) { - INT cSpinCount = cSpin; + INT cSpinRemaining = cSpin; OSSYNC_FOREVER { @@ -2259,9 +2259,9 @@ inline const BOOL CSemaphore::_FTryAcquire( const INT cSpin ) // stealing it from those waiting threads themselves. if ( state.CAvail() == 0 || state.CWait() > 0 ) { - if ( cSpinCount ) + if ( cSpinRemaining ) { - cSpinCount--; + cSpinRemaining--; continue; } else From aafdf9626013c39062b78a10594e73685e78f83b Mon Sep 17 00:00:00 2001 From: Evgeny Kotkov Date: Sat, 5 Jun 2021 23:56:35 +0300 Subject: [PATCH 10/20] Add a missing `const` in CSemaphore::_FAcquire() --- dev/ese/src/sync/sync.cxx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/ese/src/sync/sync.cxx b/dev/ese/src/sync/sync.cxx index 1013a8a2..45b43798 100644 --- a/dev/ese/src/sync/sync.cxx +++ b/dev/ese/src/sync/sync.cxx @@ -3882,7 +3882,7 @@ const BOOL CSemaphore::_FAcquire( const DWORD dwTimeout ) State().IncWait(); OSSYNC_FOREVER { - CSemaphoreState state = State(); + const CSemaphoreState state = State(); if ( state.CAvail() > 0 ) { From 880dab27c3d602220ddac02f5026d8c53207c779 Mon Sep 17 00:00:00 2001 From: Evgeny Kotkov Date: Sun, 6 Jun 2021 00:05:08 +0300 Subject: [PATCH 11/20] Rename the local variables `state` to `stateCur` The original code used the latter name, so let's not unnecessarily change that. --- dev/ese/published/inc/sync.hxx | 10 +++++----- dev/ese/src/sync/sync.cxx | 20 ++++++++++---------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/dev/ese/published/inc/sync.hxx b/dev/ese/published/inc/sync.hxx index 355e9e1b..d8cd00cc 100644 --- a/dev/ese/published/inc/sync.hxx +++ b/dev/ese/published/inc/sync.hxx @@ -2225,16 +2225,16 @@ inline const INT CSemaphore::CWait() const { OSSYNC_FOREVER { - const CSemaphoreState state = State(); + const CSemaphoreState stateCur = State(); - if ( state.CAvail() > 0 && state.CWait() > 0 ) + if ( stateCur.CAvail() > 0 && stateCur.CWait() > 0 ) { // The existing waiters are in transition. continue; } else { - return state.CWait(); + return stateCur.CWait(); } } } @@ -2253,11 +2253,11 @@ inline const BOOL CSemaphore::_FTryAcquire( const INT cSpin ) OSSYNC_FOREVER { - const CSemaphoreState state = State(); + const CSemaphoreState stateCur = State(); // Do not acquire the semaphore with waiting threads to avoid inadvertently // stealing it from those waiting threads themselves. - if ( state.CAvail() == 0 || state.CWait() > 0 ) + if ( stateCur.CAvail() == 0 || stateCur.CWait() > 0 ) { if ( cSpinRemaining ) { diff --git a/dev/ese/src/sync/sync.cxx b/dev/ese/src/sync/sync.cxx index 45b43798..cde81f7b 100644 --- a/dev/ese/src/sync/sync.cxx +++ b/dev/ese/src/sync/sync.cxx @@ -3882,15 +3882,15 @@ const BOOL CSemaphore::_FAcquire( const DWORD dwTimeout ) State().IncWait(); OSSYNC_FOREVER { - const CSemaphoreState state = State(); + const CSemaphoreState stateCur = State(); - if ( state.CAvail() > 0 ) + if ( stateCur.CAvail() > 0 ) { - OSSYNCAssert( state.CWait() > 0 ); + OSSYNCAssert( stateCur.CWait() > 0 ); // Atomically acquire the semaphore and decrement the waiting counter. - const CSemaphoreState stateNew( state.CAvail() - 1, state.CWait() - 1 ); - if ( State().FChange( state, stateNew ) ) + const CSemaphoreState stateNew( stateCur.CAvail() - 1, stateCur.CWait() - 1 ); + if ( State().FChange( stateCur, stateNew ) ) { return fTrue; } @@ -3909,7 +3909,7 @@ const BOOL CSemaphore::_FAcquire( const DWORD dwTimeout ) dwRemaining = dwTimeout - dwElapsed; - if ( !_FWait( state.CAvail(), dwRemaining ) ) + if ( !_FWait( stateCur.CAvail(), dwRemaining ) ) { break; } @@ -3926,17 +3926,17 @@ void CSemaphore::ReleaseAllWaiters() { OSSYNC_FOREVER { - const CSemaphoreState state = State(); + const CSemaphoreState stateCur = State(); - if ( state.CAvail() > 0 && state.CWait() > 0 ) + if ( stateCur.CAvail() > 0 && stateCur.CWait() > 0 ) { // The existing waiters are in transition. continue; } else { - const CSemaphoreState stateNew( state.CWait(), state.CWait() ); - if ( State().FChange(state, stateNew ) ) + const CSemaphoreState stateNew( stateCur.CWait(), stateCur.CWait() ); + if ( State().FChange(stateCur, stateNew ) ) { volatile void *pv = State().PAvail(); From cf8ab1e02652b3cf091f197249a784739f7f495a Mon Sep 17 00:00:00 2001 From: Evgeny Kotkov Date: Sun, 6 Jun 2021 00:09:59 +0300 Subject: [PATCH 12/20] Restore the removed (CSemaphoreState&) casts The original code had such explicit casts, and they are also used everywhere within sync.cxx, so let's not unnecessarily change that. --- dev/ese/published/inc/sync.hxx | 4 ++-- dev/ese/src/sync/sync.cxx | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dev/ese/published/inc/sync.hxx b/dev/ese/published/inc/sync.hxx index d8cd00cc..6c722d19 100644 --- a/dev/ese/published/inc/sync.hxx +++ b/dev/ese/published/inc/sync.hxx @@ -2225,7 +2225,7 @@ inline const INT CSemaphore::CWait() const { OSSYNC_FOREVER { - const CSemaphoreState stateCur = State(); + const CSemaphoreState stateCur = (CSemaphoreState&) State(); if ( stateCur.CAvail() > 0 && stateCur.CWait() > 0 ) { @@ -2253,7 +2253,7 @@ inline const BOOL CSemaphore::_FTryAcquire( const INT cSpin ) OSSYNC_FOREVER { - const CSemaphoreState stateCur = State(); + const CSemaphoreState stateCur = (CSemaphoreState&) State(); // Do not acquire the semaphore with waiting threads to avoid inadvertently // stealing it from those waiting threads themselves. diff --git a/dev/ese/src/sync/sync.cxx b/dev/ese/src/sync/sync.cxx index cde81f7b..c133af03 100644 --- a/dev/ese/src/sync/sync.cxx +++ b/dev/ese/src/sync/sync.cxx @@ -3882,7 +3882,7 @@ const BOOL CSemaphore::_FAcquire( const DWORD dwTimeout ) State().IncWait(); OSSYNC_FOREVER { - const CSemaphoreState stateCur = State(); + const CSemaphoreState stateCur = (CSemaphoreState&) State(); if ( stateCur.CAvail() > 0 ) { @@ -3926,7 +3926,7 @@ void CSemaphore::ReleaseAllWaiters() { OSSYNC_FOREVER { - const CSemaphoreState stateCur = State(); + const CSemaphoreState stateCur = (CSemaphoreState&) State(); if ( stateCur.CAvail() > 0 && stateCur.CWait() > 0 ) { From eac8c87db8de86ad95aea3aee1f2a627a4ac92ed Mon Sep 17 00:00:00 2001 From: Evgeny Kotkov Date: Sun, 6 Jun 2021 00:11:34 +0300 Subject: [PATCH 13/20] Drop the empty #else-#endif block in tdb.hxx --- dev/ese/src/inc/tdb.hxx | 1 - 1 file changed, 1 deletion(-) diff --git a/dev/ese/src/inc/tdb.hxx b/dev/ese/src/inc/tdb.hxx index 06880659..9101e59e 100644 --- a/dev/ese/src/inc/tdb.hxx +++ b/dev/ese/src/inc/tdb.hxx @@ -483,7 +483,6 @@ class TDB #ifdef _AMD64_ BYTE m_bReserved2[8]; // for alignment. fileopen.cxx: C_ASSERT( sizeof(TDB) % 16 == 0 ); -#else #endif // 208 / 272 bytes (amd64) From 54d0fad9d5f7dcbd6cd27ede2a74b20ae3bbbb36 Mon Sep 17 00:00:00 2001 From: Evgeny Kotkov Date: Sun, 6 Jun 2021 02:26:49 +0300 Subject: [PATCH 14/20] Update the comments in sync.hxx/.cxx This changeset reworks the existing comments and adds the missing bits to match the style and the documentation level that is used in sync.hxx and sync.cxx. No functional changes intended. --- dev/ese/published/inc/sync.hxx | 81 ++++++++++++++++++++++++++----- dev/ese/src/sync/sync.cxx | 89 +++++++++++++++++++++++++++++----- 2 files changed, 147 insertions(+), 23 deletions(-) diff --git a/dev/ese/published/inc/sync.hxx b/dev/ese/published/inc/sync.hxx index 6c722d19..10dd9158 100644 --- a/dev/ese/published/inc/sync.hxx +++ b/dev/ese/published/inc/sync.hxx @@ -316,6 +316,7 @@ extern const INT cbCacheLine; extern BOOL g_fSyncProcessAbort; +// system max spin count extern INT g_cSpinMax; @@ -2030,8 +2031,7 @@ inline const BOOL CSemaphoreState::FChange( const CSemaphoreState& stateCur, con return AtomicCompareExchange( (QWORD*)&m_qwState, stateCur.m_qwState, stateNew.m_qwState ) == stateCur.m_qwState; } -// increase the available count on the semaphore by the count given using -// a transacted memory compare/exchange operation +// atomically increase the available count on the semaphore by the count given __forceinline void CSemaphoreState::IncAvail( const INT cToInc ) { @@ -2043,26 +2043,43 @@ __forceinline void CSemaphoreState::IncAvail( const INT cToInc ) __forceinline const BOOL CSemaphoreState::FDecAvail() { + // try forever to change the state of the semaphore + OSSYNC_FOREVER { + // get current value + const LONG cAvail = m_cAvail; + // see if we have an available count + if ( cAvail == 0 ) { + // we do not have an available count, return failure + return fFalse; } + + // we have an available count, attempt the transaction + else if ( AtomicCompareExchange( (LONG*)&m_cAvail, cAvail, cAvail - 1 ) == cAvail ) { + // the transaction succeeded, return success + return fTrue; } } } +// atomically increment the number of waiters + __forceinline void CSemaphoreState::IncWait() { AtomicIncrement( (LONG*)&m_cWait ); } +// atomically decrement the number of waiters + __forceinline void CSemaphoreState::DecWait() { AtomicDecrement( (LONG*)&m_cWait ); @@ -2113,18 +2130,13 @@ class CSemaphore // manipulators - // Resolves the internal timeout value to an OS level timeout. static const DWORD _DwOSTimeout( const INT cmsecTimeout ); + // NOTE: all private methods use the OS level (DWORD) timeout values + const BOOL _FTryAcquire( const INT cSpin ); const BOOL _FAcquire( const DWORD dwTimeout ); void _Release( const INT cToRelease ); - - // Waits until the semaphore counter value changes. - // This method has the same semantics as the WaitOnAddress() function and is - // guaranteed to return when the address is signaled, but it is also allowed - // to return for other reasons. The caller should compare the new value with - // the original. const BOOL _FWait( const INT cAvail, const DWORD dwTimeout ); const BOOL _FOSWait( const INT cAvail, const DWORD dwTimeout ); }; @@ -2157,11 +2169,15 @@ inline const BOOL CSemaphore::FTryAcquire() { if ( _FTryAcquire( 0 ) ) { + // we successfully acquired the semaphore + State().SetAcquire(); return fTrue; } else { + // we did not acquire the semaphore, this is a contention + State().SetContend(); return fFalse; } @@ -2177,11 +2193,15 @@ inline const BOOL CSemaphore::FAcquire( const INT cmsecTimeout ) if ( _FTryAcquire( g_cSpinMax ) || _FAcquire( _DwOSTimeout( cmsecTimeout ) ) ) { + // we successfully acquired the semaphore + State().SetAcquire(); return fTrue; } else { + // we did not acquire the semaphore, this is a contention + State().SetContend(); return fFalse; } @@ -2193,19 +2213,29 @@ inline const BOOL CSemaphore::FAcquire( const INT cmsecTimeout ) inline const BOOL CSemaphore::FWait( const INT cmsecTimeout ) { + // first try to quickly check for an available count + if ( State().CAvail() > 0 ) { return fTrue; } + // if that doesn't work, try to grab an available count without spinning. + // if that also doesn't work, attempt acquiring using the full state machine + if ( _FTryAcquire( 0 ) || _FAcquire( _DwOSTimeout( cmsecTimeout ) ) ) { + // we successfully acquired the semaphore, release it + _Release( 1 ); + State().SetAcquire(); return fTrue; } else { + // we did not acquire the semaphore, this is a contention + State().SetContend(); return fFalse; } @@ -2223,17 +2253,24 @@ inline void CSemaphore::Release( const INT cToRelease ) inline const INT CSemaphore::CWait() const { + // try forever until we get a non-transitional state + OSSYNC_FOREVER { + // read the current state of the semaphore + const CSemaphoreState stateCur = (CSemaphoreState&) State(); if ( stateCur.CAvail() > 0 && stateCur.CWait() > 0 ) { - // The existing waiters are in transition. + // the existing waiters are in transition, retry + continue; } else { + // return the waiter count + return stateCur.CWait(); } } @@ -2246,31 +2283,51 @@ inline const INT CSemaphore::CAvail() const return State().CAvail(); } +// try to acquire one count of the semaphore, entering a loop which iterates +// up to cSpin times. returns fFalse if a count could not be acquired inline const BOOL CSemaphore::_FTryAcquire( const INT cSpin ) { INT cSpinRemaining = cSpin; + // try forever to acquire the semaphore + OSSYNC_FOREVER { + // read the current state of the semaphore + const CSemaphoreState stateCur = (CSemaphoreState&) State(); - // Do not acquire the semaphore with waiting threads to avoid inadvertently - // stealing it from those waiting threads themselves. + // see if we have an available count + // + // NOTE: we do not acquire the semaphore with waiting threads to + // avoid stealing it from those waiting threads themselves + if ( stateCur.CAvail() == 0 || stateCur.CWait() > 0 ) { if ( cSpinRemaining ) { + // we do not have an available count, but can keep spinning + cSpinRemaining--; + continue; } else { + // we do not have an available count and have reached the + // spin limit, return failure + return fFalse; } } + + // we have an available count, attempt the transaction + else if ( State().FDecAvail() ) { + // the transaction succeeded, return success + return fTrue; } } diff --git a/dev/ese/src/sync/sync.cxx b/dev/ese/src/sync/sync.cxx index c133af03..1d5af262 100644 --- a/dev/ese/src/sync/sync.cxx +++ b/dev/ese/src/sync/sync.cxx @@ -3854,6 +3854,7 @@ CSemaphore::~CSemaphore() #endif // SYNC_ANALYZE_PERFORMANCE } +// converts the internal timeout value to an OS level timeout const DWORD CSemaphore::_DwOSTimeout( const INT cmsecTimeout ) { @@ -3872,45 +3873,73 @@ const DWORD CSemaphore::_DwOSTimeout( const INT cmsecTimeout ) } // attempts to acquire a count from the semaphore, returning fFalse if unsuccessful -// in the time permitted. Infinite and Test-Only timeouts are supported. +// in the time permitted const BOOL CSemaphore::_FAcquire( const DWORD dwTimeout ) { const DWORD dwStart = DwOSSyncITickTime(); DWORD dwRemaining = dwTimeout; + // try forever until we successfully change the state of the semaphore + State().IncWait(); OSSYNC_FOREVER { + // read the current state of the semaphore + const CSemaphoreState stateCur = (CSemaphoreState&) State(); + // see if we have an available count + if ( stateCur.CAvail() > 0 ) { + // we ourselves are waiting on the semaphore, so there had better be at least + // one waiter in the state + OSSYNCAssert( stateCur.CWait() > 0 ); - // Atomically acquire the semaphore and decrement the waiting counter. + // try to atomically acquire the semaphore and decrement the number of waiters + const CSemaphoreState stateNew( stateCur.CAvail() - 1, stateCur.CWait() - 1 ); + if ( State().FChange( stateCur, stateNew ) ) { + // the transaction succeeded, return success + return fTrue; } } else if ( dwRemaining == 0 ) { + // we were unable to acquire the semaphore in the time permitted + break; } else { + // check and update the remaining time + const DWORD dwElapsed = DwOSSyncITickTime() - dwStart; + if ( dwElapsed > dwTimeout ) { + // we were unable to acquire the semaphore in the time permitted + break; } dwRemaining = dwTimeout - dwElapsed; + // wait for the semaphore counter value to change + // + // NOTE: we may have exited the wait due to a spurious OS level wake up, + // so the state machine ensures that the available count is re-checked on + // each iteration + if ( !_FWait( stateCur.CAvail(), dwRemaining ) ) { + // we were unable to acquire the semaphore in the time permitted + break; } } @@ -3924,24 +3953,37 @@ const BOOL CSemaphore::_FAcquire( const DWORD dwTimeout ) void CSemaphore::ReleaseAllWaiters() { + // try forever until we successfully change the state of the semaphore + OSSYNC_FOREVER { + // read the current state of the semaphore + const CSemaphoreState stateCur = (CSemaphoreState&) State(); if ( stateCur.CAvail() > 0 && stateCur.CWait() > 0 ) { - // The existing waiters are in transition. + // the existing waiters are in transition, retry + continue; } else { + // attempt to change the semaphore to have an available count equal + // to the number of waiters + const CSemaphoreState stateNew( stateCur.CWait(), stateCur.CWait() ); + if ( State().FChange(stateCur, stateNew ) ) { volatile void *pv = State().PAvail(); + // wake all waiting threads + WakeByAddressAll( (void*)pv ); + // we're done + return; } } @@ -3958,24 +4000,34 @@ void CSemaphore::_Release( const INT cToRelease ) return; } + // release the required number of counts + State().IncAvail( cToRelease ); + // check to see if we have any waiting threads to wake + const INT cWait = State().CWait(); + if ( cWait == 0 ) { - // No one is waiting. + // no one is waiting } else if ( cWait <= cToRelease ) { volatile void *pv = State().PAvail(); - // No more waiting threads than `cToRelease`, wake everyone. + + // no more waiting threads than cToRelease, wake everyone + WakeByAddressAll( (void*)pv ); } else { volatile void *pv = State().PAvail(); - // Wake at most `cToRelease` threads. The benefit from not waking unnecessary - // threads is expected to be greater than the loss on extra calls below. + + // wake at most cToRelease threads, as the benefit from not waking + // unnecessary threads is expected to be greater than the loss on + // extra calls below + for ( INT i = 0; i < cToRelease; i++ ) { WakeByAddressSingle( (void*)pv ); @@ -3983,12 +4035,18 @@ void CSemaphore::_Release( const INT cToRelease ) } } +// waits until the semaphore counter value changes. this method has the same +// semantics as the WaitOnAddress() function and is guaranteed to return when +// the address is signaled, but it is also allowed to return for other reasons. +// the caller should compare the new value with the original const BOOL CSemaphore::_FWait( const INT cAvail, const DWORD dwTimeout ) { PERFOpt( AtomicIncrement( (LONG*)&g_cOSSYNCThreadBlock ) ); State().StartWait(); + // wait for semaphore + BOOL fSuccess; #ifdef SYNC_DEADLOCK_DETECTION @@ -3999,7 +4057,7 @@ const BOOL CSemaphore::_FWait( const INT cAvail, const DWORD dwTimeout ) { #ifdef DEBUG SYNCDeadLockTimeOutState sdltosStatePre = sdltosEnabled; - OSSYNC_FOREVER + OSSYNC_FOREVER // spin until we get a non- check-in-progress state ... { C_ASSERT( sizeof(g_sdltosState) == sizeof(LONG) ); sdltosStatePre = (SYNCDeadLockTimeOutState)AtomicCompareExchange( (LONG*)&g_sdltosState, sdltosEnabled, sdltosCheckInProgress ); @@ -4013,13 +4071,18 @@ const BOOL CSemaphore::_FWait( const INT cAvail, const DWORD dwTimeout ) SYNCDeadLockTimeOutState sdltosStatePre = sdltosEnabled; #endif - OSSYNCAssertSzRTL( fSuccess || sdltosStatePre == sdltosDisabled, "Potential Deadlock Detected (Timeout); FYI ed [dll]!OSSYNC::g_sdltosState to 0 to disable." ); + OSSYNCAssertSzRTL( fSuccess /* superflous */ || sdltosStatePre == sdltosDisabled, "Potential Deadlock Detected (Timeout); FYI ed [dll]!OSSYNC::g_sdltosState to 0 to disable." ); #ifdef DEBUG if ( sdltosStatePre != sdltosDisabled ) { - OSSYNCAssert( sdltosStatePre != sdltosCheckInProgress ); + // needs re-enabling (if SOMEONE/debugger didn't play with state) + OSSYNCAssert( sdltosStatePre != sdltosCheckInProgress ); // that'd be wrong on convergence loop above. + // while it seems really simple this should alwyas be reset, this is designed to allow the user to kill + // timeout detection dynamically in the debugger by setting g_sdltosState = 0 (i.e. sdltosDisabled) via + // debugger, thus g_sdltosState won't == sdltosCheckInProgress and we won't reset it to sdltosEnabled / + // i.e. disabling deadlock detection for all subsequent hits. const SYNCDeadLockTimeOutState sdltosCheck = (SYNCDeadLockTimeOutState)AtomicCompareExchange( (LONG*)&g_sdltosState, sdltosCheckInProgress, sdltosEnabled ); OSSYNCAssertSzRTL( sdltosCheck != sdltosDisabled, "Devs, the debugger used to set g_sdltosState to 0 and disables further timeout detection asserts!? Just an FYI. If you did not, then code is confused." ); @@ -4037,7 +4100,7 @@ const BOOL CSemaphore::_FWait( const INT cAvail, const DWORD dwTimeout ) } } else -#endif +#endif // SYNC_DEADLOCK_DETECTION { fSuccess = _FOSWait( cAvail, dwTimeout ); } @@ -4048,6 +4111,10 @@ const BOOL CSemaphore::_FWait( const INT cAvail, const DWORD dwTimeout ) return fSuccess; } +// performs an OS level wait until the available count of the semaphore changes. +// this method is guaranteed to return when the corresponding address is signaled, +// but it is also allowed to return for other reasons. the caller should compare +// the new value with the original const BOOL CSemaphore::_FOSWait( const INT cAvail, const DWORD dwTimeout ) { From 041a98123936a59071fd088f7bc5621783018ed6 Mon Sep 17 00:00:00 2001 From: Evgeny Kotkov Date: Sun, 6 Jun 2021 02:28:23 +0300 Subject: [PATCH 15/20] Add a comment to CSemaphoreFairnessTest's declaration --- test/ese/src/devlibtest/sync/syncunit/semaphoreperf.cxx | 1 + 1 file changed, 1 insertion(+) diff --git a/test/ese/src/devlibtest/sync/syncunit/semaphoreperf.cxx b/test/ese/src/devlibtest/sync/syncunit/semaphoreperf.cxx index 374c2525..94f8a9a5 100644 --- a/test/ese/src/devlibtest/sync/syncunit/semaphoreperf.cxx +++ b/test/ese/src/devlibtest/sync/syncunit/semaphoreperf.cxx @@ -751,6 +751,7 @@ ERR CSemaphorePerfTest::ErrTest() return JET_errSuccess; } +// Test fixture. class CSemaphoreFairnessTest : public UNITTEST { From 0b755685b5d6702c1a24b8a4b4b86f41d3ed7124 Mon Sep 17 00:00:00 2001 From: Evgeny Kotkov Date: Sun, 6 Jun 2021 02:36:48 +0300 Subject: [PATCH 16/20] Fix an issue in CSemaphore::ReleaseAllWaiters() Properly handle a case where this method is called in a state where no one is waiting on the semaphore. This should be a no-op operation. Add a regression test for this behavior, SyncSemaphoreNoopReleaseAllWaiters(). --- dev/ese/src/sync/sync.cxx | 6 +++++ .../devlibtest/sync/syncunit/semaphore.cxx | 26 +++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/dev/ese/src/sync/sync.cxx b/dev/ese/src/sync/sync.cxx index 1d5af262..35a5193b 100644 --- a/dev/ese/src/sync/sync.cxx +++ b/dev/ese/src/sync/sync.cxx @@ -3967,6 +3967,12 @@ void CSemaphore::ReleaseAllWaiters() continue; } + else if ( stateCur.CWait() <= 0 ) + { + // there are no waiters, we're done + + return; + } else { // attempt to change the semaphore to have an available count equal diff --git a/test/ese/src/devlibtest/sync/syncunit/semaphore.cxx b/test/ese/src/devlibtest/sync/syncunit/semaphore.cxx index 56285705..09a416f3 100644 --- a/test/ese/src/devlibtest/sync/syncunit/semaphore.cxx +++ b/test/ese/src/devlibtest/sync/syncunit/semaphore.cxx @@ -675,3 +675,29 @@ ERR SyncSemaphorePerformsBasicUncontendedSemaphoreReleaseAndAcquireWithSemaphore SyncBasicTestTerm; return err; } + +CUnitTest( SyncSemaphoreNoopReleaseAllWaiters, 0x0, "" ); +ERR SyncSemaphoreNoopReleaseAllWaiters::ErrTest() +{ + SyncBasicTestInit; + + CSemaphore* psemaphore = new CSemaphore( CSyncBasicInfo( "CSemaphore test." ) ); + + TestCheck( psemaphore->CWait() == 0 ); + TestCheck( psemaphore->CAvail() == 0 ); + + psemaphore->Release(); + + TestCheck( psemaphore->CWait() == 0 ); + TestCheck( psemaphore->CAvail() == 1 ); + + psemaphore->ReleaseAllWaiters(); + + TestCheck( psemaphore->CWait() == 0 ); + TestCheck( psemaphore->CAvail() == 1 ); + +HandleError: + delete psemaphore; + SyncBasicTestTerm; + return err; +} From 161b0bcbed4fd9751e0c4576dcc5c88edfb673ec Mon Sep 17 00:00:00 2001 From: Evgeny Kotkov Date: Wed, 7 Jul 2021 22:57:48 +0300 Subject: [PATCH 17/20] Ensure that we wait on values of the same type and size Adjust the CSemaphoreState so that it would return the typed pointer for the semaphore count value. In _FOSWait(), accept the value of the same type. Assert same sizes of the values in `Address` and `CompareAddress` when performing a WaitOnAddress(). --- dev/ese/published/inc/sync.hxx | 6 +++--- dev/ese/src/sync/sync.cxx | 24 +++++++++++++----------- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/dev/ese/published/inc/sync.hxx b/dev/ese/published/inc/sync.hxx index 10dd9158..187ece35 100644 --- a/dev/ese/published/inc/sync.hxx +++ b/dev/ese/published/inc/sync.hxx @@ -1999,7 +1999,7 @@ class CSemaphoreState const INT CAvail() const { return (INT)m_cAvail; } const INT CWait() const { return (INT)m_cWait; } - volatile void * PAvail() { return &m_cAvail; } + volatile DWORD * GetAvailAddress() { return &m_cAvail; } // debugging support @@ -2137,8 +2137,8 @@ class CSemaphore const BOOL _FTryAcquire( const INT cSpin ); const BOOL _FAcquire( const DWORD dwTimeout ); void _Release( const INT cToRelease ); - const BOOL _FWait( const INT cAvail, const DWORD dwTimeout ); - const BOOL _FOSWait( const INT cAvail, const DWORD dwTimeout ); + const BOOL _FWait( const DWORD cAvail, const DWORD dwTimeout ); + const BOOL _FOSWait( const DWORD cAvail, const DWORD dwTimeout ); }; // acquire one count of the semaphore, waiting forever if necessary diff --git a/dev/ese/src/sync/sync.cxx b/dev/ese/src/sync/sync.cxx index 35a5193b..c0ab32c9 100644 --- a/dev/ese/src/sync/sync.cxx +++ b/dev/ese/src/sync/sync.cxx @@ -3936,7 +3936,7 @@ const BOOL CSemaphore::_FAcquire( const DWORD dwTimeout ) // so the state machine ensures that the available count is re-checked on // each iteration - if ( !_FWait( stateCur.CAvail(), dwRemaining ) ) + if ( !_FWait( (DWORD)stateCur.CAvail(), dwRemaining ) ) { // we were unable to acquire the semaphore in the time permitted @@ -3982,11 +3982,11 @@ void CSemaphore::ReleaseAllWaiters() if ( State().FChange(stateCur, stateNew ) ) { - volatile void *pv = State().PAvail(); + volatile DWORD *pdwAvail = State().GetAvailAddress(); // wake all waiting threads - WakeByAddressAll( (void*)pv ); + WakeByAddressAll( (void*)pdwAvail ); // we're done @@ -4020,15 +4020,15 @@ void CSemaphore::_Release( const INT cToRelease ) } else if ( cWait <= cToRelease ) { - volatile void *pv = State().PAvail(); + volatile DWORD *pdwAvail = State().GetAvailAddress(); // no more waiting threads than cToRelease, wake everyone - WakeByAddressAll( (void*)pv ); + WakeByAddressAll( (void*)pdwAvail ); } else { - volatile void *pv = State().PAvail(); + volatile DWORD *pdwAvail = State().GetAvailAddress(); // wake at most cToRelease threads, as the benefit from not waking // unnecessary threads is expected to be greater than the loss on @@ -4036,7 +4036,7 @@ void CSemaphore::_Release( const INT cToRelease ) for ( INT i = 0; i < cToRelease; i++ ) { - WakeByAddressSingle( (void*)pv ); + WakeByAddressSingle( (void*)pdwAvail ); } } } @@ -4046,7 +4046,7 @@ void CSemaphore::_Release( const INT cToRelease ) // the address is signaled, but it is also allowed to return for other reasons. // the caller should compare the new value with the original -const BOOL CSemaphore::_FWait( const INT cAvail, const DWORD dwTimeout ) +const BOOL CSemaphore::_FWait( const DWORD cAvail, const DWORD dwTimeout ) { PERFOpt( AtomicIncrement( (LONG*)&g_cOSSYNCThreadBlock ) ); State().StartWait(); @@ -4122,12 +4122,14 @@ const BOOL CSemaphore::_FWait( const INT cAvail, const DWORD dwTimeout ) // but it is also allowed to return for other reasons. the caller should compare // the new value with the original -const BOOL CSemaphore::_FOSWait( const INT cAvail, const DWORD dwTimeout ) +const BOOL CSemaphore::_FOSWait( const DWORD cAvail, const DWORD dwTimeout ) { - volatile void *pv = State().PAvail(); + volatile DWORD *pdwAvail = State().GetAvailAddress(); + + static_assert( sizeof(*pdwAvail) == sizeof(cAvail), "Should be of the same size." ); OnThreadWaitBegin(); - BOOL fSuccess = WaitOnAddress( pv, (PVOID)&cAvail, sizeof(cAvail), dwTimeout ); + BOOL fSuccess = WaitOnAddress( pdwAvail, (PVOID)&cAvail, sizeof(cAvail), dwTimeout ); OnThreadWaitEnd(); return fSuccess; From 4e4ff2b3cdc340047f27cba8fbdabd7a8bae208b Mon Sep 17 00:00:00 2001 From: Evgeny Kotkov Date: Wed, 7 Jul 2021 22:58:42 +0300 Subject: [PATCH 18/20] Fix formatting in CSemaphore::ReleaseAllWaiters() --- dev/ese/src/sync/sync.cxx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/ese/src/sync/sync.cxx b/dev/ese/src/sync/sync.cxx index c0ab32c9..c73f4d6c 100644 --- a/dev/ese/src/sync/sync.cxx +++ b/dev/ese/src/sync/sync.cxx @@ -3980,7 +3980,7 @@ void CSemaphore::ReleaseAllWaiters() const CSemaphoreState stateNew( stateCur.CWait(), stateCur.CWait() ); - if ( State().FChange(stateCur, stateNew ) ) + if ( State().FChange( stateCur, stateNew ) ) { volatile DWORD *pdwAvail = State().GetAvailAddress(); From 7979bcc4ccf6d49461d67a0e5f1168cc53f5baaa Mon Sep 17 00:00:00 2001 From: Evgeny Kotkov Date: Wed, 7 Jul 2021 23:00:49 +0300 Subject: [PATCH 19/20] Minor code cleanup in CSemaphore::_FOSWait() Cast to `void *` instead of `PVOID`, to be consistent with other similar casts around. --- dev/ese/src/sync/sync.cxx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/ese/src/sync/sync.cxx b/dev/ese/src/sync/sync.cxx index c73f4d6c..6b72a89a 100644 --- a/dev/ese/src/sync/sync.cxx +++ b/dev/ese/src/sync/sync.cxx @@ -4129,7 +4129,7 @@ const BOOL CSemaphore::_FOSWait( const DWORD cAvail, const DWORD dwTimeout ) static_assert( sizeof(*pdwAvail) == sizeof(cAvail), "Should be of the same size." ); OnThreadWaitBegin(); - BOOL fSuccess = WaitOnAddress( pdwAvail, (PVOID)&cAvail, sizeof(cAvail), dwTimeout ); + BOOL fSuccess = WaitOnAddress( pdwAvail, (void*)&cAvail, sizeof(cAvail), dwTimeout ); OnThreadWaitEnd(); return fSuccess; From c95ce4734fbe95da50afd56e957f391b8466a343 Mon Sep 17 00:00:00 2001 From: Evgeny Kotkov Date: Thu, 8 Jul 2021 12:42:06 +0300 Subject: [PATCH 20/20] Use LONG instead of DWORD for semaphore and wait counts As the semaphore count returned by CSemaphoreState::CAvail() and m_cAvail have originally been signed, let's not unnecessarily switch to an unsigned type. Doing so also allows us to remove some type casts. --- dev/ese/published/inc/sync.hxx | 14 +++++++------- dev/ese/src/sync/sync.cxx | 24 ++++++++++++------------ 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/dev/ese/published/inc/sync.hxx b/dev/ese/published/inc/sync.hxx index 187ece35..95495a1e 100644 --- a/dev/ese/published/inc/sync.hxx +++ b/dev/ese/published/inc/sync.hxx @@ -1997,9 +1997,9 @@ class CSemaphoreState // accessors - const INT CAvail() const { return (INT)m_cAvail; } - const INT CWait() const { return (INT)m_cWait; } - volatile DWORD * GetAvailAddress() { return &m_cAvail; } + const INT CAvail() const { return m_cAvail; } + const INT CWait() const { return m_cWait; } + volatile LONG * GetAvailAddress() { return &m_cAvail; } // debugging support @@ -2017,8 +2017,8 @@ class CSemaphoreState struct { - volatile DWORD m_cAvail; - volatile DWORD m_cWait; + volatile LONG m_cAvail; + volatile LONG m_cWait; }; }; }; @@ -2137,8 +2137,8 @@ class CSemaphore const BOOL _FTryAcquire( const INT cSpin ); const BOOL _FAcquire( const DWORD dwTimeout ); void _Release( const INT cToRelease ); - const BOOL _FWait( const DWORD cAvail, const DWORD dwTimeout ); - const BOOL _FOSWait( const DWORD cAvail, const DWORD dwTimeout ); + const BOOL _FWait( const LONG cAvail, const DWORD dwTimeout ); + const BOOL _FOSWait( const LONG cAvail, const DWORD dwTimeout ); }; // acquire one count of the semaphore, waiting forever if necessary diff --git a/dev/ese/src/sync/sync.cxx b/dev/ese/src/sync/sync.cxx index 6b72a89a..1e5cfd82 100644 --- a/dev/ese/src/sync/sync.cxx +++ b/dev/ese/src/sync/sync.cxx @@ -3936,7 +3936,7 @@ const BOOL CSemaphore::_FAcquire( const DWORD dwTimeout ) // so the state machine ensures that the available count is re-checked on // each iteration - if ( !_FWait( (DWORD)stateCur.CAvail(), dwRemaining ) ) + if ( !_FWait( stateCur.CAvail(), dwRemaining ) ) { // we were unable to acquire the semaphore in the time permitted @@ -3982,11 +3982,11 @@ void CSemaphore::ReleaseAllWaiters() if ( State().FChange( stateCur, stateNew ) ) { - volatile DWORD *pdwAvail = State().GetAvailAddress(); + volatile LONG *pcAvail = State().GetAvailAddress(); // wake all waiting threads - WakeByAddressAll( (void*)pdwAvail ); + WakeByAddressAll( (void*)pcAvail ); // we're done @@ -4020,15 +4020,15 @@ void CSemaphore::_Release( const INT cToRelease ) } else if ( cWait <= cToRelease ) { - volatile DWORD *pdwAvail = State().GetAvailAddress(); + volatile LONG *pcAvail = State().GetAvailAddress(); // no more waiting threads than cToRelease, wake everyone - WakeByAddressAll( (void*)pdwAvail ); + WakeByAddressAll( (void*)pcAvail ); } else { - volatile DWORD *pdwAvail = State().GetAvailAddress(); + volatile LONG *pcAvail = State().GetAvailAddress(); // wake at most cToRelease threads, as the benefit from not waking // unnecessary threads is expected to be greater than the loss on @@ -4036,7 +4036,7 @@ void CSemaphore::_Release( const INT cToRelease ) for ( INT i = 0; i < cToRelease; i++ ) { - WakeByAddressSingle( (void*)pdwAvail ); + WakeByAddressSingle( (void*)pcAvail ); } } } @@ -4046,7 +4046,7 @@ void CSemaphore::_Release( const INT cToRelease ) // the address is signaled, but it is also allowed to return for other reasons. // the caller should compare the new value with the original -const BOOL CSemaphore::_FWait( const DWORD cAvail, const DWORD dwTimeout ) +const BOOL CSemaphore::_FWait( const LONG cAvail, const DWORD dwTimeout ) { PERFOpt( AtomicIncrement( (LONG*)&g_cOSSYNCThreadBlock ) ); State().StartWait(); @@ -4122,14 +4122,14 @@ const BOOL CSemaphore::_FWait( const DWORD cAvail, const DWORD dwTimeout ) // but it is also allowed to return for other reasons. the caller should compare // the new value with the original -const BOOL CSemaphore::_FOSWait( const DWORD cAvail, const DWORD dwTimeout ) +const BOOL CSemaphore::_FOSWait( const LONG cAvail, const DWORD dwTimeout ) { - volatile DWORD *pdwAvail = State().GetAvailAddress(); + volatile LONG *pcAvail = State().GetAvailAddress(); - static_assert( sizeof(*pdwAvail) == sizeof(cAvail), "Should be of the same size." ); + static_assert( sizeof(*pcAvail) == sizeof(cAvail), "Should be of the same size." ); OnThreadWaitBegin(); - BOOL fSuccess = WaitOnAddress( pdwAvail, (void*)&cAvail, sizeof(cAvail), dwTimeout ); + BOOL fSuccess = WaitOnAddress( pcAvail, (void*)&cAvail, sizeof(cAvail), dwTimeout ); OnThreadWaitEnd(); return fSuccess;