lpgar/lib/lpgar.rb

396 lines
12 KiB
Ruby

# frozen_string_literal: true
require 'pg'
require 'digest'
require 'weakref'
module LPGAR
PREPARED = {
'lpgar_get_columns' => <<~COMM,
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = $1;
COMM
'lpgar_get_pk' => <<~COMM
SELECT a.attname, format_type(a.atttypid, a.atttypmod) AS data_type
FROM pg_index i
JOIN pg_attribute a ON a.attrelid = i.indrelid
AND a.attnum = ANY(i.indkey)
WHERE i.indrelid = $1::regclass
AND i.indisprimary;
COMM
}.freeze
# ActiveRecord object.
# @abstract this class is subclassed automatically to create new records.
class Record
# All currently existing instances of records, hashed by row identity.
# Used for record deduplication.
# @return [Hash{String => Record}]
@instances = {}
class << self
# Name of the table in the database that represents this record.
# @return [String]
attr_reader :table_name
# Postgres connection.
# @return [PG::Connection]
attr_reader :conn
# Valid columns.
# @return [Array(String)]
attr_reader :cols
# Primary key columns.
# @return [Array(String)]
attr_reader :cols_pk
# Delete a row from the database
# @param [Object] record instance of this class
def delete(record)
unless record.instance_of? self
raise StandardError, "not a row in #{table_name}"
end
keys = record.instance_variable_get("@data").values_at(*cols_pk)
# @sg-ignore
conn.exec(<<~QUERY, keys)
DELETE
FROM #{table_name}
WHERE #{record.send(:_identity_condition)[0]}
QUERY
unmake(record)
end
# Return either an existing Record or create a new one.
# @param data [Hash{String => Object}] row data
# @param syncless [Boolean] do not perform any communication with PG
# @return [Object] new record object
def new(data, syncless: false)
check_instantiable
ident = Digest::MD5.hexdigest(data.values_at(*cols_pk).join)
if @instances[ident] and @instances[ident].weakref_alive?
@instances[ident].sync
@instances[ident]
end
new_record = super(data)
unless syncless
synced = new_record.sync
insert_query(new_record) unless synced
end
track_instance(new_record)
new_record
end
# Explicitly create a new record. Returns nil if exists.
# @param data [Hash{String => Object}] row data
# @return [(Object,nil)] new record object or nil if exists
def create(data)
instance = new(data, syncless: true)
return if instance.sync
insert_query(instance)
instance
end
# Explicitly request an existing record. Returns nil if doesn't exist.
# @param data [Hash{String => Object}] row data
# @return [(Object,nil)] new record object or nil if doesn't exist.
def get(data)
instance = new(data, syncless: true)
instance if instance.sync
end
# Map returned rows from a query to an array of objects of this table.
# @param query [String] raw postgresql query
# @return [Array(Object)] array of records of this table
def map(query, params = [])
# @sg-ignore
conn.exec(query, params).map do |row|
new(row, syncless: true)
end
end
# Change row identity
# Should not be called directly in most cases.
# @param original [String] original row identity
# @param changed [String] new row identity
def reident(original, changed)
@instances[changed] = @instances[original]
@instances.delete(original)
end
# Create sync query.
# @return [String]
def sync_query
return @sync_query if @sync_query
selector = @cols_pk.map.with_index do |key, index|
"#{key} = $#{index + 1}"
end.join " AND "
@sync_query = <<~QUERY
SELECT * FROM #{@table_name}
WHERE #{selector}
LIMIT 1
QUERY
end
# Returns transaction class for this record.
# @return [Class]
# rubocop:disable Metrics/MethodLength, Metrics/AbcSize
def transaction_class
return @transaction if @transaction
columns = cols.filter_map do |x|
x.to_sym if x.match?(/\A[a-z_][a-z0-9_]*\Z/)
end
all_columns = cols
@transaction = Class.new do
@cols = all_columns
class << self
attr_reader :cols
end
define_method(:initialize) do
@data = {}
end
define_method(:[]) do |key|
@data[key] if self.class.cols.include? key
end
define_method(:[]=) do |key, value|
@data[key] = value if self.class.cols.include? key
end
columns.each do |column|
define_method(column) do
@data[column.to_s]
end
define_method("#{column}=".to_sym) do |value|
@data[column.to_s] = value
end
end
end
end
# rubocop:enable Metrics/MethodLength, Metrics/AbcSize
private
# Check if Record is properly set up.
# @raise [StandardError] rasied if class is not properly set up
def check_instantiable
unless [cols,
cols_pk,
conn,
table_name].map { |x| !x.nil? }.all? true
raise StandardError, "Invalid ActiveRecord class"
end
end
# Add a new Record instance to track.
# @param instance [Record] Instance of the Record (sub)class.
def track_instance(ins)
@instances[ins.identity] = WeakRef.new(ins)
end
# Replace writing and syncing methods with error stubs.
def unmake(record)
[:[]=, :sync, :transact, :commit].each do |method|
record.define_singleton_method(method) do |*_args, **_params|
raise StandardError, "row destroyed; cannot use #{method}"
end
end
end
# Create a new record by "INSERT"
def insert_query(record)
recbody = record.instance_variable_get("@data")
# @sg-ignore
conn.exec(<<~QUERY, recbody.values)
INSERT INTO #{table_name} (#{recbody.keys.join ', '})
VALUES (#{recbody.keys.map.with_index do |_, index|
'$%d' % (index + 1)
end.join ', '})
QUERY
end
end
def initialize(data)
@data = data
end
# Return record data by column name
# @param key [String] column name
def [](key)
@data[key]
end
# Set record data
# @param key [String] column name
# @param value [Object] row value
def []=(key, value)
unless self.class.cols.include? key
raise StandardError,
"invalid column #{key} for table #{self.class.table_name}"
end
original_identity = identity
_update({ key => value })
_check_identity(original_identity, identity)
end
# Return row identity, calculated as MD5 hash of primary key data.
# @return [String]
def identity
Digest::MD5.hexdigest(@data.values_at(*self.class.cols_pk).join)
end
# Attempt to synchronize row data.
# @return [Boolean] true if synchronization was successful.
def sync
done = false
pkvals = @data.values_at(*self.class.cols_pk)
# @sg-ignore
self.class.conn.exec(self.class.sync_query, pkvals).each do |row|
@data = row
done = true
end
done
end
# Change values of the record
# @param &block [#call] optional block to commit transaction inline.
# @return [Object] transaction object
def transact
transaction = _create_transaction
if block_given?
yield transaction
commit(transaction)
return
end
transaction
end
# Commit transaction changes.
# @param transaction [Object] transaction object
def commit(transaction)
original_identity = identity
transaction_data = transaction.instance_variable_get("@data")
_update(transaction_data)
_check_identity(original_identity, identity)
end
private
# Create a new transaction object.
# @return [Object]
def _create_transaction
transaction = self.class.transaction_class.new
@data.filter { |k, _| self.class.cols.include? k }.each do |k, v|
transaction[k] = v
end
transaction
end
# Check own identity and reident self if identity changed.
# @param original [String]
# @param changed [String]
def _check_identity(original, changed)
self.class.reident(original, changed) if original != changed
end
# Update row representation in database and on object.
# Should be used for most write operations.
# Does not check whether keys are valid.
# @param data [Hash{String => Object}]
def _update(data)
keys = @data.values_at(*self.class.cols_pk)
# @sg-ignore
self.class.conn.exec(<<~QUERY, data.values + keys).inspect
UPDATE #{self.class.table_name}
SET #{
query, last_index = _construct_set_query(data)
query
}
WHERE #{_identity_condition(last_index)[0]}
QUERY
data.each { |k, v| @data[k] = v }
end
# Returns identity condition for SQL queries.
# @param offset [Integer] placeholders index start
# @return [String, Integer] query part and last index
def _identity_condition(offset = 0)
last_index = 0
query = self.class.cols_pk.map.with_index do |key, index|
last_index = offset + index + 1
"#{key} = $#{last_index}"
end.join ' AND '
[query, last_index]
end
# Returns update assignment string with placeholders
# @param data [Hash]
# @param offset [Integer] placeholders index start
# @return [String, Integer] query part and last index
def _construct_set_query(data, offset = 0)
last_index = 0
query = data.keys.map.with_index do |key, index|
last_index = offset + index + 1
"#{key} = $#{last_index}"
end.join ', '
[query, last_index]
end
end
# Database connection to Postgres
class Database
# @see PG.connect
def initialize(*args, **params)
@conn = PG.connect(*args, **params)
@conn.type_map_for_queries = PG::BasicTypeMapForQueries.new @conn
@conn.type_map_for_results = PG::BasicTypeMapForResults.new @conn
PREPARED.each do |name, statement|
@conn.prepare name, statement
end
end
# Create class from table name
# @param name [String]
# @param block [#call]
# @return [Class<Record>]
def table(name, &block)
conn = @conn
new_class = Class.new(Record) do
@table_name = name
@cols = conn.exec_prepared('lpgar_get_columns', [name]).map do |row|
row['column_name']
end
raise StandardError, "table #{name} doesn't exist" if @cols.empty?
@cols_pk = conn.exec_prepared('lpgar_get_pk', [name]).map do |row|
row['attname']
end
if @cols_pk.empty?
raise StandardError, "table #{name} has no primary keys"
end
@conn = conn
@instances = {}
end
new_class.class_exec(&block) if block
new_class
end
# Close Postgres connection
def close
@conn.close
end
alias disconnect :close
# Raw Postgres connection
# @return PG::Connection
attr_reader :conn
end
end