# Storm OpenTSDB éæ ## Storm OpenTSDB Bolt å TridentState OpenTSDB 为æ¶é´åºåæ°æ®æä¾äºå¯æ©å±ä¸é«å¯ç¨æ§çåå¨. å®ç± Time Series Daemon (TSD) servers 以åå½ä»¤è¡å·¥å ·ç»æ. æ¯ä¸ª TSD è¿æ¥å°é ç½®ç HBase é群以 push/queryï¼æ¨é/æ¥è¯¢ï¼ æ°æ®. æ¶é´åºåæ°æ®ç¹å æ¬: - a metric name. - a UNIX timestamp (seconds or milliseconds since Epoch). - a value (64 bit integer or single-precision floating point value). - a set of tags (key-value pairs) that describe the time series the point belongs to. Storm bolt å trident state ä»ä¸ä¸ªåºäºç»å®ç `TupleMetricPointMapper` ç tuple ä¸å建äºä¸é¢çæ¶é´åºåæ°æ®. 该模åæä¾äº core Storm å Trident bolt å®ç°, ç¨æ·å°æ°æ®åå ¥ OpenTSDB. æ¶é´åºåæ°æ®ç¹è¢«åå ¥æ¶æ at-least-onceï¼è³å°ä¸æ¬¡ï¼çè¯ä¹ä¿è¯, å¹¶ä¸éå¤çæ°æ®ç¹åºè¯¥å OpenTSDB ç [è¿é](http://opentsdb.net/docs/build/html/user_guide/writing.html#duplicate-data-points) æåç䏿 ·æ¥å¤ç. ## ç¤ºä¾ ### Core Bolt ä¸é¢çç¤ºä¾æè¿°äº `org.apache.storm.opentsdb.bolt.OpenTsdbBolt` cord bolt çä½¿ç¨æ¹æ³ ``` OpenTsdbClient.Builder builder = OpenTsdbClient.newBuilder(openTsdbUrl).sync(30_000).returnDetails(); final OpenTsdbBolt openTsdbBolt = new OpenTsdbBolt(builder, Collections.singletonList(TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER)); openTsdbBolt.withBatchSize(10).withFlushInterval(2000); topologyBuilder.setBolt("opentsdb", openTsdbBolt).shuffleGrouping("metric-gen"); ``` ### Trident State ``` final OpenTsdbStateFactory openTsdbStateFactory = new OpenTsdbStateFactory(OpenTsdbClient.newBuilder(tsdbUrl), Collections.singletonList(TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER)); TridentTopology tridentTopology = new TridentTopology(); final Stream stream = tridentTopology.newStream("metric-tsdb-stream", new MetricGenSpout()); stream.peek(new Consumer() { @Override public void accept(TridentTuple input) { LOG.info("########### Received tuple: [{}]", input); } }).partitionPersist(openTsdbStateFactory, MetricGenSpout.DEFAULT_METRIC_FIELDS, new OpenTsdbStateUpdater()); ```