データベース接続

この章では、LDBCで構築したクエリを使用して、データベースへの接続処理を行うための方法について説明します。

プロジェクトに以下の依存関係を設定する必要があります。

libraryDependencies ++= Seq(
  "io.github.takapi327" %% "ldbc-dsl" % "0.3.0-beta8",
  "com.mysql" % "mysql-connector-j" % "8.4.0"
)

LDBCでのクエリ構築方法をまだ読んでいない場合は、型安全なクエリ構築の章を先に読むことをオススメしましす。

以下のコード例では、以下のimportを想定しています。

import com.mysql.cj.jdbc.MysqlDataSource

import cats.effect.IO
// This is just for testing. Consider using cats.effect.IOApp instead of calling
// unsafe methods directly.
import cats.effect.unsafe.implicits.global

import ldbc.sql.*
import ldbc.dsl.io.*
import ldbc.dsl.logging.ConsoleLogHandler
import ldbc.query.builder.TableQuery

テーブル定義は以下を使用します。

case class User(
  id: Long,
  name: String,
  age: Option[Int],
)

val table = Table[User]("user")(
  column("id", BIGINT, AUTO_INCREMENT, PRIMARY_KEY),
  column("name", VARCHAR(255)),
  column("age", INT.UNSIGNED.DEFAULT(None)),
)

val userQuery = TableQuery[User](table)

DataSourceの使用

LDBCはデータベース接続にJDBCのDataSourceを使用します。LDBCにはこのDataSourceを構築する実装は提供されていないため、mysqlやHikariCPなどのライブラリを使用する必要があります。今回の例ではMysqlDataSourceを使用してDataSourceの構築を行います。

private val dataSource = new MysqlDataSource()
dataSource.setServerName("127.0.0.1")
dataSource.setPortNumber(3306)
dataSource.setDatabaseName("database name")
dataSource.setUser("user name")
dataSource.setPassword("password")

ログ

LDBCではDatabase接続の実行ログやエラーログを任意のロギングライブラリを使用して任意の形式で書き出すことができます。

標準ではCats EffectのConsoleを使用したロガーが提供されているため開発時はこちらを使用することができます。

given LogHandler[IO] = ConsoleLogHandler[IO]

カスタマイズ

任意のロギングライブラリを使用してログをカスタマイズする場合はldbc.dsl.logging.LogHandlerを使用します。

以下は標準実装のログ実装です。LDBCではデータベース接続で以下3種類のイベントが発生します。

それぞれのイベントでどのようなログを書き込むかをパターンマッチングによって振り分けを行います。

def consoleLogger[F[_]: Console: Sync]: LogHandler[F] =
  case LogEvent.Success(sql, args) =>
    Console[F].println(
      s"""Successful Statement Execution:
         |  $sql
         |
         | arguments = [${ args.mkString(",") }]
         |""".stripMargin
    )
  case LogEvent.ProcessingFailure(sql, args, failure) =>
    Console[F].errorln(
      s"""Failed ResultSet Processing:
         |  $sql
         |
         | arguments = [${ args.mkString(",") }]
         |""".stripMargin
    ) >> Console[F].printStackTrace(failure)
  case LogEvent.ExecFailure(sql, args, failure) =>
    Console[F].errorln(
      s"""Failed Statement Execution:
         |  $sql
         |
         | arguments = [${ args.mkString(",") }]
         |""".stripMargin
    ) >> Console[F].printStackTrace(failure)

Query

select文を構築するとtoList/headOption/unsafeメソッドを使用できるようになります。これらのメソッドは取得後のデータ形式を決定するために使用します。特段何も型を指定しない場合はselectメソッドで指定したカラムの型がTupleとして返却されます。

toList

クエリを実行した結果データの一覧を取得したい場合は、toListメソッドを使用します。toListメソッドを使用してデータベース処理を行なった結果、データ取得件数が0件であった場合空の配列が返されます。

val query1 = userQuery.selectAll.toList // List[(Long, String, Option[Int])]

toListメソッドにモデルを指定すると取得後のデータを指定したモデルに変換することができます。

val query = userQuery.selectAll.toList[User] // User

toListメソッドで指定するモデルの型はselectメソッドで指定したTupleの型と一致するか、Tupleの型から指定したモデルへの型変換が可能なものでなければなりません。

val query1 = userQuery.select(user => (user.name, user.age)).toList[User] // Compile error

case class Test(name: String, age: Option[Int])
val query2 = userQuery.select(user => (user.name, user.age)).toList[Test] // Test

headOption

クエリを実行した結果最初の1件のデータをOptionalで取得したい場合は、headOptionメソッドを使用します。headOptionメソッドを使用してデータベース処理を行なった結果データ取得件数が0件であった場合Noneが返されます。

headOptionメソッドを使用した場合、複数のデータを取得するクエリを実行したとしても最初のデータのみ返されることに注意してください。

val query1 = userQuery.selectAll.headOption // Option[(Long, String, Option[Int])]
val query2 = userQuery.selectAll.headOption[User] // Option[User]

unsafe

unsafeメソッドを使用した場合、取得したデータの最初の1件のみ返されることはheadOptionメソッドと同じですが、データはOptionalにはならずそのままのデータが返却されます。もし取得したデータの件数が0件であった場合は例外が発生するため適切な例外ハンドリングを行う必要があります。

実行時に例外を発生する可能性が高いためunsafeという名前になっています。

val query1 = userQuery.selectAll.unsafe // (Long, String, Option[Int])
val query2 = userQuery.selectAll.unsafe[User] // User

Update

insert/update/delete文を構築するとupdateメソッドを使用できるようになります。updateメソッドはデータベースへの書き込み処理件数を返却します。

val insert = userQuery.insert((1L, "name", None)).update // Int
val update = userQuery.update("name", "update name").update // Int
val delete = userQuery.delete.update // Int

insert文の場合データ挿入時にAutoIncrementで生成された値を返却させたい場合があります。その場合はupdateメソッドではなくreturningメソッドを使用して返却したいカラムを指定します。

val insert = userQuery.insert((1L, "name", None)).returning("id") // Long

returningメソッドで指定する値はモデルが持つプロパティ名である必要があります。また、指定したプロパティがテーブル定義上でAutoIncrementの属性が設定されていなければエラーとなってしまいます。

MySQLではデータ挿入時に返却できる値はAutoIncrementのカラムのみであるため、LDBCでも同じような仕様となっています。

データベース操作の実行

データベース接続を行う前にコミットのタイミングや読み書き専用などの設定を行う必要があります。

読み取り専用

readOnlyメソッドを使用することで実行するクエリの処理を読み込み専用にすることができます。readOnlyメソッドはinsert/update/delete文でも使用することができますが、書き込み処理を行うので実行時にエラーとなります。

val read = userQuery.selectAll.toList.readOnly(dataSource)

自動コミット

autoCommitメソッドを使用することで実行するクエリの処理をクエリ実行時ごとにコミットするように設定することができます。

val read = userQuery.insert((1L, "name", None)).update.autoCommit(dataSource)

トランザクション

transactionメソッドを使用することで複数のデータベース接続処理を1つのトランザクションにまとめることができます。

toList/headOption/unsafe/returning/updateメソッドの戻り値はKleisli[F, Connection[F], T]型となっています。そのためmapやflatMapを使用して処理を1つにまとめることができます。

1つにまとめたKleisli[F, Connection[F], T]に対してtransactionメソッドを使用することで、中で行われる全てのデータベース接続処理は1つのトランザクションにまとめて実行されます。

(for
  result1 <- userQuery.insert((1L, "name", None)).returning("id")
  result2 <- userQuery.update("name", "update name").update
  ...
yield ...).transaction(dataSource)

Database Action

データベース処理を実行する方法としてデータベースへの接続情報を持ったDatabaseを使用して行う方法も存在します。

Databaseを構築する方法はDriverManagerを使用した方法と、DataSourceから生成する方法の2種類があります。以下はMySQLのドライバーを使用してデータベースへの接続情報を持ったDatabaseを構築する例です。

val db = Database.fromMySQLDriver[IO]("database name", "host", "port number", "user name", "password")

Databaseを使用してデータベース処理を実行するメリットは以下になります。

Databaseを使用する方法は、DataSourceを受け渡す方法を簡略化しただけにすぎないため、どちらを使用しても実行結果に差が出ることはありません。 flatMapなどで処理を結合しメソッドチェーンで実行するか、結合した処理をDatabaseを使用して実行するかの違いでしかありません。そのため実行方法はユーザーの好きの方法を選択できます。

Read Only

val user: Option[User] = db.readOnly(userQuery.selectAll.headOption[User]).unsafeRunSync()

Auto Commit

val result = db.autoCommit(userQuery.insert((1L, "name", None)).update).unsafeRunSync()

Transaction

db.transaction(for
  result1 <- userQuery.insert((1L, "name", None)).returning("id")
  result2 <- userQuery.update("name", "update name").update
  ...
yield ...).unsafeRunSync()

Database model

LDBCではDatabaseモデルはデータベースの接続情報を持つ以外の用途でも使用されます。他の用途としてSchemaSPYのドキュメント生成に使用されることです。SchemaSPYのドキュメント生成に関してはこちらを参照してください。

すでにDatabaseモデルを別の用途で生成している場合は、そのモデルを使用してデータベースの接続情報を持ったDatabaseを構築することができます。

import ldbc.dsl.io.*

val database: Database = ???

val db = database.fromDriverManager()
// or
val db = database.fromDriverManager("user name", "password")

メソッドチェーンでの使用

DatabaseモデルはTableQueryのメソッドでDataSourceの代わりに使用することもできます。

val read = userQuery.selectAll.toList.readOnly(db)
val commit = userQuery.insert((1L, "name", None)).update.autoCommit(db)
val transaction = (for
  result1 <- userQuery.insert((1L, "name", None)).returning("id")
  result2 <- userQuery.update("name", "update name").update
  ...
yield ...).transaction(db)

HikariCPコネクションプールの使用

ldbc-hikariは、HikariCP接続プールを構築するためのHikariConfigおよびHikariDataSourceを構築するためのビルダーを提供します。

libraryDependencies ++= Seq(
  "io.github.takapi327" %% "ldbc-hikari" % "0.3.0-beta8",
)

HikariConfigBuilderは名前の通りHikariCPのHikariConfigを構築するためのビルダーです。

val hikariConfig: com.zaxxer.hikari.HikariConfig = HikariConfigBuilder.default.build()

HikariConfigBuilderにはdefaultfromメソッドがありdefaultを使用した場合、LDBC指定のパスを元にConfigから対象の値を取得してHikariConfigの構築を行います。

ldbc.hikari {
  jdbc_url = ...
  username = ...
  password = ...
}

ユーザー独自のパスを指定したい場合はfromメソッドを使用して引数に取得したいパスを渡す必要があります。

val hikariConfig: com.zaxxer.hikari.HikariConfig = HikariConfigBuilder.from("custom.path").build()

// custom.path {
//   jdbc_url = ...
//   username = ...
//   password = ...
// }

HikariCPに設定できる内容は公式を参照してください。

Configに設定できるキーの一覧は以下になります。

キー名 説明
catalog 接続時に設定するデフォルトのカタログ名 String
connection_timeout クライアントがプールからの接続を待機する最大ミリ秒数 Duration
idle_timeout 接続がプール内でアイドル状態であることを許可される最大時間 (ミリ秒単位) Duration
leak_detection_threshold 接続漏れの可能性を示すメッセージがログに記録されるまでに、接続がプールから外れる時間 Duration
maximum_pool_size アイドル接続と使用中の接続の両方を含め、プールが許容する最大サイズ Int
max_lifetime プール内の接続の最大寿命 Duration
minimum_idle アイドル接続と使用中接続の両方を含め、HikariCPがプール内に維持しようとするアイドル接続の最小数 Int
pool_name 接続プールの名前 String
allow_pool_suspension プール・サスペンドを許可するかどうか Boolean
auto_commit プール内の接続のデフォルトの自動コミット動作 Boolean
connection_init_sql 新しい接続が作成されたときに、その接続がプールに追加される前に実行されるSQL文字列 String
connection_test_query 接続の有効性をテストするために実行する SQL クエリ String
data_source_classname Connections の作成に使用する JDBC DataSourceの完全修飾クラス名 String
initialization_fail_timeout プール初期化の失敗タイムアウト Duration
isolate_internal_queries 内部プール・クエリ (主に有効性チェック)を、Connection.rollback()によって独自のトランザクションで分離するかどうか Boolean
jdbc_url JDBCのURL String
readonly プールに追加する接続を読み取り専用接続として設定するかどうか Boolean
register_mbeans HikariCPがJMXにHikariConfigMXBeanとHikariPoolMXBeanを自己登録するかどうか Boolean
schema 接続時に設定するデフォルトのスキーマ名 String
username DataSource.getConnection(username,password)の呼び出しに使用されるデフォルトのユーザ名 String
password DataSource.getConnection(username,password)の呼び出しに使用するデフォルトのパスワード String
driver_class_name 使用するDriverのクラス名 String
transaction_isolation デフォルトのトランザクション分離レベル String

HikariDataSourceBuilderを使用することで、HikariCPのHikariDataSourceを構築することができます。

接続プールはライフタイムで管理されるオブジェクトでありきれいにシャットダウンする必要があるため、ビルダーによって構築されたHikariDataSourceResourceとして管理されます。

val dataSource: Resource[IO, HikariDataSource] = HikariDataSourceBuilder.default[IO].buildDataSource()

buildDataSource経由で構築されたHikariDataSourceは、内部でLDBC指定のパスを元にConfigから設定を取得し構築されたHikariConfigを使用しています。 これはHikariConfigBuilderdefault経由で生成されたHikariConfigと同等のものです。

もしユーザー指定のHikariConfigを使用したい場合は、buildFromConfigを使用することでHikariDataSourceを構築することができます。

val hikariConfig = ???
val dataSource = HikariDataSourceBuilder.default[IO].buildFromConfig(hikariConfig)

HikariDataSourceBuilderを使用して構築されたHikariDataSourceは通常IOAppを使用して実行します。

object HikariApp extends IOApp:

  val dataSourceResource: Resource[IO, HikariDataSource] = HikariDataSourceBuilder.default[IO].buildDataSource()

  def run(args: List[String]): IO[ExitCode] =
    dataSourceResource.use { dataSource =>
       ...
    }

HikariDatabase

HikariCPのコネクション情報を持ったDatabaseを構築する方法も存在します。

HikariDatabaseHikariDataSourceと同様にResourceとして管理されます。 そのため通常はIOAppを使用して実行します。

object HikariApp extends IOApp:

  val hikariConfig = ???
  val databaseResource: Resource[F, Database[F]] = HikariDatabase.fromHikariConfig[IO](hikariConfig)

  def run(args: List[String]): IO[ExitCode] =
    databaseResource.use { database =>
       for
         result <- database.readOnly(...)
        yield ExitCode.Success
    }