classProcessBase:publicEventVisitor{public:...UPIDself()const{returnpid;}...virtualvoidinitialize(){}virtualvoidfinalize(){}virtualvoidexited(constUPID&pid){}virtualvoidlost(constUPID&pid){}...// Puts a message at front of queue.voidinject(constUPID&from,conststd::string&name,constchar*data=NULL,size_tlength=0);// Sends a message with data to PID.voidsend(constUPID&to,conststd::string&name,constchar*data=NULL,size_tlength=0);...UPIDlink(constUPID&pid);...typedefstd::tr1::function<void(constUPID&,conststd::string&)>MessageHandler;// Setup a handler for a message.voidinstall(conststd::string&name,constMessageHandler&handler){handlers.message[name]=handler;}...private:...std::deque<Event*>events;// Delegates for messages.std::map<std::string,UPID>delegates;// Handlers for messages and HTTP requests.struct{std::map<std::string,MessageHandler>message;std::map<std::string,HttpRequestHandler>http;}handlers;// Active references.intrefs;// Process PID.UPIDpid;};
メッセージを送受信する API (send(), inject())や、ユーザ定義のメッセージ用コールバックを登録する API (install()) がある。
Process にメッセージを配送するのはフレームワークの仕事。
そのほか link() なんて API に Erlang 愛を見いだせる。libprocess は Erlang 風にアクター (Process) 監視の仕組みを持っている。
オブジェクトが持つ状態は受信したイベント(メッセージ)キューやメソッドハンドラのテーブル、そしてオブジェクトの ID たる PID など。
template<typenameT>classProcess:publicvirtualProcessBase{public:virtual~Process(){}// Returns pid of process; valid even before calling spawn.PID<T>self()const{returnPID<T>(dynamic_cast<constT*>(this));}protected:// Useful typedefs for dispatch/delay/defer to self()/this.typedefTSelf;typedefTThis;};
self() とかもうね…
アプリケーションはこの Process クラスを継承して適当なハンドラ関数を実装し、
それを install() して受信に備える。そのほか exited() などの lifecycle コールバックで後始末などをしてもよい。
send() や install() といった API のシグネチャから、 libprocess のメッセージは文字列をキーとした文字列なのがわかる。
メッセージの内部表現をみてみると・・・
ostream&operator<<(ostream&stream,constUPID&pid){// Call inet_ntop since inet_ntoa is not thread-safe!charip[INET_ADDRSTRLEN];if(inet_ntop(AF_INET,(in_addr*)&pid.ip,ip,INET_ADDRSTRLEN)==NULL)memset(ip,0,INET_ADDRSTRLEN);stream<<pid.id<<"@"<<ip<<":"<<pid.port;returnstream;}
HTTP
Process クラスはメッセージベースの通信とは別に HTTP をサポートしている。
process.hpp
12345678910111213141516171819
classProcessBase:publicEventVisitor{...typedefstd::tr1::function<Future<http::Response>(consthttp::Request&)>HttpRequestHandler;// Setup a handler for an HTTP request.boolroute(conststd::string&name,constHttpRequestHandler&handler){if(name.find('/')!=0){returnfalse;}handlers.http[name.substr(1)]=handler;returntrue;}...};
RPC-like
実際のアプリケーションが Process を直接継承することは多くない。
かわりにバイナリからオブジェクトへの復号化をフレームワークにまかせ、 RPC ぽく使う。
オブジェクトの直列化には・・・どういうわけか Protobuf をつかう。Thrift どこいった・・・。
戻り値のかわりに reply() で送信元にメッセージを送ることはできる。
ただし見ての通り、リクエストとレスポンスの対応をとるリクエスト ID みたいのはない。運用でカバーするものらしい。
protobuf.hpp
12345678910111213141516171819202122232425
template<typenameT>classProtobufProcess:publicprocess::Process<T>{...virtualvoidvisit(constprocess::MessageEvent&event){if(protobufHandlers.count(event.message->name)>0){from=event.message->from;// For 'reply'.protobufHandlers[event.message->name](event.message->body);from=process::UPID();}else{process::Process<T>::visit(event);}}...process::UPIDfrom;// Sender of "current" message, accessible by subclasses....voidreply(constgoogle::protobuf::Message&message){CHECK(from)<<"Attempting to reply without a sender";std::stringdata;message.SerializeToString(&data);send(from,message);}};
ProcessManager
Process クラスを継承して色々できるのはわかったけど、
それをどう使えばいいのだろう。ちいさいサンプル
が入っていたから眺めてみよう。
...// Active ProcessManager (eventually will probably be thread-local).staticProcessManager*process_manager=NULL;...UPIDspawn(ProcessBase*process,boolmanage){process::initialize();if(process!=NULL){...returnprocess_manager->spawn(process,manage);}else{returnUPID();}}
classProcessManager{public:ProcessManager(conststring&delegate);~ProcessManager();ProcessReferenceuse(constUPID&pid);boolhandle(constSocket&socket,Request*request);booldeliver(ProcessBase*receiver,Event*event,ProcessBase*sender=NULL);booldeliver(constUPID&to,Event*event,ProcessBase*sender=NULL);UPIDspawn(ProcessBase*process,boolmanage);voidresume(ProcessBase*process);voidcleanup(ProcessBase*process);voidlink(ProcessBase*process,constUPID&to);voidterminate(constUPID&pid,boolinject,ProcessBase*sender=NULL);boolwait(constUPID&pid);voidenqueue(ProcessBase*process);ProcessBase*dequeue();voidsettle();private:// Map of all local spawned and running processes.map<string,ProcessBase*>processes;synchronizable(processes);...// Queue of runnable processes (implemented using list).list<ProcessBase*>runq;synchronizable(runq);// Number of running processes, to support Clock::settle operation.intrunning;};
voidpost(constUPID&to,conststring&name,constchar*data,size_tlength){process::initialize();if(!to){return;}// Encode and transport outgoing message.transport(encode(UPID(),to,name,string(data,length)));}...staticvoidtransport(Message*message,ProcessBase*sender=NULL){if(message->to.ip==__ip__&&message->to.port==__port__){// Local message.process_manager->deliver(message->to,newMessageEvent(message),sender);}else{// Remote message.socket_manager->send(message);}}
voidProcessManager::enqueue(ProcessBase*process){...synchronized(runq){CHECK(find(runq.begin(),runq.end(),process)==runq.end());runq.push_back(process);}// Wake up the processing thread if necessary.gate->open();}
ProcessManager::enqueue() では受信したメッセージによって実行可能になった process を
処理待ちキュー runq に詰め、それからキューを消化するワーカースレッドに通知を送っている。(gate は pthread_cond_t のラッパです。)
ProcessBase*ProcessManager::dequeue(){...ProcessBase*process=NULL;synchronized(runq){if(!runq.empty()){process=runq.front();runq.pop_front();// Increment the running count of processes in order to support// the Clock::settle() operation (this must be done atomically// with removing the process from the runq).__sync_fetch_and_add(&running,1);}}returnprocess;}
void*schedule(void*arg){do{ProcessBase*process=process_manager->dequeue();if(process==NULL){Gate::state_told=gate->approach();process=process_manager->dequeue();if(process==NULL){gate->arrive(old);// Wait at gate if idle.continue;}else{gate->leave();}}process_manager->resume(process);}while(true);}
voidinitialize(conststring&delegate){...for(inti=0;i<cpus;i++){pthread_tthread;// For now, not saving handles on our threads.if(pthread_create(&thread,NULL,schedule,NULL)!=0){LOG(FATAL)<<"Failed to initialize, pthread_create";}}...}
ワーカスレッドは CPU の数だけつくられるようだ。
SEDA から脈々と生き続ける
ノンブロッキングでマルチスレッドの血筋・・・なのはいいとして、
起動したスレッドを止める様子がないのは心配。
サーバって graceful に停止できないと Valgrind や ASAN で
メモリリークをチェックするのが大変だとおもうんだけど、このスレッド自体は何も資源を持っていないので放置してプロセス抜けても大丈夫ということなのかなあ。男らしい・・・。
voidProcessManager::resume(ProcessBase*process){__process__=process;// __process__ はスレッドローカル変数...boolblocked=false;...while(!terminate&&!blocked){Event*event=NULL;process->lock();{if(process->events.size()>0){event=process->events.front();process->events.pop_front();process->state=ProcessBase::RUNNING;}else{process->state=ProcessBase::BLOCKED;blocked=true;}}process->unlock();if(!blocked){...// Now service the event.try{process->serve(*event);}catch(conststd::exception&e){std::cerr<<"libprocess: "<<process->pid<<" terminating due to "<<e.what()<<std::endl;terminate=true;}catch(...){std::cerr<<"libprocess: "<<process->pid<<" terminating due to unknown exception"<<std::endl;terminate=true;}deleteevent;if(terminate){cleanup(process);}}}__process__=NULL;CHECK_GE(running,1);__sync_fetch_and_sub(&running,1);}
classSocketManager{public:...voidsend(Encoder*encoder,ints,boolpersist);voidsend(constResponse&response,ints,boolpersist);voidsend(Message*message);...Socketaccepted(ints);voidclose(ints);...// Map from UPID (local/remote) to process.map<UPID,set<ProcessBase*>>links;// Collection of all actice sockets.map<int,Socket>sockets;// Collection of sockets that should be disposed when they are// finished being used (e.g., when there is no more data to send on// them).set<int>dispose;// Map from socket to node (ip, port).map<int,Node>nodes;// Maps from node (ip, port) to temporary sockets (i.e., they will// get closed once there is no more data to send on them).map<Node,int>temps;// Maps from node (ip, port) to persistent sockets (i.e., they will// remain open even if there is no more data to send on them). We// distinguish these from the 'temps' collection so we can tell when// a persistant socket has been lost (and thus generate// ExitedEvents).map<Node,int>persists;// Map from socket to outgoing queue.map<int,queue<Encoder*>>outgoing;...};
何種類かのテーブルを管理している。Boost には bimap あるで・・・とかまあそういう姑発言はさておき、
int はソケットの fd だとおもえばよい。 Node はリモートにある OS プロセスを抽象している。
void*serve(void*arg){ev_loop(((structev_loop*)arg),0);returnNULL;}...voidinitialize(conststring&delegate){...#ifdef __sun__loop=ev_default_loop(EVBACKEND_POLL|EVBACKEND_SELECT);#elseloop=ev_default_loop(EVFLAG_AUTO);#endif // __sun__ev_async_init(&async_watcher,handle_async);ev_async_start(loop,&async_watcher);ev_timer_init(&timeouts_watcher,handle_timeouts,0.,2100000.0);ev_timer_again(loop,&timeouts_watcher);ev_io_init(&server_watcher,accept,__s__,EV_READ);ev_io_start(loop,&server_watcher);...pthread_tthread;// For now, not saving handles on our threads.if(pthread_create(&thread,NULL,serve,loop)!=0){LOG(FATAL)<<"Failed to initialize, pthread_create";}...}
voidrecv_data(structev_loop*loop,ev_io*watcher,intrevents){DataDecoder*decoder=(DataDecoder*)watcher->data;...while(true){...length=recv(s,data,size,0);...// Decode as much of the data as possible into HTTP requests.constdeque<Request*>&requests=decoder->decode(data,length);if(!requests.empty()){foreach(Request*request,requests){process_manager->handle(decoder->socket(),request);}}...}}
boolProcessManager::handle(constSocket&socket,Request*request){CHECK(request!=NULL);// Check if this is a libprocess request (i.e., 'User-Agent:// libprocess/id@ip:port') and if so, parse as a message.if(libprocess(request)){Message*message=parse(request);if(message!=NULL){deleterequest;// TODO(benh): Use the sender PID in order to capture// happens-before timing relationships for testing.returndeliver(message->to,newMessageEvent(message));}VLOG(1)<<"Failed to handle libprocess request: "<<request->method<<" "<<request->path<<" (User-Agent: "<<request->headers["User-Agent"]<<")";deleterequest;returnfalse;}...}
classStatistics{public:Statistics(constDuration&window);~Statistics();// Returns the time series of a statistic.process::Future<std::map<Seconds,double>>timeseries(conststd::string&context,conststd::string&name,constOption<Seconds>&start=None(),constOption<Seconds>&stop=None());// Returns the latest value of a statistic.process::Future<Option<double>>get(conststd::string&context,conststd::string&name);...};
ちなみになぜ集計の結果取得みたいのが非同期かというと、
独立した Process である StatisticsProcessが集計処理をしているから。
アクター大活躍。libprocess の中にかぎらず、Mesos 自体の API も Future 主体で書かれている。
// Definition of a "shared" future. A future can hold any// copy-constructible value. A future is considered "shared" because// by default a future can be accessed concurrently.template<typenameT>classFuture{public:// Constructs a failed future.staticFuture<T>failed(conststd::string&message);// Helpers to get the current state of this future.boolisPending()const;boolisReady()const;boolisDiscarded()const;boolisFailed()const;...booldiscard();...// Waits for this future to become ready, discarded, or failed.boolawait(constDuration&duration=Seconds(-1.0))const;// Return the value associated with this future, waits indefinitely// until a value gets associated or until the future is discarded.Tget()const;// Returns the failure message associated with this future.std::stringfailure()const;// Installs callbacks for the specified events and returns a const// reference to 'this' in order to easily support chaining.constFuture<T>&onReady(constReadyCallback&callback)const;constFuture<T>&onFailed(constFailedCallback&callback)const;constFuture<T>&onDiscarded(constDiscardedCallback&callback)const;constFuture<T>&onAny(constAnyCallback&callback)const;// Installs callbacks that get executed when this future is ready// and associates the result of the callback with the future that is// returned to the caller (which may be of a different type).template<typenameX>Future<X>then(conststd::tr1::function<Future<X>(constT&)>&f)const;template<typenameX>Future<X>then(conststd::tr1::function<X(constT&)>&f)const;// Helpers for the compiler to be able to forward std::tr1::bind results.template<typenameX>Future<X>then(conststd::tr1::_Bind<X(*(void))(void)>&b)const{returnthen(std::tr1::function<X(constT&)>(b));}...private:friendclassPromise<T>;...enumState{PENDING,READY,FAILED,DISCARDED,};int*refs;int*lock;State*state;T**t;std::string**message;// Message associated with failure.std::queue<ReadyCallback>*onReadyCallbacks;std::queue<FailedCallback>*onFailedCallbacks;std::queue<DiscardedCallback>*onDiscardedCallbacks;std::queue<AnyCallback>*onAnyCallbacks;Latch*latch;};...// Promise<T> の定義がつづく...
// Represents a uniquely owned pointer.//// TODO(bmahler): For now, Owned only provides shared_ptr semantics.// When we make the switch to C++11, we will change to provide// unique_ptr semantics. Consequently, each usage of Owned that// invoked a copy will have to be adjusted to use move semantics.template<typenameT>classOwned:publicboost::shared_ptr<T>{public:Owned(T*t):boost::shared_ptr<T>(t){}};
manager にならんで古くからよく知られるダメな C++ のパターン、巨大スーパークラスも健在。Process クラス。
アプリケーションがこいつを継承する必然性ぜんぜんない。
コールバックをインストールすれば済む話でその API まで自分で用意してるのに、なぜ継承させるのか。
composition over inheritance とか今更過ぎてケチをつけるのも気が引ける。