diff --git a/gc/gc_lock.cc b/gc/gc_lock.cc index 5c7768ca..a8c9df87 100644 --- a/gc/gc_lock.cc +++ b/gc/gc_lock.cc @@ -242,7 +242,8 @@ print() const inline GcLockBase::Data:: Data() : - bits(0), bits2(0) + bits(0), bits2(0), + writeLock(0) { epoch = gcLockStartingEpoch; // makes it easier to test overflows. visibleEpoch = epoch; @@ -253,6 +254,7 @@ Data(const Data & other) { //ML::ticks(); q = other.q; + writeLock = other.writeLock; //ML::ticks(); } @@ -262,6 +264,7 @@ operator = (const Data & other) { //ML::ticks(); this->q = other.q; + this->writeLock = other.writeLock; //ML::ticks(); return *this; } @@ -481,8 +484,6 @@ void GcLockBase:: exitCS(ThreadGcInfoEntry * entry, RunDefer runDefer /* = true */) { - if (!entry) entry = &getEntry(); - if (entry->inEpoch == -1) throw ML::Exception("not in a CS"); @@ -514,8 +515,6 @@ void GcLockBase:: enterCSExclusive(ThreadGcInfoEntry * entry) { - if (!entry) entry = &getEntry(); - ExcAssertEqual(entry->inEpoch, -1); Data current = *data, newValue; @@ -629,6 +628,48 @@ exitCSExclusive(ThreadGcInfoEntry * entry) entry->inEpoch = -1; } +void GcLockBase:: +enterCSWrite(GcInfo::PerThreadInfo * info, + RunDefer runDefer) +{ + Word oldVal, newVal; + GCLOCK_SPINCHECK_DECL + + for (;;) { + GCLOCK_SPINCHECK; + + oldVal = data->writeLock; + + // Stop bit is set, meaning that it's locked. + if (oldVal & StopBitMask) { + sched_yield(); + continue; + } + + newVal = oldVal + 1; + + + // If the CAS fails, someone else in the meantime + // must have entered a CS and incremented the counter + // behind our back. We then retry + if (ML::cmp_xchg(data->writeLock, oldVal, newVal)) + break; + } + + + enterShared(info, runDefer); +} + +void GcLockBase:: +exitCSWrite(GcInfo::PerThreadInfo * info, + RunDefer runDefer) +{ + ExcAssertGreater(data->writeLock, 0); + ML::atomic_dec(data->writeLock); + + exitShared(info, runDefer); +} + void GcLockBase:: visibleBarrier() diff --git a/gc/gc_lock.h b/gc/gc_lock.h index 9b17fce4..dc848087 100644 --- a/gc/gc_lock.h +++ b/gc/gc_lock.h @@ -10,19 +10,45 @@ #define __mmap__gc_lock_h__ #define GC_LOCK_DEBUG 0 +#define GC_LOCK_SPIN_DEBUG 0 #include "jml/utils/exc_assert.h" +#include "jml/utils/abort.h" #include "jml/arch/atomic_ops.h" #include "jml/arch/thread_specific.h" #include +#include #if GC_LOCK_DEBUG # include #endif -namespace Datacratic { +#include + +#if GC_LOCK_SPIN_DEBUG + +# define GCLOCK_SPINCHECK_DECL \ + std::chrono::time_point __start, __end; \ + __start = std::chrono::system_clock::now(); + +# define GCLOCK_SPINCHECK \ + do { \ + __end = std::chrono::system_clock::now(); \ + std::chrono::duration elapsed = __end - __start; \ + if (elapsed > std::chrono::seconds(10)) { \ + ML::do_abort(); \ + throw ML::Exception("GCLOCK_SPINCHECK: spent more than 10" \ + " seconds spinning"); \ + } \ + } while (0) +#else +# define GCLOCK_SPINCHECK_DECL +# define GCLOCK_SPINCHECK ((void) 0) +#endif +namespace Datacratic { + /*****************************************************************************/ /* GC LOCK BASE */ /*****************************************************************************/ @@ -31,36 +57,174 @@ struct GcLockBase : public boost::noncopyable { public: + /** Enum for type safe specification of whether or not we run deferrals on + entry or exit to a critical sections. Thoss places that are latency + sensitive should use RD_NO. + */ + enum RunDefer { + RD_NO = 0, ///< Don't run deferred work on this call + RD_YES = 1 ///< Potentially run deferred work on this call + }; + + enum DoLock { + DONT_LOCK = 0, + DO_LOCK = 1 + }; + /// A thread's bookkeeping info about each GC area struct ThreadGcInfoEntry { ThreadGcInfoEntry() - : inEpoch(-1), readLocked(0), writeLocked(0) + : inEpoch(-1), readLocked(0), writeLocked(0), exclusiveLocked(0), + writeEntered(0), + owner(0) { } + ~ThreadGcInfoEntry() { + using namespace std; + + if (readLocked || writeLocked) + cerr << "Thread died but GcLock is still locked" << endl; + + } + + int inEpoch; // 0, 1, -1 = not in int readLocked; int writeLocked; + int exclusiveLocked; + int writeEntered; + + GcLockBase *owner; + + void init(const GcLockBase * const self) { + if (!owner) + owner = const_cast(self); + } + + void enterShared(RunDefer runDefer) { + if (!readLocked && !exclusiveLocked) + owner->enterCS(this, runDefer); + + ++readLocked; + } + + void exitShared(RunDefer runDefer) { + if (readLocked <= 0) + throw ML::Exception("Bad read lock nesting"); + + --readLocked; + if (!readLocked && !exclusiveLocked) + owner->exitCS(this, runDefer); + + } + + bool isLockedShared() { + return readLocked + exclusiveLocked; + } + + void lockExclusive() { + if (readLocked || writeLocked) + throw ML::Exception("Can not lock exclusive while holding " + "read or write lock"); + if (!exclusiveLocked) + owner->enterCSExclusive(this); + + ++exclusiveLocked; + } + + void unlockExclusive() { + if (exclusiveLocked <= 0) + throw ML::Exception("Bad exclusive lock nesting"); + + --exclusiveLocked; + if (!exclusiveLocked) + owner->exitCSExclusive(this); + } std::string print() const; + }; + typedef ML::ThreadSpecificInstanceInfo GcInfo; typedef typename GcInfo::PerThreadInfo ThreadGcInfo; - /** Enum for type safe specification of whether or not we run deferrals on - entry or exit to a critical sections. Thoss places that are latency - sensitive should use RD_NO. - */ - enum RunDefer { - RD_NO = 0, ///< Don't run deferred work on this call - RD_YES = 1 ///< Potentially run deferred work on this call - }; + typedef uint64_t Word; + static constexpr size_t Bits = sizeof(Word) * CHAR_BIT; + static constexpr Word StopBitMask = (1ULL << (Bits - 1)); + + struct Data { + Data(); + Data(const Data & other); + + Data & operator = (const Data & other); + + typedef uint64_t q2 __attribute__((__vector_size__(16))); + + volatile union { + struct { + int32_t epoch; ///< Current epoch number (could be smaller). + int16_t in[2]; ///< How many threads in each epoch + int32_t visibleEpoch;///< Lowest epoch number that's visible + int32_t exclusive; ///< Mutex value for exclusive lock + }; + struct { + uint64_t bits; + uint64_t bits2; + }; + struct { + q2 q; + }; + } JML_ALIGNED(16); + volatile Word writeLock; + + int16_t inCurrent() const { return in[epoch & 1]; } + int16_t inOld() const { return in[(epoch - 1)&1]; } + + void setIn(int32_t epoch, int val) + { + //if (epoch != this->epoch && epoch + 1 != this->epoch) + // throw ML::Exception("modifying wrong epoch"); + in[epoch & 1] = val; + } + + void addIn(int32_t epoch, int val) + { + //if (epoch != this->epoch && epoch + 1 != this->epoch) + // throw ML::Exception("modifying wrong epoch"); + in[epoch & 1] += val; + } + + /** Check that the invariants all hold. Throws an exception if not. */ + void validate() const; + + /** Calculate the appropriate value of visibleEpoch from the rest + of the fields. Returns true if waiters should be woken up. + */ + bool calcVisibleEpoch(); + + /** Human readable string. */ + std::string print() const; + + bool operator == (const Data & other) const + { + return bits == other.bits && bits2 == other.bits2; + } + + bool operator != (const Data & other) const + { + return ! operator == (other); + } + + } JML_ALIGNED(16); void enterCS(ThreadGcInfoEntry * entry = 0, RunDefer runDefer = RD_YES); void exitCS(ThreadGcInfoEntry * entry = 0, RunDefer runDefer = RD_YES); + void enterCSWrite(GcInfo::PerThreadInfo * info = 0, RunDefer runDefer = RD_YES); + void exitCSWrite(GcInfo::PerThreadInfo * info = 0, RunDefer runDefer = RD_YES); void enterCSExclusive(ThreadGcInfoEntry * entry = 0); void exitCSExclusive(ThreadGcInfoEntry * entry = 0); @@ -77,7 +241,9 @@ struct GcLockBase : public boost::noncopyable { JML_ALWAYS_INLINE ThreadGcInfoEntry & getEntry(GcInfo::PerThreadInfo * info = 0) const { - return *gcInfo.get(info); + ThreadGcInfoEntry *entry = gcInfo.get(info); + entry->init(this); + return *entry; } GcLockBase(); @@ -87,54 +253,74 @@ struct GcLockBase : public boost::noncopyable { /** Permanently deletes any resources associated with this lock. */ virtual void unlink() = 0; - void lockShared(GcInfo::PerThreadInfo * info = 0, + void enterShared(GcInfo::PerThreadInfo * info = 0, RunDefer runDefer = RD_YES) { ThreadGcInfoEntry & entry = getEntry(info); - if (!entry.readLocked && !entry.writeLocked) - enterCS(&entry, runDefer); - - ++entry.readLocked; + entry.enterShared(runDefer); #if GC_LOCK_DEBUG using namespace std; - cerr << "lockShared " + cerr << "enterShared " << this << " index " << index << ": now " << entry.print() << " data " << data->print() << endl; #endif } - void unlockShared(GcInfo::PerThreadInfo * info = 0, - RunDefer runDefer = RD_YES) + void exitShared(GcInfo::PerThreadInfo * info = 0, + RunDefer runDefer = RD_YES) { ThreadGcInfoEntry & entry = getEntry(info); - if (entry.readLocked <= 0) - throw ML::Exception("bad read lock nesting"); - --entry.readLocked; - if (!entry.readLocked && !entry.writeLocked) - exitCS(&entry, runDefer); + entry.exitShared(runDefer); #if GC_LOCK_DEBUG using namespace std; - cerr << "unlockShared " + cerr << "exitShared " << this << " index " << index << ": now " << entry.print() << " data " << data->print() << endl; #endif } + void enterWriteShared(GcInfo::PerThreadInfo * info = 0, + RunDefer runDefer = RD_YES) + { + ThreadGcInfoEntry & entry = getEntry(info); + if (!entry.writeEntered) + enterCSWrite(info, runDefer); + + ++entry.writeEntered; + } + + void exitWriteShared(GcInfo::PerThreadInfo * info = 0, + RunDefer runDefer = RD_YES) + { + ThreadGcInfoEntry & entry = getEntry(info); + ExcAssertGreater(entry.writeEntered, 0); + + --entry.writeEntered; + if (!entry.writeEntered) + exitCSWrite(info, runDefer); + } + int isLockedShared(GcInfo::PerThreadInfo * info = 0) const { ThreadGcInfoEntry & entry = getEntry(info); - return entry.readLocked + entry.writeLocked; + + return entry.isLockedShared() || isLockedWrite(); + } + + bool isLockedWrite() const { + return getEntry().writeLocked; } int lockedInEpoch(GcInfo::PerThreadInfo * info = 0) const { ThreadGcInfoEntry & entry = getEntry(info); + return entry.inEpoch; } @@ -142,14 +328,7 @@ struct GcLockBase : public boost::noncopyable { { ThreadGcInfoEntry & entry = getEntry(info); - if (entry.readLocked) - throw ML::Exception("can't acquire write lock with read lock held"); - - if (!entry.writeLocked) - enterCSExclusive(&entry); - - ++entry.writeLocked; - + entry.lockExclusive(); #if GC_LOCK_DEBUG using namespace std; cerr << "lockExclusive " @@ -163,11 +342,7 @@ struct GcLockBase : public boost::noncopyable { { ThreadGcInfoEntry & entry = getEntry(info); - if (entry.writeLocked <= 0) - throw ML::Exception("bad write lock nesting"); - --entry.writeLocked; - if (!entry.writeLocked) - exitCSExclusive(&entry); + entry.unlockExclusive(); #if GC_LOCK_DEBUG using namespace std; @@ -181,13 +356,66 @@ struct GcLockBase : public boost::noncopyable { int isLockedExclusive(GcInfo::PerThreadInfo * info = 0) const { ThreadGcInfoEntry & entry = getEntry(info); - return entry.writeLocked; + + return entry.exclusiveLocked; + } + + void lockWrite() + { + ThreadGcInfoEntry &entry = getEntry(); + if (!entry.writeLocked) { + Word oldValue, newValue; + GCLOCK_SPINCHECK_DECL + for (;;) { + GCLOCK_SPINCHECK; + + oldValue = data->writeLock; + + // Stop bit is set, meaning that it's already locked. + if (oldValue & StopBitMask) + continue; + + newValue = oldValue | StopBitMask; + if (ML::cmp_xchg(data->writeLock, oldValue, newValue)) + break; + + } + + // Stop bit must be set + ExcAssertEqual((data->writeLock & StopBitMask), StopBitMask); + + // At this point, we stoped all the upcoming writes. However, + // ongoing writes might still be executing. Issuing a writeBarrier + // will wait for all writes to finish before continuing + writeBarrier(); + + // No writes must be ongoing + ExcAssertEqual((data->writeLock & ~StopBitMask), 0); + } + ++entry.writeLocked; + } + + void writeBarrier() { + // Busy-waiting for all writes to finish + while ((data->writeLock & ~StopBitMask) > 0) { } + } + + + void unlockWrite(GcInfo::PerThreadInfo * info = 0) + { + ThreadGcInfoEntry &entry = getEntry(); + --entry.writeLocked; + if (!entry.writeLocked) { + Word oldValue = data->writeLock; + Word newValue = oldValue & ~StopBitMask; + + if (!ML::cmp_xchg(data->writeLock, oldValue, newValue)) { + throw ML::Exception("Failed to unlockWrite"); + } + + } } - enum DoLock { - DONT_LOCK = 0, - DO_LOCK = 1 - }; struct SharedGuard { SharedGuard(GcLockBase & lock, @@ -198,13 +426,13 @@ struct GcLockBase : public boost::noncopyable { doLock_(doLock) { if (doLock_) - lock.lockShared(0, runDefer_); + lock.enterShared(0, runDefer_); } ~SharedGuard() { if (doLock_) - lock.unlockShared(0, runDefer_); + lock.exitShared(0, runDefer_); } GcLockBase & lock; @@ -212,6 +440,39 @@ struct GcLockBase : public boost::noncopyable { const DoLock doLock_; ///< Do we really lock? }; + + struct WriteSharedGuard { + WriteSharedGuard(GcLockBase & lock, RunDefer runDefer = RD_YES) + : lock(lock), + runDefer(runDefer) + { + lock.enterWriteShared(0, runDefer); + } + + ~WriteSharedGuard() + { + lock.exitWriteShared(0, runDefer); + } + + GcLockBase & lock; + const RunDefer runDefer; + }; + + struct WriteLockGuard { + WriteLockGuard(GcLockBase & lock) + : lock(lock) + { + lock.lockWrite(); + } + + ~WriteLockGuard() + { + lock.unlockWrite(); + } + + GcLockBase & lock; + }; + struct ExclusiveGuard { ExclusiveGuard(GcLockBase & lock) : lock(lock) @@ -284,70 +545,6 @@ struct GcLockBase : public boost::noncopyable { void dump(); - struct Data { - Data(); - Data(const Data & other); - - Data & operator = (const Data & other); - - typedef uint64_t q2 __attribute__((__vector_size__(16))); - - volatile union { - struct { - int32_t epoch; ///< Current epoch number (could be smaller). - int16_t in[2]; ///< How many threads in each epoch - int32_t visibleEpoch;///< Lowest epoch number that's visible - int32_t exclusive; ///< Mutex value to lock exclusively - }; - struct { - uint64_t bits; - uint64_t bits2; - }; - struct { - q2 q; - }; - } JML_ALIGNED(16); - - int16_t inCurrent() const { return in[epoch & 1]; } - int16_t inOld() const { return in[(epoch - 1)&1]; } - - void setIn(int32_t epoch, int val) - { - //if (epoch != this->epoch && epoch + 1 != this->epoch) - // throw ML::Exception("modifying wrong epoch"); - in[epoch & 1] = val; - } - - void addIn(int32_t epoch, int val) - { - //if (epoch != this->epoch && epoch + 1 != this->epoch) - // throw ML::Exception("modifying wrong epoch"); - in[epoch & 1] += val; - } - - /** Check that the invariants all hold. Throws an exception if not. */ - void validate() const; - - /** Calculate the appropriate value of visibleEpoch from the rest - of the fields. Returns true if waiters should be woken up. - */ - bool calcVisibleEpoch(); - - /** Human readable string. */ - std::string print() const; - - bool operator == (const Data & other) const - { - return bits == other.bits && bits2 == other.bits2; - } - - bool operator != (const Data & other) const - { - return ! operator == (other); - } - - } JML_ALIGNED(16); - protected: Data* data; @@ -437,5 +634,6 @@ struct SharedGcLock : public GcLockBase } // namespace Datacratic + #endif /* __mmap__gc_lock_h__ */ diff --git a/gc/rcu_protected.h b/gc/rcu_protected.h index b4f379ab..698179da 100644 --- a/gc/rcu_protected.h +++ b/gc/rcu_protected.h @@ -20,7 +20,7 @@ struct RcuLocked { : ptr(ptr), lock(lock) { if (lock) - lock->lockShared(); + lock->enterShared(); } /// Transfer from another lock @@ -37,7 +37,7 @@ struct RcuLocked { : ptr(ptr), lock(other.lock) { if (lock) - lock->lockShared(); + lock->enterShared(); } template @@ -72,7 +72,7 @@ struct RcuLocked { void unlock() { if (lock) { - lock->unlockShared(); + lock->exitShared(); lock = 0; } } diff --git a/gc/testing/gc_test.cc b/gc/testing/gc_test.cc index b7afcbbe..d62d2ba8 100644 --- a/gc/testing/gc_test.cc +++ b/gc/testing/gc_test.cc @@ -21,6 +21,8 @@ #include #include #include +#include +#include #include #include @@ -41,7 +43,7 @@ extern int32_t gcLockStartingEpoch; BOOST_AUTO_TEST_CASE ( test_gc ) { GcLock gc; - gc.lockShared(); + gc.enterShared(); BOOST_CHECK(gc.isLockedShared()); @@ -55,7 +57,7 @@ BOOST_AUTO_TEST_CASE ( test_gc ) cerr << endl << "after defer" << endl; gc.dump(); - gc.unlockShared(); + gc.exitShared(); cerr << endl << "after unlock shared" << endl; gc.dump(); @@ -248,6 +250,136 @@ BOOST_AUTO_TEST_CASE(test_mutual_exclusion) } +BOOST_AUTO_TEST_CASE(test_write_exclusion) +{ + cerr << "Testing write exclusion" << endl; + + GcLock lock; + + std::atomic finished { false }; + + std::atomic errors { 0 }; + std::atomic numWrite { 0 }; + std::atomic numRead { 0 }; + std::atomic numLockWrite { 0 }; + + auto doWriteThread = [&]() { + while (!finished.load()) { + GcLock::WriteSharedGuard guard(lock); + + numWrite.fetch_add(1); + if (numWrite.load() > 0 && numLockWrite.load() > 0) { + cerr << "at least one write when locked write" << endl; + errors.fetch_add(1); + } + + numWrite.fetch_sub(1); + } + }; + + auto doLockWriteThread = [&]() { + GcLock::WriteLockGuard guard(lock); + numLockWrite.fetch_add(1); + + while (!finished.load()) { + + if (numWrite.load() > 0) { + cerr << "write after locked write" << endl; + errors.fetch_add(1); + } + + + } + + numLockWrite.fetch_sub(1); + }; + + auto doReadThread = [&]() { + while (!finished.load()) { + GcLock::SharedGuard guard(lock); + + numRead.fetch_add(1); + this_thread::sleep_for(chrono::milliseconds(10)); + } + }; + + { + size_t writeThreads = 4; + cerr << "single write lock, multi writes" << endl; + + boost::thread_group group; + for (size_t i = 0; i < writeThreads; ++i) { + group.create_thread(doWriteThread); + } + + this_thread::sleep_for(chrono::milliseconds(500)); + + group.create_thread(doLockWriteThread); + + this_thread::sleep_for(chrono::seconds(1)); + + finished.store(true); + group.join_all(); + + BOOST_CHECK_EQUAL(errors.load(), 0); + } + + { + size_t writeThreads = 16; + size_t writeLockThreads = 8; + cerr << "Multi write lock, multi writes" << endl; + + finished.store(false); + boost::thread_group group; + for (size_t i = 0; i < writeThreads; ++i) { + group.create_thread(doWriteThread); + } + + this_thread::sleep_for(chrono::milliseconds(500)); + + for (size_t i = 0; i < writeLockThreads; ++i) { + group.create_thread(doLockWriteThread); + } + + this_thread::sleep_for(chrono::seconds(1)); + + finished.store(true); + group.join_all(); + + BOOST_CHECK_EQUAL(errors.load(), 0); + } + + + { + size_t writeThreads = 4; + size_t readThreads = 8; + cerr << "mixed reads and write lock" << endl; + + finished.store(false); + boost::thread_group group; + for (size_t i = 0; i < writeThreads; ++i) { + group.create_thread(doWriteThread); + } + for (size_t i = 0; i < readThreads; ++i) { + group.create_thread(doReadThread); + } + + this_thread::sleep_for(chrono::milliseconds(200)); + group.create_thread(doLockWriteThread); + + this_thread::sleep_for(chrono::seconds(1)); + finished.store(true); + group.join_all(); + BOOST_CHECK_EQUAL(errors.load(), 0); + + const size_t minimumReads = 100 * readThreads; + cout << numRead.load() << " total reads" << endl; + BOOST_CHECK_GE(numRead.load(), minimumReads); + } + + +} + #endif #define USE_MALLOC 1 @@ -394,8 +526,8 @@ struct TestBase { void checkVisible(int threadNum, unsigned long long start) { // We're reading from someone else's pointers, so we need to lock here - gc.enterCS(); - //gc.lockShared(); + //gc.enterCS(); + gc.enterShared(); for (unsigned i = 0; i < nthreads; ++i) { for (unsigned j = 0; j < nblocks; ++j) { @@ -415,8 +547,8 @@ struct TestBase { } } - gc.exitCS(); - //gc.unlockShared(); + //gc.exitCS(); + gc.exitShared(); } void doReadThread(int threadNum)