Miah Petersen
2010-Mar-25  17:42 UTC
Threading and it is more of a ruby question, than a rails question
I needed an email queueing system for a project I am writing.  The script
below runs every minute via a cronjob on one of my servers.  I am using a
table in mysql to keep track of the number of email bot threads to spawn and
their current state.  They need to be threaded(or at least run concurrently)
since some email lists will be longer than others, and some emails have a
higher priority.  The thought is that this way I can quickly add more email
threads as is needed, as well as have a simple web ui to check their state
and turn them on or off as is necessary.
It works perfectly, most of the time.  But sometime, arbitrarily, one of the
threads does not finish and an email bot state is left as processing.  In
the log file, it actually says that it completes, but the entry in the
database still says processing.  Any ideas?
======== Email Bots Table
==========+----+--------+----------------------------------------------+----------------+---------------------+
| id | status | query_constraints                            | state
   | updated_at          |
+----+--------+----------------------------------------------+----------------+---------------------+
|  1 | on     | select * from email_queue where priority = 0 |
idle           | 2010-03-25 17:33:14 |
|  2 | on     | select * from email_queue WHERE priority > 0 |
idle           | 2010-03-25 17:33:14 |
+----+--------+----------------------------------------------+----------------+---------------------+
======== Email bot Script that runs via cronjob ==========
require ''rubygems''
require ''net/smtp''
require ''timeout''
require ''mysql''
require ''activesupport''
require ''activerecord''
begin
  require ''openssl''
rescue LoadError
end
ActiveRecord::Base.establish_connection(
  :adapter => ''mysql'',
  :host => ''localhost'',
  :database => ''adevelopment'',
  :username => ''username'',
  :password => ''password''
)
  def send_email(from, to, subject, message)
  msg = <<END_OF_MESSAGE
From: #{from}
To: #{to}
MIME-Version: 1.0
Content-type: text/html
Subject: #{subject}
#{message}
END_OF_MESSAGE
    Net::SMTP.start(''server'', 25, ''<sending
domain>'',''username'', ''password'',
:login) do |smtp|
      smtp.send_message msg,
from.gsub(/[^<]+</,'''').gsub(/>/,''''),
to.gsub(/[^<]+</,'''').gsub(/>/,'''')
    end
  end
  # gather email bots
  mysql = ActiveRecord::Base.connection.execute("SELECT * FROM
email_bots")
  emailbots = {}
  mysql.each_hash do |p|
    emailbots[p[''id'']] = p
  end
  mysql.free
  threads = []
  emailbots.each do |emailbot|
    if emailbot[1][''status''] == ''on''
&& emailbot[1][''state''] == ''idle''
      threads << Thread.new{
        ebot = emailbot[1]
        ActiveRecord::Base.connection.execute("UPDATE email_bots SET
state=''processing'', updated_at=UTC_TIMESTAMP() WHERE
id=#{ebot[''id'']}")
        sent = ""
        mysql
ActiveRecord::Base.connection.execute(ebot[''query_constraints''])
i = 1
        mysql.each_hash do |email|
puts email[''id'']
          begin
            send_email(email[''sender''],
email[''recipient''],
email[''subject''], email[''message''])
            sent += email[''id''] + ","
          rescue Exception => e
ActiveRecord::Base.connection.execute(ActiveRecord::Base.send("sanitize_sql_array",["UPDATE
email_queue SET error=''%s'' WHERE
id=#{email[''id'']}", "#{e.message}
#{e.backtrace.inspect}"]))
          end
          i = i + 1
          if i % 100 == 0
            if sent != ""
              ActiveRecord::Base.connection.execute("INSERT INTO email_sent
(SELECT priority, user_id, #{ebot[''id'']}, sender, recipient,
subject,
message, error, UTC_TIMESTAMP FROM email_queue WHERE id in
(#{sent.chop!}))")
              ActiveRecord::Base.connection.execute("DELETE FROM
email_queue
WHERE id in (#{sent})")
              sent = ""
            end
          end
        end
        mysql.free
        if sent != ""
          ActiveRecord::Base.connection.execute("INSERT INTO email_sent
(SELECT priority, user_id, #{ebot[''id'']}, sender, recipient,
subject,
message, error, UTC_TIMESTAMP FROM email_queue WHERE id in
(#{sent.chop!}))")
          ActiveRecord::Base.connection.execute("DELETE FROM email_queue
WHERE id in (#{sent})")
        end
        ActiveRecord::Base.connection.execute("UPDATE email_bots SET
state=''idle'' WHERE id=#{ebot[''id'']}")
      }
    else
#  This is a potential work around which i am not too excited about.
#      mysql
ActiveRecord::Base.connection.execute(ebot[''query_constraints''])
#      if mysql.num_rows == 0
#        ActiveRecord::Base.connection.execute("UPDATE email_bots SET
current_action=''idle'' WHERE
id=#{ebot[''id'']}")
      end
    end
  end
  threads.each { |t|
    t.join
  }
-- 
You received this message because you are subscribed to the Google Groups
"Ruby on Rails: Talk" group.
To post to this group, send email to
rubyonrails-talk-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
To unsubscribe from this group, send email to
rubyonrails-talk+unsubscribe-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org
For more options, visit this group at
http://groups.google.com/group/rubyonrails-talk?hl=en.
miah
2010-Mar-25  18:07 UTC
Re: Threading and it is more of a ruby question, than a rails question
Actually looking it is terminating when i call mysql.free after finishing walking through all of the sends On Mar 25, 12:42 pm, Miah Petersen <nadan...-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org> wrote:> I needed an email queueing system for a project I am writing. The script > below runs every minute via a cronjob on one of my servers. I am using a > table in mysql to keep track of the number of email bot threads to spawn and > their current state. They need to be threaded(or at least run concurrently) > since some email lists will be longer than others, and some emails have a > higher priority. The thought is that this way I can quickly add more email > threads as is needed, as well as have a simple web ui to check their state > and turn them on or off as is necessary. > > It works perfectly, most of the time. But sometime, arbitrarily, one of the > threads does not finish and an email bot state is left as processing. In > the log file, it actually says that it completes, but the entry in the > database still says processing. Any ideas? > > ======== Email Bots Table ==========> +----+--------+----------------------------------------------+----------------+---------------------+ > | id | status | query_constraints | state > | updated_at | > +----+--------+----------------------------------------------+----------------+---------------------+ > | 1 | on | select * from email_queue where priority = 0 | > idle | 2010-03-25 17:33:14 | > | 2 | on | select * from email_queue WHERE priority > 0 | > idle | 2010-03-25 17:33:14 | > +----+--------+----------------------------------------------+----------------+---------------------+ > > ======== Email bot Script that runs via cronjob ==========> > require ''rubygems'' > require ''net/smtp'' > require ''timeout'' > require ''mysql'' > require ''activesupport'' > require ''activerecord'' > > begin > require ''openssl'' > rescue LoadError > end > > ActiveRecord::Base.establish_connection( > :adapter => ''mysql'', > :host => ''localhost'', > :database => ''adevelopment'', > :username => ''username'', > :password => ''password'' > ) > > def send_email(from, to, subject, message) > msg = <<END_OF_MESSAGE > From: #{from} > To: #{to} > MIME-Version: 1.0 > Content-type: text/html > Subject: #{subject} > > #{message} > END_OF_MESSAGE > > Net::SMTP.start(''server'', 25, ''<sending domain>'',''username'', ''password'', > :login) do |smtp| > smtp.send_message msg, from.gsub(/[^<]+</,'''').gsub(/>/,''''), > to.gsub(/[^<]+</,'''').gsub(/>/,'''') > end > end > > # gather email bots > mysql = ActiveRecord::Base.connection.execute("SELECT * FROM email_bots") > emailbots = {} > mysql.each_hash do |p| > emailbots[p[''id'']] = p > end > mysql.free > > threads = [] > emailbots.each do |emailbot| > if emailbot[1][''status''] == ''on'' && emailbot[1][''state''] == ''idle'' > threads << Thread.new{ > ebot = emailbot[1] > ActiveRecord::Base.connection.execute("UPDATE email_bots SET > state=''processing'', updated_at=UTC_TIMESTAMP() WHERE id=#{ebot[''id'']}") > sent = "" > mysql > ActiveRecord::Base.connection.execute(ebot[''query_constraints'']) > i = 1 > mysql.each_hash do |email| > puts email[''id''] > begin > send_email(email[''sender''], email[''recipient''], > email[''subject''], email[''message'']) > sent += email[''id''] + "," > rescue Exception => e > > ActiveRecord::Base.connection.execute(ActiveRecord::Base.send("sanitize_sql_array",["UPDATE > email_queue SET error=''%s'' WHERE id=#{email[''id'']}", "#{e.message} > #{e.backtrace.inspect}"])) > end > i = i + 1 > if i % 100 == 0 > if sent != "" > ActiveRecord::Base.connection.execute("INSERT INTO email_sent > (SELECT priority, user_id, #{ebot[''id'']}, sender, recipient, subject, > message, error, UTC_TIMESTAMP FROM email_queue WHERE id in > (#{sent.chop!}))") > ActiveRecord::Base.connection.execute("DELETE FROM email_queue > WHERE id in (#{sent})") > sent = "" > end > end > end > # the mysql.free is called, but it doesn''t run past this. > mysql.free > if sent != "" > ActiveRecord::Base.connection.execute("INSERT INTO email_sent > (SELECT priority, user_id, #{ebot[''id'']}, sender, recipient, subject, > message, error, UTC_TIMESTAMP FROM email_queue WHERE id in > (#{sent.chop!}))") > ActiveRecord::Base.connection.execute("DELETE FROM email_queue > WHERE id in (#{sent})") > end > ActiveRecord::Base.connection.execute("UPDATE email_bots SET > state=''idle'' WHERE id=#{ebot[''id'']}") > } > else > # This is a potential work around which i am not too excited about. > # mysql > ActiveRecord::Base.connection.execute(ebot[''query_constraints'']) > # if mysql.num_rows == 0 > # ActiveRecord::Base.connection.execute("UPDATE email_bots SET > current_action=''idle'' WHERE id=#{ebot[''id'']}") > end > end > end > threads.each { |t| > t.join > }-- You received this message because you are subscribed to the Google Groups "Ruby on Rails: Talk" group. To post to this group, send email to rubyonrails-talk-/JYPxA39Uh5TLH3MbocFF+G/Ez6ZCGd0@public.gmane.org To unsubscribe from this group, send email to rubyonrails-talk+unsubscribe@googlegroups.com. For more options, visit this group at http://groups.google.com/group/rubyonrails-talk?hl=en.