Embulkã«ã¯filterãã©ã°ã¤ã³ã¨ããä»çµã¿ãããããããèªä½ãããã¨ã§ãEmbulkã§å ¥åããã³ãã¼ã¹ããçµæãè²ã ã¨å å·¥ãããã¨ãã§ãã¾ããä¾ãã°ãããã¹ã¦ã®ãã°ã«ãã¹ãåã追å ãããã¨ãã£ãããã°ã®ç¨®é¡ã«ãããªãå ±éå¦çãå®ç¾©ããã®ã«åããä»çµã¿ã§ãã
ãã ããããã試ãã¦ã¿ãçµæã以åã®è¨äºã§åãä¸ãããããªç¹æ®ãªãã°ãå¦çããå ´åã§ãã
- ãã¼ã¹å¦çã®ä¸ã§ãä¸åº¦å®æãããã»ã¨ãã©ç´ãå¿ è¦ããªãåºæ¬çãªé¨å â parserãã©ã°ã¤ã³
- 試è¡é¯èª¤ããªãããä½åº¦ãç´ãå¿ è¦ãããé¨å â filterãã©ã°ã¤ã³
ã¨ä½¿ãåããæ¹ããã³ã¼ãã®è¦éããè¯ããªãã¾ãããå人çã«ã¯ãparserãã©ã°ã¤ã³ã¨åããããfilterãã©ã°ã¤ã³ãèªä½ãããã¨ãå¤ãããªã®ã§ãä½ãæ¹ãã¡ã¢ãã¦ããã¾ãã
ç¹å®ã«ã©ã ã«å«ã¾ããæååãç½®æããfilterãã©ã°ã¤ã³
ä¸ä¾ã¨ãã¦ã以ä¸ã®ãããªCSVãã¡ã¤ã«ãèªã¿è¾¼ãéã«ãcommentåã«å«ã¾ããã: account=(æ°å)ããã by aliceãã¨ããæååã¯éè¨ã®éªéãªã®ã§åé¤ããããã¨ããå ´åãèãã¾ãã
id,account,time,comment 1,,2015-01-27 19:23:49,"login failure: account=alice" 2,,2015-01-27 19:01:23,"login failure: account=bob" 3,alice,2015-01-28 02:20:02,"login by alice"
ã¾ããembulk newã³ãã³ãã§filterãã©ã°ã¤ã³ã®ã²ãªå½¢ãä½æãã¾ãã
happyturn% embulk new ruby-filter myapp 2015-05-30 16:38:29.963 +0900: Embulk v0.6.10 Creating embulk-filter-myapp/ Creating embulk-filter-myapp/README.md Creating embulk-filter-myapp/LICENSE.txt Creating embulk-filter-myapp/.gitignore Creating embulk-filter-myapp/Rakefile Creating embulk-filter-myapp/Gemfile Creating embulk-filter-myapp/embulk-filter-myapp.gemspec Creating embulk-filter-myapp/lib/embulk/filter/myapp.rb
ã³ãã³ãå®è¡ç´å¾ã®embulk-filter-myapp/lib/embulk/filter/myapp.rbã®ã²ãªå½¢ã¯ä»¥ä¸ã®éãã§ãã
module Embulk module Filter class MyappFilterPlugin < FilterPlugin Plugin.register_filter("myapp", self) def self.transaction(config, in_schema, &control) # configuration code: task = { "property1" => config.param("property1", :string), "property2" => config.param("property2", :integer, default: 0), } yield(task, out_columns) end def init # initialization code: @property1 = task["property1"] @property2 = task["property2"] end def close end def add(page) # filtering code: page.each do |record| page_builder.add(record) end end def finish page_builder.finish end end end end
ã¾ããtransactionã¡ã½ããã®ãªãã§ããã£ã«ã¿ãéãããã¨ã®ã«ã©ã ãå®ç¾©ãã¾ããã«ã©ã ãå¢æ¸ãããªããªããin_schemaããã®ã¾ã¾out_columnã«ä»£å ¥ããã ãã§OKã§ããinitã¡ã½ããã¯ãä»åã®ãã©ã°ã¤ã³ã¯ãã©ã¡ã¼ã¿ãåããªãã®ã§ç©ºã«ãã¾ãã
def self.transaction(config, in_schema, &control) # 説æã®ããã«ä»£å ¥æãæ¸ãããyieldã¡ã½ããã«ãã®ã¾ã¾æ¸¡ãã¦ãOK task = {} out_columns = in_schema yield(task, out_columns) end def init end
ããã¦ããã£ã«ã¿ã®ããã®ã³ã¼ããaddã¡ã½ããã®ä¸ã«è¨è¼ãã¾ããå¤æ°recordã¯é åãªã®ã§ã4çªç®ã«ããcommentåãåç §ããããªããrecord[3]ã¨æå®ããå¿ è¦ãããã¾ãã
def add(page) # filtering code: idx = 3 page.each do |record| case record[idx] when /^login failure:/ record[idx] = "login failure" when /^login by/ record[idx] = "login" end page_builder.add(record) end end
ãããã¯ãåã®é åºãå¤ããå¯è½æ§ãããã®ã§ã©ããã¦ãåãååã§åç §ããããã¨ããå ´åã¯ã以ä¸ã®ããã«pageããschemaãåãåºãã¦ãcommentåã®ã¤ã³ããã¯ã¹ãåå¾ãããã¨ãã§ãã¾ãã
def add(page) # find index of "comment" column columns = page.schema.select{|c| c.name == "comment" } idx = columns[0].index # filtering code: page.each do |record| case record[idx] when /^login failure:/ record[idx] = "login failure" when /^login by/ record[idx] = "login" end page_builder.add(record) end end
ã¾ããä»åº¦ã¯éã«ã«ã©ã åã®å¤æ´ã«å¼±ããªã£ã¦ãã¾ãã®ã§ãã©ã¡ããè¯ããã¯ä¸é·ä¸çãã¨ã大æµã®å ´åã¯record[3]ã§è¯ãæ°ããã¾ãã
ãã¨ã¯ãè¨å®ãã¡ã¤ã«ã¸ä»¥ä¸ã®ããã«filterã®ååãè¨è¼ãã
filters: - type: myapp
embulk previewã³ãã³ããå®è¡ããã°ããã£ã«ã¿ãæå¹ã«ãªã£ã¦ãããã¨ã確èªã§ãã¾ãã
happyturn% embulk preview config.yml -L embulk-filter-myapp 2015-05-30 17:20:16.564 +0900: Embulk v0.6.10 2015-05-30 17:20:17.351 +0900: Loaded plugin embulk-filter-myapp (0.1.0) 2015-05-30 17:20:17.365 +0900 [INFO] (preview): Listing local files at directory '/Users/myoshiz/devel/try1/csv' filtering filename by prefix 'sample_' 2015-05-30 17:20:17.369 +0900 [INFO] (preview): Loading files [/Users/myoshiz/devel/try1/csv/sample_01.csv.gz] +---------+----------------+-------------------------+----------------+ | id:long | account:string | time:timestamp | comment:string | +---------+----------------+-------------------------+----------------+ | 1 | | 2015-01-27 19:23:49 UTC | login failure | | 2 | | 2015-01-27 19:01:23 UTC | login failure | | 3 | alice | 2015-01-28 02:20:02 UTC | login | +---------+----------------+-------------------------+----------------+
ãµã³ãã«ã³ã¼ã
ä»åã®ãµã³ãã«ã¯Gistã«ãã¢ãããã¼ããã¾ããã