I''m trying to run a single worker that could perform a periodic task for a given user.>From a controller, I imagine something like:def start_job MiddleMan.ask_work(:worker => :foo_worker, :worker_method => :perform_task, :data => { :user_id = current_user.id }) end def check_job @status = MiddleMan.ask_status(:worker => :foo_worker)[current_user.id] end My worker is something like: class FooWorker < BackgrounDRb::MetaWorker set_worker_name :foo_worker def create(args=nil) @mutex = Mutex.new @mutex.synchronize do @statuses = {} register_status(@statuses) end end def do_task(some_user_id) thread_pool.defer(some_user_id) do |user_id| user = User.find user_id save_status user_id, :progress, "Starting Task" user.do_some_database_stuff save_status user_id, :progress, "Task Stage 2" user.do_some_other_database_stuff save_status user_id, :progress, "Task Stage 3" save_status user_id, :completed, true end end def save_status(user_id, key, data,) @mutex.synchronize do if @statuses[user_id].nil? @statuses[user_id] = {} @statuses[user_id][:completed] = false end @statuses[user_id][key] = data register_status(@statuses) logger.info "statuses synced for #{user_id}, #{key}, #{data}" end end end Problem is, when I use thread_pool, it gets as far as the first part of the task and then just dies, and the backgroundrb script just outputs "going to sleep for a while" over and over until I kill it. I tried taking out the mutex and the register_status bits and the same problem happens. When I use "Thread.new" in place of the thread_pool line, everything works. However, I get some "Some read error" messages during the "do_task" work. Everything happens like it should - I even ran two sessions concurrently and initiated the task with two different users at the same time. The work gets done; although a bit slowly and with those strange errors. Oddly enough, when I run the jobs from the rails irb console, they go without any error - even if I start a couple jobs for multiple users and spam "MiddleMan.ask_status" while the jobs are still running. (of course, when I start the server using the ''Thread.new'' method, I get a "no marshal_dump is defined for class Thread" exception in the backgroundrb_server log. I''m guessing this has something to do with why I''m supposed to be using thread_pool instead of Thread). When I get those read errors, the backgroundrb_debug.log shows "Client disconected" (sic). I don''t know if this is normal for rails talking to bgrb or if it''s an error. I don''t see it on every line, but it seems to coincide with the "some read error" messages in the script. Am I doing something wrong? Anyone have a working example using thread_pool? Also, I''ve never used Mutex before, so if my usage is off, please let me know. Thanks very much, - Jason L. -- My Rails and Linux Blog: http://offtheline.net
On Dec 19, 2007 6:58 AM, Jason LaPier <jason.lapier at gmail.com> wrote:> I''m trying to run a single worker that could perform a periodic task > for a given user. > > >From a controller, I imagine something like: > def start_job > MiddleMan.ask_work(:worker => :foo_worker, :worker_method => :perform_task, > :data => { :user_id = current_user.id }) > end > > def check_job > @status = MiddleMan.ask_status(:worker => :foo_worker)[current_user.id] > end > > My worker is something like: > class FooWorker < BackgrounDRb::MetaWorker > set_worker_name :foo_worker > def create(args=nil) > @mutex = Mutex.new > @mutex.synchronize do > @statuses = {} > register_status(@statuses) > end > end > > def do_task(some_user_id) > thread_pool.defer(some_user_id) do |user_id| > user = User.find user_id > save_status user_id, :progress, "Starting Task" > user.do_some_database_stuff > save_status user_id, :progress, "Task Stage 2" > user.do_some_other_database_stuff > save_status user_id, :progress, "Task Stage 3" > save_status user_id, :completed, true > end > end > > def save_status(user_id, key, data,) > @mutex.synchronize do > if @statuses[user_id].nil? > @statuses[user_id] = {} > @statuses[user_id][:completed] = false > end > @statuses[user_id][key] = data > register_status(@statuses) > logger.info "statuses synced for #{user_id}, #{key}, #{data}" > end > end > > end > > > Problem is, when I use thread_pool, it gets as far as the first part > of the task and then just dies, and the backgroundrb script just > outputs "going to sleep for a while" over and over until I kill it. I > tried taking out the mutex and the register_status bits and the same > problem happens. When I use "Thread.new" in place of the thread_pool > line, everything works. However, I get some "Some read error" messages > during the "do_task" work. Everything happens like it should - I even > ran two sessions concurrently and initiated the task with two > different users at the same time. The work gets done; although a bit > slowly and with those strange errors. Oddly enough, when I run the > jobs from the rails irb console, they go without any error - even if I > start a couple jobs for multiple users and spam "MiddleMan.ask_status" > while the jobs are still running. (of course, when I start the server > using the ''Thread.new'' method, I get a "no marshal_dump is defined for > class Thread" exception in the backgroundrb_server log. I''m guessing > this has something to do with why I''m supposed to be using thread_pool > instead of Thread). > > When I get those read errors, the backgroundrb_debug.log shows "Client > disconected" (sic). I don''t know if this is normal for rails talking > to bgrb or if it''s an error. I don''t see it on every line, but it > seems to coincide with the "some read error" messages in the script. > > Am I doing something wrong? Anyone have a working example using > thread_pool? Also, I''ve never used Mutex before, so if my usage is > off, please let me know. > > Thanks very much,Hmm, I am having a look at the problem. Do not worry about "some read error" messages, they just indicate that connection that rails was having with backgroundrb server is closed. This happens quite often because whenever you do: MiddleMan.xxx A new connection to backgroundrb server is initiated and older one is closed and at that time you get that message. I put those "puts" for debugging purpose. -- Let them talk of their oriental summer climes of everlasting conservatories; give me the privilege of making my own summer with my own coals. http://gnufied.org
On Dec 19, 2007 1:54 PM, hemant <gethemant at gmail.com> wrote:> > On Dec 19, 2007 6:58 AM, Jason LaPier <jason.lapier at gmail.com> wrote: > > I''m trying to run a single worker that could perform a periodic task > > for a given user. > > > > >From a controller, I imagine something like: > > def start_job > > MiddleMan.ask_work(:worker => :foo_worker, :worker_method => :perform_task, > > :data => { :user_id = current_user.id }) > > end > > > > def check_job > > @status = MiddleMan.ask_status(:worker => :foo_worker)[current_user.id] > > end > > > > My worker is something like: > > class FooWorker < BackgrounDRb::MetaWorker > > set_worker_name :foo_worker > > def create(args=nil) > > @mutex = Mutex.new > > @mutex.synchronize do > > @statuses = {} > > register_status(@statuses) > > end > > end > > > > def do_task(some_user_id) > > thread_pool.defer(some_user_id) do |user_id| > > user = User.find user_id > > save_status user_id, :progress, "Starting Task" > > user.do_some_database_stuff > > save_status user_id, :progress, "Task Stage 2" > > user.do_some_other_database_stuff > > save_status user_id, :progress, "Task Stage 3" > > save_status user_id, :completed, true > > end > > end > > > > def save_status(user_id, key, data,) > > @mutex.synchronize do > > if @statuses[user_id].nil? > > @statuses[user_id] = {} > > @statuses[user_id][:completed] = false > > end > > @statuses[user_id][key] = data > > register_status(@statuses) > > logger.info "statuses synced for #{user_id}, #{key}, #{data}" > > end > > end > > > > end > > > > > > Problem is, when I use thread_pool, it gets as far as the first part > > of the task and then just dies, and the backgroundrb script just > > outputs "going to sleep for a while" over and over until I kill it. I > > tried taking out the mutex and the register_status bits and the same > > problem happens. When I use "Thread.new" in place of the thread_pool > > line, everything works. However, I get some "Some read error" messages > > during the "do_task" work. Everything happens like it should - I even > > ran two sessions concurrently and initiated the task with two > > different users at the same time. The work gets done; although a bit > > slowly and with those strange errors. Oddly enough, when I run the > > jobs from the rails irb console, they go without any error - even if I > > start a couple jobs for multiple users and spam "MiddleMan.ask_status" > > while the jobs are still running. (of course, when I start the server > > using the ''Thread.new'' method, I get a "no marshal_dump is defined for > > class Thread" exception in the backgroundrb_server log. I''m guessing > > this has something to do with why I''m supposed to be using thread_pool > > instead of Thread). > > > > When I get those read errors, the backgroundrb_debug.log shows "Client > > disconected" (sic). I don''t know if this is normal for rails talking > > to bgrb or if it''s an error. I don''t see it on every line, but it > > seems to coincide with the "some read error" messages in the script. > > > > Am I doing something wrong? Anyone have a working example using > > thread_pool? Also, I''ve never used Mutex before, so if my usage is > > off, please let me know. > > > > Thanks very much, > > Hmm, > > I am having a look at the problem. Do not worry about "some read > error" messages, they just indicate that connection that rails was > having with backgroundrb server is closed. This happens quite often > because whenever you do: > > MiddleMan.xxx > > A new connection to backgroundrb server is initiated and older one is > closed and at that time you get that message. I put those "puts" for > debugging purpose.Hi Jason, Looks like there is an exception being thrown thats not handled in your code. Here is my sample worker code, similar to yours. I also got similar behaviour as you described, but then i set "Thread.abort_on_exception = true" and also ran backgroundrb in foreground mode(see README file for details). And then it turned out, there was an exception being thrown in my code and hence it was not going beyond first stage. Also, you don''t need to protect register_status() with mutex, but rather you should protect status variable with a mutex as demonstrated below. class ErrorWorker < BackgrounDRb::MetaWorker set_worker_name :error_worker attr_accessor :worker_status def create(args = nil) @worker_status = { } @status_mutex = Mutex.new register_status(@worker_status) Thread.abort_on_exception = true end def do_task(user_id) thread_pool.defer(user_id) do |user_id| save_status(user_id,:progress,"Started") t_user = World.find_by_id(user_id) t_user.change_message logger.info "message changed" save_status(user_id,:progress,"message_changed") t_user.change_name logger.info "name changed" save_status(user_id,:progress,"name changed") t_user.save end end def save_status(user_id,key,data) @status_mutex.synchronize do @worker_status[user_id] = { :key => key, :data => data} end register_status(@worker_status) logger.info "status synced for user #{user_id} #{key} #{data}" end end To stress test I did this: require File.join(File.dirname(__FILE__) + "/../config/environment") p MiddleMan.ask_status(:worker => :error_worker) (7..200).to_a.each do |world_id| MiddleMan.ask_work(:worker => :error_worker, :worker_method => :do_task, :data => world_id) p MiddleMan.ask_status(:worker => :error_worker) end and it worked flawlessly. See if this helps. -- Let them talk of their oriental summer climes of everlasting conservatories; give me the privilege of making my own summer with my own coals. http://gnufied.org
On Dec 19, 2007 1:18 AM, hemant <gethemant at gmail.com> wrote:> > Hi Jason, > > Looks like there is an exception being thrown thats not handled in > your code. Here is my sample worker code, similar to yours. I also got > similar behaviour as you described, but then i set > "Thread.abort_on_exception = true" and also ran backgroundrb in > foreground mode(see README file for details). > > And then it turned out, there was an exception being thrown in my code > and hence it was not going beyond first stage. Also, you don''t need to > protect register_status() with mutex, but rather you should protect > status variable with a mutex as demonstrated below. > > > class ErrorWorker < BackgrounDRb::MetaWorker > set_worker_name :error_worker > attr_accessor :worker_status > def create(args = nil) > @worker_status = { } > @status_mutex = Mutex.new > register_status(@worker_status) > Thread.abort_on_exception = true > end > > def do_task(user_id) > thread_pool.defer(user_id) do |user_id| > save_status(user_id,:progress,"Started") > t_user = World.find_by_id(user_id) > t_user.change_message > logger.info "message changed" > save_status(user_id,:progress,"message_changed") > t_user.change_name > logger.info "name changed" > save_status(user_id,:progress,"name changed") > t_user.save > end > end > > def save_status(user_id,key,data) > @status_mutex.synchronize do > @worker_status[user_id] = { :key => key, :data => data} > end > register_status(@worker_status) > logger.info "status synced for user #{user_id} #{key} #{data}" > end > end > > To stress test I did this: > > require File.join(File.dirname(__FILE__) + "/../config/environment") > p MiddleMan.ask_status(:worker => :error_worker) > > (7..200).to_a.each do |world_id| > MiddleMan.ask_work(:worker => :error_worker, :worker_method => > :do_task, :data => world_id) > p MiddleMan.ask_status(:worker => :error_worker) > end > > and it worked flawlessly. > > See if this helps. >Ok, awesome - thanks very much for your help. With the abort_on_exception flag and the foreground logging, I was able to track down the problem. If you try something like: def do_task(some_user_id) logger.info "do_task: some_user_id: #{some_user_id.class}" logger.info "starting task - defer to the thread pool" thread_pool.defer(some_user_id) do |user_id| logger.info "user_id (in pool): #{user_id.class}" # and the rest of it end end You''ll see in the log: do_task: some_user_id: Fixnum starting task - defer to the thread pool user_id (in pool): Array So some_user_id is being converted to [user_id] for the block. Calling AR''s find method with an array gives you back an array (which was the source of my original exception - trying to act on an Array as if it were a User object). When you use User.find_by_id in your example, AR is taking just the first element in the array and doing a find on that. I suspect that''s a bug in thread_pool, or at least if it''s not, it''s a little confusing syntactically. Now that I know how it works, I''ve got my code working, so if it''s not a bug, no problem :) - Jason L. -- My Rails and Linux Blog: http://offtheline.net
On Dec 19, 2007 10:58 PM, Jason LaPier <jason.lapier at gmail.com> wrote:> > On Dec 19, 2007 1:18 AM, hemant <gethemant at gmail.com> wrote: > > > > Hi Jason, > > > > Looks like there is an exception being thrown thats not handled in > > your code. Here is my sample worker code, similar to yours. I also got > > similar behaviour as you described, but then i set > > "Thread.abort_on_exception = true" and also ran backgroundrb in > > foreground mode(see README file for details). > > > > And then it turned out, there was an exception being thrown in my code > > and hence it was not going beyond first stage. Also, you don''t need to > > protect register_status() with mutex, but rather you should protect > > status variable with a mutex as demonstrated below. > > > > > > class ErrorWorker < BackgrounDRb::MetaWorker > > set_worker_name :error_worker > > attr_accessor :worker_status > > def create(args = nil) > > @worker_status = { } > > @status_mutex = Mutex.new > > register_status(@worker_status) > > Thread.abort_on_exception = true > > end > > > > def do_task(user_id) > > thread_pool.defer(user_id) do |user_id| > > save_status(user_id,:progress,"Started") > > t_user = World.find_by_id(user_id) > > t_user.change_message > > logger.info "message changed" > > save_status(user_id,:progress,"message_changed") > > t_user.change_name > > logger.info "name changed" > > save_status(user_id,:progress,"name changed") > > t_user.save > > end > > end > > > > def save_status(user_id,key,data) > > @status_mutex.synchronize do > > @worker_status[user_id] = { :key => key, :data => data} > > end > > register_status(@worker_status) > > logger.info "status synced for user #{user_id} #{key} #{data}" > > end > > end > > > > To stress test I did this: > > > > require File.join(File.dirname(__FILE__) + "/../config/environment") > > p MiddleMan.ask_status(:worker => :error_worker) > > > > (7..200).to_a.each do |world_id| > > MiddleMan.ask_work(:worker => :error_worker, :worker_method => > > :do_task, :data => world_id) > > p MiddleMan.ask_status(:worker => :error_worker) > > end > > > > and it worked flawlessly. > > > > See if this helps. > > > > Ok, awesome - thanks very much for your help. With the > abort_on_exception flag and the foreground logging, I was able to > track down the problem. If you try something like: > > def do_task(some_user_id) > logger.info "do_task: some_user_id: #{some_user_id.class}" > logger.info "starting task - defer to the thread pool" > > thread_pool.defer(some_user_id) do |user_id| > logger.info "user_id (in pool): #{user_id.class}" > # and the rest of it > end > end > > You''ll see in the log: > do_task: some_user_id: Fixnum > starting task - defer to the thread pool > user_id (in pool): Array > > So some_user_id is being converted to [user_id] for the block. Calling > AR''s find method with an array gives you back an array (which was the > source of my original exception - trying to act on an Array as if it > were a User object). When you use User.find_by_id in your example, AR > is taking just the first element in the array and doing a find on > that. > > I suspect that''s a bug in thread_pool, or at least if it''s not, it''s a > little confusing syntactically. Now that I know how it works, I''ve got > my code working, so if it''s not a bug, no problem :) > >Glad that you solved the problem. Yes the block is getting passed value as an Array is a bug and its fixed now. Whats more now, if your block throws an exception which you are not handling, it will be handled by bdrb and logged in backgroundrb.log. Sync with latest code for all this goodness. -- Let them talk of their oriental summer climes of everlasting conservatories; give me the privilege of making my own summer with my own coals. http://gnufied.org
On Dec 19, 2007 1:08 PM, hemant <gethemant at gmail.com> wrote:> > Glad that you solved the problem. Yes the block is getting passed > value as an Array is a bug and its fixed now. > Whats more now, if your block throws an exception which you are not > handling, it will be handled by bdrb and logged in backgroundrb.log. > > Sync with latest code for all this goodness.Great! Thanks for fixing this stuff so quickly. I just synced and everything is working great. - Jason L. -- My Rails and Linux Blog: http://offtheline.net
Yes, Hemant is quick ! BackgrounDRd is one of the best piece of soft for Rails so far ! Thanks for your work Hemant ! I have implemented a little chat over XMPP with your help (with threadpool too), thanks ! Mickael. On Dec 19, 2007 10:42 PM, Jason LaPier <jason.lapier at gmail.com> wrote:> On Dec 19, 2007 1:08 PM, hemant <gethemant at gmail.com> wrote: > > > > Glad that you solved the problem. Yes the block is getting passed > > value as an Array is a bug and its fixed now. > > Whats more now, if your block throws an exception which you are not > > handling, it will be handled by bdrb and logged in backgroundrb.log. > > > > Sync with latest code for all this goodness. > > Great! Thanks for fixing this stuff so quickly. I just synced and > everything is working great. > > - Jason L.