Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lib/queue_classic.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ def self.default_queue
end
end

def self.default_conn_adapter
@conn_adapter ||= ConnAdapter.new
end

def self.log_yield(data)
begin
t0 = Time.now
Expand Down
121 changes: 0 additions & 121 deletions lib/queue_classic/conn.rb

This file was deleted.

106 changes: 106 additions & 0 deletions lib/queue_classic/conn_adapter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
require 'thread'
require 'uri'
require 'pg'

module QC
class ConnAdapter

attr_accessor :connection
def initialize(c=nil)
@connection = c.nil? ? establish_new : validate!(c)
@mutex = Mutex.new
end

def execute(stmt, *params)
@mutex.synchronize do
QC.log(:at => "exec_sql", :sql => stmt.inspect)
begin
params = nil if params.empty?
r = @connection.exec(stmt, params)
result = []
r.each {|t| result << t}
result.length > 1 ? result : result.pop
rescue PGError => e
QC.log(:error => e.inspect)
@connection.reset
raise
end
end
end

def wait(time, *channels)
@mutex.synchronize do
listen_cmds = channels.map {|c| 'LISTEN "' + c + '"'}
@connection.exec(listen_cmds.join(';'))
wait_for_notify(time)
unlisten_cmds = channels.map {|c| 'UNLISTEN "' + c +'"'}
@connection.exec(unlisten_cmds.join(';'))
drain_notify
end
end

def disconnect
@mutex.synchronize do
begin
@connection.close
rescue => e
QC.log(:at => 'disconnect', :error => e.message)
end
end
end

private

def wait_for_notify(t)
Array.new.tap do |msgs|
@connection.wait_for_notify(t) {|event, pid, msg| msgs << msg}
end
end

def drain_notify
until @connection.notifies.nil?
QC.log(:at => "drain_notifications")
end
end

def validate!(c)
return c if c.is_a?(PG::Connection)
klass = c.class
err = "connection must be an instance of PG::Connection, but was #{c}"
raise(ArgumentError, err)
end

def establish_new
QC.log(:at => "establish_conn")
conn = PGconn.connect(*normalize_db_url(db_url))
if conn.status != PGconn::CONNECTION_OK
QC.log(:error => conn.error)
end
conn.exec("SET application_name = '#{QC::APP_NAME}'")
conn
end

def normalize_db_url(url)
host = url.host
host = host.gsub(/%2F/i, '/') if host

[
host, # host or percent-encoded socket path
url.port || 5432,
nil, '', #opts, tty
url.path.gsub("/",""), # database name
url.user,
url.password
]
end

def db_url
return @db_url if @db_url
url = ENV["QC_DATABASE_URL"] ||
ENV["DATABASE_URL"] ||
raise(ArgumentError, "missing QC_DATABASE_URL or DATABASE_URL")
@db_url = URI.parse(url)
end

end
end
28 changes: 16 additions & 12 deletions lib/queue_classic/queue.rb
Original file line number Diff line number Diff line change
@@ -1,33 +1,35 @@
require 'queue_classic'
require 'queue_classic/conn'
require 'queue_classic/conn_adapter'
require 'json'

module QC
class Queue

def self.delete(id)
QC.log_yield(:measure => 'queue.delete') do
Conn.execute("DELETE FROM #{TABLE_NAME} where id = $1", id)
end
end

attr_reader :name, :top_bound
def initialize(name, top_bound=nil)
@name = name
@top_bound = top_bound || QC::TOP_BOUND
end

def conn_adapter=(a)
@adapter = a
end

def conn_adapter
@adapter ||= QC.default_conn_adapter
end

def enqueue(method, *args)
QC.log_yield(:measure => 'queue.enqueue') do
s="INSERT INTO #{TABLE_NAME} (q_name, method, args) VALUES ($1, $2, $3)"
res = Conn.execute(s, name, method, JSON.dump(args))
res = conn_adapter.execute(s, name, method, JSON.dump(args))
end
end

def lock
QC.log_yield(:measure => 'queue.lock') do
s = "SELECT * FROM lock_head($1, $2)"
if r = Conn.execute(s, name, top_bound)
if r = conn_adapter.execute(s, name, top_bound)
{:id => r["id"],
:method => r["method"],
:args => JSON.parse(r["args"])}
Expand All @@ -36,20 +38,22 @@ def lock
end

def delete(id)
self.class.delete(id)
QC.log_yield(:measure => 'queue.delete') do
conn_adapter.execute("DELETE FROM #{TABLE_NAME} where id = $1", id)
end
end

def delete_all
QC.log_yield(:measure => 'queue.delete_all') do
s = "DELETE FROM #{TABLE_NAME} WHERE q_name = $1"
Conn.execute(s, name)
conn_adapter.execute(s, name)
end
end

def count
QC.log_yield(:measure => 'queue.count') do
s = "SELECT COUNT(*) FROM #{TABLE_NAME} WHERE q_name = $1"
r = Conn.execute(s, name)
r = conn_adapter.execute(s, name)
r["count"].to_i
end
end
Expand Down
16 changes: 10 additions & 6 deletions lib/queue_classic/setup.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@ module Setup
CreateTable = File.join(Root, "/sql/create_table.sql")
DropSqlFunctions = File.join(Root, "/sql/drop_ddl.sql")

def self.create
Conn.execute(File.read(CreateTable))
Conn.execute(File.read(SqlFunctions))
def self.create(c=nil)
conn = QC::ConnAdapter.new(c)
conn.execute(File.read(CreateTable))
conn.execute(File.read(SqlFunctions))
conn.disconnect if c.nil? #Don't close a conn we didn't create.
end

def self.drop
Conn.execute("DROP TABLE IF EXISTS queue_classic_jobs CASCADE")
Conn.execute(File.read(DropSqlFunctions))
def self.drop(c=nil)
conn = QC::ConnAdapter.new(c)
conn.execute("DROP TABLE IF EXISTS queue_classic_jobs CASCADE")
conn.execute(File.read(DropSqlFunctions))
conn.disconnect if c.nil? #Don't close a conn we didn't create.
end
end
end
Loading