Hi, i''m going to implement a syndication-service, which will get lists in xml with some meta-data an enclosed video files, which will get encoded at the end. The syndication run will be startet every five minutes of a full hour. So i thought to build 4 Worker. One for checking which feeds to syndicate (syndication_worker) at a specific time, one for processing the list (import_worker), one for fetching the clips (download_worker) and one for encoding (encoding) worker. In my tests all went fine, all my jobs are invoked properly. Q1) Is this procedure right in the intention of backgroundrb? I meen, there will be at worst up to about hundret of download_worker started in one syndication run. Is bdrb able to handle such a queue, without to lose one job? Q2) Some downloads are really slow, so i would rather start about 5 downloads in parallel and queue the encoding in one queue again. syndication | import / / | \ \ download download download download download \ \ | / / encoder Is this possible? I know i can give another job_key, maybe the name of the content-source, but this can go up to 20 or more download-queues in one syndication run and this is too much. Thanks in advance! Cheers, Matze ------------------------------------------------ backgroundrb.yml - - - - - - - :backgroundrb: :ip: 0.0.0.0 :port: 11006 :environment: production :schedules: :syndication_worker: :checksyndicate: :trigger_args: 0 5 * * * * * - - - - - - - syndication_worker.rb - - - - - - - class SyndicationWorker < BackgrounDRb::MetaWorker set_worker_name :syndication_worker def create(args = nil) # this method is called, when worker is loaded for the first time end def checksyndicate syndications = # all Syndication in this hour syndications.each do |syndication| MiddleMan.ask_work(:worker => import_worker, :worker_method => import, :data => syndication.id) end end - - - - - - - import_worker.rb - - - - - - - class ImportWorker < BackgrounDRb::MetaWorker set_worker_name :import_worker def create(args = nil) # this method is called, when worker is loaded for the first time end def import(feedid) feed = Feed.find_by_id(feedid) # fetch feed feed.items.each do |item| # create item MiddleMan.ask_work(:worker => download_worker, :worker_method => download, :data => item.id) end end - - - - - - - download_worker.rb - - - - - - - class DownloadWorker < BackgrounDRb::MetaWorker set_worker_name :download_worker def create(args = nil) # this method is called, when worker is loaded for the first time end def download(itemid) item = Item.find_by_id(:itemid) # fetch enclosure MiddleMan.ask_work(:worker => encoder_worker, :worker_method => encode, :data => item.id) end - - - - - - - encoder_worker.rb - - - - - - - class EncoderWorker < BackgrounDRb::MetaWorker set_worker_name :encoder_worker def create(args = nil) # this method is called, when worker is loaded for the first time end def download(itemid) item = Item.find_by_id(:itemid) # encode item end - - - - - -
Hi Matthias, On Mon, 2008-01-14 at 00:19 +0100, Matthias Heigl wrote:> Hi, > > i''m going to implement a syndication-service, which will get lists in > xml with some meta-data an enclosed video files, which will get encoded > at the end. The syndication run will be startet every five minutes of > a full hour. > > So i thought to build 4 Worker. One for checking which feeds to > syndicate (syndication_worker) at a specific time, one for processing > the list (import_worker), one for fetching the clips (download_worker) > and one for encoding (encoding) worker. > > In my tests all went fine, all my jobs are invoked properly. > > Q1) > Is this procedure right in the intention of backgroundrb? > I meen, there will be at worst up to about hundret of download_worker > started in one syndication run. Is bdrb able to handle such a queue, > without to lose one job? > > Q2) > Some downloads are really slow, so i would rather start about 5 > downloads in parallel and queue the encoding in one queue again. > > syndication > | > import > / / | \ \ > download download download download download > \ \ | / / > encoder > > Is this possible?Yup, but let me suggest an alternative architecture and see for yourself if this will work better or not. Here is the deal: class SyndicationWorker < BackgrounDRb::MetaWorker def create(args = nil) add_periodic_timer(300) { search_for_syndication } end def search_for_syndication # when you are saving syndications which are to be imported # downloaded and encoded. Rather than calling another worker # from here, you should have a flag for each Feed, and each # item indicating that this feed needs to be imported, # downloaded and encoded syndications end end class ImportWorker < BackgrounDRb::MetaWorker def create(args = nil) add_periodic_timer(300) { import_syndication } end def import_syndication new_feeds = Feed.find(:all,:conditions => {:to_be_imported => \ true }) new_feeds.each do |feed| # fetch the feed and have two flags in associated item # indicating item is to be downloaded and encoded fetch_feed(feed) # update the ''to_be_imported'' flag for given feed # generally, we should prefer all this logic to go into # model itself ( remember thick model, thin controllers) # but I am writing here for demonstration feed.to_be_imported = false feed.save end end end class DownloadWorker < BackgrounDRb::MetaWorker # assuming you want 5 downloads to go concurrently # I am setting thread pool size of 5 # default pool size is of 20 threads pool_size 5 def create(args = nil) add_periodic_timer(300) { download_syndication } end def download_syndication new_items = Item.find(:conditions => {:to_be_downloaded => \ true}) new_items.each do |item_id| thread_pool.defer(item_id) do |item_id| # using item_id, rather than item itself for # thread local objects, with underlying db # connection of their own. item = Item.find_by_id(item_id) download_video(item) # update the to_be_downloaded flag for item. end end end end class EncodeWorker < BackgrounDRb::MetaWorker def create(args = nil) add_periodic_timer(300) { encode_video } end def encode_video new_encodes = Item.find(:conditions => {:to_be_encoded => \ true }) new_encodes.each do |item| encode_item(item) # update the to_be_encoded flag and save the model end end end I guess, above architecture will work quite well, and I am avoiding inter dependence in workers and rather using table itself as a queue. It has multiple advantages that way. Tables are persistent, so you will always know, which feeds are to be fetched, downloaded and encoded and which are already done.> > I know i can give another job_key, maybe the name of the content-source, > but this can go up to 20 or more download-queues in one syndication run > and this is too much. > > Thanks in advance! > > Cheers, > > Matze > > > ------------------------------------------------ > > backgroundrb.yml > - - - - - - - > :backgroundrb: > :ip: 0.0.0.0 > :port: 11006 > :environment: production > :schedules: > :syndication_worker: > :checksyndicate: > :trigger_args: 0 5 * * * * * > - - - - - - - > > syndication_worker.rb > - - - - - - - > class SyndicationWorker < BackgrounDRb::MetaWorker > set_worker_name :syndication_worker > def create(args = nil) > # this method is called, when worker is loaded for the first time > end > def checksyndicate > syndications = # all Syndication in this hour > syndications.each do |syndication| > MiddleMan.ask_work(:worker => import_worker, :worker_method => > import, :data => syndication.id) > end > end > - - - - - - - > > import_worker.rb > - - - - - - - > class ImportWorker < BackgrounDRb::MetaWorker > set_worker_name :import_worker > def create(args = nil) > # this method is called, when worker is loaded for the first time > end > def import(feedid) > feed = Feed.find_by_id(feedid) > # fetch feed > feed.items.each do |item| > # create item > MiddleMan.ask_work(:worker => download_worker, :worker_method => > download, :data => item.id) > end > end > - - - - - - - > > download_worker.rb > - - - - - - - > class DownloadWorker < BackgrounDRb::MetaWorker > set_worker_name :download_worker > def create(args = nil) > # this method is called, when worker is loaded for the first time > end > def download(itemid) > item = Item.find_by_id(:itemid) > # fetch enclosure > MiddleMan.ask_work(:worker => encoder_worker, :worker_method => > encode, :data => item.id) > end > - - - - - - - > > encoder_worker.rb > - - - - - - - > class EncoderWorker < BackgrounDRb::MetaWorker > set_worker_name :encoder_worker > def create(args = nil) > # this method is called, when worker is loaded for the first time > end > def download(itemid) > item = Item.find_by_id(:itemid) > # encode item > end > - - - - - - > _______________________________________________ > Backgroundrb-devel mailing list > Backgroundrb-devel at rubyforge.org > http://rubyforge.org/mailman/listinfo/backgroundrb-devel
Hi Matthias, On Mon, 2008-01-14 at 08:52 +0530, hemant kumar wrote:> Hi Matthias, > Yup, but let me suggest an alternative architecture and see for yourself > if this will work better or not. Here is the deal: > > class SyndicationWorker < BackgrounDRb::MetaWorker > def create(args = nil) > add_periodic_timer(300) { search_for_syndication } > end > def search_for_syndication > # when you are saving syndications which are to be imported > # downloaded and encoded. Rather than calling another worker > # from here, you should have a flag for each Feed, and each > # item indicating that this feed needs to be imported, > # downloaded and encoded > syndications > end > end > > class ImportWorker < BackgrounDRb::MetaWorker > def create(args = nil) > add_periodic_timer(300) { import_syndication } > end > def import_syndication > new_feeds = Feed.find(:all,:conditions => {:to_be_imported => \ > true }) > new_feeds.each do |feed| > # fetch the feed and have two flags in associated item > # indicating item is to be downloaded and encoded > fetch_feed(feed) > # update the ''to_be_imported'' flag for given feed > # generally, we should prefer all this logic to go into > # model itself ( remember thick model, thin controllers) > # but I am writing here for demonstration > feed.to_be_imported = false > feed.save > end > end > end > > class DownloadWorker < BackgrounDRb::MetaWorker > # assuming you want 5 downloads to go concurrently > # I am setting thread pool size of 5 > # default pool size is of 20 threads > pool_size 5 > def create(args = nil) > add_periodic_timer(300) { download_syndication } > end > def download_syndication > new_items = Item.find(:conditions => {:to_be_downloaded => \ > true}) > new_items.each do |item_id| > thread_pool.defer(item_id) do |item_id| > # using item_id, rather than item itself for > # thread local objects, with underlying db > # connection of their own. > item = Item.find_by_id(item_id) > download_video(item) > # update the to_be_downloaded flag for item. > end > end > end > end > > class EncodeWorker < BackgrounDRb::MetaWorker > def create(args = nil) > add_periodic_timer(300) { encode_video } > end > def encode_video > new_encodes = Item.find(:conditions => {:to_be_encoded => \ > true }) > new_encodes.each do |item| > encode_item(item) > # update the to_be_encoded flag and save the model > end > end > end > > > I guess, above architecture will work quite well, and I am avoiding > inter dependence in workers and rather using table itself as a queue. It > has multiple advantages that way. Tables are persistent, so you will > always know, which feeds are to be fetched, downloaded and encoded and > which are already done. >Okay, I digress. Above architecture will not work well, when you will have multiple copies of same workers running parallely. So, you can use your existing code, with thing I suggested for download worker. Also, if you want to queue jobs invoked with ask_work, I will suggest you to use defer. What I am saying is: class ImportWorker < BackgrounDRb::MetaWorker pool_size 1 def import(feed_id) thread_pool.defer { |b_feed_id| start_import(t_feed_it) } end end And invoke above job as usual: MiddleMan.ask_work(:worker => :import_worker, :worker_method => :import, :data => feed_id) #defer will make sure that, your task is queued and having thread pool size of 1, makes sure that only one job is running concurrently.
hemant kumar schrieb:> Hi Matthias, > > On Mon, 2008-01-14 at 08:52 +0530, hemant kumar wrote: >> Hi Matthias, >> Yup, but let me suggest an alternative architecture and see for yourself >> if this will work better or not. Here is the deal: >> >> class SyndicationWorker < BackgrounDRb::MetaWorker >> def create(args = nil) >> add_periodic_timer(300) { search_for_syndication } >> end >> def search_for_syndication >> # when you are saving syndications which are to be imported >> # downloaded and encoded. Rather than calling another worker >> # from here, you should have a flag for each Feed, and each >> # item indicating that this feed needs to be imported, >> # downloaded and encoded >> syndications >> end >> end >> >> class ImportWorker < BackgrounDRb::MetaWorker >> def create(args = nil) >> add_periodic_timer(300) { import_syndication } >> end >> def import_syndication >> new_feeds = Feed.find(:all,:conditions => {:to_be_imported => \ >> true }) >> new_feeds.each do |feed| >> # fetch the feed and have two flags in associated item >> # indicating item is to be downloaded and encoded >> fetch_feed(feed) >> # update the ''to_be_imported'' flag for given feed >> # generally, we should prefer all this logic to go into >> # model itself ( remember thick model, thin controllers) >> # but I am writing here for demonstration >> feed.to_be_imported = false >> feed.save >> end >> end >> end >> >> class DownloadWorker < BackgrounDRb::MetaWorker >> # assuming you want 5 downloads to go concurrently >> # I am setting thread pool size of 5 >> # default pool size is of 20 threads >> pool_size 5 >> def create(args = nil) >> add_periodic_timer(300) { download_syndication } >> end >> def download_syndication >> new_items = Item.find(:conditions => {:to_be_downloaded => \ >> true}) >> new_items.each do |item_id| >> thread_pool.defer(item_id) do |item_id| >> # using item_id, rather than item itself for >> # thread local objects, with underlying db >> # connection of their own. >> item = Item.find_by_id(item_id) >> download_video(item) >> # update the to_be_downloaded flag for item. >> end >> end >> end >> end >> >> class EncodeWorker < BackgrounDRb::MetaWorker >> def create(args = nil) >> add_periodic_timer(300) { encode_video } >> end >> def encode_video >> new_encodes = Item.find(:conditions => {:to_be_encoded => \ >> true }) >> new_encodes.each do |item| >> encode_item(item) >> # update the to_be_encoded flag and save the model >> end >> end >> end >> >> >> I guess, above architecture will work quite well, and I am avoiding >> inter dependence in workers and rather using table itself as a queue. It >> has multiple advantages that way. Tables are persistent, so you will >> always know, which feeds are to be fetched, downloaded and encoded and >> which are already done. >> > > Okay, I digress. Above architecture will not work well, when you will > have multiple copies of same workers running parallely. > > > So, you can use your existing code, with thing I suggested for download > worker. Also, if you want to queue jobs invoked with ask_work, I will > suggest you to use defer. What I am saying is: > > class ImportWorker < BackgrounDRb::MetaWorker > pool_size 1 > def import(feed_id) > thread_pool.defer { |b_feed_id| start_import(t_feed_it) } > end > end > > And invoke above job as usual: > > MiddleMan.ask_work(:worker => :import_worker, :worker_method > => :import, :data => feed_id) > > #defer will make sure that, your task is queued and having thread pool > size of 1, makes sure that only one job is running concurrently. >Thanks for the tipp, i will give thread_pool a try. The only problem using the periodic_timer is that it''s possibly taken 20 minutes the first encoding starts under some circumstances, but it really will be reliable even if the process crashes or something else went wrong. If it takes too long if a new content source is added by a content manager, so my telephone will ring "is the import still running?" :) Thanks for your quick reply! Matze