コネクタ

この章では、ldbc独自のMySQLコネクタを使用したデータベース接続について説明します。

ScalaでMySQLデータベースへの接続を行うためにはJDBCを使用する必要があります。JDBCはJavaの標準APIであり、Scalaでも使用することができます。 JDBCはJavaで実装が行われているためScalaで使用する場合でもJVM環境でのみ動作することができます。

昨今のScalaを取り巻く環境はJSやNativeなどの環境でも動作できるようプラグインの開発が盛んに行われています。 ScalaはJavaの資産を使用できるJVMのみで動作する言語から、マルチプラットフォーム環境でも動作できるよう進化を続けています。

しかし、JDBCはJavaの標準APIでありScalaのマルチプラットフォーム環境での動作をサポートしていません。

そのため、ScalaでアプリケーションをJS, Nativeなどで動作できるように作成を行ったとしてもJDBCを使用できないため、MySQLなどのデータベースへ接続を行うことができません。

Typelevel ProjectにはSkunkと呼ばれるPostgreSQL用のScalaライブラリが存在します。 このプロジェクトはJDBCを使用しておらず、純粋なScalaのみでPostgreSQLへの接続を実現しています。そのため、Skunkを使用すればJVM, JS, Native環境を問わずPostgreSQLへの接続を行うことができます。

ldbc コネクタはこのSkunkに影響を受けてJVM, JS, Native環境を問わずMySQLへの接続を行えるようにするために開発が行われてるプロジェクトです。

※ このコネクタは現在実験的な機能となります。そのため本番環境での使用しないでください。

ldbcコネクタは一番低レイヤーのAPIとなります。 今後このコネクタを使用してより高レイヤーのAPIを提供する予定です。また既存の高レイヤーのAPIとの互換性を持たせることも予定しています。

使用するにはプロジェクトに以下の依存関係を設定する必要があります。

JVM

libraryDependencies += "io.github.takapi327" %% "ldbc-connector" % "0.3.0-beta8"

JS/Native

libraryDependencies += "io.github.takapi327" %%% "ldbc-connector" % "0.3.0-beta8"

サポートバージョン

現在のバージョンは以下のバージョンのMySQLをサポートしています。

メインサポートはMySQL 8.xです。MySQL 5.7.xはサブサポートとなります。そのためMySQL 5.7.xでの動作には注意が必要です。 将来的にはMySQL 5.7.xのサポートは終了する予定です。

接続

ldbcコネクタを使用してMySQLへの接続を行うためには、Connectionを使用します。

また、Connectionはオブザーバビリティを意識した開発を行えるようにOtel4sを使用してテレメトリデータを収集できるようにしています。 そのため、Connectionを使用する際にはOtel4sTracerを設定する必要があります。

開発時やトレースを使用したテレメトリデータが不要な場合はTracer.noopを使用することを推奨します。

import cats.effect.IO
import org.typelevel.otel4s.trace.Tracer
import ldbc.connector.Connection

given Tracer[IO] = Tracer.noop[IO]

val connection = Connection[IO](
  host = "127.0.0.1",
  port = 3306,
  user = "root",
)

以下はConnection構築時に設定できるプロパティの一覧です。

プロパティ 用途
host String MySQLサーバーのホストを指定します
port Int MySQLサーバーのポート番号を指定します
user String MySQLサーバーへログインを行うユーザー名を指定します
password Option[String] MySQLサーバーへログインを行うユーザーのパスワードを指定します
database Option[String] MySQLサーバーへ接続後に使用するデータベース名を指定します
debug Boolean 処理のログを出力します。デフォルトはfalseです
ssl SSL MySQLサーバーとの通知んでSSL/TLSを使用するかを指定します。デフォルトはSSL.Noneです
socketOptions List[SocketOption] TCP/UDP ソケットのソケットオプションを指定します。
readTimeout Duration MySQLサーバーへの接続を試みるまでのタイムアウトを指定します。デフォルトはDuration.Infです。
allowPublicKeyRetrieval Boolean MySQLサーバーとの認証時にRSA公開鍵を使用するかを指定します。デフォルトはfalseです。

ConnectionResourceを使用してリソース管理を行います。そのためコネクション情報を使用する場合はuseメソッドを使用してリソースの管理を行います。

connection.use { conn =>
  // コードを記述
}

認証

MySQLでの認証は、クライアントがMySQLサーバーへ接続するときにLoginRequestというフェーズでユーザ情報を送信します。そして、サーバー側では送られたユーザがmysql.userテーブルに存在するか検索を行い、どの認証プラグインを使用するかを決定します。認証プラグインが決定した後にサーバーはそのプラグインを呼び出してユーザー認証を開始し、その結果をクライアント側に送信します。このようにMySQLでは認証がプラガブル(様々なタイプのプラグインを付け外しできる)になっています。

MySQLでサポートされている認証プラグインは公式ページに記載されています。

ldbcは現時点で以下の認証プラグインをサポートしています。

※ ネイティブプラガブル認証とSHA-256 プラガブル認証はMySQL 8.xから非推奨となったプラグインです。特段理由がない場合はSHA-2 プラガブル認証のキャッシュを使用することを推奨します。

ldbcのアプリケーションコード上で認証プラグインを意識する必要はありません。ユーザーはMySQLのデータベース上で使用したい認証プラグインで作成されたユーザーを作成し、ldbcのアプリケーションコード上ではそのユーザーを使用してMySQLへの接続を試みるだけで問題ありません。 ldbcが内部で認証プラグインを判断し、適切な認証プラグインを使用してMySQLへの接続を行います。

実行

以降の処理では以下テーブルを使用しているものとします。

CREATE TABLE users (
  id BIGINT AUTO_INCREMENT PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  age INT NULL
);

Statement

Statementは動的なパラメーターを使用しないSQLを実行するためのAPIです。

Statementは動的なパラメーターを使用しないため、使い方によってはSQLインジェクションのリスクがあります。そのため、動的なパラメーターを使用する場合はPreparedStatementを使用することを推奨します。

ConnectioncreateStatementメソッドを使用してStatementを構築します。

読み取りクエリ

読み取り専用のSQLを実行する場合はexecuteQueryメソッドを使用します。

クエリを実行した結果MySQLサーバーから返される値はResultSetに格納されて戻り値として返却されます。

connection.use { conn =>
  for
    statement <- conn.createStatement()
    result <- statement.executeQuery("SELECT * FROM users")
  yield
    // ResultSetを使用した処理
}

書き込みクエリ

書き込みを行うSQLを実行する場合はexecuteUpdateメソッドを使用します。

クエリを実行した結果MySQLサーバーから返される値は影響を受けた行数が戻り値として返却されます。

connection.use { conn =>
  for
    statement <- conn.createStatement()
    result <- statement.executeUpdate("INSERT INTO users (name, age) VALUES ('Alice', 20)")
  yield
}

AUTO_INCREMENTの値を取得

Statementを使用してクエリ実行後にAUTO_INCREMENTの値を取得する場合はgetGeneratedKeysメソッドを使用します。

クエリを実行した結果MySQLサーバーから返される値はAUTO_INCREMENTに生成された値が戻り値として返却されます。

connection.use { conn =>
  for
    statement <- conn.createStatement()
    _ <- statement.executeUpdate("INSERT INTO users (name, age) VALUES ('Alice', 20)", Statement.RETURN_GENERATED_KEYS)
    gereatedKeys <- statement.getGeneratedKeys()
  yield
}

Client/Server PreparedStatement

ldbcではPreparedStatementClient PreparedStatementServer PreparedStatementに分けて提供しています。

Client PreparedStatementは動的なパラメーターを使用してアプリケーション上でSQLの構築を行い、MySQLサーバーに送信を行うためのAPIです。 そのためMySQLサーバーへのクエリ送信方法はStatementと同じになります。

このAPIはJDBCのPreparedStatementに相当します。

より安全なMySQLサーバー内でクエリを構築するためのPreparedStatementServer PreparedStatementで提供されますので、そちらを使用してください。

Server PreparedStatementは実行を行うクエリをMySQLサーバー内で事前に準備を行い、アプリケーション上でパラメーターを設定して実行を行うためのAPIです。

Server PreparedStatementでは実行するクエリの送信とパラメーターの送信が分けて行われるため、クエリの再利用が可能となります。

Server PreparedStatementを使用する場合事前にクエリをMySQLサーバーで準備します。格納するためにMySQLサーバーはメモリを使用しますが、クエリの再利用が可能となるため、パフォーマンスの向上が期待できます。

しかし、事前準備されたクエリは解放されるまでメモリを使用し続けるため、メモリリークのリスクがあります。

Server PreparedStatementを使用する場合はcloseメソッドを使用して適切にクエリの解放を行う必要があります。

Client PreparedStatement

ConnectionclientPreparedStatementメソッドを使用してClient PreparedStatementを構築します。

connection.use { conn =>
  for 
    statement <- conn.clientPreparedStatement("SELECT * FROM users WHERE id = ?")
    ...
  yield ...
}

Server PreparedStatement

ConnectionserverPreparedStatementメソッドを使用してServer PreparedStatementを構築します。

connection.use { conn =>
  for 
    statement <- conn.serverPreparedStatement("SELECT * FROM users WHERE id = ?")
    ...
  yield ...
}

読み取りクエリ

読み取り専用のSQLを実行する場合はexecuteQueryメソッドを使用します。

クエリを実行した結果MySQLサーバーから返される値はResultSetに格納されて戻り値として返却されます。

connection.use { conn =>
  for 
    statement <- conn.clientPreparedStatement("SELECT * FROM users WHERE id = ?") // or conn.serverPreparedStatement("SELECT * FROM users WHERE id = ?")
    _ <- statement.setLong(1, 1)
    result <- statement.executeQuery()
  yield
    // ResultSetを使用した処理
}

動的なパラメーターを使用する場合はsetXXXメソッドを使用してパラメーターを設定します。 setXXXメソッドはOption型を使用することもできます。Noneが渡された場合パラメーターにはNULLがセットされます。

setXXXメソッドはパラメーターのインデックスとパラメーターの値を指定します。

statement.setLong(1, 1)

現在のバージョンでは以下のメソッドがサポートされています。

メソッド 備考
setNull パラメーターにNULLをセットします
setBoolean Boolean/Option[Boolean]
setByte Byte/Option[Byte]
setShort Short/Option[Short]
setInt Int/Option[Int]
setLong Long/Option[Long]
setBigInt BigInt/Option[BigInt]
setFloat Float/Option[Float]
setDouble Double/Option[Double]
setBigDecimal BigDecimal/Option[BigDecimal]
setString String/Option[String]
setBytes Array[Byte]/Option[Array[Byte]]
setDate LocalDate/Option[LocalDate] java.sqlではなくjava.timeを直接扱います。
setTime LocalTime/Option[LocalTime] java.sqlではなくjava.timeを直接扱います。
setTimestamp LocalDateTime/Option[LocalDateTime] java.sqlではなくjava.timeを直接扱います。
setYear Year/Option[Year] java.sqlではなくjava.timeを直接扱います。

書き込みクエリ

書き込みを行うSQLを実行する場合はexecuteUpdateメソッドを使用します。

クエリを実行した結果MySQLサーバーから返される値は影響を受けた行数が戻り値として返却されます。

connection.use { conn =>
  for 
    statement <- conn.clientPreparedStatement("INSERT INTO users (name, age) VALUES (?, ?)") // or conn.serverPreparedStatement("INSERT INTO users (name, age) VALUES (?, ?)")
    _ <- statement.setString(1, "Alice")
    _ <- statement.setInt(2, 20)
    result <- statement.executeUpdate()
  yield result
}

AUTO_INCREMENTの値を取得

クエリ実行後にAUTO_INCREMENTの値を取得する場合はgetGeneratedKeysメソッドを使用します。

クエリを実行した結果MySQLサーバーから返される値はAUTO_INCREMENTに生成された値が戻り値として返却されます。

connection.use { conn =>
  for 
    statement <- conn.clientPreparedStatement("INSERT INTO users (name, age) VALUES (?, ?)", Statement.RETURN_GENERATED_KEYS) // or conn.serverPreparedStatement("INSERT INTO users (name, age) VALUES (?, ?)", Statement.RETURN_GENERATED_KEYS)
    _ <- statement.setString(1, "Alice")
    _ <- statement.setInt(2, 20)
    _ <- statement.executeUpdate()
    getGeneratedKeys <- statement.getGeneratedKeys()
  yield getGeneratedKeys
}

ResultSet

ResultSetはクエリ実行後にMySQLサーバーから返された値を格納するためのAPIです。

SQLを実行して取得したレコードをResultSetから取得するにはJDBCと同じようにnextメソッドとgetXXXメソッドを使用して取得する方法と、ldbc独自のdecodeメソッドを使用する方法があります。

next/getXXX

nextメソッドは次のレコードが存在する場合はtrueを返却し、次のレコードが存在しない場合はfalseを返却します。

getXXXメソッドはレコードから値を取得するためのAPIです。

getXXXメソッドは取得するカラムのインデックスを指定する方法とカラム名を指定する方法があります。

connection.use { conn =>
  for 
    statement <- conn.clientPreparedStatement("SELECT `id`, `name`, `age` FROM users WHERE id = ?")
    _ <- statement.setLong(1, 1)
    result <- statement.executeQuery()
    records <- Monad[IO].whileM(result.next()) {
      for
        id <- result.getLong(1)
        name <- result.getString("name")
        age <- result.getInt(3)  
      yield (id, name, age)
  }
  yield records
}

decode

decodeメソッドはResultSetから取得した値をScalaの型に変換して取得するためのAPIです。

取得するカラムの数に応じて*:演算子を使用して変換する型を指定します。

例では、usersテーブルのid, name, ageカラムを取得する場合を示しておりそれぞれのカラムの型を指定しています。

result.decode(bigint *: varchar *: int.opt)

NULL許容のカラムを取得する場合はOption型に変換するためにoptメソッドを使用します。 これによりレコードがNULLの場合はNoneとして取得することができます。

クエリ実行からレコード取得までの一連の流れは以下のようになります。

connection.use { conn =>
  for 
    statement <- conn.clientPreparedStatement("SELECT * FROM users WHERE id = ?") // or conn.serverPreparedStatement("SELECT * FROM users WHERE id = ?")
    _ <- statement.setLong(1, 1)
    result <- statement.executeQuery()
    decodes <- result.decode(bigint *: varchar *: int.opt)
  yield decodes
}

ResultSetから取得するレコードは常に配列になります。 これはMySQLで実行するクエリの結果が常に複数のレコードを返す可能性があるからです。

単一のレコードを取得する場合はdecode処理後に、headheadOptionメソッドを使用して取得を行なってください。

現在のバージョンでは以下のデータ型がサポートされています。

Codec データ型 Scala 型
boolean BOOLEAN Boolean
tinyint TINYINT Byte
utinyint unsigned TINYINT Short
smallint SMALLINT Short
usmallint unsigned SMALLINT Int
int INT Int
uint unsigned INT Long
bigint BIGINT Long
ubigint unsigned BIGINT BigInt
float FLOAT Float
double DOUBLE Double
decimal DECIMAL BigDecimal
char CHAR String
varchar VARCHAR String
binary BINARY Array[Byte]
varbinary VARBINARY String
tinyblob TINYBLOB String
blob BLOB String
mediumblob MEDIUMBLOB String
longblob LONGBLOB String
tinytext TINYTEXT String
text TEXT String
mediumtext MEDIUMTEXT String
longtext LONGTEXT String
enum ENUM String
set SET List[String]
json JSON String
date DATE LocalDate
time TIME LocalTime
timetz TIME OffsetTime
datetime DATETIME LocalDateTime
timestamp TIMESTAMP LocalDateTime
timestamptz TIMESTAMP OffsetDateTime
year YEAR Year

※ 現在MySQLのデータ型を指定して値を取得するような作りとなっていますが、将来的にはより簡潔にScalaの型を指定して値を取得するような作りに変更する可能性があります。

以下サポートされていないデータ型があります。

トランザクション

Connectionを使用してトランザクションを実行するためにはsetAutoCommitメソッドとcommitメソッド、rollbackメソッドを組み合わせて使用します。

まず、setAutoCommitメソッドを使用してトランザクションの自動コミットを無効にします。

conn.setAutoCommit(false)

何かしらの処理を行った後にcommitメソッドを使用してトランザクションをコミットします。

for
  statement <- conn.clientPreparedStatement("INSERT INTO users (name, age) VALUES (?, ?)")
  _ <- statement.setString(1, "Alice")
  _ <- statement.setInt(2, 20)
  result <- statement.executeUpdate()
  _ <- conn.commit()
yield

もしくは、rollbackメソッドを使用してトランザクションをロールバックします。

for
  statement <- conn.clientPreparedStatement("INSERT INTO users (name, age) VALUES (?, ?)")
  _ <- statement.setString(1, "Alice")
  _ <- statement.setInt(2, 20)
  result <- statement.executeUpdate()
  _ <- conn.rollback()
yield

setAutoCommitメソッドを使用してトランザクションの自動コミットを無効にした場合、コネクションのResourceを解放する際に自動的にロールバックが行われます。

トランザクション分離レベル

ldbcではトランザクション分離レベルの設定を行うことができます。

トランザクション分離レベルはsetTransactionIsolationメソッドを使用して設定を行います。

MySQLでは以下のトランザクション分離レベルがサポートされています。

MySQLのトランザクション分離レベルについては公式ドキュメントを参照してください。

import ldbc.connector.Connection.TransactionIsolationLevel

conn.setTransactionIsolation(TransactionIsolationLevel.REPEATABLE_READ)

現在設定されているトランザクション分離レベルを取得するにはgetTransactionIsolationメソッドを使用します。

for
  isolationLevel <- conn.getTransactionIsolation()
yield

セーブポイント

より高度なトランザクション管理のために「Savepoint機能」を使用することができます。これにより、データベース操作中に特定のポイントをマークしておくことが可能になり、何か問題が発生した場合にも、そのポイントまでデータベースの状態を巻き戻すことができます。これは、複雑なデータベース操作や、長いトランザクションの中での安全なポイント設定を必要とする場合に特に役立ちます。

特徴:

この機能を活用することで、あなたのアプリケーションはより堅牢で信頼性の高いデータベース操作を実現できるようになります。

セーブポイントの設定

Savepointを設定するには、setSavepointメソッドを使用します。このメソッドは、Savepointの名前を指定することができます。 Savepointの名前を指定しない場合、デフォルトの名前としてUUIDで生成された値が設定されます。

getSavepointNameメソッドを使用して、設定されたSavepointの名前を取得することができます。

※ MySQLではデフォルトで自動コミットが有効になっているため、Savepointを使用する場合は自動コミットを無効にする必要があります。そうしないと全ての処理が都度コミットされてしまうため、Savepointを使用したトランザクションのロールバックを行うことができなくなるためです。

for
  _ <- conn.setAutoCommit(false)
  savepoint <- conn.setSavepoint("savepoint1")
yield savepoint.getSavepointName

セーブポイントのロールバック

Savepointを使用してトランザクションの一部をロールバックするには、rollbackメソッドにSavepointを渡すことでロールバックを行います。 Savepointを使用して部分的にロールバックをした後、トランザクション全体をコミットするとそのSavepoint以降のトランザクションはコミットされません。

for
  _ <- conn.setAutoCommit(false)
  savepoint <- conn.setSavepoint("savepoint1")
  _ <- conn.rollback(savepoint)
  _ <- conn.commit()
yield

セーブポイントのリリース

Savepointをリリースするには、releaseSavepointメソッドにSavepointを渡すことでリリースを行います。 Savepointをリリースした後、トランザクション全体をコミットするとそのSavepoint以降のトランザクションはコミットされます。

for
  _ <- conn.setAutoCommit(false)
  savepoint <- conn.setSavepoint("savepoint1")
  _ <- conn.releaseSavepoint(savepoint)
  _ <- conn.commit()
yield

ユーティリティコマンド

MySQLにはいくつかのユーティリティコマンドがあります。(参照)

ldbcではこれらのコマンドを使用するためのAPIを提供しています。

コマンド 用途 サポート
COM_QUIT クライアントが接続を閉じることをサーバーに要求していることを伝える。
COM_INIT_DB 接続のデフォルト・スキーマを変更する
COM_STATISTICS 内部ステータスの文字列を可読形式で取得する。
COM_DEBUG サーバーの標準出力にデバッグ情報をダンプする
COM_PING サーバーが生きているかチェックする
COM_CHANGE_USER 現在の接続のユーザーを変更する
COM_RESET_CONNECTION セッションの状態をリセットする
COM_SET_OPTION 現在の接続のオプションを設定する

COM QUIT

COM_QUITはクライアントが接続を閉じることをサーバーに要求していることを伝えるためのコマンドです。

ldbcではConnectioncloseメソッドを使用して接続を閉じることができます。 closeメソッドを使用すると接続が閉じられるため、その後の処理で接続を使用することはできません。

ConnectionResourceを使用してリソース管理を行います。そのためcloseメソッドを使用してリソースの解放を行う必要はありません。

connection.use { conn =>
  conn.close()
}

COM INIT DB

COM_INIT_DBは接続のデフォルト・スキーマを変更するためのコマンドです。

ldbcではConnectionsetSchemaメソッドを使用してデフォルト・スキーマを変更することができます。

connection.use { conn =>
  conn.setSchema("test")
}

COM STATISTICS

COM_STATISTICSは内部ステータスの文字列を可読形式で取得するためのコマンドです。

ldbcではConnectiongetStatisticsメソッドを使用して内部ステータスの文字列を取得することができます。

connection.use { conn =>
  conn.getStatistics
}

取得できるステータスは以下のようになります。

COM PING

COM_PINGはサーバーが生きているかチェックするためのコマンドです。

ldbcではConnectionisValidメソッドを使用してサーバーが生きているかチェックすることができます。 サーバーが生きている場合はtrueを返却し、生きていない場合はfalseを返却します。

connection.use { conn =>
  conn.isValid
}

COM CHANGE USER

COM_CHANGE_USERは現在の接続のユーザーを変更するためのコマンドです。 また、以下の接続状態をリセットします。

ldbcではConnectionchangeUserメソッドを使用してユーザーを変更することができます。

connection.use { conn =>
  conn.changeUser("root", "password")
}

COM RESET CONNECTION

COM_RESET_CONNECTIONはセッションの状態をリセットするためのコマンドです。

COM_RESET_CONNECTIONCOM_CHANGE_USERをより軽量化したもので、セッションの状態をクリーンアップする機能はほぼ同じだが、次のような機能がある

ldbcではConnectionresetServerStateメソッドを使用してセッションの状態をリセットすることができます。

connection.use { conn =>
  conn.resetServerState
}

COM SET OPTION

COM_SET_OPTIONは現在の接続のオプションを設定するためのコマンドです。

ldbcではConnectionenableMultiQueriesメソッドとdisableMultiQueriesメソッドを使用してオプションを設定することができます。

enableMultiQueriesメソッドを使用すると、複数のクエリを一度に実行することができます。 disableMultiQueriesメソッドを使用すると、複数のクエリを一度に実行することができなくなります。

※ これは、Insert、Update、および Delete ステートメントによるバッチ処理にのみ使用できます。Selectステートメントで使用を行なったとしても、最初のクエリの結果のみが返されます。

connection.use { conn =>
  conn.enableMultiQueries *> conn.disableMultiQueries
}

バッチコマンド

ldbcではバッチコマンドを使用して複数のクエリを一度に実行することができます。 バッチコマンドを使用することで、複数のクエリを一度に実行することができるため、ネットワークラウンドトリップの回数を減らすことができます。

バッチコマンドを使用するにはStatementまたはPreparedStatementaddBatchメソッドを使用してクエリを追加し、executeBatchメソッドを使用してクエリを実行します。

connection.use { conn =>
  for
    statement <- conn.createStatement()
    _ <- statement.addBatch("INSERT INTO users (name, age) VALUES ('Alice', 20)")
    _ <- statement.addBatch("INSERT INTO users (name, age) VALUES ('Bob', 30)")
    result <- statement.executeBatch()
  yield result
}

上記の例では、AliceBobのデータを一度に追加することができます。 実行されるクエリは以下のようになります。

INSERT INTO users (name, age) VALUES ('Alice', 20);INSERT INTO users (name, age) VALUES ('Bob', 30);

バッチコマンド実行後の戻り値は、実行したクエリそれぞれの影響を受けた行数の配列となります。

上記の例では、Aliceのデータは1行追加され、Bobのデータも1行追加されるため、戻り値はList(1, 1)となります。

バッチコマンドを実行した後は、今までaddBatchメソッドで追加したクエリがクリアされます。

手動でクリアする場合はclearBatchメソッドを使用してクリアを行います。

connection.use { conn =>
  for
    statement <- conn.createStatement()
    _ <- statement.addBatch("INSERT INTO users (name, age) VALUES ('Alice', 20)")
    _ <- statement.clearBatch()
    _ <- statement.addBatch("INSERT INTO users (name, age) VALUES ('Bob', 30)")
    _ <- statement.executeBatch()
  yield
}

上記の例では、Aliceのデータは追加されませんが、Bobのデータは追加されます。

StatementとPreparedStatementの違い

StatementPreparedStatementではバッチコマンドで実行されるクエリが異なる場合があります。

Statementを使用してINSERT文をバッチコマンドで実行した場合、複数のクエリが一度に実行されます。 しかし、PreparedStatementを使用してINSERT文をバッチコマンドで実行した場合、1つのクエリが実行されます。

例えば、以下のクエリをバッチコマンドで実行した場合、Statementを使用しているため、複数のクエリが一度に実行されます。

connection.use { conn =>
  for
    statement <- conn.createStatement()
    _ <- statement.addBatch("INSERT INTO users (name, age) VALUES ('Alice', 20)")
    _ <- statement.addBatch("INSERT INTO users (name, age) VALUES ('Bob', 30)")
    result <- statement.executeBatch()
  yield result
}

// 実行されるクエリ
// INSERT INTO users (name, age) VALUES ('Alice', 20);INSERT INTO users (name, age) VALUES ('Bob', 30);

しかし、以下のクエリをバッチコマンドで実行した場合、PreparedStatementを使用しているため、1つのクエリが実行されます。

connection.use { conn =>
  for
    statement <- conn.clientPreparedStatement("INSERT INTO users (name, age) VALUES (?, ?)")
    _ <- statement.setString(1, "Alice")
    _ <- statement.setInt(2, 20)
    _ <- statement.addBatch()
    _ <- statement.setString(1, "Bob")
    _ <- statement.setInt(2, 30)
    _ <- statement.addBatch()
    result <- statement.executeBatch()
  yield result
}

// 実行されるクエリ
// INSERT INTO users (name, age) VALUES ('Alice', 20), ('Bob', 30);

これは、PreparedStatementを使用している場合、クエリのパラメーターを設定した後にaddBatchメソッドを使用することで、1つのクエリに複数のパラメーターを設定することができるためです。

ストアドプロシージャの実行

ldbcではストアドプロシージャを実行するためのAPIを提供しています。

ストアドプロシージャを実行するにはConnectionprepareCallメソッドを使用してCallableStatementを構築します。

※ 使用するストアドプロシージャは公式ドキュメント記載のものを使用しています。

CREATE PROCEDURE demoSp(IN inputParam VARCHAR(255), INOUT inOutParam INT)
BEGIN
    DECLARE z INT;
    SET z = inOutParam + 1;
    SET inOutParam = z;

    SELECT inputParam;

    SELECT CONCAT('zyxw', inputParam);
END

上記のストアドプロシージャを実行する場合は以下のようになります。

connection.use { conn =>
  for
    callableStatement <- conn.prepareCall("CALL demoSp(?, ?)")
    _ <- callableStatement.setString(1, "abcdefg")
    _ <- callableStatement.setInt(2, 1)
    hasResult <- callableStatement.execute()
    values <- Monad[IO].whileM[List, Option[String]](callableStatement.getMoreResults()) {
      for
        resultSet <- callableStatement.getResultSet().flatMap {
          case Some(rs) => IO.pure(rs)
          case None     => IO.raiseError(new Exception("No result set"))
        }
        value <- resultSet.getString(1)
      yield value
    }
  yield values // List(Some("abcdefg"), Some("zyxwabcdefg"))
}

出力パラメータ(ストアド・プロシージャを作成したときにOUTまたはINOUTとして指定したパラメータ)の値を取得するには、JDBCでは、CallableStatementインターフェイスのさまざまなregisterOutputParameter()メソッドを使用して、ステートメント実行前にパラメータを指定する必要がありますが、ldbcではsetXXXメソッドを使用してパラメータを設定することだけクエリ実行時にパラメーターの設定も行なってくれます。

ただし、ldbcでもregisterOutputParameter()メソッドを使用してパラメータを指定することもできます。

connection.use { conn =>
  for
    callableStatement <- conn.prepareCall("CALL demoSp(?, ?)")
    _ <- callableStatement.setString(1, "abcdefg")
    _ <- callableStatement.setInt(2, 1)
    _ <- callableStatement.registerOutParameter(2, ldbc.connector.data.Types.INTEGER)
    hasResult <- callableStatement.execute()
    value <- callableStatement.getInt(2)
  yield value // 2
}

registerOutParameterでOutパラメータを指定する場合、同じindex値を使用してsetXXXメソッドでパラメータを設定していない場合サーバーにはNullで値が設定されることに注意してください。

未対応機能

ldbcコネクタは現在実験的な機能となります。そのため、以下の機能はサポートされていません。 機能提供は順次行っていく予定です。