libprocess是mesos中非常重要的一个基础库,提供一些很方便的helper函数以及并发编程所需要的基本原语,例如下面我将重点讲的future/promise。
为了更好的解释future/promise是什么,我抽取了一段mesos中的代码作为例子:
Future<Socket> PollSocketImpl::accept() { return io::poll(get(), io::READ) .then(lambda::bind(&internal::accept, get())); }
这个函数的基本作用是:使用io::poll()注册io::READ事件,并且当事件ready的时候,调用internal::accept()。
显然,这是一个异步的accept()方法。
为什么说是异步的呢?因为调用了accept()方法之后,你会立即得到一个Future<Socket>值,它不是一个真正的Socket变量,而是Future<Socket>,顾名思义,这是一个未来的值,现在没有任何意义。
那么这个值什么时候会ready呢?不知道,但你可以有三种方式,在将来某个时刻取得这个值:
- 调用Future<Socket>::get(),如果值还没有就绪,get()函数会一直阻塞,直到能拿到一个Socket值,然后返回
- 通过Future<Socket>::isReady()等API定期判断,一旦isReady()返回true,即可调用Future<Socket>::get(),get()函数立即返回
- 注册回调函数,告诉生产者,当值就绪之后,触发我注册的回调。Future提供多种回调方式,后面会讲到
也就是说,借助Future/Promise,生产者和消费者之间可以做到完全异步。
future和promise的定义
以下是我摘自cplusplus.com中future/promise的定义,libprocess的思想与此类似:
- A future is an object that can retrieve a value from some
provider object or function, properly synchronizing this access if in
different threads. - A promise is an object that can store a value of typeT to be retrieved by a future object (possibly in another thread), offering a synchronization point.
你可以理解为这就是一个管道,future是读端,promise是写端。
future基本结构
future变量,主要有三个组成部分,状态,数据域,callbacks。由struct Data{} 结构维护,struct Data{}结构在future变量里是一个私有的成员变量,并且是shared_ptr<T>。
为什么是需要shared_ptr<T>呢?因为future本身是可以随意拷贝的,它既可以是函数的参数,也可以是函数的返回值,生产者和消费者同时持有同一个future变量,但数据是只有一份的,多线程之间访问需要互斥,这个统统由future的实现来完成,对开发者完全透明。
struct Data { Data(); ~Data(); void clearAllCallbacks(); int lock; State state; bool discard; bool associated; T* t; std::string* message; // Message associated with failure. std::vector<DiscardCallback> onDiscardCallbacks; std::vector<ReadyCallback> onReadyCallbacks; std::vector<FailedCallback> onFailedCallbacks; std::vector<DiscardedCallback> onDiscardedCallbacks; std::vector<AnyCallback> onAnyCallbacks; };
1. 状态
enum State { PENDING, READY, FAILED, DISCARDED, };
future变量有4种状态,分别是:
- PENDING表示值还没有ready,这个时候生产者还没调用promise<T>.set(),而消费者调用future<T>.get()的话会处于阻塞状态
- READY表示值已经就绪,future<T>.get()会立即返回结果
- FAILED表示值未就绪,已被破坏。通常生产者在处理过程出现异常时会设置这个状态
- DISCARDED表示值被丢失。生产者缺少某些条件无法计算这个值
2. 数据
struct Data{}中的T* t用来保存数据,生产者通过promise<T>.set()将值保存到这里,然后触发相应的回调函数,消费者通过future<T>.get()从这里读取,如下:
set过程
template <typename T> bool Promise<T>::set(const T& t) { if (!f.data->associated) { return f.set(t); } return false; } template <typename T> bool Future<T>::set(const T& _t) { bool result = false; internal::acquire(&data->lock); { if (data->state == PENDING) { data->t = new T(_t); data->state = READY; result = true; } } internal::release(&data->lock); // Invoke all callbacks associated with this future being READY. We // don't need a lock because the state is now in READY so there // should not be any concurrent modications. if (result) { internal::run(data->onReadyCallbacks, *data->t); internal::run(data->onAnyCallbacks, *this); data->clearAllCallbacks(); } return result; }
get过程如下:
template <typename T> const T& Future<T>::get() const { if (!isReady()) { await(); } CHECK(!isPending()) << "Future was in PENDING after await()"; // We can't use CHECK_READY here due to check.hpp depending on future.hpp. if (!isReady()) { CHECK(!isFailed()) << "Future::get() but state == FAILED: " << failure(); CHECK(!isDiscarded()) << "Future::get() but state == DISCARDED"; } assert(data->t != NULL); return *data->t; }
3. callbacks
future支持非常多类型的callbacks,针对每种不同的状态都可以设置相应的callback,如struct Data{}里可以看到如下字段:
std::vector<DiscardCallback> onDiscardCallbacks; std::vector<ReadyCallback> onReadyCallbacks; std::vector<FailedCallback> onFailedCallbacks; std::vector<DiscardedCallback> onDiscardedCallbacks; std::vector<AnyCallback> onAnyCallbacks;
其中:
- 当生产者调用promise<T>.discard()时触发onDiscardCallbacks
- 当生产者调用promise<T>.set()时触发onReadyCallbacks
- 当生产者调用promise<T>.fail()时触发onFailedCallbacks
- 不管生产者触发何种动作,onAnyCallbacks都会触发
libprocess中的future<T>::then(f)
libprocess里的future实现支持then()方法,允许你通过future的callbacks来实现一些相当高级别的黑科技。例如,你的代码可以这样写:
Future<int> second(const bool& b) { return b ? 1 : 0; } Future<string> third(const int& s) { return s > 0 ? "good" : "bad"; } TEST(Process, chain) { Future<string> S = readyFuture() .then(lambda::bind(&second, lambda::_1)) .then(lambda::bind(&third, lambda::_1)); string s = S.get(); }
这里的意思是说,readyFuture()得到一个future变量A,当A就绪时,执行second(A)并得到一个future变量B,当B就绪时,执行third(B)得到一个future值S。上面的代码等同于:
TEST(Process, chain) { Future<bool> A = oneFuture(); Future<int> B = second(A.get()); Future<string> S = third(B.get()); string s = S.get(); }
注意,get()是同步的,会一直等到相应的future变量处于ready状态才返回。
可以看到,在第一种实现里,Future<string>是third()函数的返回值,而不是readyFuture()函数的返回值,注意返回值的类型。这段代码里,我们最终想要的,只是S的值,A/B对我们而言只是一个临时变量。既然如何,有什么办法能省掉它们呢?libprocess又是怎么做到的呢?
then的实现如下:
template <typename T> template <typename X> Future<X> Future<T>::then(const lambda::function<Future<X>(const T&)>& f) const { memory::shared_ptr<Promise<X>> promise(new Promise<X>()); lambda::function<void(const Future<T>&)> thenf = lambda::bind(&internal::thenf<T, X>, f, promise, lambda::_1); onAny(thenf); // Propagate discarding up the chain. To avoid cyclic dependencies, // we keep a weak future in the callback. promise->future().onDiscard( lambda::bind(&internal::discard<T>, WeakFuture<T>(*this))); return promise->future(); }
在这个函数里我们可以看到,实质上then只是创建了一个闭包thenf,这个闭包的上下文是:
- 当前的Future<T>
- f
- f 的返回值Future<X>对应的Promise<X>,闭包里只需要保存Promise<X>即可,Future<X>是作为函数返回值返回的,仅供caller读取
- internal::thenf<T, X> 函数
然后将这个闭包作为一个callbacks,注册到Future<T>里,当这个future有任何action时(ready/failed/discard),触发这个callback。这个callback所做的事情就是,调用f(T)得到Future<X>。但是问题又来了,因为thenf()必然是在Future<T>::then(f)之后某个时刻才执行的,这两个函数完全异步。但是Future<T>::then(f)函数返回时,caller拿到一个Future<X>,并希望从此得到期望的结果。但是thenf()执行时,说明T已经ready,所以调用f(T),也就是Future<T>::then(f)时注册的callback函数,f(T)也会产生一个Future<X>,但是此Future<X>非彼Future<X>,因为f(T)函数也可能是异步执行的。
显然,Future<T>::then(f)返回给caller的Future<X>是一个临时future,真正的Future<X>只有在f(T)函数被调用时才会得到,但是Future<T>::then()的caller显然并不之情,它只关心它所拿到的Future<X>,并期望从中得到结果。
这就需要一种手段,当Future<T>::then(f)注册的f函数被执行时,f的返回值能够直接透传给Future<T>::then(f)的caller,promise->associate()就是做这个事情的。
template <typename T, typename X> void thenf(const lambda::function<Future<X>(const T&)>& f, const memory::shared_ptr<Promise<X>>& promise, const Future<T>& future) { if (future.isReady()) { if (future.hasDiscard()) { promise->discard(); } else { promise->associate(f(future.get())); } } else if (future.isFailed()) { promise->fail(future.failure()); } else if (future.isDiscarded()) { promise->discard(); } }
promise是thenf闭包里的一个上下文,Future<T>::then(f)被调用之后,then()函数会创建一个临时的promise,并将promise关联的Future<X>返回给caller,也就是说,caller拿到的只是一个临时的Future<X>。当Future<T> ready之后,then(f)注册的f函数被执行,f(T)才返回一个真正的Future<X>,Promise<X>::associate()的作用就是当真正的Future<X> ready时,将其数据copy到自身关联的临时的Future<X>中去。