CLOVER🍀

That was when it all began.

RustでMySQLにアクセスしてみる

これは、なにをしたくて書いたもの?

RustからMySQLにアクセスしてみましょう、ということで。

mysqlクレート

RustでMySQLにアクセスするには、mysqlクレートを使うみたいです。

mysql - Rust

こちらを使ってみます。

トップページ書かれているサンプルなどを参考に、試していってみるとしましょう。

Crate mysql / Example

環境

今回の環境はこちら。

$ rustup --version
rustup 1.27.1 (54dd3d00f 2024-04-24)
info: This is the version for the rustup toolchain manager, not the rustc compiler.
info: The currently active `rustc` version is `rustc 1.83.0 (90b35a623 2024-11-26)`

MySQLは172.17.0.2でアクセスできるものとします。

 MySQL  localhost:3306 ssl  practice  SQL > select version();
+-----------+
| version() |
+-----------+
| 8.4.3     |
+-----------+
1 row in set (0.0006 sec)

準備

Cargoパッケージの作成。

$ cargo new --vcs none --lib mysql-getting-started
$ cd mysql-getting-started

テストコードのみの実装にするつもりなので、ライブラリークレートにしました。

mysqlクレートを使う

では、mysqlクレートを使っていきます。

依存関係の追加。

$ cargo add mysql

Cargo.toml

[package]
name = "mysql-getting-started"
version = "0.1.0"
edition = "2021"

[dependencies]
mysql = "25.0.1"

ところで、ソースコードをこの状態にして

src/lib.rs

#[cfg(test)]
mod tests {

}

ビルドするとうまくいきません。

$ cargo build

どうやら、OpenSSLの開発パッケージが必要なようです。

  Could not find openssl via pkg-config:

  pkg-config exited with status code 1
  > PKG_CONFIG_ALLOW_SYSTEM_CFLAGS=1 pkg-config --libs --cflags openssl

  The system library `openssl` required by crate `openssl-sys` was not found.
  The file `openssl.pc` needs to be installed and the PKG_CONFIG_PATH environment variable must contain its parent directory.
  The PKG_CONFIG_PATH environment variable is not set.

  HINT: if you have installed the library, try setting PKG_CONFIG_PATH to the directory containing `openssl.pc`.


  cargo:warning=Could not find directory of OpenSSL installation, and this `-sys` crate cannot proceed without this knowledge. If OpenSSL is installed and this crate had trouble finding it,  you can set the `OPENSSL_DIR` environment variable for the compilation process. See stderr section below for further information.

  --- stderr


  Could not find directory of OpenSSL installation, and this `-sys` crate cannot
  proceed without this knowledge. If OpenSSL is installed and this crate had
  trouble finding it,  you can set the `OPENSSL_DIR` environment variable for the
  compilation process.

  Make sure you also have the development packages of openssl installed.
  For example, `libssl-dev` on Ubuntu or `openssl-devel` on Fedora.

  If you're in a situation where you think the directory *should* be found
  automatically, please open a bug at https://github.com/sfackler/rust-openssl
  and include information about your system as well as this message.

  $HOST = x86_64-unknown-linux-gnu
  $TARGET = x86_64-unknown-linux-gnu
  openssl-sys = 0.9.104


warning: build failed, waiting for other jobs to finish...

メッセージに従ってlibssl-devをインストールしてもいいのですが、ここはSSL/TLSのバックエンドをデフォルトのnative-tlsからrustlsに
切り替えてみましょう。

$ cargo add mysql --no-default-features --features default-rustls

Crate mysql / SSL Support

rustlsはRustで実装されたTLSライブラリーです。

rustls - Rust

Cargo.toml

[package]
name = "mysql-getting-started"
version = "0.1.0"
edition = "2021"

[dependencies]
mysql = { version = "25.0.1", default-features = false, features = ["default-rustls"] }

これでOpenSSLに依存しなくなりました。

テストコードの雛形は以下のようにしました。

src/lib.rs

#[cfg(test)]
mod tests {
    use mysql::prelude::Queryable;
    use mysql::{params, Conn, Opts, OptsBuilder, Pool, PoolConstraints, PoolOpts, TxOpts};

    // ここにテストを書く!!
}

以降ではテスト関数を書きつつ、mysqlクレートを使っていこうと思います。

MySQLへ接続する

まずはMySQLへ接続してみます。

直接接続する方法と、コネクションプールを使う方法があります。

まずは直接接続する方法から。Connを使います。

URL指定で接続するパターン。

    #[test]
    fn connect_mysql() {
        let url = "mysql://kazuhira:[email protected]:3306/practice";
        let opts = Opts::from_url(url).unwrap();

        let conn = Conn::new(opts).unwrap();

        let version = conn.server_version();
        let version_string = format!("{}.{}.{}", version.0, version.1, version.2);
        assert_eq!(version_string, "8.4.3");
    }

Crate mysql / URL-based connection string

QueryStringでパラメーターを設定することもでき、その多くはOptsを見ればよいみたいです。

Opts in mysql - Rust

コネクションプールの設定内容も含まれていますね。これは接続数の最小値、最大値くらいが設定内容でしょうか。

PoolConstraints in mysql - Rust

OptsBuilderを使って、接続パラメーターを構築するパターン。

    #[test]
    fn connect_mysql2() {
        let opts = OptsBuilder::new()
            .ip_or_hostname(Some("172.17.0.2"))
            .tcp_port(3306)
            .user(Some("kazuhira"))
            .pass(Some("password"))
            .db_name(Some("practice"));

        let conn = Conn::new(opts).unwrap();

        let version = conn.server_version();
        let version_string = format!("{}.{}.{}", version.0, version.1, version.2);
        assert_eq!(version_string, "8.4.3");
    }

Crate mysql / OptsBuilder

コネクションプールを使う場合もURL指定、OptsBuilderそれぞれが利用できます。Poolを使います。

    #[test]
    fn connect_mysql_using_pool() {
        let url = "mysql://kazuhira:[email protected]:3306/practice";

        let pool = Pool::new(url).unwrap();

        let conn = pool.get_conn().unwrap();

        let version = conn.server_version();
        let version_string = format!("{}.{}.{}", version.0, version.1, version.2);
        assert_eq!(version_string, "8.4.3");
    }

    #[test]
    fn connect_mysql_using_pool2() {
        let opts = OptsBuilder::new()
            .ip_or_hostname(Some("172.17.0.2"))
            .tcp_port(3306)
            .user(Some("kazuhira"))
            .pass(Some("password"))
            .db_name(Some("practice"))
            .pool_opts(PoolOpts::new().with_constraints(PoolConstraints::new(10, 10).unwrap()));

        let pool = Pool::new(opts).unwrap();

        let conn = pool.get_conn().unwrap();

        let version = conn.server_version();
        let version_string = format!("{}.{}.{}", version.0, version.1, version.2);
        assert_eq!(version_string, "8.4.3");
    }

以降は、今回はコネクションプールを使う必要がないので

SQLを実行してみる

次はSQLを実行してみます。

接続を表すConnにSQLを実行するメソッドがいくつかあります。これはQueryableというトレイトを実装して実現しているようです。

Struct mysql::Conn / Trait Implementations / impl Queryable for Conn

たとえばquery_first。

    #[test]
    fn simple_query() {
        let url = "mysql://kazuhira:[email protected]:3306/practice";
        let opts = Opts::from_url(url).unwrap();

        let mut conn = Conn::new(opts).unwrap();

        let message = conn
            .query_first::<String, _>("select 'hello'")
            .unwrap()
            .unwrap();
        assert_eq!(message, "hello");
    }

どうも見ていると、末尾がdropのメソッドは結果を受け取らないものになるようです。

ただ、実際にはプリペアードステートメントを使うことになると思うので、Connから直接SQLを実行することはそうないでしょうね。

ちなみに、RustとMySQLとのデータ型のマッピングはこちらに記載があります。

Crate mysql_common / Supported rust types

追加の下準備

ここから先は、SQLの実行やトランザクションを扱います。

テーブルがあった方がよいので、例の内容を使うことにしましょう。

Crate mysql / Example

またテストを実行する際に、Connの作成およびテーブルの再作成を行う関数を作成しました。

    fn prepare_test<F>(consumer: F)
    where
        F: Fn(&mut Conn),
    {
        let url = "mysql://kazuhira:[email protected]:3306/practice";
        let opts = Opts::from_url(url).unwrap();

        let mut conn = Conn::new(opts).unwrap();

        let drop_stmt = conn.prep("drop table if exists payment").unwrap();
        conn.exec_drop(&drop_stmt, ()).unwrap();
        conn.close(drop_stmt).unwrap();

        let create_stmt = conn
            .prep(
                "create table payment(\n\
                      customer_id int,\n\
                      amount int not null,\n\
                      account_name text,\n\
                      primary key(customer_id)
                  )",
            )
            .unwrap();

        conn.exec_drop(&create_stmt, ()).unwrap();
        conn.close(create_stmt).unwrap();

        consumer(&mut conn);
    }

各テスト内ではこの関数を呼び出し、Connを受け取る関数でテストを実装することにします。

同じ名前のテーブルをdrop & createするので、テストの同時実行はできません。cargo testはデフォルトでテストが並列実行されるようなので、
以下のようにしてスレッド数を1にしておく必要があります。

$ cargo test -- --test-threads=1

insert&select

用意した関数を使って、insert文とselect文を実行してみます。

こんな感じになりました。

    #[test]
    fn insert_select() {
        prepare_test(|conn| {
            // insert
            let insert_stmt =
                conn.prep("insert into payment(customer_id, amount, account_name) values(:customer_id, :amount, :account_name)").unwrap();

            conn.exec_drop(
                &insert_stmt,
                params! {
                    "customer_id" => 1,
                    "amount" => 2,
                    "account_name" => None::<String>
                },
            )
            .unwrap();

            conn.exec_drop(
                &insert_stmt,
                params! {
                    "customer_id" => 3,
                    "amount" => 4,
                    "account_name" => Some::<String>("foo".into())
                },
            )
            .unwrap();

            conn.close(insert_stmt).unwrap();

            // select
            let select_stmt_simply = conn
                .prep("select customer_id, amount, account_name from payment where customer_id = ?")
                .unwrap();

            let result1 = conn
                .exec_first::<(i32, i32, Option<String>), _, _>(&select_stmt_simply, (1,))
                .unwrap()
                .map(|(customer_id, amount, account_name)| (customer_id, amount, account_name))
                .unwrap();
            conn.close(select_stmt_simply).unwrap();

            assert_eq!(result1, (1, 2, None::<String>));

            let select_stmt_named = conn
                .prep("select customer_id, amount, account_name from payment where customer_id = :customer_id")
                .unwrap();

            let result2 = conn
                .exec_first::<(i32, i32, Option<String>), _, _>(
                    &select_stmt_named,
                    params! { "customer_id" => 3},
                )
                .unwrap()
                .map(|(customer_id, amount, account_name)| (customer_id, amount, account_name))
                .unwrap();

            assert_eq!(result2, (3, 4, Some("foo".into())));

            let result3 = conn
                .exec_first::<(i32, i32, Option<String>), _, _>(
                    &select_stmt_named,
                    params! { "customer_id" => 99},
                )
                .unwrap();
            conn.close(select_stmt_named).unwrap();

            assert_eq!(result3, None);
        });
    }

プリペアードステートメントはConn.prepで作成します。プレースホルダーは名前付きパラメーターと?を使うことができます。

            // insert
            let insert_stmt =
                conn.prep("insert into payment(customer_id, amount, account_name) values(:customer_id, :amount, :account_name)").unwrap();


            // select
            let select_stmt_simply = conn
                .prep("select customer_id, amount, account_name from payment where customer_id = ?")
                .unwrap();

両方の指定方法を混在させることはできません。

Crate mysql / Named parameters

名前付きパラメーターはparams!マクロ、?の場合はタプルでパラメーターを指定します。

            conn.exec_drop(
                &insert_stmt,
                params! {
                    "customer_id" => 1,
                    "amount" => 2,
                    "account_name" => None::<String>
                },
            )
            .unwrap();


            let result1 = conn
                .exec_first::<(i32, i32, Option<String>), _, _>(&select_stmt_simply, (1,))
                .unwrap()
                .map(|(customer_id, amount, account_name)| (customer_id, amount, account_name))
                .unwrap();
            conn.close(select_stmt_simply).unwrap();

クエリーを実行する時は、Connを使っていた時とは異なりexec_〜メソッドを使うことが多くなります。

またStatementをクローズしているのですが、これはステートメントキャッシュが無効の場合にするべきだそうです。

            conn.close(insert_stmt).unwrap();

disabled statement cache means, that you have to close statements yourself using Conn::close, or they’ll exhaust server limits/resources;

Crate mysql / Statement cache

ステートメントキャッシュが有効な条件は以下のようなので、今回はやらなくてもいいはずなのですが。

  • Connを直接使っている時
  • PooledConn(コネクションプール)を使っている場合は以下のいずれかの時
    • PoolOpts::reset_connectionがtrue
    • PoolOpts::reset_connectionがfalseで、Connでラップされている時

また構造体を使ってバッチ更新、複数件取得のパターンも書いてみました。

    #[derive(Debug, PartialEq, Eq)]
    struct Payment {
        customer_id: i32,
        amount: i32,
        account_name: Option<String>,
    }

    #[test]
    fn insert_select2() {
        prepare_test(|conn| {
            let insert_data = vec![
                Payment {
                    customer_id: 1,
                    amount: 2,
                    account_name: None,
                },
                Payment {
                    customer_id: 3,
                    amount: 4,
                    account_name: Some("foo".into()),
                },
                Payment {
                    customer_id: 5,
                    amount: 6,
                    account_name: None,
                },
                Payment {
                    customer_id: 7,
                    amount: 8,
                    account_name: None,
                },
                Payment {
                    customer_id: 9,
                    amount: 10,
                    account_name: Some("bar".into()),
                },
            ];

            // insert
            let insert_stmt =
                conn.prep("insert into payment(customer_id, amount, account_name) values(:customer_id, :amount, :account_name)").unwrap();

            conn.exec_batch(
                &insert_stmt,
                insert_data.iter().map(|p| {
                    params! {
                        "customer_id" => p.customer_id,
                        "amount" => p.amount,
                        "account_name" => &p.account_name,
                    }
                }),
            )
            .unwrap();
            conn.close(insert_stmt).unwrap();

            // select
            let select_stmt =
                conn.prep("select customer_id, amount, account_name from payment where amount > :amount order by customer_id asc")
                .unwrap();

            let results = conn
                .exec_map(
                    &select_stmt,
                    params! {"amount" => 5},
                    |(customer_id, amount, account_name)| Payment {
                        customer_id,
                        amount,
                        account_name,
                    },
                )
                .unwrap();
            conn.close(select_stmt).unwrap();

            assert_eq!(
                results,
                vec![
                    Payment {
                        customer_id: 5,
                        amount: 6,
                        account_name: None,
                    },
                    Payment {
                        customer_id: 7,
                        amount: 8,
                        account_name: None,
                    },
                    Payment {
                        customer_id: 9,
                        amount: 10,
                        account_name: Some("bar".into()),
                    },
                ]
            );
        });
    }

バッチ更新を行っても、バルクinsertになるようなことはなかったです…。

トランザクション

トランザクションについてはこちら。

Crate mysql / Transaction

こんな感じで書いてみました。

    #[test]
    fn transaction() {
        prepare_test(|conn| {
            let mut tx = conn.start_transaction(TxOpts::default()).unwrap();

            let insert_data = vec![
                Payment {
                    customer_id: 1,
                    amount: 2,
                    account_name: None,
                },
                Payment {
                    customer_id: 3,
                    amount: 4,
                    account_name: Some("foo".into()),
                },
                Payment {
                    customer_id: 5,
                    amount: 6,
                    account_name: None,
                },
                Payment {
                    customer_id: 7,
                    amount: 8,
                    account_name: None,
                },
                Payment {
                    customer_id: 9,
                    amount: 10,
                    account_name: Some("bar".into()),
                },
            ];

            // insert
            let insert_stmt =
                tx.prep("insert into payment(customer_id, amount, account_name) values(:customer_id, :amount, :account_name)").unwrap();

            tx.exec_batch(
                &insert_stmt,
                insert_data.iter().map(|p| {
                    params! {
                        "customer_id" => p.customer_id,
                        "amount" => p.amount,
                        "account_name" => &p.account_name,
                    }
                }),
            )
            .unwrap();
            tx.close(insert_stmt).unwrap();

            tx.rollback().unwrap(); // ここでtxはdropされる

            let mut tx = conn.start_transaction(TxOpts::default()).unwrap();

            let count_stmt = tx.prep("select count(*) from payment").unwrap();

            let count_result = tx
                .exec_first::<i32, _, _>(&count_stmt, ())
                .unwrap()
                .unwrap();
            tx.close(count_stmt).unwrap();

            assert_eq!(count_result, 0);

            tx.rollback().unwrap();

            let mut tx = conn.start_transaction(TxOpts::default()).unwrap();

            // insert
            let insert_stmt =
                tx.prep("insert into payment(customer_id, amount, account_name) values(:customer_id, :amount, :account_name)").unwrap();

            tx.exec_batch(
                &insert_stmt,
                insert_data.iter().map(|p| {
                    params! {
                        "customer_id" => p.customer_id,
                        "amount" => p.amount,
                        "account_name" => &p.account_name,
                    }
                }),
            )
            .unwrap();
            tx.close(insert_stmt).unwrap();

            tx.commit().unwrap();

            let mut tx = conn.start_transaction(TxOpts::default()).unwrap();

            // select
            let select_stmt =
                tx.prep("select customer_id, amount, account_name from payment where amount > :amount order by customer_id asc")
                .unwrap();

            let results = tx
                .exec_map(
                    &select_stmt,
                    params! {"amount" => 5},
                    |(customer_id, amount, account_name)| Payment {
                        customer_id,
                        amount,
                        account_name,
                    },
                )
                .unwrap();
            tx.close(select_stmt).unwrap();

            assert_eq!(
                results,
                vec![
                    Payment {
                        customer_id: 5,
                        amount: 6,
                        account_name: None,
                    },
                    Payment {
                        customer_id: 7,
                        amount: 8,
                        account_name: None,
                    },
                    Payment {
                        customer_id: 9,
                        amount: 10,
                        account_name: Some("bar".into()),
                    },
                ]
            );
        });
    }

Conn.start_transactionでトランザクションを開始します。

            let mut tx = conn.start_transaction(TxOpts::default()).unwrap();

トランザクション内の操作は、Connの代わりにこのTransactionを使ってSQLを実行します。

            // insert
            let insert_stmt =
                tx.prep("insert into payment(customer_id, amount, account_name) values(:customer_id, :amount, :account_name)").unwrap();

            tx.exec_batch(
                &insert_stmt,
                insert_data.iter().map(|p| {
                    params! {
                        "customer_id" => p.customer_id,
                        "amount" => p.amount,
                        "account_name" => &p.account_name,
                    }
                }),
            )
            .unwrap();
            tx.close(insert_stmt).unwrap();

ちょっと変わったところとして、コミットやロールバックを行うとそのTransactionは使えなくなるので、続けてトランザクション内で
操作を行う場合はTransactionを開始し直す必要があります。

            tx.rollback().unwrap(); // ここでtxはdropされる

            let mut tx = conn.start_transaction(TxOpts::default()).unwrap();

あとは操作するのがConnからTransactionに変わったくらいで、扱い方自体は大きく変わりません。

ロールバックした場合はデータが登録されていないことや

            tx.rollback().unwrap(); // ここでtxはdropされる

            let mut tx = conn.start_transaction(TxOpts::default()).unwrap();

            let count_stmt = tx.prep("select count(*) from payment").unwrap();

            let count_result = tx
                .exec_first::<i32, _, _>(&count_stmt, ())
                .unwrap()
                .unwrap();
            tx.close(count_stmt).unwrap();

            assert_eq!(count_result, 0);

            tx.rollback().unwrap();

コミットした場合はデータが取得できることも確認。

            tx.commit().unwrap();

            let mut tx = conn.start_transaction(TxOpts::default()).unwrap();

            // select
            let select_stmt =
                tx.prep("select customer_id, amount, account_name from payment where amount > :amount order by customer_id asc")
                .unwrap();

            let results = tx
                .exec_map(
                    &select_stmt,
                    params! {"amount" => 5},
                    |(customer_id, amount, account_name)| Payment {
                        customer_id,
                        amount,
                        account_name,
                    },
                )
                .unwrap();
            tx.close(select_stmt).unwrap();

            assert_eq!(
                results,
                vec![
                    Payment {
                        customer_id: 5,
                        amount: 6,
                        account_name: None,
                    },
                    Payment {
                        customer_id: 7,
                        amount: 8,
                        account_name: None,
                    },
                    Payment {
                        customer_id: 9,
                        amount: 10,
                        account_name: Some("bar".into()),
                    },
                ]
            );

こんなところでしょうか。

おわりに

実は、とてもとても苦労しました。

たくさんあるquery_〜、exec_〜メソッドがよくわからなかったり、ドキュメントのサンプルをそのまま真似したらnullの扱いが
よくわからなかったり。テストで共通処理を作ろうと思ったら、関数を引数に取る関数の書き方がよくわからなかったり。

なんか本来の内容と全然関係ないところでもたくさん苦労した気がしますが、いいRustの勉強も兼ねられたかなと思います。