Hello all, I''ve come up w/ a worker class that manages queued jobs using a fixed number of child workers. Well, that''s not quite true -- a new worker is spawned for each job, but you set the total number that may exist at once. There are three components: 1) queue_worker.rb: The singleton worker that manages the child workers. You probably want to auto start this. Make sure you give it :singleton=>true. Read this file for the methods to interact with your children. (ie. queue_job(), delete_job(), job_progress() ) 2) backgroundrb_rails_queue.rb: The super class for the "child workers" (and uses backgroundrb_rails.rb in turn). This file needs to be included in background.rb 3) Your child worker, which should be a subclass of BackgrounDRb::RailsQueue, is otherwise the about same as normal If it''s a big loop, you probably want to use terminate?() on each iteration and update @progress. Use suicide() at the end to make room for the next child. Options: (probably in your backgroundrb.yml) autostart: :queue_key: class: queue_worker args: :num_child_workers: 2 :child_class: :cost_calculator_worker :reQ_on_finish: true :singleton: true :queue_key can be changed to what you want, but it is the permanent key of the QueueWorker :num_child_workers: is up to you! :child_class: your worker class you want as child workers. :reQ_on_finish: do you want results to be stored in the queue until you call job_progress!() ? Note: to be able to access your child jobs w/ the QueueWorker methods, include a unique :id in your {args} when you queue_job({args}) I''ll attach the files. If they don''t go through, I''ll resend as text. BTW, This works well enough for me, but I''m learning as I go too, so no guarantees :) I don''t use the fancy timing options, so ymmmv for :next_start and :interval. Let me know if you find any issues (though I''m off-line for a week after this post). I''m wondering myself if it might be better to reuse child workers instead of re-spawning new ones. Another day maybe. cheers, David Lemstra -------------- next part -------------- An embedded and charset-unspecified text was scrubbed... Name: queue_worker.rb Url: http://rubyforge.org/pipermail/backgroundrb-devel/attachments/20060824/2349e2f4/attachment.ksh -------------- next part -------------- An embedded and charset-unspecified text was scrubbed... Name: backgroundrb_rails_queue.rb Url: http://rubyforge.org/pipermail/backgroundrb-devel/attachments/20060824/2349e2f4/attachment-0001.ksh
Wow! Thats pretty thick ;) I''ll work my way through it this weekend and see how it works. Thanks -Ezra On Aug 23, 2006, at 9:43 PM, David Lemstra wrote:> Hello all, > I''ve come up w/ a worker class that manages queued jobs using a fixed > number of child workers. Well, that''s not quite true -- a new > worker is > spawned for each job, but you set the total number that may exist > at once. > > There are three components: > 1) queue_worker.rb: The singleton worker that manages the child > workers. > You probably want to auto start this. Make sure you give it > :singleton=>true. Read this file for the methods to interact with your > children. (ie. queue_job(), delete_job(), job_progress() ) > > 2) backgroundrb_rails_queue.rb: The super class for the "child > workers" > (and uses backgroundrb_rails.rb in turn). This file needs to be > included > in background.rb > > 3) Your child worker, which should be a subclass of > BackgrounDRb::RailsQueue, is otherwise the about same as normal > If it''s a big loop, you probably want to use terminate?() on each > iteration and update @progress. Use suicide() at the end to make room > for the next child. > > Options: (probably in your backgroundrb.yml) > autostart: > :queue_key: > class: queue_worker > args: > :num_child_workers: 2 > :child_class: :cost_calculator_worker > :reQ_on_finish: true > :singleton: true > > :queue_key can be changed to what you want, but it is the permanent > key > of the QueueWorker > :num_child_workers: is up to you! > :child_class: your worker class you want as child workers. > :reQ_on_finish: do you want results to be stored in the queue until > you > call job_progress!() ? > Note: to be able to access your child jobs w/ the QueueWorker methods, > include a unique :id in your {args} when you queue_job({args}) > > I''ll attach the files. If they don''t go through, I''ll resend as text. > > BTW, This works well enough for me, but I''m learning as I go too, > so no > guarantees :) I don''t use the fancy timing options, so ymmmv for > :next_start and :interval. > > Let me know if you find any issues (though I''m off-line for a week > after > this post). I''m wondering myself if it might be better to reuse child > workers instead of re-spawning new ones. Another day maybe. > > cheers, > David Lemstra > # Put your code that runs your task inside the do_work method > # it will be run automatically in a thread. You have access to > # all of your rails models if you set load_rails to true in the > # config file. You also get @logger inside of this class by default. > require ''monitor.rb'' > > class QueueWorker < BackgrounDRb::Rails > > attr_reader :q, :id_hash, :completed > def initialize(key, args={}) > super(key,args) > @num_child_workers = args[:num_child_workers] ? args > [:num_child_workers] : 1 > @child_workers = Array.new(@num_child_workers) {|i| Hash > [:job_key,nil,:args,nil,:s_thread,nil, :s_mutex,Mutex.new, :child, > i] } > > @q = [] > @q.extend(MonitorMixin) > @q_loaded_cv = @q.new_cond > @id_hash = {} > @id_hash_mutex = Mutex.new > > raise ArgumentError unless args.has_key?(:child_class) > @child_class = args[:child_class] > @reQ_on_finish = args[:reQ_on_finish] || false > @completed = 0 > end > > def queue_job(args) > return nil if @id_hash && args[:id] && @id_hash.has_key?(args > [:id]) > @q.synchronize do > @q.push args > @id_hash_mutex.synchronize { @id_hash[args[:id]] = > {:status=>:queued, :job_key=>nil, :results=>nil } } if args[:id] > @q_loaded_cv.signal > end > return true > end > > def job_in_progress?(job_id) > @id_hash.has_key?(job_id) > end > > def job_status?(job_id) > @id_hash_mutex.synchronize do > return nil unless @id_hash.has_key?(job_id) > return @id_hash[job_key][:status] > end > end > > def job_progress(job_id) > report_hsh = {} > @id_hash_mutex.synchronize do > return nil unless @id_hash.has_key?(job_id) > report_hsh[:status] = @id_hash[job_id][:status] > report_hsh[:progress] = case @id_hash[job_id][:status] > when :queued then > ahead = 0 > @q.each_index {|i| if @q[i][:id] == job_id then ahead = > i; break; end } > ahead > when :running > w = self[@id_hash[job_id][:job_key]] > w.nil? ? nil : w.progress > when :done > @id_hash[job_id][:results] > else nil > end > end > return report_hsh > end > > def job_progress!(job_id) > report_hsh = {} > @id_hash_mutex.synchronize do > return nil unless @id_hash.has_key?(job_id) > report_hsh[:status] = @id_hash[job_id][:status] > report_hsh[:progress] = case @id_hash[job_id][:status] > when :queued then > ahead = 0 > @q.each_index {|i| if @q[i][:id] == job_id then ahead = > i; break; end } > ahead > when :running > w = self[@id_hash[job_id][:job_key]] > if w.nil? > @id_hash.delete(job_id)[:results] > else > w.progress > end > when :done > @id_hash.delete(job_id)[:results] > else nil > end > end > return report_hsh > end > > def delete_job(job_id) > args = nil > @q.synchronize do @id_hash_mutex.synchronize do > args = @id_hash[job_id] > return true if args.nil? > if args[:status] == :queued > @q.delete_if {|h| h[:id] == job_id } > @id_hash.delete(job_id) > return true > elsif args[:status] == :done > @id_hash.delete(job_id) > return true > end > end > end > ::BackgrounDRb::MiddleMan.instance.delete_worker(args > [:job_key]) if args[:status] == :running > return true > end > > def do_work(args) > # You probably don''t want to mess with this method unless you > know what''s what. > @child_workers.each do |child_hash| > child_hash[:s_thread] = Thread.start do > loop do > # Wait for a new job in the @q > child_hash[:s_mutex].synchronize do > # get the Q mutex and wait for a job > @q.synchronize do > tl = Thread.list > @q_loaded_cv.wait_while { @q.empty? } > child_hash[:args] = @q.shift > if child_hash[:args][:id] > @id_hash_mutex.synchronize do > @id_hash[child_hash[:args][:id]][:status] = :running > child_hash[:job_key] = spawn_worker > ({:args=>child_hash[:args],:class=>@child_class}) > @id_hash[child_hash[:args][:id]][:job_key] = > child_hash[:job_key] > end > else > child_hash[:job_key] = spawn_worker(job_args.merge > (:class=>@child_class)) > end > end > self[child_hash[:job_key]].thread[:DQ_request].wait > (child_hash[:s_mutex]) > # grab and store the results > if child_hash[:args][:id] > @id_hash_mutex.synchronize do > if @reQ_on_finish > r = self[child_hash[:job_key]].results > @id_hash[child_hash[:args][:id]][:results] = r if r > @id_hash[child_hash[:args][:id]][:status] = :done > @id_hash[child_hash[:args][:id]][:done_at] = > Time.now > else > @id_hash.delete(child_hash[:args][:id]) > end > end > end > self[child_hash[:job_key]].thread[:DQed].signal > @completed += 1 > [:args,:job_key].each {|k| child_hash[k] = nil } > end > # Loop back and wait for the job_key to get killed again.... > end > end > end > end > > private > > def [](key) > # Use jobs to avoid the access time update w/ [] > ::BackgrounDRb::MiddleMan.instance.jobs[key] > end > > end > module BackgrounDRb > > class RailsQueue < BackgrounDRb::Rails > attr_reader :progress > def initialize(key, args={}) > super(key,args) > @job_ctrl = true > end > > def start_process > return if schedule_first_run && schedule_first_run.to_i > > Time.now.to_i > @thread = Thread.new do > Thread.current[:safe_to_kill] = ConditionVariable.new > Thread.current[:kill] = false > Thread.current[:DQ_request] = ConditionVariable.new > Thread.current[:DQed] = ConditionVariable.new > Thread.current[:mutex] = Mutex.new > begin > Thread.current[:mutex].synchronize do > do_work(@args) > end > rescue Exception => e > @logger.error "#{ e.message } - (#{ e.class })" << "\n" > << (e.backtrace or []).join("\n") > end > end > @next_start = @interval.from_now if schedule_repeat > end > > def results > # Overwrite this method and set reQ_on_finish = true (in the > queue worker args) > # to have a process put it''s results in back in the queue > # for pickup before being killed > nil > end > > def before_DQ(args=nil) > # stub method that gets called before dequeue is run. > # Overwrite in your class instance > true > end > > def terminate(args=nil) > do_DQ(args) > super(args) > end > > def suicide(args=nil) > do_DQ(args) > kill > Thread.exit > end > > private > def do_DQ(args=nil) > before_DQ(args) > Thread.current[:DQ_request].signal > Thread.current[:DQed].wait(Thread.current[:mutex]) > end > end > end_______________________________________________ > Backgroundrb-devel mailing list > Backgroundrb-devel at rubyforge.org > http://rubyforge.org/mailman/listinfo/backgroundrb-devel
Hey David, I''m currently trying out your QueueWorker and I am experiencing a little problem: It seems the worker (my worker) is never started. Just to make sure I got it right: MiddleMan[:queue].queue_job({ :in_filename => self.getFullPath, :out_filename => self.getPreviewPath, :width => 300, :height => 600 } ) This should put another instance of my worker into the queue and run it, if there are enough threads "free". is that how you meant it? I modified the configuration a little bit to fit my needs: autostart: :queue: class: queue_worker args: :num_child_workers: 1 :child_class: :generate_preview_worker :reQ_on_finish: false :singleton: true Any ideas what i might be doing wrong? regards, Micha 2006/8/24, David Lemstra <david at lemstra.ca>:> > Hello all, > I''ve come up w/ a worker class that manages queued jobs using a fixed > number of child workers. Well, that''s not quite true -- a new worker is > spawned for each job, but you set the total number that may exist at once. > > There are three components: > 1) queue_worker.rb: The singleton worker that manages the child workers. > You probably want to auto start this. Make sure you give it > :singleton=>true. Read this file for the methods to interact with your > children. (ie. queue_job(), delete_job(), job_progress() ) > > 2) backgroundrb_rails_queue.rb: The super class for the "child workers" > (and uses backgroundrb_rails.rb in turn). This file needs to be included > in background.rb > > 3) Your child worker, which should be a subclass of > BackgrounDRb::RailsQueue, is otherwise the about same as normal > If it''s a big loop, you probably want to use terminate?() on each > iteration and update @progress. Use suicide() at the end to make room > for the next child. > > Options: (probably in your backgroundrb.yml) > autostart: > :queue_key: > class: queue_worker > args: > :num_child_workers: 2 > :child_class: :cost_calculator_worker > :reQ_on_finish: true > :singleton: true > > :queue_key can be changed to what you want, but it is the permanent key > of the QueueWorker > :num_child_workers: is up to you! > :child_class: your worker class you want as child workers. > :reQ_on_finish: do you want results to be stored in the queue until you > call job_progress!() ? > Note: to be able to access your child jobs w/ the QueueWorker methods, > include a unique :id in your {args} when you queue_job({args}) > > I''ll attach the files. If they don''t go through, I''ll resend as text. > > BTW, This works well enough for me, but I''m learning as I go too, so no > guarantees :) I don''t use the fancy timing options, so ymmmv for > :next_start and :interval. > > Let me know if you find any issues (though I''m off-line for a week after > this post). I''m wondering myself if it might be better to reuse child > workers instead of re-spawning new ones. Another day maybe. > > cheers, > David Lemstra > > > # Put your code that runs your task inside the do_work method > # it will be run automatically in a thread. You have access to > # all of your rails models if you set load_rails to true in the > # config file. You also get @logger inside of this class by default. > require ''monitor.rb'' > > class QueueWorker < BackgrounDRb::Rails > > attr_reader :q, :id_hash, :completed > def initialize(key, args={}) > super(key,args) > @num_child_workers = args[:num_child_workers] ? > args[:num_child_workers] : 1 > @child_workers = Array.new(@num_child_workers) {|i| > Hash[:job_key,nil,:args,nil,:s_thread,nil, :s_mutex,Mutex.new, :child, i] > } > > @q = [] > @q.extend(MonitorMixin) > @q_loaded_cv = @q.new_cond > @id_hash = {} > @id_hash_mutex = Mutex.new > > raise ArgumentError unless args.has_key?(:child_class) > @child_class = args[:child_class] > @reQ_on_finish = args[:reQ_on_finish] || false > @completed = 0 > end > > def queue_job(args) > return nil if @id_hash && args[:id] && @id_hash.has_key?(args[:id]) > @q.synchronize do > @q.push args > @id_hash_mutex.synchronize { @id_hash[args[:id]] > {:status=>:queued, :job_key=>nil, :results=>nil } } if args[:id] > @q_loaded_cv.signal > end > return true > end > > def job_in_progress?(job_id) > @id_hash.has_key?(job_id) > end > > def job_status?(job_id) > @id_hash_mutex.synchronize do > return nil unless @id_hash.has_key?(job_id) > return @id_hash[job_key][:status] > end > end > > def job_progress(job_id) > report_hsh = {} > @id_hash_mutex.synchronize do > return nil unless @id_hash.has_key?(job_id) > report_hsh[:status] = @id_hash[job_id][:status] > report_hsh[:progress] = case @id_hash[job_id][:status] > when :queued then > ahead = 0 > @q.each_index {|i| if @q[i][:id] == job_id then ahead = i; > break; end } > ahead > when :running > w = self[@id_hash[job_id][:job_key]] > w.nil? ? nil : w.progress > when :done > @id_hash[job_id][:results] > else nil > end > end > return report_hsh > end > > def job_progress!(job_id) > report_hsh = {} > @id_hash_mutex.synchronize do > return nil unless @id_hash.has_key?(job_id) > report_hsh[:status] = @id_hash[job_id][:status] > report_hsh[:progress] = case @id_hash[job_id][:status] > when :queued then > ahead = 0 > @q.each_index {|i| if @q[i][:id] == job_id then ahead = i; > break; end } > ahead > when :running > w = self[@id_hash[job_id][:job_key]] > if w.nil? > @id_hash.delete(job_id)[:results] > else > w.progress > end > when :done > @id_hash.delete(job_id)[:results] > else nil > end > end > return report_hsh > end > > def delete_job(job_id) > args = nil > @q.synchronize do @id_hash_mutex.synchronize do > args = @id_hash[job_id] > return true if args.nil? > if args[:status] == :queued > @q.delete_if {|h| h[:id] == job_id } > @id_hash.delete(job_id) > return true > elsif args[:status] == :done > @id_hash.delete(job_id) > return true > end > end > end > ::BackgrounDRb::MiddleMan.instance.delete_worker(args[:job_key]) if > args[:status] == :running > return true > end > > def do_work(args) > # You probably don''t want to mess with this method unless you know > what''s what. > @child_workers.each do |child_hash| > child_hash[:s_thread] = Thread.start do > loop do > # Wait for a new job in the @q > child_hash[:s_mutex].synchronize do > # get the Q mutex and wait for a job > @q.synchronize do > tl = Thread.list > @q_loaded_cv.wait_while { @q.empty? } > child_hash[:args] = @q.shift > if child_hash[:args][:id] > @id_hash_mutex.synchronize do > @id_hash[child_hash[:args][:id]][:status] = :running > child_hash[:job_key] > spawn_worker({:args=>child_hash[:args],:class=>@child_class}) > @id_hash[child_hash[:args][:id]][:job_key] > child_hash[:job_key] > end > else > child_hash[:job_key] > spawn_worker(job_args.merge(:class=>@child_class)) > end > end > > self[child_hash[:job_key]].thread[:DQ_request].wait(child_hash[:s_mutex]) > # grab and store the results > if child_hash[:args][:id] > @id_hash_mutex.synchronize do > if @reQ_on_finish > r = self[child_hash[:job_key]].results > @id_hash[child_hash[:args][:id]][:results] = r if r > @id_hash[child_hash[:args][:id]][:status] = :done > @id_hash[child_hash[:args][:id]][:done_at] = Time.now > else > @id_hash.delete(child_hash[:args][:id]) > end > end > end > self[child_hash[:job_key]].thread[:DQed].signal > @completed += 1 > [:args,:job_key].each {|k| child_hash[k] = nil } > end > # Loop back and wait for the job_key to get killed again.... > end > end > end > end > > private > > def [](key) > # Use jobs to avoid the access time update w/ [] > ::BackgrounDRb::MiddleMan.instance.jobs[key] > end > > end > > > module BackgrounDRb > > class RailsQueue < BackgrounDRb::Rails > attr_reader :progress > def initialize(key, args={}) > super(key,args) > @job_ctrl = true > end > > def start_process > return if schedule_first_run && schedule_first_run.to_i > > Time.now.to_i > @thread = Thread.new do > Thread.current[:safe_to_kill] = ConditionVariable.new > Thread.current[:kill] = false > Thread.current[:DQ_request] = ConditionVariable.new > Thread.current[:DQed] = ConditionVariable.new > Thread.current[:mutex] = Mutex.new > begin > Thread.current[:mutex].synchronize do > do_work(@args) > end > rescue Exception => e > @logger.error "#{ e.message } - (#{ e.class })" << "\n" << ( > e.backtrace or []).join("\n") > end > end > @next_start = @interval.from_now if schedule_repeat > end > > def results > # Overwrite this method and set reQ_on_finish = true (in the queue > worker args) > # to have a process put it''s results in back in the queue > # for pickup before being killed > nil > end > > def before_DQ(args=nil) > # stub method that gets called before dequeue is run. > # Overwrite in your class instance > true > end > > def terminate(args=nil) > do_DQ(args) > super(args) > end > > def suicide(args=nil) > do_DQ(args) > kill > Thread.exit > end > > private > def do_DQ(args=nil) > before_DQ(args) > Thread.current[:DQ_request].signal > Thread.current[:DQed].wait(Thread.current[:mutex]) > end > end > end > > _______________________________________________ > Backgroundrb-devel mailing list > Backgroundrb-devel at rubyforge.org > http://rubyforge.org/mailman/listinfo/backgroundrb-devel > >-- Michael Siebert <info at siebert-wd.de> www.siebert-wd.de - Gedanken lesen www.stellar-legends.de - Weltraum-Browsergame im Alpha-Stadium -------------- next part -------------- An HTML attachment was scrubbed... URL: http://rubyforge.org/pipermail/backgroundrb-devel/attachments/20060913/74d55a1c/attachment-0001.html
Michael, It all looks good to me (but some error messages might help). Maybe superclass :generate_preview_worker with BackgrounDRb::Rails first and make sure it works on its own w/o the Q baggage. Maybe setting :id manually in the arg hash is needed. I''m not sure if I''ve tried it without. David Lemstra Michael Siebert wrote:> Hey David, > I''m currently trying out your QueueWorker and I am experiencing a little > problem: > It seems the worker (my worker) is never started. Just to make sure I > got it right: > > MiddleMan[:queue].queue_job({ > :in_filename => self.getFullPath, > :out_filename => self.getPreviewPath, > :width => 300, > :height => 600 > > } > ) > > This should put another instance of my worker into the queue and run it, > if there are enough threads "free". is that how you meant it? > I modified the configuration a little bit to fit my needs: > > autostart: > :queue: > class: queue_worker > args: > :num_child_workers: 1 > :child_class: :generate_preview_worker > :reQ_on_finish: false > :singleton: true > > Any ideas what i might be doing wrong? > > regards, > Micha > > 2006/8/24, David Lemstra <david at lemstra.ca <mailto:david at lemstra.ca>>: > > Hello all, > I''ve come up w/ a worker class that manages queued jobs using a fixed > number of child workers. Well, that''s not quite true -- a new worker is > spawned for each job, but you set the total number that may exist at > once. > > There are three components: > 1) queue_worker.rb: The singleton worker that manages the child > workers. > You probably want to auto start this. Make sure you give it > :singleton=>true. Read this file for the methods to interact with your > children. (ie. queue_job(), delete_job(), job_progress() ) > > 2) backgroundrb_rails_queue.rb: The super class for the "child workers" > (and uses backgroundrb_rails.rb in turn). This file needs to be included > in background.rb > > 3) Your child worker, which should be a subclass of > BackgrounDRb::RailsQueue, is otherwise the about same as normal > If it''s a big loop, you probably want to use terminate?() on each > iteration and update @progress. Use suicide() at the end to make room > for the next child. > > Options: (probably in your backgroundrb.yml) > autostart: > :queue_key: > class: queue_worker > args: > :num_child_workers: 2 > :child_class: :cost_calculator_worker > :reQ_on_finish: true > :singleton: true > > :queue_key can be changed to what you want, but it is the permanent key > of the QueueWorker > :num_child_workers: is up to you! > :child_class: your worker class you want as child workers. > :reQ_on_finish: do you want results to be stored in the queue until you > call job_progress!() ? > Note: to be able to access your child jobs w/ the QueueWorker methods, > include a unique :id in your {args} when you queue_job({args}) > > I''ll attach the files. If they don''t go through, I''ll resend as text. > > BTW, This works well enough for me, but I''m learning as I go too, so no > guarantees :) I don''t use the fancy timing options, so ymmmv for > :next_start and :interval. > > Let me know if you find any issues (though I''m off-line for a week > after > this post). I''m wondering myself if it might be better to reuse child > workers instead of re-spawning new ones. Another day maybe. > > cheers, > David Lemstra > > > # Put your code that runs your task inside the do_work method > # it will be run automatically in a thread. You have access to > # all of your rails models if you set load_rails to true in the > # config file. You also get @logger inside of this class by default. > require ''monitor.rb '' > > class QueueWorker < BackgrounDRb::Rails > > attr_reader :q, :id_hash, :completed > def initialize(key, args={}) > super(key,args) > @num_child_workers = args[:num_child_workers] ? > args[:num_child_workers] : 1 > @child_workers = Array.new(@num_child_workers) {|i| > Hash[:job_key,nil,:args,nil,:s_thread,nil, :s_mutex,Mutex.new, > :child, i] } > > @q = [] > @q.extend(MonitorMixin) > @q_loaded_cv = @q.new_cond > @id_hash = {} > @id_hash_mutex = Mutex.new > > raise ArgumentError unless args.has_key?(:child_class) > @child_class = args[:child_class] > @reQ_on_finish = args[:reQ_on_finish] || false > @completed = 0 > end > > def queue_job(args) > return nil if @id_hash && args[:id] && @id_hash.has_key?(args[:id]) > @q.synchronize do > @q.push args > @id_hash_mutex.synchronize { @id_hash[args[:id]] > {:status=>:queued, :job_key=>nil, :results=>nil } } if args[:id] > @q_loaded_cv.signal > end > return true > end > > def job_in_progress?(job_id) > @id_hash.has_key?(job_id) > end > > def job_status?(job_id) > @id_hash_mutex.synchronize do > return nil unless @id_hash.has_key?(job_id) > return @id_hash[job_key][:status] > end > end > > def job_progress(job_id) > report_hsh = {} > @id_hash_mutex.synchronize do > return nil unless @id_hash.has_key?(job_id) > report_hsh[:status] = @id_hash[job_id][:status] > report_hsh[:progress] = case @id_hash[job_id][:status] > when :queued then > ahead = 0 > @q.each_index {|i| if @q[i][:id] == job_id then ahead = i; > break; end } > ahead > when :running > w = self[@id_hash[job_id][:job_key]] > w.nil? ? nil : w.progress > when :done > @id_hash[job_id][:results] > else nil > end > end > return report_hsh > end > > def job_progress!(job_id) > report_hsh = {} > @id_hash_mutex.synchronize do > return nil unless @id_hash.has_key?(job_id) > report_hsh[:status] = @id_hash[job_id][:status] > report_hsh[:progress] = case @id_hash[job_id][:status] > when :queued then > ahead = 0 > @q.each_index {|i| if @q[i][:id] == job_id then ahead = i; > break; end } > ahead > when :running > w = self[@id_hash[job_id][:job_key]] > if w.nil? > @id_hash.delete(job_id)[:results] > else > w.progress > end > when :done > @id_hash.delete(job_id)[:results] > else nil > end > end > return report_hsh > end > > def delete_job(job_id) > args = nil > @q.synchronize do @id_hash_mutex.synchronize do > args = @id_hash[job_id] > return true if args.nil? > if args[:status] == :queued > @q.delete_if {|h| h[:id] == job_id } > @id_hash.delete(job_id) > return true > elsif args[:status] == :done > @id_hash.delete(job_id) > return true > end > end > end > ::BackgrounDRb::MiddleMan.instance.delete_worker(args[:job_key]) > if args[:status] == :running > return true > end > > def do_work(args) > # You probably don''t want to mess with this method unless you > know what''s what. > @child_workers.each do |child_hash| > child_hash[:s_thread] = Thread.start do > loop do > # Wait for a new job in the @q > child_hash[:s_mutex].synchronize do > # get the Q mutex and wait for a job > @q.synchronize do > tl = Thread.list > @q_loaded_cv.wait_while { @ q.empty? } > child_hash[:args] = @q.shift > if child_hash[:args][:id] > @id_hash_mutex.synchronize do > @id_hash[child_hash[:args][:id]][:status] = :running > child_hash[:job_key] > spawn_worker({:args=>child_hash[:args],:class=>@child_class}) > @id_hash[child_hash[:args][:id]][:job_key] > child_hash[:job_key] > end > else > child_hash[:job_key] > spawn_worker(job_args.merge(:class=>@child_class)) > end > end > self[child_hash[:job_key]].thread[:DQ_request].wait(child_hash[:s_mutex]) > > # grab and store the results > if child_hash[:args][:id] > @id_hash_mutex.synchronize do > if @reQ_on_finish > r = self[child_hash[:job_key]].results > @id_hash[child_hash[:args][:id]][:results] = r if r > @id_hash[child_hash[:args][:id]][:status] = :done > @id_hash[child_hash[:args][:id]][:done_at] = Time.now > else > @id_hash.delete(child_hash[:args][:id]) > end > end > end > self[child_hash[:job_key]].thread[:DQed].signal > @completed += 1 > [:args,:job_key].each {|k| child_hash[k] = nil } > end > # Loop back and wait for the job_key to get killed again.... > end > end > end > end > > private > > def [](key) > # Use jobs to avoid the access time update w/ [] > ::BackgrounDRb::MiddleMan.instance.jobs[key] > end > > end > > > module BackgrounDRb > > class RailsQueue < BackgrounDRb::Rails > attr_reader :progress > def initialize(key, args={}) > super(key,args) > @job_ctrl = true > end > > def start_process > return if schedule_first_run && schedule_first_run.to_i > > Time.now.to_i > @thread = Thread.new do > Thread.current[:safe_to_kill] = ConditionVariable.new > Thread.current[:kill] = false > Thread.current[:DQ_request] = ConditionVariable.new > Thread.current[:DQed] = ConditionVariable.new > Thread.current[:mutex] = Mutex.new > begin > Thread.current[:mutex].synchronize do > do_work(@args) > end > rescue Exception => e > @logger.error "#{ e.message } - (#{ e.class })" << "\n" << > (e.backtrace or []).join("\n") > end > end > @next_start = @interval.from_now if schedule_repeat > end > > def results > # Overwrite this method and set reQ_on_finish = true (in the > queue worker args) > # to have a process put it''s results in back in the queue > # for pickup before being killed > nil > end > > def before_DQ(args=nil) > # stub method that gets called before dequeue is run. > # Overwrite in your class instance > true > end > > def terminate(args=nil) > do_DQ(args) > super(args) > end > > def suicide(args=nil) > do_DQ(args) > kill > Thread.exit > end > > private > def do_DQ(args=nil) > before_DQ(args) > Thread.current[:DQ_request].signal > Thread.current[:DQed].wait(Thread.current[:mutex]) > end > end > end > > _______________________________________________ > Backgroundrb-devel mailing list > Backgroundrb-devel at rubyforge.org > <mailto:Backgroundrb-devel at rubyforge.org> > http://rubyforge.org/mailman/listinfo/backgroundrb-devel > > > > > -- > Michael Siebert <info at siebert-wd.de <mailto:info at siebert-wd.de>> > > www.siebert-wd.de <http://www.siebert-wd.de> - Gedanken lesen > www.stellar-legends.de <http://www.stellar-legends.de> - > Weltraum-Browsergame im Alpha-Stadium-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | David Lemstra B.Eng.Mgt., M.A.Sc. | |~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~| | david at lemstra.ca | 4339 Harrison Rd. Binbrook | | (905)-692-3687 | Ontario, Canada. L0R1C0 | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | "If it ain''t broke, it doesn''t have enough | | features yet" ~Scott Adams | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~