ruby / db
I wrap the pg Postgres driver with light methods instead of using an ORM.
Configuration
Add the pg gem:
gem "connection_pool"
gem "pg"
Configure the pool on process boot:
# config/puma.rb
on_worker_boot do
DB.configure do |c|
c.pool_size = workers * threads
c.reap = true
end
end
Wrapper
The DB class manages connection pooling and provides a simple interface:
# lib/db.rb
require "connection_pool"
require "pg"
require_relative "db/instrumented_connection"
class DB
Config = Struct.new(:pool_size, :reap)
class << self
def configure
@config = Config.new
yield(@config)
end
def pool
@pool ||= new(
pool_size: @config.pool_size,
reap: @config.reap
)
end
end
def initialize(pool_size: 1, reap: false, raise_errors: false)
@env = ENV.fetch("APP_ENV")
@raise_errors = raise_errors
@pool = ConnectionPool.new(size: pool_size, timeout: 5) { build_pg_conn }
if reap
start_reaper_thread
end
end
def exec(sql, params = [])
@pool.with do |conn|
iconn = DB::InstrumentedConnection.new(conn, @env, raise_errors: @raise_errors)
iconn.exec(sql, params)
end
end
def transaction
@pool.with do |conn|
iconn = DB::InstrumentedConnection.new(conn, @env)
iconn.transaction { yield(iconn) }
end
end
def escape_string(string)
@pool.with do |conn|
conn.escape_string(string)
end
end
def fuzzy_like(query)
normalized = query.to_s.strip.sub(/\\+$/, "")
return "" if normalized == ""
pattern = Regexp.union("%", "_")
escape_char = "\\"
escaped = normalized
.gsub(pattern) { |x| [escape_char, x].join }
.tr(" ", "%")
"%#{escaped}%"
end
def self.db_url_for(env)
case env
when "staging", "production"
ENV.fetch("DATABASE_URL")
else
"postgres://postgres:postgres@localhost:5432/myapp_#{env}"
end
end
private def build_pg_conn
conn = PG.connect(DB.db_url_for(@env))
map = PG::BasicTypeMapForResults.new(conn)
map.default_type_map = PG::TypeMapAllStrings.new
conn.type_map_for_results = map
conn.type_map_for_queries = PG::BasicTypeMapForQueries.new(conn)
conn
end
private def start_reaper_thread
Thread.new do
Thread.current.name = "db-reaper"
loop do
@pool.reap(idle_seconds: 300) { |conn| conn&.close }
sleep 60
end
end
end
end
Each connection is wrapped with instrumentation for observability:
# lib/db/instrumented_connection.rb
require "forwardable"
require "pg"
require "sentry-ruby"
class DB
class InstrumentedConnection
SENTRY_ENVS = ["staging", "production"].freeze
extend Forwardable
def_delegators :@conn, *(
PG::Connection.public_instance_methods(false) - [
:exec,
:exec_params,
:transaction
]
)
def initialize(conn, env, raise_errors: false)
@conn = conn
@env = env
@raise_errors = raise_errors
end
def exec(sql, params = [])
rows = []
with_sentry(sql, params) do
result = execute_query(sql, params)
rows = result.to_a
end
rows
rescue PG::ConnectionBad
sleep 5 # give HA backup time to come online
retry
rescue => err
report_error(err, operation: "DB#exec", sql: sql, params: params)
if @raise_errors || !SENTRY_ENVS.include?(@env)
raise err
end
[]
end
def transaction
if in_transaction?
yield(self)
else
@conn.transaction { yield(self) }
end
rescue PG::ConnectionBad
sleep 5
retry
end
private def in_transaction?
@conn.transaction_status != 0
end
private def execute_query(sql, params)
if params.empty?
@conn.exec(sql)
else
params = params.map { |p| p.is_a?(Hash) ? p.to_json : p }
@conn.exec_params(sql, params)
end
end
private def with_sentry(sql, params = [])
tx = Sentry.get_current_scope&.get_span ||
Sentry.get_current_scope&.get_transaction ||
Sentry.start_transaction(name: "DB#exec")
if tx
tx.with_child_span(op: "db.sql.execute", description: sql) do |span|
span.set_data("SQL Params", params)
yield
end
else
yield
end
end
private def report_error(err, context = {})
if SENTRY_ENVS.include?(@env)
Sentry.capture_exception(err, extra: context)
end
end
end
end
Usage
In controllers, access the pool:
class ApplicationController < ActionController::Base
private def db
DB.pool
end
end
class SuggestionsController < ApplicationController
def new
render json: Search::SuggestCompany.new(db).call(
query: params[:query]
)
end
end
Pass db through initializers and use <<~SQL heredocs:
module Search
class SuggestCompany
def initialize(db)
@db = db
end
def call(query:)
@db.exec(<<~SQL, [@db.fuzzy_like(query)])
SELECT
companies.id,
companies.name,
companies.status
FROM
companies
WHERE
companies.name ILIKE $1
OR companies.also_known_as ILIKE $1
ORDER BY
companies.score DESC
LIMIT 50
SQL
end
end
end
For scripts, instantiate directly:
if $0 == __FILE__
require_relative "../db"
pp Search::SuggestCompany.new(DB.new).call(query: "Data")
end
Background processes
Initialize DB connections in background processes:
# Each forked worker gets its own connection
children = workers.map do |worker|
fork { worker.new(DB.new).poll }
end
See ruby / job queues and ruby / clock for complete examples.
Benefits
- Direct SQL with parameterized queries
- Connection pooling with automatic reaping
- Automatic retries for connection failures
- Sentry integration for error tracking and performance monitoring
- JSON serialization for hash parameters
- No DSL to learn or maintain
- Full control over every query
Migrations
Generate migrations with bin/db-gen-migration:
#!/usr/bin/env ruby
# bin/db-gen-migration
require "time"
require "fileutils"
name = ARGV[0].to_s.strip
if name == ""
puts "err: migration name required"
puts "usage: bin/db-gen-migration migration_name"
return
end
snake_name = name
.gsub(/([A-Z]+)([A-Z][a-z])/, '\1_\2')
.gsub(/([a-z\d])([A-Z])/, '\1_\2')
.tr("-", "_")
.downcase
camel_name = snake_name
.split("_")
.map(&:capitalize)
.join
filename = "#{Time.now.strftime("%Y%m%d%H%M%S")}_#{snake_name}.rb"
filepath = File.join(__dir__, "..", "db", "migrate", filename)
if File.exist?(filepath)
puts "err: migration file already exists: #{filename}"
exit 1
end
File.write(filepath, <<~RUBY)
class #{camel_name}
def initialize(db)
@db = db
end
def up
@db.transaction do |conn|
conn.exec <<~SQL
SQL
end
end
def down
@db.transaction do |conn|
conn.exec <<~SQL
SQL
end
end
end
RUBY
puts "created: db/migrate/#{filename}"
puts "see: db/README.md#safe-migrations"
This produces timestamped files in db/migrate/:
# db/migrate/20260122153000_add_score_to_companies.rb
class AddScoreToCompanies
def initialize(db)
@db = db
end
def up
@db.transaction do |conn|
conn.exec <<~SQL
ALTER TABLE companies ADD COLUMN score integer DEFAULT 0
SQL
end
end
def down
@db.transaction do |conn|
conn.exec <<~SQL
ALTER TABLE companies DROP COLUMN score
SQL
end
end
end
For DDL that can't run in a transaction, remove the transaction wrapper:
# db/migrate/20260122153555_add_index_foo.rb
class AddIndexFoo
def initialize(db)
@db = db
end
def up
@db.exec <<~SQL
CREATE INDEX CONCURRENTLY index_foo ON bar (col)
SQL
end
def down
@db.exec <<~SQL
DROP INDEX IF EXISTS index_foo
SQL
end
end
The runner sets safe timeouts:
# lib/db.rb
class DB
class Migrations
class Runner
def migrate
# Fail fast if can't acquire lock
db.exec("SET lock_timeout = '10s'")
# Allow migrations to run for a while
db.exec("SET statement_timeout = '10min'")
pending_versions.each do |ver|
migration_class(ver).new(db).up
db.exec(<<~SQL, [ver])
INSERT INTO schema_migrations (version) VALUES ($1)
SQL
end
dump # schema.sql for code review diffs
end
end
end
end
Check for pending migrations on boot:
# config.ru
DB::Migrations.check_pending!
Migration linter
A linter checks migrations for unsafe patterns. See postgres / safe migrations for DDL patterns.
# lib/db/linter.rb
class DB
class Linter
CHECKS = [
{
name: "non-concurrent index",
pattern: /CREATE\s+(UNIQUE\s+)?INDEX\s+(?!CONCURRENTLY)/i,
skip_if: ->(content) { content.include?("CREATE TABLE") },
message: "Use CREATE INDEX CONCURRENTLY outside a transaction."
},
{
name: "foreign key without NOT VALID",
pattern: /REFERENCES\s+\w+\s*\([^)]+\)(?!.*NOT\s+VALID)/i,
message: "Add NOT VALID, then VALIDATE in a separate migration."
},
{
name: "SET NOT NULL directly",
pattern: /ALTER\s+(TABLE\s+\w+\s+)?ALTER\s+(COLUMN\s+)?\w+\s+SET\s+NOT\s+NULL/i,
message: "Use a CHECK constraint with NOT VALID instead."
},
{
name: "check constraint without NOT VALID",
pattern: /ADD\s+CONSTRAINT\s+\w+\s+CHECK\s*\([^)]+\)(?!.*NOT\s+VALID)/i,
message: "Add NOT VALID, then VALIDATE in a separate migration."
}
]
def initialize
@migrate_dir = File.expand_path("../db/migrate", __dir__)
end
def run
errors = []
Dir[File.join(@migrate_dir, "*.rb")].sort.each do |file|
content = File.read(file, encoding: "UTF-8")
filename = File.basename(file)
CHECKS.each do |check|
if check[:skip_if]&.call(content)
next
end
if content.match?(check[:pattern])
errors << {file: filename, check: check[:name], message: check[:message]}
end
end
end
if errors.empty?
puts "dblint: all migrations ok"
return true
end
errors.each do |err|
puts "#{err[:file]}: #{err[:check]}"
puts " #{err[:message]}"
end
false
end
end
end
Run with bin/dblint:
#!/usr/bin/env ruby
require_relative "../lib/db/linter"
exit DB::Linter.new.run ? 0 : 1
Query analyzer
Wrap DB with analyzer to output EXPLAIN ANALYZE:
# lib/db/analyzer.rb
class DB
class Analyzer
def initialize(db, out: $stdout)
@db = db
@out = out
end
def exec(sql, params = [])
statement = sql.strip
type = statement.split.first.upcase
if %w(SELECT INSERT UPDATE DELETE MERGE WITH).include?(type)
explain_sql =
if type == "SELECT"
"EXPLAIN ANALYZE #{sql}"
else
"EXPLAIN #{sql}"
end
rows = @db.exec(explain_sql, params)
@out.puts explain_sql
@out.puts rows.map { |row| row.values.first }.join("\n")
end
@db.exec(sql, params)
end
def transaction(&)
@db.transaction(&)
end
def fuzzy_like(query)
@db.fuzzy_like(query)
end
def escape_string(string)
@db.escape_string(string)
end
end
end
Usage:
db = DB::Analyzer.new(DB.new)
Search::SuggestCompany.new(db).call(query: "Data")