ruby / db

I wrap the pg Postgres driver with light methods instead of using an ORM.

Configuration

Add the pg gem:

gem "connection_pool"
gem "pg"

Configure the pool on process boot:

# config/puma.rb
on_worker_boot do
  DB.configure do |c|
    c.pool_size = workers * threads
    c.reap = true
  end
end

Wrapper

The DB class manages connection pooling and provides a simple interface:

# lib/db.rb
require "connection_pool"
require "pg"
require_relative "db/instrumented_connection"

class DB
  Config = Struct.new(:pool_size, :reap)

  class << self
    def configure
      @config = Config.new
      yield(@config)
    end

    def pool
      @pool ||= new(
        pool_size: @config.pool_size,
        reap: @config.reap
      )
    end
  end

  def initialize(pool_size: 1, reap: false, raise_errors: false)
    @env = ENV.fetch("APP_ENV")
    @raise_errors = raise_errors
    @pool = ConnectionPool.new(size: pool_size, timeout: 5) { build_pg_conn }

    if reap
      start_reaper_thread
    end
  end

  def exec(sql, params = [])
    @pool.with do |conn|
      iconn = DB::InstrumentedConnection.new(conn, @env, raise_errors: @raise_errors)
      iconn.exec(sql, params)
    end
  end

  def transaction
    @pool.with do |conn|
      iconn = DB::InstrumentedConnection.new(conn, @env)
      iconn.transaction { yield(iconn) }
    end
  end

  def escape_string(string)
    @pool.with do |conn|
      conn.escape_string(string)
    end
  end

  def fuzzy_like(query)
    normalized = query.to_s.strip.sub(/\\+$/, "")
    return "" if normalized == ""

    pattern = Regexp.union("%", "_")
    escape_char = "\\"
    escaped = normalized
      .gsub(pattern) { |x| [escape_char, x].join }
      .tr(" ", "%")

    "%#{escaped}%"
  end

  def self.db_url_for(env)
    case env
    when "staging", "production"
      ENV.fetch("DATABASE_URL")
    else
      "postgres://postgres:postgres@localhost:5432/myapp_#{env}"
    end
  end

  private def build_pg_conn
    conn = PG.connect(DB.db_url_for(@env))

    map = PG::BasicTypeMapForResults.new(conn)
    map.default_type_map = PG::TypeMapAllStrings.new

    conn.type_map_for_results = map
    conn.type_map_for_queries = PG::BasicTypeMapForQueries.new(conn)

    conn
  end

  private def start_reaper_thread
    Thread.new do
      Thread.current.name = "db-reaper"

      loop do
        @pool.reap(idle_seconds: 300) { |conn| conn&.close }
        sleep 60
      end
    end
  end
end

Each connection is wrapped with instrumentation for observability:

# lib/db/instrumented_connection.rb
require "forwardable"
require "pg"
require "sentry-ruby"

class DB
  class InstrumentedConnection
    SENTRY_ENVS = ["staging", "production"].freeze

    extend Forwardable

    def_delegators :@conn, *(
      PG::Connection.public_instance_methods(false) - [
        :exec,
        :exec_params,
        :transaction
      ]
    )

    def initialize(conn, env, raise_errors: false)
      @conn = conn
      @env = env
      @raise_errors = raise_errors
    end

    def exec(sql, params = [])
      rows = []

      with_sentry(sql, params) do
        result = execute_query(sql, params)
        rows = result.to_a
      end

      rows
    rescue PG::ConnectionBad
      sleep 5 # give HA backup time to come online
      retry
    rescue => err
      report_error(err, operation: "DB#exec", sql: sql, params: params)

      if @raise_errors || !SENTRY_ENVS.include?(@env)
        raise err
      end

      []
    end

    def transaction
      if in_transaction?
        yield(self)
      else
        @conn.transaction { yield(self) }
      end
    rescue PG::ConnectionBad
      sleep 5
      retry
    end

    private def in_transaction?
      @conn.transaction_status != 0
    end

    private def execute_query(sql, params)
      if params.empty?
        @conn.exec(sql)
      else
        params = params.map { |p| p.is_a?(Hash) ? p.to_json : p }
        @conn.exec_params(sql, params)
      end
    end

    private def with_sentry(sql, params = [])
      tx = Sentry.get_current_scope&.get_span ||
        Sentry.get_current_scope&.get_transaction ||
        Sentry.start_transaction(name: "DB#exec")

      if tx
        tx.with_child_span(op: "db.sql.execute", description: sql) do |span|
          span.set_data("SQL Params", params)
          yield
        end
      else
        yield
      end
    end

    private def report_error(err, context = {})
      if SENTRY_ENVS.include?(@env)
        Sentry.capture_exception(err, extra: context)
      end
    end
  end
end

Usage

In controllers, access the pool:

class ApplicationController < ActionController::Base
  private def db
    DB.pool
  end
end

class SuggestionsController < ApplicationController
  def new
    render json: Search::SuggestCompany.new(db).call(
      query: params[:query]
    )
  end
end

Pass db through initializers and use <<~SQL heredocs:

module Search
  class SuggestCompany
    def initialize(db)
      @db = db
    end

    def call(query:)
      @db.exec(<<~SQL, [@db.fuzzy_like(query)])
        SELECT
          companies.id,
          companies.name,
          companies.status
        FROM
          companies
        WHERE
          companies.name ILIKE $1
          OR companies.also_known_as ILIKE $1
        ORDER BY
          companies.score DESC
        LIMIT 50
      SQL
    end
  end
end

For scripts, instantiate directly:

if $0 == __FILE__
  require_relative "../db"
  pp Search::SuggestCompany.new(DB.new).call(query: "Data")
end

Background processes

Initialize DB connections in background processes:

# Each forked worker gets its own connection
children = workers.map do |worker|
  fork { worker.new(DB.new).poll }
end

See ruby / job queues and ruby / clock for complete examples.

Benefits

Migrations

Generate migrations with bin/db-gen-migration:

#!/usr/bin/env ruby
# bin/db-gen-migration

require "time"
require "fileutils"

name = ARGV[0].to_s.strip
if name == ""
  puts "err: migration name required"
  puts "usage: bin/db-gen-migration migration_name"
  return
end

snake_name = name
  .gsub(/([A-Z]+)([A-Z][a-z])/, '\1_\2')
  .gsub(/([a-z\d])([A-Z])/, '\1_\2')
  .tr("-", "_")
  .downcase
camel_name = snake_name
  .split("_")
  .map(&:capitalize)
  .join

filename = "#{Time.now.strftime("%Y%m%d%H%M%S")}_#{snake_name}.rb"
filepath = File.join(__dir__, "..", "db", "migrate", filename)
if File.exist?(filepath)
  puts "err: migration file already exists: #{filename}"
  exit 1
end

File.write(filepath, <<~RUBY)
  class #{camel_name}
    def initialize(db)
      @db = db
    end

    def up
      @db.transaction do |conn|
        conn.exec <<~SQL

        SQL
      end
    end

    def down
      @db.transaction do |conn|
        conn.exec <<~SQL

        SQL
      end
    end
  end
RUBY

puts "created: db/migrate/#{filename}"
puts "see: db/README.md#safe-migrations"

This produces timestamped files in db/migrate/:

# db/migrate/20260122153000_add_score_to_companies.rb
class AddScoreToCompanies
  def initialize(db)
    @db = db
  end

  def up
    @db.transaction do |conn|
      conn.exec <<~SQL
        ALTER TABLE companies ADD COLUMN score integer DEFAULT 0
      SQL
    end
  end

  def down
    @db.transaction do |conn|
      conn.exec <<~SQL
        ALTER TABLE companies DROP COLUMN score
      SQL
    end
  end
end

For DDL that can't run in a transaction, remove the transaction wrapper:

# db/migrate/20260122153555_add_index_foo.rb
class AddIndexFoo
  def initialize(db)
    @db = db
  end

  def up
    @db.exec <<~SQL
      CREATE INDEX CONCURRENTLY index_foo ON bar (col)
    SQL
  end

  def down
    @db.exec <<~SQL
      DROP INDEX IF EXISTS index_foo
    SQL
  end
end

The runner sets safe timeouts:

# lib/db.rb
class DB
  class Migrations
    class Runner
      def migrate
        # Fail fast if can't acquire lock
        db.exec("SET lock_timeout = '10s'")
        # Allow migrations to run for a while
        db.exec("SET statement_timeout = '10min'")

        pending_versions.each do |ver|
          migration_class(ver).new(db).up
          db.exec(<<~SQL, [ver])
            INSERT INTO schema_migrations (version) VALUES ($1)
          SQL
        end

        dump # schema.sql for code review diffs
      end
    end
  end
end

Check for pending migrations on boot:

# config.ru
DB::Migrations.check_pending!

Migration linter

A linter checks migrations for unsafe patterns. See postgres / safe migrations for DDL patterns.

# lib/db/linter.rb
class DB
  class Linter
    CHECKS = [
      {
        name: "non-concurrent index",
        pattern: /CREATE\s+(UNIQUE\s+)?INDEX\s+(?!CONCURRENTLY)/i,
        skip_if: ->(content) { content.include?("CREATE TABLE") },
        message: "Use CREATE INDEX CONCURRENTLY outside a transaction."
      },
      {
        name: "foreign key without NOT VALID",
        pattern: /REFERENCES\s+\w+\s*\([^)]+\)(?!.*NOT\s+VALID)/i,
        message: "Add NOT VALID, then VALIDATE in a separate migration."
      },
      {
        name: "SET NOT NULL directly",
        pattern: /ALTER\s+(TABLE\s+\w+\s+)?ALTER\s+(COLUMN\s+)?\w+\s+SET\s+NOT\s+NULL/i,
        message: "Use a CHECK constraint with NOT VALID instead."
      },
      {
        name: "check constraint without NOT VALID",
        pattern: /ADD\s+CONSTRAINT\s+\w+\s+CHECK\s*\([^)]+\)(?!.*NOT\s+VALID)/i,
        message: "Add NOT VALID, then VALIDATE in a separate migration."
      }
    ]

    def initialize
      @migrate_dir = File.expand_path("../db/migrate", __dir__)
    end

    def run
      errors = []

      Dir[File.join(@migrate_dir, "*.rb")].sort.each do |file|
        content = File.read(file, encoding: "UTF-8")
        filename = File.basename(file)

        CHECKS.each do |check|
          if check[:skip_if]&.call(content)
            next
          end

          if content.match?(check[:pattern])
            errors << {file: filename, check: check[:name], message: check[:message]}
          end
        end
      end

      if errors.empty?
        puts "dblint: all migrations ok"
        return true
      end

      errors.each do |err|
        puts "#{err[:file]}: #{err[:check]}"
        puts "  #{err[:message]}"
      end

      false
    end
  end
end

Run with bin/dblint:

#!/usr/bin/env ruby
require_relative "../lib/db/linter"
exit DB::Linter.new.run ? 0 : 1

Query analyzer

Wrap DB with analyzer to output EXPLAIN ANALYZE:

# lib/db/analyzer.rb
class DB
  class Analyzer
    def initialize(db, out: $stdout)
      @db = db
      @out = out
    end

    def exec(sql, params = [])
      statement = sql.strip
      type = statement.split.first.upcase

      if %w(SELECT INSERT UPDATE DELETE MERGE WITH).include?(type)
        explain_sql =
          if type == "SELECT"
            "EXPLAIN ANALYZE #{sql}"
          else
            "EXPLAIN #{sql}"
          end

        rows = @db.exec(explain_sql, params)
        @out.puts explain_sql
        @out.puts rows.map { |row| row.values.first }.join("\n")
      end

      @db.exec(sql, params)
    end

    def transaction(&)
      @db.transaction(&)
    end

    def fuzzy_like(query)
      @db.fuzzy_like(query)
    end

    def escape_string(string)
      @db.escape_string(string)
    end
  end
end

Usage:

db = DB::Analyzer.new(DB.new)
Search::SuggestCompany.new(db).call(query: "Data")

← All articles