# Storm ç¶æç®¡ç
## æ ¸å¿ Storm ä¸çç¶ææ¯æ
Storm æ ¸å¿ä¸º Bolt æä¾ç¨äºä¿ååéæ°è·åå
¶æä½ç¶æçæ½è±¡. æä¾ä¸ä¸ªåºäºå
åçé»è®¤ç¶æå®ç°ï¼åæ¶è¿æä¾äºä¸ä¸ªä½¿ç¨ Redis åç¶æä¿æçå®ç°.
## ç¶æç®¡ç
è¥ Bolt éè¦éè¿æ¡æ¶æ¥ç®¡çåä¿æå
¶ç¶æ, åºè¯¥å®ç°æ¥å£ `IStatefulBolt`,æè
ç»§æ¿ç±» `BaseStatefulBolt`,ç¶åå®ç°æ¹æ³ `void initState(T state)`. æ¹æ³ `initState` å¨ Bolt 使ç¨ä¿åçåå²ç¶æè¿è¡åå§åæé´éè¿æ¡æ¶æ§è¡. æ§è¡æ¶æºå¨ `prepare` æ¹æ³ä¹åï¼å¨ Bolt å¼å§å¤ç Tuple æ°æ®ä¹å.
å½åæ¯æçå¯ä¸ä¸ç§ `State` å®ç°æ¯æä¾ key-value æ å°ç `KeyValueState`.
ä¾å¦, ä¸ä¸ªåè¯è®¡æ° bolt å¯ä»¥ä½¿ç¨ key-value ç¶ææ½è±¡å®ç°åè¯è®¡æ°, æ¥éª¤å¦ä¸.
1. ç»§æ¿ `BaseStatefulBolt` ç±», æ·»å ä¸ä¸ª `KeyValueState` å®ä¾åé, ç¨äºåå¨åè¯å°åè¯æ°éçæ å°.
2. å¨ init æ¹æ³ä¸ç¨ä¹åä¿åçç¶ææ¥åå§å Bolt. è¿éé¢å«æä¸æ¬¡ç¨åºè¿è¡çæ¶åæ¡æ¶æå䏿¬¡æäº¤çåè¯è®¡æ°.
3. å¨ `execute` æ¹æ³ä¸, æ´æ°åè¯è®¡æ°.
```
public class WordCountBolt extends BaseStatefulBolt> {
private KeyValueState wordCounts;
private OutputCollector collector;
...
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void initState(KeyValueState state) {
wordCounts = state;
}
@Override
public void execute(Tuple tuple) {
String word = tuple.getString(0);
Integer count = wordCounts.get(word, 0);
count++;
wordCounts.put(word, count);
collector.emit(tuple, new Values(word, count));
collector.ack(tuple);
}
...
}
```
1. æ¡æ¶å¨ææ§çæ£æ¥å¹¶ä¿å Bolt çç¶æ (é»è®¤æ¯ç§ä¸æ¬¡). é¢çå¯ä»¥éè¿è®¾ç½® storm config ç `topology.state.checkpoint.interval.ms`æ¥èªå·±å®ä¹ã
2. 对äºç¶ææä¹
å, å¯ä»¥è®¾ç½® storm config ä¸ç `topology.state.provider` æ¥ä½¿ç¨æ¯ææä¹
åç state provider. ä¾å¦, è¥ä½¿ç¨åºäº Redis ç key-value ç¶æå®ç°, éè¦å¨ storm.yaml æä»¶ä¸è®¾ç½® `topology.state.provider: org.apache.storm.redis.state.RedisKeyValueStateProvider`. provider å®ç°ä»£ç ç jar å
éè¦æ¾å¨ class path ä¸, å¨è¿ä¸ªä¾åä¸, éè¦æ `storm-redis-*.jar` ç½®äº extlib ç®å½ä¸.
3. state provider ç屿§å¯ä»¥éè¿è®¾ç½® `topology.state.provider.config` æ¥è¿è¡è¦ç. å¯¹äº Redis state, æ¯ä¸ä¸ªå
·æä¸å屿§ç JSON å符串.
```
{ "keyClass": "Optional fully qualified class name of the Key type.", "valueClass": "Optional fully qualified class name of the Value type.", "keySerializerClass": "Optional Key serializer implementation class.", "valueSerializerClass": "Optional Value Serializer implementation class.", "jedisPoolConfig": { "host": "localhost", "port": 6379, "timeout": 2000, "database": 0, "password": "xyz" } }
```
## æ£æ¥ç¹æºå¶
æ£æ¥ç¹éè¿ä¸ä¸ªå
é¨ç checkpoint spout æ¥è§¦åï¼è§¦åå¨æå¨ `topology.state.checkpoint.interval.ms` æå®. 妿卿æä¸è³å°æä¸ä¸ª `IStatefulBolt`, topology builder ä¼èªå¨æ·»å checkpoint spout. å¯¹äºæç¶æçææ, topology builder ä½¿ç¨ `StatefulBoltExecutor` å
è£
`IStatefulBolt`, è´è´£å¨æ¶å° checkpoint tuple çæ¶åæ¥æ§è¡ç¶ææäº¤. æ ç¶æç Bolt 被å
è£
å¨ `CheckpointTupleForwarder`, ä»
ä¼è½¬å checkpoint tuple 以确ä¿å
¶å¯ä»¥è´¯ç©¿æ´ä¸ªææDAG(æåæ ç¯å¾). checkpoint tuple å¨ä¸ä¸ªå为 `$checkpoint` çå
é¨ stream 䏿µå¨. topology builder ç»ç» checkpoint spout æºæµåºç checkpoint stream ç©¿è¿æ´ä¸ªææ.
```
default default default
[spout1] ---------------> [statefulbolt1] ----------> [bolt1] --------------> [statefulbolt2]
| ----------> -------------->
| ($chpt) ($chpt)
|
[$checkpointspout] _______| ($chpt)
```
å½å°äºæ£æ¥å¨æ, checkpoint tuples 被 checkpoint spout åå°åºæ¥. 䏿¦æ¥æ¶å° state tuple, Bolt çç¶æå°±ä¼è¢«ä¿å, ç¶å checkpoint tuple ä¼è½¬åå°ä¸ä¸ä¸ªç»ä»¶. æ¯ä¸ä¸ª Bolt å¨ä¿åç¶æä¹å, ä¼å¨ææçè¾å
¥æµä¸çå¾
checkpoint å°è¾¾, 使å¾ç¶æè¡¨ç°ä¸ºä¸ä¸ªè·¨æ´ä¸ªææçæç»çç¶æ. 䏿¦ checkpoint spout 仿æç Bolt 䏿¥æ¶å°ACKæ¶æ¯, ç¶ææäº¤å°±å®æäº, äºå¡ä¼è¢« checkpoint spout è®°å½ä¸ºå·²æäº¤.
checkpoint å½åä¸ä¼æ£æ¥ Spout çç¶æ. ç®å, 䏿¦ææç Bolt è¢«æ£æ¥å®æ¯, å¹¶ä¸ä¸æ¦ checkpoint tuple 被 ack, Spout åå°ç tuples ä¹ä¼è¢« ack. è¿ä¹æå³ç, `topology.state.checkpoint.interval.ms` è¦å°äº `topology.message.timeout.secs`.
ç¶ææäº¤ç工使¹å¼å°±åä¸ä¸ªå
·æ `åå¤` å `æäº¤` é¶æ®µç䏿®µå¼æäº¤åè®®, 以达å°è·¨æ´ä¸ªææçç¶æçä¿åæä½å
·æä¸è´æ§åååæ§.
### æ¢å¤
æ¢å¤é¶æ®µä¼å¨ææé¦æ¬¡å¯å¨çæ¶å触å. 妿åç½®äºå¡æ²¡ææåè£
å¤å¥½, ä¼åææä¸åéä¸ä¸ª `rollback` æ¶æ¯, Bolt ä¼ä¸¢å¼å·²ç»å°±ç»ªçäºå¡. 妿åç½®äºå¡æååå¤å¥½ä½æ¯æªæäº¤, ä¼åææä¸åéä¸ä¸ª `commit` æ¶æ¯è®©ææå·²ç»å°±ç»ªçäºå¡å¯ä»¥è¢«æäº¤. å½è¿äºæ¥éª¤å®æå, Bolt ç¶æåå§å宿.
æ¢å¤ä¹ä¼å¨å
¶ä¸ä¸ä¸ª Bolt æªæå确认 checkpoint æ¶æ¯æè
worker å¨è¿ä¸é´æäºçæ¶å触å. å æ¤, å½ supervisor éå¯ä¸ä¸ª worker, checkpoint æºå¶ä¼ç¡®ä¿ Bolt 使ç¨ä¹åçç¶æåå§å, åæ¶æ£æ¥æä½ä¼ä»ä¸æ¬¡ç¦»å¼çç¹ç»§ç»æ§è¡.
### å¯é æ§
Storm ä½¿ç¨ acking æºå¶å¨ tuples å¤çå¤±è´¥çæ¶åè¿è¡éæ°åé. æå¯è½ç¶æå·²ç»æäº¤ä½æ¯ worker å¨ç¡®è®¤(ack) tuple ä¹åææ. å¨è¿ç§æ
åµä¸éæ°åéç tuple ä¼å¯¼è´ç¶æé夿´æ°. å½å, `StatefulBoltExecutor` 卿¥æ¶å°ä¸ä¸ªæµä¸ç checkpoint tuple 以åç»§ç»ä»ä¸ä¸ªæµä¸è·åå¹¶å¤ç tuple, åæ¶çå¾
checkpoint å°è¾¾å
¶ä»è¾å
¥æµä»¥ä¿åç¶æ. è¿ä¹å¯è½å¯¼è´æ¢å¤æé´é æéå¤çç¶ææ´æ°.
ç¶ææ½è±¡å¹¶ä¸è½æ¶é¤éå¤, å½åä»
æä¾'è³å°ä¸æ¬¡'çä¿é.
ä¸ºäºæä¾'è³å°ä¸æ¬¡'çä¿é, æç¶æææä¸çææ Bolt é½ä¼å¯¹ Tuple è¿è¡æ è®°, åæ¶å¨å¤ç宿ååå°å¹¶ç¡®è®¤è¾å
¥ Tuple. å¯¹äºæ ç¶æç Bolt, ç»§æ¿ `BaseBasicBolt` å¯ä»¥èªå¨ç®¡ç"æ è®°/确认"æä½. æç¶æç Bolt æ è®° Tupleåæ¶å¨å¤ç宿ååå°å确认tuple, å°±åä¸é¢"ç¶æç®¡ç"ä¸èä¸ç `WordCountBolt`.
### IStateful bolt é©å
IStateful æ¥å£æä¾é©åæ¹æ³ç¨ä»¥å¨æç¶æ Bolt ä¸å¯ä»¥å®ç°ä¸äºèªå®ä¹çå¨ä½
```
/**
* This is a hook for the component to perform some actions just before the
* framework commits its state.
*/
void preCommit(long txid);
/**
* This is a hook for the component to perform some actions just before the
* framework prepares its state.
*/
void prePrepare(long txid);
/**
* This is a hook for the component to perform some actions just before the
* framework rolls back the prepared state.
*/
void preRollback();
```
è¿ä¸ªåè½æ¯å¯éç, å¹¶ä¸æç¶æ Bolt æªæä¾ä»»ä½å®ç°. æä¾è¿ä¸ªåè½æ¯ä¸ºäºå¯ä»¥å¨ç¶ææ½è±¡çé¡¶å±(æä»¬å¯è½æ³å¨æç¶æ Bolt çç¶æåå¤å¥½ä¹ååä¸äºå
¶ä»å¨ä½å¦æäº¤æè
åæ»çå°æ¹)建ç«å
¶ä»ç³»ç»çº§ç»ä»¶.
## æä¾èªå®ä¹ç¶æå®ç°
å½åå¯ä¸æ¯æç `State` å®ç°æ¯æä¾ key-value çæ å°ç `KeyValueState`.
èªå®ä¹ç¶æå®ç°åºå½ä¸ºæ¥å£ `org.apache.storm.State` çæ¹æ³æä¾å®ç°. è¿äºæ¹æ³æ¯`void prepareCommit(long txid)`, `void commit(long txid)`, `rollback()`. `commit()` æ¹æ³æ¯å¯éçä¸å¨ Bolt 管çèªå·±çç¶æçæ¶åé常æç¨. è¿äºå½åä»
ç¨äºå
é¨ç³»ç» Bolt, ä¾å¦ CheckpointSpout å¨ä¿åèªå·±ç¶æçæ¶å.
`KeyValueState` çå®ç°ä¹åºå½å®ç°å®ä¹å¨æ¥å£ `org.apache.storm.state.KeyValueState` ä¸çæ¹æ³.
### State provider
æ¡æ¶éè¿å¯¹åºç `StateProvider` æ¥å®ä¾åç¶æ. ä¸ä¸ªèªå®ä¹çç¶æåºå½ä¹æä¾ä¸ä¸ªå¯ä»¥å è½½åè¿ååºäºå½å空é´çç¶æç `StateProvider` å®ç°. æ¯ä¸ä¸ªç¶æå±äºä¸ä¸ªç¬æçå½å空é´. å½å空é´éå¸¸æ¯æ¯ä¸ª Task å¯ä¸ç, å æ¤æ¯ä¸ªä»»å¡å¯ä»¥æèªå·±çç¶æ. StateProvider åç¸åºç State å®ç°åºè¯¥ä½äº Storm ç class path ä¸ï¼ä¸è¬æ¾å¨ extlib ç®å½ä¸).