MyCSS

ラベル SNS の投稿を表示しています。 すべての投稿を表示
ラベル SNS の投稿を表示しています。 すべての投稿を表示

2012/03/15

Amazon SWFつかってみた(ながいよ) はてなブックマークに追加

それは突然にやってきた
AWS界隈ではDynamoDB祭りが開かれているまっただ中の2012年02月22日、いつものように何の前触れも無く、こっそりとManagement Consoleにタブが追加されました。Twitterで「何このタブは?」という情報が拡散してから公式ブログに発表になるという、最近おなじみのパターンですw
SWF?なにAWSでFlashが出来るの?などと一瞬思いましたが、もちろんそんなわけは無く、Simple Workflow Serviceの略でSWS、もとい、SWFでした。あれっw?

ワークフローと聞いて
自分が一番最初にイメージするのは、経費や稟議の申請処理なんかを思いつきますが、Amazon SWFはすでにNASAでも使われていると言う事で、火星への交通費の精算に使われたり、探査機が撮った膨大な写真を処理するのに使われているようです。 (一応言っておくと、前者は冗談です。)
SWFとはなんぞや?と言う事は、他でもイロイロ言われていると思いますので、詳細は割愛させて頂きます(汗)
※と言いつつ、現状AWSの公式ドキュメントくらいしか無いような気がする。

AWS Flow Framework
今回のJava SDKは気合いが入っています。DynamoDBの時もシャレオツなDataMapperがSDKに入っていて「おぉ!」と思ったけれども、SWFの場合はフレームワークが入っていて若干のけぞりました。その名もAWS Flow Frameworkです。

どれどれ、と思ってサンプルコードをダウンロードして手元のEclipseに取り込んだら、いきなりコンパイルできない。理由が、あるべきクラスが無いと言うもの。「これ、きっと中の人が入れ忘れたんじゃね(苦笑)?」などと思って調べてみたら、なんとAPT(Annotation Processing Tool)でクライアントコードが自動生成される事が判明。正直ビビりました(笑)

理解するまで結構時間かかりました(汗)
SWFはその他のAWSサービスと違って、結構複雑です。いや、API自体はシンプルなのは間違いないです。けれども、そもそもワークフローを扱うのですから、それなりに面倒な事になります。
※でも、そういうマンドクサイものを汎用的なAPIを仕立ててくるAWSはさすがです…。

まぁ、そうとはいえ、Flow Frameworkを使えばかなりスッキリしたコードが書ける事が分かりました。ただし、非同期な動きを良く理解しないと、振る舞いが意味不明な感じになります。java.util.concurrent.Future<V>を使ったプログラムを書いた事があれば、結構すんなり受け入れられるかな?とは思います。

そして、出来る事は沢山あります。自分もまだ全貌を把握しきれていません。とりあえず、最初の一発目を動かすのに結構苦労したので、自分がやってみた事を記事にしておきます。

間違い等あれば、ご指摘ください!!

サンプルワークフロー
実際に試すにあたり、こんなワークフローを考えてみました。
  1. だれかがS3へ画像をアップロードする
  2. S3から画像をダウンロードする
  3. モノクロに変換する
  4. 処理完了をメールでお知らせする
まぁ、AWSが提供しているサンプルに激しく近い感じですが、サンプルに適したよい例が思いつかなかったのでご了承くださいw

やったこと

1.まずはSWFドメインを作成する

ドメインが無い事には始まりませんので、Management Consoleから作成します。
Create a new Domainをクリックします。

以下のように設定します。Nameは何でもよいですが、プログラムから参照しますので分かりやすいものがよいでしょう。また、現状では一度登録したドメインが消せないようなので、注意が必要です。(Deprecate(廃止)する事は可能です)



2.SNSにトピックを作成する

今回はメール通知用にAmazon SNSを使う事にします。今回は新たに専用のトピックを作成してみます。Management ConsoleのSNSのタブに移動して、Create New Topicボタンをクリックします。
Topic Nameを入力。Display Nameは日本語が入らないんだなー。
 トピックが出来たら、メールの通知先を加えましょう。Create New Subscriptionをクリック。
 ProtocolにEmailを選択し、メールアドレスを入力。
Subscribeすると、AWSから確認メールが送られて来るので、Confirm subscriptionのURLをブラウザで開きます。
こんな感じで表示されればOKです。 
なお、TopicARNはプログラムから参照する必要があるので控えておきましょう。

3.Eclipseでプロジェクトを作る

Flow Frameworkを使う際、APTを使う設定をしないとどうにもなりませんので、ここはしっかりやる必要があります。

始めはMavenベースでやろうと思ったけど、aws-java-sdk-flow-build-tools.jarがリポジトリに見当たらなかったので、ここは素直にAWS Toolkit for Eclipseを使う事にしました。

Build PathにAWS SDK for Javaが入っている事を確認しましょう。


重要なのが、Annotation ProcessingがEnableになっている事です。

Eclipseのデフォルトでは.apt_generatedというディレクトリに生成されますが、自分の好きなものを指定してもよいかもしれません。ただ、srcには入れない方がよいでしょう。

4.Activityを定義する

ふぅ。ようやく本題に入る事が出来そうです(笑)

最初はワークフロー実行に必要なActivityを定義します。必要なものは以下です。
  • S3へのアップロード
  • S3からのダウンロード
  • 画像モノクロ変換
  • メール通知
ActivityはInterfaceとして定義して、@Activitiesアノテーションを加えるのが決まりです。こんな感じのソースを書きました。


5.Workflowを定義する

WorkflowもInterfaceとして定義して、@Workflowアノテーションを加えます。


6.Activityを実装する

実装の中身はそれほど重要じゃないと思うので、ここでは割愛しますね。コードはgithubに置いてありますので、ご興味あればどーぞ。


7.Workflowを実装する

Workflowの実装は独特なものがあります。実装コードはこんな感じです。



いきなりS3ActivitiesClientなどというクラスが出てきますが、これらはFlow Frameworkが自動生成したクラスになります。Workflowの実装はこのクライアントを利用するのがキモです。

このクライアントは、アクティビティで定義したメソッドにPromiseでラップした引数を加えたバージョンが多数追加されています。このPromiseですが、詳細な説明はAWSのドキュメントに譲るとして(今回はそんなのばっかりだなぁw)、ざっくり言うとメソッドの実行が非同期で行われ、結果を待たずして次のコードに移るというものです。コードは同期的に見えますが、実際は非同期な振る舞いをします。


8.ActivityWorker、WorkflowWorkerを実装する

さて、ActivityとWorkflowの準備ができましたので、これらを実際にホストするプログラムを書く必要があります。Flow Frameworkでは、ActivityWorker、WorkflowWorkerというものが用意されていますので、これらを素直に使いましょう。


それぞれ引数にドメイン名とタスクリスト名を渡しますが、ドメイン名は先ほどManagement Consoleで作ったドメイン名にします。このコードでは単にworker.start()してるだけですが、きちんと実装する際は不正な形で終了しないようにshutdownHookを引っ掛けてください。

現状のタイミングでActivityを実行するのかどうかを判断するのはWorkflowWorkerです。AWSの言う所のDeciderでしょうか?絵にするとこういうイメージかなぁ?
つ、伝わるかなぁ…っていうか、そもそもあってるかなぁ(汗)

9.ワークフローを実行するコードを書く

ようやくここまで来ました。な、長かった。こんな感じになります。


ここでもFlow Frameworkが自動生成してくれるWorkFlowExternalを使います。

10.実行してみる!

これで準備が整ったので、いざ実行してみましょう。
これには順番が大事です。初回は「8.ActivityWorker、WorkflowWorkerを実装する」で作ったActivityWorkerとWorkflowWorkerを実行します。実は、このWorkerは、ActivityとWorkflowの登録をSWFに対して自動でやってくれるんです。いちいちManagement Consoleで手動で登録する必要がありません。便利ですよ。

Workerを実行後にManagement Consoleで確認すると、ちゃんと登録されているのが分かります。

これでようやくワークフローを実行する事が出来ます。上記のWorkerを実行した状態で、「9.ワークフローを実行するコードを書く」で実装したStarter.javaを実行します。以下のようなログが起動済みの「ActivityHost」から吐き出されたら成功です!

Management Consoleで確認してみましょう。Workflow Executionsをクリックして、Execution StatusをClosedで指定します。

ちゃんとありました!
Workflow Execution IDをクリックすると、実行の履歴を見る事が出来ます。分かりやすい所で、Activityを見てみるとこんな感じです。ちゃんとワークフローが実行されているのが確認できます。


Management Consoleからは、Executionに対して「Signal」「Try-Cancel」「Terminate」「Re-Run」の操作をする事が可能です。分かりやすい所で「Re-Run」してみましょう。

すると、Execution Listに同じWorkflow Execution IDでRun ID違いのExecutionが追加されている事が分かります。ステータスはActiveです。


手元で起動しているWorkerが反応します。ActivityHostからは先ほど同じようなログが出ているのが確認できます。Management Consoleから手元のプログラムが動く感覚は、理屈は分かるとはいえ、実にミョ〜な感じですww


ワークフローが完了すると、ちゃんとCompletedになっている事が確認できます。


まとめ
というわけで、基本的な部分だけつまみ食いした感じですが、おおよその感じがつかんで頂ければ幸いです。

そして見て分かる通り、実際のアクティビティを実行するワーカーはクラウドだろうがオンプレだろうがモバイルだろうがどこにあっても良い、というのがキモだと思います。また、きちんと作れば容易にスケールする事も分かります。

AWSで非同期処理ならばSQSというのが定番ですが、二重処理を防ぐためにメッセージのステート管理を自前でやる必要があったり、SQSにポーリングするプログラムを別途仕込む必要があったりしましたが、こういった事はSWFを使うと一挙に解決されます。

DynamoDBは、SimpleDBで出来なかった事が増えてバンザイな感じでしたけども、SWFもSQSでちょっと面倒だった事が簡単にできるようになった気がします。


続くエントリはこちら:Amazon SWF Flow Frameworkのエラー処理について

2011/07/20

AWS SDK for Ruby はてなブックマークに追加

「七夕記念企画/雲(AWS)に願いを!」という事で、お願いしたらあっという間に願いがかなったわけでして。


【AWS発表】 AWS SDK for Rubyを提供開始 - Amazon Web Services ブログ

大変喜ばしい!というわけで、ちょっとだけ試してみたメモです。

AWS::SNSを試してみる
一年以上前に、Amazon SNSのRubyライブラリをRightAwsを元に作ったんですが、もう不要になりました。万歳!

SDKの導入方法はこの辺を見ていただくとして、例えばTopicを全て総ざらいして、その中のSubscriptionを取得するにはこんな感じになります。
# -*- coding: utf-8 -*-
require 'rubygems'
require 'aws-sdk'

AWS.config(:access_key_id => 'アクセスキー', 
  :secret_access_key => 'シークレットキー')
  
sns = AWS::SNS.new
sns.topics.each do |topic|
  puts topic.name
  topic.subscriptions.each do |sub|
    puts sub.arn
  end
end
実にいい感じですね。

Messageをpublishするには、Topicオブジェクトにpublishしてあげるだけです。
topic.publish('デフォルトメッセージ',
  :subject => 'Test',
  :email => 'ほげほげ')

送信先がメールの場合、:subject に日本語をそのまま与えるとエラーになりますんで、例のごとくJISエンコードしてあげます。
subject = '=?ISO-2022-JP?B?' + Kconv.tojis('日本語サブジェクト')
  .split(//,1).pack('m').chomp + '?='
topic.publish('デフォルトメッセージ',
  :subject => subject,
  :email => 'ほげほげ')

~を含むとpublishに失敗するぞ!
まだちゃんと検証していないのですが、問題を発見しましたので報告しておきますw
先のpublishですが、メッセージの本文に~(0x007E)があると、AWS::SNS::Errors::SignatureDoesNotMatchになってしまいます。
※ちなみに \(~0~)/ を試して気がつきましたw
topic.publish('デフォルトメッセージ',
  :subject => 'Test',
  :email => '~') 

以下、エラーメッセージ
E, [2011-07-20T20:45:38.321580 #3504] ERROR -- : [AWS SNS 403 0.870669] publish {:message_structure=>"json", :topic_arn=>"arn:aws:sns:us-east-1:XXXXXXXXXXXX:XXXXXXXX", :message=>"{\"default\":\"デフォルトメッセージ\",\"email\":\"~\"}", :subject=>"Test"} AWS::SNS::Errors::SignatureDoesNotMatch: The request signature we calculated does not match the signature you provided. Check your AWS Secret Access Key and signing method. Consult the service documentation for details.

これってAWS SDKの問題って言うよりはRubyの問題なのかな?それともJSONエンコード?ちなみに試した環境は以下です。
$ ruby --version
ruby 1.8.7 (2009-06-12 patchlevel 174) [universal-darwin10.0]

2010/06/01

Amazon SNS自腹実験人柱サイト、AWSWatch をリリースしました。 はてなブックマークに追加

Amazon Web Serviceを監視して、異常があったらAmazon Simple Notification Serviceを通してメールで通知するサイト、AWSWatchを作ってみました。 ロゴがいかにもやっつけでダッサイですけど。

作った動機
あまり大きな声で言いたくないのですが(笑)、2010年5月、立て続けにUS-EASTで障害が発生しました。


一発目はゴールデンウィーク中の5月4日で、こちらのブログでも言及されている通り、一部のEC2インスタンスが結構長い間ストップしてしまいました。個人的にも仕事で使っているインスタンスのうち1台が、外部からの強制リブートがかかった形跡を確認しています。

通常サービスを立てる際は、サーバーの監視というのをやると思いますけど、仮にトラブルが発生した際、原因がアプリケーションなのか、OSなのか、それとももっと低レイヤーのハードウェアなのか?という切り分けをするかと思います。そう言った事をクラウドでやろうとすると、ハード的な部分はそもそも仮想化されているので、直接見る事が出来ません。見えたところで、どうしようもないのですけど、基本的にはクラウドベンダーが公開する障害情報を頼りにするしかありません。

AWSであれば、Service Health Dashboardで随時確認する事が出来ます。ですがこのサイト、ぱっと見キレイにまとまって見やすい感じがしますが、いざ問題があった時、時系列を追うのが結構厳しいです。基本的にはRSSフィードを見てね、という作りでちょっと不親切です。また、時間表記が太平洋時間だったりしますので、日本人的には結構つらいものがあります。だいたい、障害があった時は、基本的に「心の余裕」がありませんので、この作りはあまりいただけません・・。

というわけで、
  • 障害があったらメールを受け取りたい
  • 自分の好きなタイムゾーンで時刻を表示したい
という事がやりたくて、AWSWatchを作りました。

Heroku
開発は帰宅してから空いた時間でボチボチと作っておりました。Amazon SNSのRubyクライアントを作る所から始めたので、なんだかんだで二週間くらいかかりました。完全に個人で運用するサイトですので、最初はお金のかからないGoogle App Engineを使おうかと思いましたが、一番手になじんでいるのがRailsですし、以前から興味があったHerokuを使ってみようと思い、あまり深く考えずにチョイスしました。

このHerokuですが、RubyでWebアプリが書ける人であれば、即利用可能です。herokuのGemをインストールして、gitが使えれば、こんな感じでサービスを公開出来ます。
gem install heroku
rails awswatch
cd awswatch
git init
git add .
git commit -m "First Release!"
heroku create
git push heroku master
gitでpushした時点でdeployが完了した事になります。

マイグレーションは
heroku rake db:migrate
consoleは
heroku console
とやれば、
script/console production
をやっている事になります。スバラシイ!!

Herokuに関しては、この辺を参考にさせていただきました。

個人的にここが素晴らしい!と思った点は、
  • 基本的に無料
  • Gemを入れるだけという気軽さ
  • Deployが簡単
  • SSLが無料で使える
  • 独自ドメイン使用可能
  • Add-onが充実している

特にAdd-onに関しては、
  • New Relicというアプリケーションモニターが利用可能
  • Exceptionalという例外を通知してくれるサービスが利用可能
  • Amazon RDSも利用可能!(RDSの利用料はかかります)
あたりが個人的にポイント高いです。

ただし、無料の範疇だとスレッドは1つですので、トラフィックが多いサイトになると厳しいです。その辺りが気になる方は、今のところGoogle App Engineが最強か?と思います。(Rubyだと厳しそうですが・・)

Amazon Simple Notification Service(Amazon SNS)
2010年4月5日に、Amazon Simple Notification Service(Amazon SNS)というサービスがリリースされました。(参考:アマゾン、クラウドサービスにメッセージ送信機能「Amazon SNS」を追加 - CNET Japan

Amazon SNS・・・実に誤解をはらみそうなネーミング(笑)ですが、SQS同様メッセージングサービスという事で、かゆいところに手が届く的なサービスになっています。

SNSとSQSの違いは、プッシュとプルの違いでしょうか。SNSはメッセージをEMail、HTTPなどに配信します(プッシュ)。SQSは、メッセージをQueueに取りに行く必要があります(プル)。SNSで真っ先に思いつくところとしては、メールの一斉配信です。これを自前でやろうとしたら大変な事は容易に想像出来ますが、その大変な部分をAmazonが肩代わりしてくれるという、非常にAmazonらしいサービスになっています。しかも、例によって激安です。メールは10万通送信しても、たった2ドルです。

この部分だけを見ると、スパマーにもってこいのサービスじゃね?と一瞬思ってしまいますが、そこはちゃんとしています。ユーザーがメールの配信を希望する(Subscribe)と、SNSは、ユーザーに確認メールを自動送信します。そのメールの中に書いてある確認用のリンクをクリック(Confirm)すると、はじめてメールが送信されるようになります。



まだ出たばかりのサービスですので、API的にもちょっとこれは・・という点がなきにしもあらずですが、それは今後の充実に期待!という事で。SNSに関しては、また追って記事を書きたいと思います。

突っ込み所が満載な件
という訳で、サービスはHerokuに置いています。識者の方はここで「?」となるはずです。
公開前日に発覚した、驚愕の事実がございます(笑)


Herokuが完璧なまでにEC2に乗っかっておりました!知りませんでした !(^^)! 事前に調べておけよ、って感じですが、HerokuにAmazon RDSのAdd-onがある時点で「?」と思うべきでしたね。監視のサービスが落っこちたら監視になりませんので、別のインフラに載せるのがエンジニアの嗜みですけど、最初から躓いてますね・・まぁ、そのうち気が向いたらGoogle App Engineにでも移行して、反省の意を表明したいと思います(笑)。でも、そもそもメール通知にSNSを使っている時点で企画倒れな様な気もしますね・・。

2010/05/19

Amazon SNSのRubyライブラリを(適当に)つくりました はてなブックマークに追加

Amazon Web ServiceのSimple Notification Serviceですが、Rubyのライブラリが見つからなかったので、適当に作りました。元ネタはRightScale提供のRightAwsで、これをSNS対応させた形です。

ソースを張っておきますので、ご自由にお使いください。動作無保証です。未実装のメソッドもあります。テストも不完全です。無い無い尽くしでアレですが、ご自身の責任範囲でお使いください。

本来であればGitHubからForkさせるべきなんでしょうけど、使い方を把握していないので・・すみません・・

※注:すでに同じ様な事やってらっしゃる方がいました。http://github.com/bemurphy/right_aws/blob/add_sns/lib/sns/right_sns_interface.rb

require "right_aws"

module RightAws

  class SnsInterface < RightAwsBase
    include RightAwsBaseInterface

    DEFAULT_HOST      = 'sns.us-east-1.amazonaws.com'
    DEFAULT_PORT      = 443
    DEFAULT_PROTOCOL  = 'https'
    DEFAULT_PATH      = '/'
    API_VERSION       = '2010-03-31'

    @@bench = AwsBenchmarkingBlock.new
    def self.bench_xml; @@bench.xml;     end
    def self.bench_sns; @@bench.service; end

    # Creates new RightSns instance.
    # 
    # Example:
    #
    #  sns = RightAws::SnsInterface.new(aws_access_key_id, aws_secret_key)
    #
    def initialize(aws_access_key_id=nil, aws_secret_access_key=nil, params={})
      params.delete(:nil_representation)
      init({
          :name                => 'SNS',
          :default_host        => ENV['SNS_URL'] ? URI.parse(ENV['SNS_URL']).host    : DEFAULT_HOST,
          :default_port        => ENV['SNS_URL'] ? URI.parse(ENV['SNS_URL']).port    : DEFAULT_PORT,
          :default_service     => DEFAULT_PATH,
          :default_protocol    => ENV['SNS_URL'] ? URI.parse(ENV['SNS_URL']).scheme  : DEFAULT_PROTOCOL,
          :default_api_version => API_VERSION
        },
        aws_access_key_id     || ENV['AWS_ACCESS_KEY_ID'],
        aws_secret_access_key || ENV['AWS_SECRET_ACCESS_KEY'],
        params)
    end


    #--------------------
    #      APIs
    #--------------------

    def add_permission()
      raise 'Sorry! Not Implemented!'
    end
    
    def confirm_subscription()
      raise 'Sorry! Not Implemented!'
    end
    
    def set_topic_attributes(topic_arn, attribute_name, attribute_value)
      request_params = {'TopicArn' => topic_arn, 'AttributeName' => attribute_name, 'AttributeValue' => attribute_value}
      link   = generate_request("SetTopicAttributes", request_params)
      result = request_info(link, QSnsSetTopicAttributesParser.new(:logger => @logger))
      return result
    rescue Exception
      on_exception
    end
    
    def get_topic_attributes(topic_arn)
      request_params = {'TopicArn' => topic_arn}
      link   = generate_request("GetTopicAttributes", request_params)
      result = request_info(link, QSnsGetTopicAttributesParser.new(:logger => @logger))
      return result
    rescue Exception
      on_exception
    end

    def create_topic(topic_name)
      request_params = {'Name' => topic_name}
      link   = generate_request("CreateTopic", request_params)
      result = request_info(link, QSnsCreateTopicParser.new(:logger => @logger))
      return result
    rescue Exception
      on_exception
    end

    def delete_topic(topic_arn)
      request_params = {'TopicArn' => topic_arn}
      link   = generate_request("DeleteTopic", request_params)
      result = request_info(link, QSnsDeleteTopicParser.new(:logger => @logger))
      return result
    rescue Exception
      on_exception
    end

    def list_topics(next_token = nil )
      request_params = {'NextToken' => next_token}
      link   = generate_request("ListTopics", request_params)
      result = request_info(link, QSnsListTopicsParser.new(:logger => @logger))
      # return result if no block given
      return result unless block_given?
      # loop if block if given
      begin
        # the block must return true if it wanna continue
        break unless yield(result) && result[:next_token]
        # make new request
        request_params['NextToken'] = result[:next_token]
        link   = generate_request("ListTopics", request_params)
        result = request_info(link, QSnsListTopicsParser.new(:logger => @logger))
      end while true
    rescue Exception
      on_exception
    end

    def subscribe(topic_arn, protocol, endpoint)
      request_params = {'TopicArn' => topic_arn, 'Protocol' => protocol, 'Endpoint' => endpoint}
      link   = generate_request("Subscribe", request_params)
      result = request_info(link, QSnsSubscribeParser.new(:logger => @logger))
      return result
    rescue Exception
      on_exception
    end

    def unsubscribe(subscription_arn)
      request_params = {'SubscriptionArn' => subscription_arn}
      link   = generate_request("Unsubscribe", request_params)
      result = request_info(link, QSnsUnubscribeParser.new(:logger => @logger))
      return result
    rescue Exception
      on_exception
    end

    # return {:subscritions => []}
    def list_subscriptions(next_token = nil )
      request_params = {'NextToken' => next_token }
      link   = generate_request("ListSubscriptions", request_params)
      result = request_info(link, QSnsListSubscriptionsParser.new(:logger => @logger))
      # return result if no block given
      return result unless block_given?
      # loop if block if given
      begin
        # the block must return true if it wanna continue
        break unless yield(result) && result[:next_token]
        # make new request
        request_params['NextToken'] = result[:next_token]
        link   = generate_request("ListSubscriptions", request_params)
        result = request_info(link, QSnsListSubscriptionsParser.new(:logger => @logger))
      end while true
    rescue Exception
      on_exception
    end

    # return {:subscritions => []}
    def list_subscriptions_by_topic(topic_arn, next_token = nil )
      request_params = {'TopicArn' => topic_arn, 'NextToken' => next_token}
      link   = generate_request("ListSubscriptionsByTopic", request_params)
      result = request_info(link, QSnsListSubscriptionsParser.new(:logger => @logger))
      # return result if no block given
      return result unless block_given?
      # loop if block if given
      begin
        # the block must return true if it wanna continue
        break unless yield(result) && result[:next_token]
        # make new request
        request_params['NextToken'] = result[:next_token]
        link   = generate_request("ListSubscriptionsByTopic", request_params)
        result = request_info(link, QSnsListSubscriptionsParser.new(:logger => @logger))
      end while true
    rescue Exception
      on_exception
    end

    def publish(topic_arn, subject, message)
      request_params = {'TopicArn' => topic_arn, 'Subject' => subject, 'Message' => message}
      link   = generate_request("Publish", request_params)
      result = request_info(link, QSnsPublishParser.new(:logger => @logger))
      return result
    rescue Exception
      on_exception
    end


    #--------------------
    #      Requests
    #--------------------

    def generate_request(action, params={}) #:nodoc:
      generate_request_impl(:get, action, params )
    end

    # Sends request to Amazon and parses the response
    # Raises AwsError if any banana happened
    def request_info(request, parser)  #:nodoc:
      request_info_impl(:sdb_connection, @@bench, request, parser)
    end

    
    #--------------------
    #      PARSERS:
    #--------------------

    class QSnsCreateTopicParser < RightAWSParser #:nodoc:
      def reset
        @result = {}
      end
      def tagend(name)
        case name
        when 'TopicArn'  then @result[:topic_arn]  =  @text
        when 'RequestId' then @result[:request_id] =  @text
        end
      end
    end

    class QSnsDeleteTopicParser < RightAWSParser #:nodoc:
      def reset
        @result = {}
      end
      def tagend(name)
        case name
        when 'RequestId' then @result[:request_id] =  @text
        end
      end
    end

    class QSnsListTopicsParser < RightAWSParser #:nodoc:
      def reset
        @result = { :topics => [] }
      end
      def tagend(name)
        case name
        when 'NextToken'  then @result[:next_token] =  @text
        when 'TopicArn'   then @result[:topics]     << @text
        end
      end
    end

    class QSnsSubscribeParser < RightAWSParser #:nodoc:
      def reset
        @result = {}
      end
      def tagend(name)
        case name
        when 'SubscriptionArn' then @result[:subscription_arn] =  @text
        when 'RequestId' then @result[:request_id] =  @text
        end
      end
    end

    # same with QSnsDeleteParser...
    class QSnsUnubscribeParser < RightAWSParser #:nodoc:
      def reset
        @result = {}
      end
      def tagend(name)
        case name
        when 'RequestId' then @result[:request_id] =  @text
        end
      end
    end

    class QSnsListSubscriptionsParser < RightAWSParser #:nodoc:
      def reset
        new_member()
        @result = {:subscriptions => [] }
      end
      def tagend(name)
        case name
        when 'NextToken'  then @result[:next_token] =  @text
        when 'TopicArn'   then @member[:topic_arn]  =  @text
        when 'Protocol'   then @member[:protocol]   =  @text
        when 'SubscriptionArn'   then @member[:subscription_arn] =  @text
        when 'Owner'   then @member[:owner]         = @text
        when 'Endpoint'   then @member[:endpoint]   =  @text; @result[:subscriptions] << @member; new_member();
        end
      end
      private
      def new_member
        @member = {:topic_arn => '', :protocol => '', :subscription_arn => '', :owner => '', :endpoint => ''}
      end
    end

    class QSnsPublishParser < RightAWSParser #:nodoc:
      def reset
        @result = {}
      end
      def tagend(name)
        case name
        when 'MessageId' then @result[:message_id] =  @text
        when 'RequestId' then @result[:request_id] =  @text
        end
      end
    end

    # same with QSnsDeleteTopicParser...
    class QSnsSetTopicAttributesParser < RightAWSParser #:nodoc:
      def reset
        @result = {}
      end
      def tagend(name)
        case name
        when 'RequestId' then @result[:request_id] =  @text
        end
      end
    end

    class QSnsGetTopicAttributesParser < RightAWSParser #:nodoc:
      def reset
        @result = {:attributes => []}
      end
      def tagend(name)
        case name
        when 'key' then @key = @text
        when 'value' then @result[:attributes] << {:key => @key, :value => @text}
        when 'RequestId' then @result[:request_id] =  @text
        end
      end
    end
  end

end