396 lines
12 KiB
Ruby
396 lines
12 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require 'pg'
|
|
require 'digest'
|
|
require 'weakref'
|
|
|
|
module LPGAR
|
|
PREPARED = {
|
|
'lpgar_get_columns' => <<~COMM,
|
|
SELECT column_name, data_type
|
|
FROM information_schema.columns
|
|
WHERE table_name = $1;
|
|
COMM
|
|
'lpgar_get_pk' => <<~COMM
|
|
SELECT a.attname, format_type(a.atttypid, a.atttypmod) AS data_type
|
|
FROM pg_index i
|
|
JOIN pg_attribute a ON a.attrelid = i.indrelid
|
|
AND a.attnum = ANY(i.indkey)
|
|
WHERE i.indrelid = $1::regclass
|
|
AND i.indisprimary;
|
|
COMM
|
|
}.freeze
|
|
|
|
# ActiveRecord object.
|
|
# @abstract this class is subclassed automatically to create new records.
|
|
class Record
|
|
# All currently existing instances of records, hashed by row identity.
|
|
# Used for record deduplication.
|
|
# @return [Hash{String => Record}]
|
|
@instances = {}
|
|
|
|
class << self
|
|
# Name of the table in the database that represents this record.
|
|
# @return [String]
|
|
attr_reader :table_name
|
|
|
|
# Postgres connection.
|
|
# @return [PG::Connection]
|
|
attr_reader :conn
|
|
|
|
# Valid columns.
|
|
# @return [Array(String)]
|
|
attr_reader :cols
|
|
|
|
# Primary key columns.
|
|
# @return [Array(String)]
|
|
attr_reader :cols_pk
|
|
|
|
# Delete a row from the database
|
|
# @param [Object] record instance of this class
|
|
def delete(record)
|
|
unless record.instance_of? self
|
|
raise StandardError, "not a row in #{table_name}"
|
|
end
|
|
|
|
keys = record.instance_variable_get("@data").values_at(*cols_pk)
|
|
# @sg-ignore
|
|
conn.exec(<<~QUERY, keys)
|
|
DELETE
|
|
FROM #{table_name}
|
|
WHERE #{record.send(:_identity_condition)[0]}
|
|
QUERY
|
|
unmake(record)
|
|
end
|
|
|
|
# Return either an existing Record or create a new one.
|
|
# @param data [Hash{String => Object}] row data
|
|
# @param syncless [Boolean] do not perform any communication with PG
|
|
# @return [Object] new record object
|
|
def new(data, syncless: false)
|
|
check_instantiable
|
|
ident = Digest::MD5.hexdigest(data.values_at(*cols_pk).join)
|
|
if @instances[ident] and @instances[ident].weakref_alive?
|
|
@instances[ident].sync
|
|
@instances[ident]
|
|
end
|
|
|
|
new_record = super(data)
|
|
unless syncless
|
|
synced = new_record.sync
|
|
insert_query(new_record) unless synced
|
|
end
|
|
track_instance(new_record)
|
|
new_record
|
|
end
|
|
|
|
# Explicitly create a new record. Returns nil if exists.
|
|
# @param data [Hash{String => Object}] row data
|
|
# @return [(Object,nil)] new record object or nil if exists
|
|
def create(data)
|
|
instance = new(data, syncless: true)
|
|
return if instance.sync
|
|
|
|
insert_query(instance)
|
|
instance
|
|
end
|
|
|
|
# Explicitly request an existing record. Returns nil if doesn't exist.
|
|
# @param data [Hash{String => Object}] row data
|
|
# @return [(Object,nil)] new record object or nil if doesn't exist.
|
|
def get(data)
|
|
instance = new(data, syncless: true)
|
|
instance if instance.sync
|
|
end
|
|
|
|
# Map returned rows from a query to an array of objects of this table.
|
|
# @param query [String] raw postgresql query
|
|
# @return [Array(Object)] array of records of this table
|
|
def map(query, params = [])
|
|
# @sg-ignore
|
|
conn.exec(query, params).map do |row|
|
|
new(row, syncless: true)
|
|
end
|
|
end
|
|
|
|
# Change row identity
|
|
# Should not be called directly in most cases.
|
|
# @param original [String] original row identity
|
|
# @param changed [String] new row identity
|
|
def reident(original, changed)
|
|
@instances[changed] = @instances[original]
|
|
@instances.delete(original)
|
|
end
|
|
|
|
# Create sync query.
|
|
# @return [String]
|
|
def sync_query
|
|
return @sync_query if @sync_query
|
|
|
|
selector = @cols_pk.map.with_index do |key, index|
|
|
"#{key} = $#{index + 1}"
|
|
end.join " AND "
|
|
@sync_query = <<~QUERY
|
|
SELECT * FROM #{@table_name}
|
|
WHERE #{selector}
|
|
LIMIT 1
|
|
QUERY
|
|
end
|
|
|
|
# Returns transaction class for this record.
|
|
# @return [Class]
|
|
# rubocop:disable Metrics/MethodLength, Metrics/AbcSize
|
|
def transaction_class
|
|
return @transaction if @transaction
|
|
|
|
columns = cols.filter_map do |x|
|
|
x.to_sym if x.match?(/\A[a-z_][a-z0-9_]*\Z/)
|
|
end
|
|
all_columns = cols
|
|
@transaction = Class.new do
|
|
@cols = all_columns
|
|
class << self
|
|
attr_reader :cols
|
|
end
|
|
define_method(:initialize) do
|
|
@data = {}
|
|
end
|
|
define_method(:[]) do |key|
|
|
@data[key] if self.class.cols.include? key
|
|
end
|
|
define_method(:[]=) do |key, value|
|
|
@data[key] = value if self.class.cols.include? key
|
|
end
|
|
columns.each do |column|
|
|
define_method(column) do
|
|
@data[column.to_s]
|
|
end
|
|
define_method("#{column}=".to_sym) do |value|
|
|
@data[column.to_s] = value
|
|
end
|
|
end
|
|
end
|
|
end
|
|
# rubocop:enable Metrics/MethodLength, Metrics/AbcSize
|
|
|
|
private
|
|
|
|
# Check if Record is properly set up.
|
|
# @raise [StandardError] rasied if class is not properly set up
|
|
def check_instantiable
|
|
unless [cols,
|
|
cols_pk,
|
|
conn,
|
|
table_name].map { |x| !x.nil? }.all? true
|
|
raise StandardError, "Invalid ActiveRecord class"
|
|
end
|
|
end
|
|
|
|
# Add a new Record instance to track.
|
|
# @param instance [Record] Instance of the Record (sub)class.
|
|
def track_instance(ins)
|
|
@instances[ins.identity] = WeakRef.new(ins)
|
|
end
|
|
|
|
# Replace writing and syncing methods with error stubs.
|
|
def unmake(record)
|
|
[:[]=, :sync, :transact, :commit].each do |method|
|
|
record.define_singleton_method(method) do |*_args, **_params|
|
|
raise StandardError, "row destroyed; cannot use #{method}"
|
|
end
|
|
end
|
|
end
|
|
|
|
# Create a new record by "INSERT"
|
|
def insert_query(record)
|
|
recbody = record.instance_variable_get("@data")
|
|
# @sg-ignore
|
|
conn.exec(<<~QUERY, recbody.values)
|
|
INSERT INTO #{table_name} (#{recbody.keys.join ', '})
|
|
VALUES (#{recbody.keys.map.with_index do |_, index|
|
|
'$%d' % (index + 1)
|
|
end.join ', '})
|
|
QUERY
|
|
end
|
|
end
|
|
|
|
def initialize(data)
|
|
@data = data
|
|
end
|
|
|
|
# Return record data by column name
|
|
# @param key [String] column name
|
|
def [](key)
|
|
@data[key]
|
|
end
|
|
|
|
# Set record data
|
|
# @param key [String] column name
|
|
# @param value [Object] row value
|
|
def []=(key, value)
|
|
unless self.class.cols.include? key
|
|
raise StandardError,
|
|
"invalid column #{key} for table #{self.class.table_name}"
|
|
end
|
|
|
|
original_identity = identity
|
|
_update({ key => value })
|
|
_check_identity(original_identity, identity)
|
|
end
|
|
|
|
# Return row identity, calculated as MD5 hash of primary key data.
|
|
# @return [String]
|
|
def identity
|
|
Digest::MD5.hexdigest(@data.values_at(*self.class.cols_pk).join)
|
|
end
|
|
|
|
# Attempt to synchronize row data.
|
|
# @return [Boolean] true if synchronization was successful.
|
|
def sync
|
|
done = false
|
|
pkvals = @data.values_at(*self.class.cols_pk)
|
|
# @sg-ignore
|
|
self.class.conn.exec(self.class.sync_query, pkvals).each do |row|
|
|
@data = row
|
|
done = true
|
|
end
|
|
done
|
|
end
|
|
|
|
# Change values of the record
|
|
# @param &block [#call] optional block to commit transaction inline.
|
|
# @return [Object] transaction object
|
|
def transact
|
|
transaction = _create_transaction
|
|
if block_given?
|
|
yield transaction
|
|
commit(transaction)
|
|
return
|
|
end
|
|
transaction
|
|
end
|
|
|
|
# Commit transaction changes.
|
|
# @param transaction [Object] transaction object
|
|
def commit(transaction)
|
|
original_identity = identity
|
|
transaction_data = transaction.instance_variable_get("@data")
|
|
_update(transaction_data)
|
|
_check_identity(original_identity, identity)
|
|
end
|
|
|
|
private
|
|
|
|
# Create a new transaction object.
|
|
# @return [Object]
|
|
def _create_transaction
|
|
transaction = self.class.transaction_class.new
|
|
@data.filter { |k, _| self.class.cols.include? k }.each do |k, v|
|
|
transaction[k] = v
|
|
end
|
|
transaction
|
|
end
|
|
|
|
# Check own identity and reident self if identity changed.
|
|
# @param original [String]
|
|
# @param changed [String]
|
|
def _check_identity(original, changed)
|
|
self.class.reident(original, changed) if original != changed
|
|
end
|
|
|
|
# Update row representation in database and on object.
|
|
# Should be used for most write operations.
|
|
# Does not check whether keys are valid.
|
|
# @param data [Hash{String => Object}]
|
|
def _update(data)
|
|
keys = @data.values_at(*self.class.cols_pk)
|
|
# @sg-ignore
|
|
self.class.conn.exec(<<~QUERY, data.values + keys).inspect
|
|
UPDATE #{self.class.table_name}
|
|
SET #{
|
|
query, last_index = _construct_set_query(data)
|
|
query
|
|
}
|
|
WHERE #{_identity_condition(last_index)[0]}
|
|
QUERY
|
|
data.each { |k, v| @data[k] = v }
|
|
end
|
|
|
|
# Returns identity condition for SQL queries.
|
|
# @param offset [Integer] placeholders index start
|
|
# @return [String, Integer] query part and last index
|
|
def _identity_condition(offset = 0)
|
|
last_index = 0
|
|
query = self.class.cols_pk.map.with_index do |key, index|
|
|
last_index = offset + index + 1
|
|
"#{key} = $#{last_index}"
|
|
end.join ' AND '
|
|
[query, last_index]
|
|
end
|
|
|
|
# Returns update assignment string with placeholders
|
|
# @param data [Hash]
|
|
# @param offset [Integer] placeholders index start
|
|
# @return [String, Integer] query part and last index
|
|
def _construct_set_query(data, offset = 0)
|
|
last_index = 0
|
|
query = data.keys.map.with_index do |key, index|
|
|
last_index = offset + index + 1
|
|
"#{key} = $#{last_index}"
|
|
end.join ', '
|
|
[query, last_index]
|
|
end
|
|
end
|
|
|
|
# Database connection to Postgres
|
|
class Database
|
|
# @see PG.connect
|
|
def initialize(*args, **params)
|
|
@conn = PG.connect(*args, **params)
|
|
@conn.type_map_for_queries = PG::BasicTypeMapForQueries.new @conn
|
|
@conn.type_map_for_results = PG::BasicTypeMapForResults.new @conn
|
|
PREPARED.each do |name, statement|
|
|
@conn.prepare name, statement
|
|
end
|
|
end
|
|
|
|
# Create class from table name
|
|
# @param name [String]
|
|
# @param block [#call]
|
|
# @return [Class<Record>]
|
|
def table(name, &block)
|
|
conn = @conn
|
|
new_class = Class.new(Record) do
|
|
@table_name = name
|
|
@cols = conn.exec_prepared('lpgar_get_columns', [name]).map do |row|
|
|
row['column_name']
|
|
end
|
|
raise StandardError, "table #{name} doesn't exist" if @cols.empty?
|
|
|
|
@cols_pk = conn.exec_prepared('lpgar_get_pk', [name]).map do |row|
|
|
row['attname']
|
|
end
|
|
if @cols_pk.empty?
|
|
raise StandardError, "table #{name} has no primary keys"
|
|
end
|
|
|
|
@conn = conn
|
|
@instances = {}
|
|
end
|
|
new_class.class_exec(&block) if block
|
|
new_class
|
|
end
|
|
|
|
# Close Postgres connection
|
|
def close
|
|
@conn.close
|
|
end
|
|
|
|
alias disconnect :close
|
|
|
|
# Raw Postgres connection
|
|
# @return PG::Connection
|
|
attr_reader :conn
|
|
end
|
|
end
|