サナギわさわさ.json

サナギさんとキルミーベイベーとプログラミングが好きです

phpでTwitterStreaming APIを取得してMySQLに突っ込む

今更感のあるテーマですが、ちょっと機会があってphpで実装することになったので残しておきます。

基本的には[PHP] ライブラリに頼らないTwitterAPI入門 - Qiita【php】twitter Streaming API の statuses/filter を試す at softelメモ を参考にしただけです。

実装していて気になった点としては、

  • fsockopenを用いた実装とcurlを用いた実装では取得できるツイートの件数が違ったこと
    curlの方が件数が多かったのでそちらを採用)

  • 一定時間ツイートが取得できない場合にMySQLとの接続が切れるので実行時にPDOException拾って適宜再接続する必要があった事

などが挙げられます。
特にfsockopenとcurlの件数違いに関しては気になるので、誰か分かる人いたら教えてください。

以下、PHP5.4でのサンプルコードです。オブジェクト指向無視した書き方なのであくまで参考までに。

<?php
date_default_timezone_set("Asia/Tokyo");
mb_internal_encoding("UTF-8");

function callback($ch, $data) {
    if(strlen($data)>2){
        $oData = json_decode($data);
        if(isset($oData->text)){
            insertTweetToSQL($oData);
        }
    }
    return strlen($data);
}

function insertTweetToSQL($tweetData) {
    $pdo = getDBConnection();
    $text = "";
    $createAt = "";
    $userId = "";
    $userName = "";
    if(isset($tweetData->text)){
        $text = $tweetData->text;
    }
    if(isset($tweetData->timestamp_ms)){
        $timestamp = $tweetData->timestamp_ms;
        $createAt = date("Y-m-d H:i:s", $timestamp/1000);
    }
    if(isset($tweetData->user) && isset($tweetData->user->screen_name)){
        $userName = $tweetData->user->screen_name;
    }
    if(isset($tweetData->user) && isset($tweetData->user->id_str)){
        $userId = $tweetData->user->id_str;
    }   
    $sql = "INSERT INTO `test` (`tweet`,`create_at`,`name_user`,`id_user`) VALUES(?,?,?,?)";
    $queryList = array();
    $queryList[] = $text;
    $queryList[] = $createAt;
    $queryList[] = $userName;
    $queryList[] = $userId;
    
    try {
        $stmt=$pdo->prepare($sql);
        if ($stmt) {
            $res=$stmt->execute($queryList);
        }
    } catch (PDOException $e){
        try {
            $pdo = reconnectDB();
            $stmt2=$pdo->prepare($sql);
            if ($stmt2) {
                $res2 = $stmt2->execute($queryList);
            }
        } catch (PDOException $e){
        }
    }
}

function getDBConnection() {
    $pdo = $GLOBALS["pdo"];
    if ($pdo === null) {
        $pdo = reconnectDB();
    }
    return $pdo;
}

function reconnectDB() {
    $url = "xxx";
    $db = "xxx";
    $user = "xxx";
    $pass = "xxx";
    $pdo = null;
    try {
        $pdo = new PDO('mysql:host='.$url.';dbname='.$db.';charset=utf8', $user, $pass);
        $pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
    } catch (PDOException $e){
        if( $e->errorInfo[1] == 2006 ){
            sleep(3);
            $pdo = reconnectDB();
        }
    }
    $GLOBALS["pdo"] = $pdo;
    return $pdo;
}

$consumer_key = "xx";
$consumer_secret = "xxx";
$oauth_token = "xxx";
$oauth_token_secret = "xxx";

$oauth_params = array(
    'oauth_consumer_key'     => $consumer_key,
    'oauth_signature_method' => 'HMAC-SHA1',
    'oauth_timestamp'        => time(),
    'oauth_version'          => '1.0a',
    'oauth_nonce'            => md5(mt_rand()),
    'oauth_token'            => $oauth_token,
);

$url = 'https://stream.twitter.com/1.1/statuses/filter.json';
$method = 'POST';
$searchWords = "ヤクルト,キルミーベイベー,サナギさん";

$params = array('track' => $searchWords,'language' => 'ja');

$base = array_merge($oauth_params,$params);
ksort($base);
$base = http_build_query($base, '', '&', PHP_QUERY_RFC3986);

$base = array($method, $url, $base);
$base = array_map('rawurlencode', $base);
$base = implode('&', $base);

$accesskey = array($consumer_secret, $oauth_token_secret);
$accesskey = array_map('rawurlencode', $accesskey);
$accesskey = implode('&', $accesskey);

$key = implode('&', array(rawurlencode($consumer_secret), rawurlencode($oauth_token_secret)));
$oauth_params['oauth_signature'] = base64_encode(hash_hmac('sha1', $base, $key, true));

foreach ($oauth_params as $name => $value) {
    $items[] = sprintf('%s="%s"', urlencode($name), urlencode($value));
}
$header = 'Authorization: OAuth ' . implode(', ', $items);

while(true) {
    $ch = curl_init();
    curl_setopt_array($ch, array(
        CURLOPT_URL            => $url,
        CURLOPT_POST           => true,
        CURLOPT_POSTFIELDS     => http_build_query($params, '', '&'),
        CURLOPT_SSL_VERIFYPEER => false,
        CURLOPT_HTTPHEADER     => array($header),
        CURLOPT_ENCODING       => 'gzip',
        CURLOPT_TIMEOUT        => 0,
        CURLOPT_WRITEFUNCTION  => 'callback'
    ));
    curl_exec($ch);
}
?>

以上です。pdoはデフォルトで実行時のエラーを無視するのでそこで少しはまりました。
また、TwitterStreamから切断されるケースに関しては発生しなかったので対応していませんが、もしかすると一定時間callbackに入ってこない場合は再接続みたいな処理を書く必要があるかもしれません。