Creating your own Ubiquo Job Managers

Sometimes can be useful to create different managers. An example of this situation is when you want to run different kind of jobs in different circumstances.

Ubiquo Jobs provides a default manager which will get ActiveJob jobs depending on priorities and schedule times:

def self.get(runner)
  recovery(runner)
  candidate_jobs = job_class.all(
    :conditions => [
      'planified_at <= ? AND state = ?',
      Time.now.utc,
      UbiquoJobs::Jobs::Base::STATES[:waiting]
    ],
    :order => 'priority asc'
  )
  job = first_without_dependencies(candidate_jobs)
  job.update_attributes({
      :state => UbiquoJobs::Jobs::Base::STATES[:instantiated],
      :runner => runner
    }) if job
  job
end

The job_class variable defaults to UbiquoJobs::Jobs::ActiveJob. If you want to make your own manager to handle special jobs, or change the way the jobs are picked, the best way to do so is to implement your own manager. A nice rails-like way to do that is include them in the lib/ folder of your ubiquo project. The class you should inherit from is UbiquoJobs::Managers::ActiveManager. If you wanted the manager to just pick up a specific subclass of ubiquo jobs, it would suffice to reimplement the self.job_class class method to return your own kind of job:

def self.job_class
  UbiquoJobs::Jobs::YourJobClass 
end

However, there’s a better way to do this. For this special case, the default UbiquoJob class provides a special member which stores the job’s class name, allowing you to select all objects subclasses of ActiveJob by its classname. For example, imagine you have a kind of job for special tasks that you know for sure will take a long time to complete. Seems reasonable to have a different manager to handle those jobs. You would create a new job in the file app/jobs/very_long_job.rb:

class VeryLongJob < UbiquoJobs::Jobs::ActiveJob
  def do_job_work
    #Do what needs to be done here
    return 0
  end
end

Then you could create a manager that handles only those kind of jobs by implementing your own subclass of the UbiquoJobs::Managers::ActiveManager class:

module JobManagers
  class VeryLongJobManager < UbiquoJobs::Managers::ActiveManager
    def self.get(runner)
      recovery(runner)
      candidate_jobs = job_class.all(
        :conditions => [
          'planified_at <= ? AND state = ? AND type = ?', 
          Time.now.utc,
          UbiquoJobs::Jobs::Base::STATES[:waiting],
          'VeryLongJob'
        ],
        :order => 'priority asc'
      )
      job = first_without_dependencies(candidate_jobs)
      job.update_attributes({
          :state => UbiquoJobs::Jobs::Base::STATES[:instantiated],
          :runner => runner
        }) if job
      job
    end
  end
end

The code is exactly the same as the default ActiveManager class, but the finder will take an extra parameter, 'VeryLongJob', to indicate that only the ActiveJob objects that are of the subclass VerylongJob should be taken.

After that, you need to modify the task that calls the workers so it takes your manager, or create a new task that will run your manager. The default task that will start a worker looks as this:

desc "Starts a new ubiquo worker"
task :start, [:name, :interval] => [:environment] do |t, args|
  options = {
    :sleep_time => args.interval.to_f
  }.delete_if { |k,v| v.blank? }
  UbiquoWorker.init(args.name, options)
end

This uses a special configuration parameter to determine the manager to use. This configuration option is stored in Ubiquo::Config.context(:ubiquo_jobs), the name of the configuration option is :job_manager_class, and takes the manager class as a value. So in order to create a task that will use your manager, you should create a new task like this one:

desc "Starts a new ubiquo worker"
task :start_very_long_jobs, [:name, :interval] => [:environment] do |t, args|
  options = {
    :sleep_time => args.interval.to_f
  }.delete_if { |k,v| v.blank? }
  Ubiquo::Config.context(:ubiquo_jobs).set(:job_manager_class, JobManagers::VeryLongJobManager)
  UbiquoWorker.init(args.name, options)
end

Your should call this task like this (assuming it’s on the same namespace as the default task):

rake ubiquo:worker:start_very_long_jobs[name,interval]