You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
voidRequestHandler::processRequest(const Request& request) {
requestQueue_.withWLock([&](auto& queue "&") {
// withWLock() automatically holds the lock for the// duration of this lambda function
queue.push_back(request);
});
}
// No. NO. NO!for (int& n : *vec.wlock()) {
n *= 2;
}
vec.wlock()返回的 LockPtr 对象在 range iterators 建立后就销毁了(详细解释见 Range-based for loop Temporary range expression 小节),range iterators 指向了 vector data,但此时锁已经被释放。想想如果要 debug 这种问题,会用多少时间 :)
这时 withLock()/withRLock()/withWLock()的好处就体现出来了,锁会在 for loop 期间一直持有:
vec.withLock([](auto& data "") {
for (int& n : data) {
n *= 2;
}
});
withLock 定义为(withRLock/withWLock 类似):
/** * Invoke a function while holding the lock. * * A reference to the datum will be passed into the function as its only * argument. * * This can be used with a lambda argument for easily defining small critical * sections in the code. For example: * * auto value = obj.withLock([](auto& data "") { * data.doStuff(); * return data.getValue(); * });*/template <classFunction>
autowithLock(Function&& function) {
returnfunction(*lock());
}
template <classFunction>
autowithLock(Function&& function) const {
returnfunction(*lock());
}
升级锁
ulock()和 withULockPtr()
Synchronized 还支持升级锁。升级锁与共享锁可以共存,但是与排它锁互斥。
/** * An enum to describe the "level" of a mutex. The supported levels are * Unique - a normal mutex that supports only exclusive locking * Shared - a shared mutex which has shared locking and unlocking functions; * Upgrade - a mutex that has all the methods of the two above along with * support for upgradable locking*/enumclassMutexLevel { UNIQUE, SHARED, UPGRADE };
升级锁解决的问题是:先对数据进行读操作,然后根据一定的条件会进行写操作。
升级锁可以通过 uclock()或者 withULockPtr()获得:
{
// only const access allowed to the underlying object when an upgrade lock// is acquiredauto ulock = vec.ulock();
auto newSize = ulock->size();
}
auto newSize = vec.withULockPtr([](auto ulock "") {
// only const access allowed to the underlying object when an upgrade lock// is acquiredreturn ulock->size();
});
Synchronized<vector<string>, std::mutex> vec;
std::condition_variable emptySignal;
// Assuming some other thread will put data on vec and signal// emptySignal, we can then wait on it as follows:auto locked = vec.lock();
emptySignal.wait(locked.getUniqueLock(),
[&] { return !locked->empty(); });
/** * Get a reference to the std::unique_lock. * * This is provided so that callers can use Synchronized<T, std::mutex> * with a std::condition_variable. * * While this API could be used to bypass the normal Synchronized APIs and * manually interact with the underlying unique_lock, this is strongly * discouraged.*/
std::unique_lock<std::mutex>& getUniqueLock() { return lock_; }
acquireLocked() —— 同时锁多个数据
假如需要将一个 vector 的数据拷贝到另一个 vector,wlock()可能会实现需求:
voidfun(Synchronized<vector<int>>& a, Synchronized<vector<int>>& b) {
auto lockedA = a.wlock();
auto lockedB = b.wlock();
... use lockedA and lockedB ...
}
voidfun(Synchronized<vector<int>>& a, Synchronized<vector<int>>& b) {
auto ret = folly::acquireLocked(a, b);
auto& lockedA = std::get<0>(ret);
auto& lockedB = std::get<1>(ret);
... use lockedA and lockedB ...
}
// 实现:通过比较锁地址的大小/** * Acquire locks for multiple Synchronized<T> objects, in a deadlock-safe * manner. * * The locks are acquired in order from lowest address to highest address. * (Note that this is not necessarily the same algorithm used by std::lock().) * For parameters that are const and support shared locks, a read lock is * acquired. Otherwise an exclusive lock is acquired. * * use lock() with folly::wlock(), folly::rlock() and folly::ulock() for * arbitrary locking without causing a deadlock (as much as possible), with the * same effects as std::lock()*/template <classSync1, classSync2>
std::tuple<detail::LockedPtrType<Sync1>, detail::LockedPtrType<Sync2>>
acquireLocked(Sync1& l1, Sync2& l2) {
if (static_cast<constvoid*>(&l1) < static_cast<constvoid*>(&l2)) {
auto p1 = l1.contextualLock();
auto p2 = l2.contextualLock();
returnstd::make_tuple(std::move(p1), std::move(p2));
} else {
auto p2 = l2.contextualLock();
auto p1 = l1.contextualLock();
returnstd::make_tuple(std::move(p1), std::move(p2));
}
}
C++17 引入了 structured binding syntax,可以使代码更简单:
voidfun(Synchronized<vector<int>>& a, Synchronized<vector<int>>& b) {
auto [lockedA, lockedB] = folly::acquireLocked(a, b);
... use lockedA and lockedB ...
}
目录
folly/Synchronized.h 提供了一种更简单、更不容易出错的同步机制,可以用来替代传统 C++标准库中使用较复杂、较容易出错的同步机制。
传统同步方案的缺点
一般是将需要同步的数据和锁一一配对,即 —— associate mutexes with data, not code :
然而,操作这些数据成员,开发人员必须注意,正确的获取锁、获取正确的锁。
一些常见的错误包括:
一般在使用时,需要提醒开发人员:“别忘了 xxxx”,那一般都会出错,比如 new 的对象别忘了 delete : )
folly/Synchronized.h 简单使用
上面的代码可以用 folly/Synchronized.h 重写为:
为什么 folly/Synchronized.h 更加有效呢?
如果在临界区有多个操作,那么可以使用如下方法:
wlock 返回一个 LockedPtr 对象,这个对象可以被理解为指向数据成员的指针。只有这个对象存在,那么锁就会被锁住,所以最好为这个对象显示定义一个 scope.
更好的方式,是使用 lambdas :
使用 withWLock 配合 lambdas 强制定义了一个 scope,更清晰。
Synchronized的模板参数
Synchronized 有两个模板参数,数据类型和锁类型:
如果不指定第二个模板参数,默认是 folly::SharedMutex。只要被 folly::LockTraits 支持的都可以使用,比如 std::mutex、std::recursive_mutex、std::timed_mutex,。std::recursive_timed_mutex、folly::SharedMutex、folly::RWSpinLock、folly::SpinLock.
根据锁类型的不同,Synchronized 会提供不同的 API:
withLock()/withRLock()/withWLock() —— 更易用的加锁方式
withLock()在上面提到过了,可以用来替代 lock()。在持有锁的期间,执行一个 lambda 或者 function. withRLock()/withWLock()同理可以替代 rlock()/wlock().
我们再详细说一下这种方式的好处。下面的函数将 vector 里的所有元素都 double:
使用 lock()/wlock()/rlock()的一个重要注意事项:一个指向数据的指针或者引用,它的生命周期一定不要比 LockedPtr 对象长(lock()/wlock()/rlock()的返回值类型)。 如果我们将上面的例子这样写就会出问题:
vec.wlock()返回的 LockPtr 对象在 range iterators 建立后就销毁了(详细解释见 Range-based for loop Temporary range expression 小节),range iterators 指向了 vector data,但此时锁已经被释放。想想如果要 debug 这种问题,会用多少时间 :)
这时 withLock()/withRLock()/withWLock()的好处就体现出来了,锁会在 for loop 期间一直持有:
withLock 定义为(withRLock/withWLock 类似):
升级锁
ulock()和 withULockPtr()
Synchronized 还支持升级锁。升级锁与共享锁可以共存,但是与排它锁互斥。
升级锁解决的问题是:先对数据进行读操作,然后根据一定的条件会进行写操作。
升级锁可以通过 uclock()或者 withULockPtr()获得:
通过下面的函数可以进行升级或者降级:
调用这些函数的 LockedPtr 会被设置为 invalid null state,并返回另一个锁住特定锁的 LockedPtr。这些操作都是原子性的,中间不会出现 unlocked 状态。
比如现在有一个 cache,数据结构为 unordered_map,需求是先检查对应的 key 是否在 unordered_map 中,如果在则返回对应的 value,不在则初始化 value 为 0:
Timed Locking
如果初始化 Synchronized 的锁类型支持时间,lock()/wlock()/rlock()可以传入一个类型为 std::chrono::duration 的参数:
Synchronized 与 std::condition_variable
如果 Synchronized 的锁类型是 std::mutex,那么可以和 std::condition_variable 配合使用。
getUniqueLock()返回一个 std::unique_lockstd::mutex的引用。但是不推荐这么使用,因为这绕过了 Synchronized 的 API,可以直接操作对应的锁:
acquireLocked() —— 同时锁多个数据
假如需要将一个 vector 的数据拷贝到另一个 vector,wlock()可能会实现需求:
但是如果一个线程调用 fun(x,y),另一个线程调用 func(y,x),就很有可能出现死锁。经典的解决方式是,所有的线程以同样的顺序获取锁。许多库的实现是通过比较锁地址的大小来决定加锁顺序:
C++17 引入了 structured binding syntax,可以使代码更简单:
acquireLockedPair()返回 std::pair,在不支持 C++17 的编译器情况下,使用也很方便。
使用一把锁,锁多个数据
比如一个 bidirectional map,需要同时操作。一般有两个方案:
Struct
std::tuple
Benchmark
SynchronizedBenchmark.cpp
下篇文章写一下 Synchronized 的基本实现 :)
参考资料:
(完)
朋友们可以关注下我的公众号,获得最及时的更新:
The text was updated successfully, but these errors were encountered: