ãPostgreSQL ã§å®ãã¦æ©ãã¦ãã¾ã MQ ä½ã£ã¦ããçããã·ã§ã³ãã©ãããã¨ããªãéã£ã¦ããã®ã§ãè²ã
調ã¹ã¦è©¦ãã¦ã¿ãã¨ãããæ¡å¤ããæ¹æ³ãè¦ã¤ãã£ãã®ã§ãã¾ã¨ãã¦ã¿ãã
- [4/24] è¤æ°ãã¥ã¼ãæ±ãå ´åã®æ¹æ³ã tableoid ãå©ç¨ããããã«æ´æ°ã
- [4/27] ãã£ã«ã¿æ¡ä»¶ãä»ããå ´åã®è¨æ³ã«ã¤ãã¦è¿½è¨(WHERE å¥å ã®è©ä¾¡é ã®æ示çæå®)
- [4/30] åªå é ä½ã« ORDER BY ã¯å©ç¨ã§ããªãæ¨è¿½è¨
éµã¯ advisory lock。PostgreSQL 8.2 以降の機能ã ã
ã¡ãã»ã¼ã¸ãã¥ã¼ã¨è¨ã£ã¦ããå®ãã¦æ©ãããã ãããã¡ãããã«ãã«ã®é«æ©è½ãªãããªããããããå¶éãããã
- receiver ã¯è¤æ°å¯ãã¡ãã»ã¼ã¸ã¯ receiver ã®ä¸ã¤ãåä¿¡ãããããã¼ããã£ã¹ã/ãã«ããã£ã¹ãã¯å¯¾è±¡å¤ã
- ack ãããack ããã« receiver ãè½ã¡ãå ´åãã¡ãã»ã¼ã¸ã¯å¥ã® receiver ãåä¿¡å¯è½ã«ãªãã
- receiver 㯠PULL ã§ã¡ãã»ã¼ã¸ãåä¿¡*1
- ã¡ãã»ã¼ã¸å¦çã«é·æéããããªãï¼ã¡ãã»ã¼ã¸åä¿¡ãã ack éä¿¡ã¾ã§ãDBã»ãã·ã§ã³ãæ¡ã£ãã¾ã¾ã§ããï¼
ã¤ã¾ããæä»æ§ã¨ receiver ã®ï¼ã¤ã«é害ãçºçãã¦ãå¦çãè¡ãããã¨ããè¦ä»¶ãæºãããã¨ãåªå
ãã¦ããã
ã¡ãã»ã¼ã¸ãæ ¼ç´ãããã¼ãã«
create table mq ( id serial, message text, primary key(id) );
å¿
é ãã£ã¼ã«ã㯠idãserial ã
ãã¥ã¼ãè¤æ°ãæã¡ããå ´åã¯ããããã対å¿ãããã¼ãã«ãä½æã
å¿
è¦ãªãã¼ã¿ã»ã¡ã¿ãã¼ã¿ãããã°é©å®ãã£ã¼ã«ãã追å ã
ãã¥ã¼ã«ã¡ãã»ã¼ã¸ãéä¿¡(send)
insert into mq (text) values ('abc');
ãã®ã¾ã¾ insert ã§ï¼¯ï¼«ã
ãã¥ã¼ããã¡ãã»ã¼ã¸ãï¼ä»¶åå¾(receive)
ä¸è¨ SQL ã«ãããã¼ãªã³ã°(PULL)ã
select * from mq where pg_try_advisory_lock(tableoid::int, id) limit 1;
PostgreSQL の advisory lock ã¯ã¢ããªãç¬èªã«å©ç¨å¯è½ãªããã¯ç®¡çæ©è½ã
pg_try_advisory_lock(tableoid::int, id) 㯠(tableoid, id) ããã¼ã«ã㦠advisory lock ã®åå¾ã«æåããã trueã失æããã false ãè¿ãã®ã§ããã®ï¼è¡ã§
ã対象ãã¥ã¼ãã¼ãã«ã®ãããã¯ããã¦ããªãã¡ãã»ã¼ã¸ãæ大ï¼ä»¶åå¾ããid ããã¼ã«ããã¯ããã
ã¨ããåä½ã«ãªãã
åãåãå¯è½ãªã¡ãã»ã¼ã¸ããªããã°ãï¼ä»¶ã
ãã¼ãã«ã«é©å½ãªã¡ã¿ãã¼ã¿è¿½å ï¼ receive ã® SQL ã« ORDER BY ã WHERE å¥è¿½å ã§ã¡ãã»ã¼ã¸ã®åªå
é ä½ããã£ã«ã¿ãå®ç¾ã§ãããå®å
æå®ããªãã¨ããªãããªã
ãã ããWHERE å¥å
ã« AND ã§ä¸¦ã¹ãå¼ã®è©ä¾¡é ã¯ä¿è¨¼ãããªãã®ã§ãå¯ä½ç¨ãæ㤠pg_try_advisory_lock() ãå«ãã¦ãã®ã¾ã¾ä¸¦ã¹ãã ãã§ã¯ãæå³ããã¨ããã«åä½ãããã©ããã¯æè¨ã§ããªãã
ä¾ãã°ãã£ã«ã¿æ¡ä»¶ããå
ã« pg_try_advisory_lock() ãè©ä¾¡ããã¦ãã¾ã£ãããåãåºããã¨ãã§ããã¡ãã»ã¼ã¸ä»¥å¤ã«ãããã¯ããã¾ãã£ã¦ãããªãã¦ç¾è±¡ãçºçãã¦ãã¾ã*2ã
ããããå ´åã«ã¯ CASE 文を使って明示的に評価順を記述する方法があるã㨠PostgreSQL ã®ããã¥ã¢ã«ã«æ¸ãã¦ãã£ãã
ã®ã§ããwhere case when [ãã£ã«ã¿æ¡ä»¶] then pg_try_advisory_lock() else false endãã®ããã«è¨è¿°ããã°ããã ãããã¡ãã£ã¨ BK ããããã©ã
select * from mq where case when priority < 10 then pg_try_advisory_lock(tableoid::int, id) else false end limit 1;
ã¾ããåªå
é ä½ã®å¦çã« ORDER BY ãå©ç¨ã§ããããã¨èãã¦ããã®ã ããid:sfujiwara さんのご指摘によりãORDER BY å¥ãã¤ããã¨ä¸¦ã¹æ¿ãã®ããã«å
¨ã¦ã®ã¬ã³ã¼ãã«å¯¾ã㦠WHERE æ¡ä»¶ãè©ä¾¡ãã¦ãã¾ããå
¨ã¬ã³ã¼ãåã®ããã¯ãåå¾ãã¦ãã¾ããã¨ãããã£ãã
ãµãã¯ã¨ãªãªã©ãé§ä½¿ããããvolatile ã§é¢æ°å®£è¨ããããã¦åé¿ã§ããªããè²ã
試ãã¦ã¿ãããã¡ã
ã¨ããããã§ãåªå
é ä½ãªã©ã§ä¸¦ã¹æ¿ãããããããã°ãã«ã¼ã½ã«ã§ï¼ã¬ã³ã¼ããã¤åã£ã¦ pg_try_advisory_lock() ãããããªããããæ®å¿µã
æ´æ°ããã¯ããããããã° for update ãã¤ãããããã©ãadvisory lock ã«ããã¢ããªã¬ã¤ã¤ã¼ã§ã¯æä»æ§ã¯ç¢ºä¿ã§ãã¦ããã®ã§ããã©ã³ã¶ã¯ã·ã§ã³ä¸è¦ã§ãããããããªããã¨*3ã
ãªããadvisory lock 㯠transaction ã¨ç¡é¢ä¿ãrollback ã§ã解æ¾ãããªãã
ããã»ãã·ã§ã³ãåããã°è§£æ¾ãããã
ã¡ãã»ã¼ã¸å¦çå®äº(ack)
delete from mq where id=12345 returning pg_advisory_unlock(tableoid::int, id);
ã¡ãã»ã¼ã¸ãåé¤ãã¤ã¤ãadvisory lock ã解æ¾ãæåããã° true ãè¿ãã
åé¤ããããªããã°å®äºãã©ã°çãªãã®ãç¨æãã¦ãDELETE ã®ãããã« UPDATE ã使ãã°ï¼¯ï¼«ã
ack ããã«ã»ãã·ã§ã³ãåããå ´å㯠advisory lock ãåæã«è§£æ¾ããã¦ãå¥ã® receiver ãåä¿¡å¯è½ã«(âãããå¬ãã)ã
ã¡ãã»ã¼ã¸ã®ãã£ã³ã»ã«(cancel)
ã¡ãã»ã¼ã¸ããã£ã³ã»ã«ããããªãã¦å ´åããããããããªãã
delete from mq where id=12345 and pg_try_advisory_lock(tableoid:int, id) returning pg_advisory_unlock(tableoid:int, id);
ãã£ã³ã»ã«ã«æåããã° trueã失æããã° false ãã¡ãã»ã¼ã¸ããã§ã«å¦çããã¦ãã¾ã£ã¦ãããï¼ä»¶ãè¿ãã
å¶é
é£ç¹ã¯ãadvisory lock ã®ãã¼ã¯ 32bit integerÃï¼(ã¾ã㯠64bit) ãªã®ã§ãå©ç¨ããã¢ããªéã§ããããã調æ´ãã¦ããªãã¨è¡çªã®å¯è½æ§ããããã¨ã
ã¨ã¯ãããadvisory lock ã®ãã¼ã¯ database ãã¨ã«ä¸ãããã(PostgreSQL ã® lock æ©æ§ã¯ããã¼ã« database oid ãå«ã)ãããåããã¼ã¿ãã¼ã¹ãå©ç¨ããã¢ããªéã§ãã調æ´ã§ãã¦ããã°ï¼¯ï¼«ãªã®ã§ããªãã¨ãç¾å®çãªç¯å²ã ããã*4
ã¾ããadvisory lock ã¯ã»ãã·ã§ã³åä½ãªã®ã§ã ack ããã¾ã§ã¯ã»ãã·ã§ã³ãæ¡ã£ã¦ãããªãã¨ãããªãã
ã¡ãã»ã¼ã¸ãå¦çãã¦ããéãã³ãã¯ã·ã§ã³éããããã¼ã«ã«è¿ãããããããã¡ã
ä½è«ï¼
ãQ4M 使ããããã©ããã¡ãã¹ã°ã¬ããããªããã¨ãã人ã«ã¯ã¡ãã£ã¨å¬ããï¼
Q4M ã ã£ããã¡ãã»ã¼ã¸ãå¾
ã£ã¦ããéããããã³ã°ãã¦ããããã ãã©ãâ¦â¦
ä½è«ï¼
æåã "UPDATE ã SET lock=1 WHERE lock=0 RETURNING *;" ã§ãããããããªããã¨æã£ã¦ããããPostgreSQL ã® UPDATE æã« LIMIT ããªãã¦â¦â¦
*1:LISTEN & NOTIFY ã§ã¿ã¤ãã³ã°ã®å¶å¾¡ããããªãå¥éãã£ã¦ããããã
*2: å¤ã®å¤§å°æ¯è¼ã¨ pg_try_advisory_lock() ãªããæ¯è¼ã®æ¹ãã³ã¹ããä½ã(ã¨æ¨æ¸¬ã§ãã)ããå ã«è©ä¾¡ãã¦ãããã ãããã¨ããããæå¾ ã§ãããããããªããã¨ã¯æããã ãã©
*3:advisory lock 㨠row lock 両æ¹ãã㦠pg_locks è¦ãããã両æ¹ã¯ããããªãã¨ããæ°æã¡ã«ãªãã
*4:åå空éãããã°è¯ãã£ããã ãã©ãã¼ã32bit æ´æ°ã®çæ¹ã enum çã«ä½¿ãæããããã©ãã©ã£ã¡ãèå¥ç¨ã«ç¨ãããã¨ããã£ã±ãã¢ããªéã§ã®èª¿æ´ãå¿ é