Class: Opee::Collector

Inherits:
Actor
  • Object
show all
Defined in:
lib/opee/collector.rb

Overview

This class is used to collect multiple paths together before proceeding in a system. Two approaches are supported, either let the Collector subclass identify when it is time to move on or put the logic in the Job that is passed between the various Actors.

To use a Collector, pass the same data along each path of Actors. All paths should terminate at the Collector. From there the Collector keeps a cache of the data's key and a token that is used to track arrivals. When all paths have converged the next Actor in the process is called.

Constant Summary

Constant Summary

Constants inherited from Actor

Actor::CLOSING, Actor::RUNNING, Actor::STEP, Actor::STOPPED

Instance Attribute Summary

Attributes inherited from Actor

#name, #state

Instance Method Summary (collapse)

Methods inherited from Actor

#ask, #ask_timeout, #ask_timeout=, #busy?, #close, #max_queue_count, #max_queue_count=, #method_missing, #on_idle, #priority_ask, #queue_count, #start, #step, #stop, #timeout_ask, #wakeup

Constructor Details

- (Collector) initialize(options = {})

A new instance of Collector



14
15
16
17
18
19
# File 'lib/opee/collector.rb', line 14

def initialize(options={})
  @cache = {}
  @next_actor = nil
  @next_method = nil
  super(options)
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method in the class Opee::Actor

Instance Method Details

- (Fixnum) cache_size

Returns the number of Jobs currently waiting to be matched.

Returns:

  • (Fixnum)

    current number of Jobs waiting to finish.



23
24
25
# File 'lib/opee/collector.rb', line 23

def cache_size()
  @cache.size()
end

- (Object) collect(job, path_id = nil) (private)

Collects a job and deternines if the job should be moved on to the next Actor or if it should wait until more processing paths have finished. This method is executed asynchronously.

Parameters:

  • job (Job|Object)

    data to process or pass on

  • path_id (Object) (defaults to: nil)

    identifier of the path the request came from



45
46
47
48
49
50
51
52
53
54
55
# File 'lib/opee/collector.rb', line 45

def collect(job, path_id=nil)
  key = job_key(job)
  token = @cache[key]
  token = update_token(job, token, path_id)
  if complete?(job, token)
    @cache.delete(key)
    keep_going(job)
  else
    @cache[key] = token
  end
end

- (true|false) complete?(job, token) (private)

Returns true if the job has been processed by all paths converging on the collector. This can be implemented in the Collector subclass or in the Job. This method is executed asynchronously.

Parameters:

  • job (Object)

    data to get the key for

  • token (Object)

    current token value or nil for the first token value

Returns:

  • (true|false)

    an indication of wether the job has completed all paths

Raises:

  • (NotImplementedError)


85
86
87
88
# File 'lib/opee/collector.rb', line 85

def complete?(job, token)
  raise NotImplementedError.new("neither Collector.complete?() nor Job.complete?() are implemented") unless job.respond_to?(:complete?)
  job.complete?(token)
end

- (Object) job_key(job) (private)

Returns the key associated with the job. If the job responds to :key then that method is called, otherwise the subclass should implement this method. This method is executed asynchronously.

Parameters:

  • job (Object)

    data to get the key for

Returns:

  • (Object)

    a key for looking up the token in the cache

Raises:

  • (NotImplementedError)


62
63
64
65
# File 'lib/opee/collector.rb', line 62

def job_key(job)
  raise NotImplementedError.new("neither Collector.job_key() nor Job.key() are implemented") unless job.respond_to?(:key)
  job.key()
end

- (Object) keep_going(job) (private)

Moves the job onto the next Actor. If the job responds to :keep_going that is used, otherwise the @next_actor and @next_method care used to continue. This method is executed asynchronously.

Parameters:

  • job (Object)

    data to get the key for



94
95
96
97
98
99
100
101
# File 'lib/opee/collector.rb', line 94

def keep_going(job)
  if job.respond_to?(:keep_going)
    job.keep_going()
  else
    # TBD @next_actor = Env.find_actor(@next_actor) if @next_actor.is_a?(Symbol)
    @next_actor.send(@next_method, job) unless @next_actor.nil? || @next_method.nil?
  end
end

- (Object) set_options(options) (private)

Processes the initialize() options. Subclasses should call super.

Parameters:

  • options (Hash)

    options to be used for initialization

Options Hash (options):

  • :next_actor (Actor)

    Actor to ask to continue when ready

  • :next_method (Symbol)

    method to ask of the next_actor to continue when ready



33
34
35
36
37
38
# File 'lib/opee/collector.rb', line 33

def set_options(options)
  super(options)
  @next_actor = options[:next_actor]
  @next_method = options[:next_method]
  @next_method = @next_method.to_sym if @next_method.is_a?(String)
end

- (Object) update_token(job, token, path_id) (private)

Updates the token associated with the job. The job or the Collector subclass can use any data desired to keep track of the job's paths that have been completed. This method is executed asynchronously.

Parameters:

  • job (Object)

    data to get the key for

  • token (Object)

    current token value or nil for the first token value

  • path_id (Object)

    an indicator of the path if used

Returns:

  • (Object)

    a token to keep track of the progress of the job

Raises:

  • (NotImplementedError)


74
75
76
77
# File 'lib/opee/collector.rb', line 74

def update_token(job, token, path_id)
  raise NotImplementedError.new("neither Collector.update_token() nor Job.update_token() are implemented") unless job.respond_to?(:update_token)
  job.update_token(token, path_id)
end