Harbor Documentation

Attributes

  • worker_count [RW]

    CONFUGURATION OPTIONS

  • daemonize [RW] (Not documented)
  • sleep_time [RW] (Not documented)
  • log_level [RW] (Not documented)
  • log_file [RW] (Not documented)

Public Class Methods

new(*args)

      # File lib/harbor/processor.rb, line 6
 6:     def self.new(*args)
 7:       raise "You must subclass Harbor::Processor and implement #reserve and #process" if self == Harbor::Processor
 8:       raise "You must implement #{self}#reserve" unless instance_methods(false).include?("reserve")
 9:       raise "You must implement #{self}#process" unless instance_methods(false).include?("process")
10: 
11:       processor = allocate
12: 
13:       processor.worker_count = 2
14:       processor.daemonize    = true
15:       processor.sleep_time   = 60
16:       processor.log_level    = :info
17:       processor.log_file     = "log/processor.log"
18: 
19:       processor.send(:initialize, *args)
20:       processor
21:     end

Public Instance Methods

handle_exception(task, exception)

Method which can be over-ridden to define special behavior for unhandled exceptions, such as marking a task as failed in the database.

      # File lib/harbor/processor.rb, line 79
79:     def handle_exception(task, exception)
80:       logger.error("#{exception}\n#{exception.backtrace.join("\n")}")
81:     end

handle_interrupt(task)

This method can be over-ridden to define special behavior for when a task is interrupted, such as updating a value in the database.

      # File lib/harbor/processor.rb, line 72
72:     def handle_interrupt(task)
73:     end

interruptible()

Sometimes when running commands in a subshell (with backticks, system, etc.) interrupts will be sent, but the process will exit silently. Processor#interruptible swaps out the :INT handler to allow us to know when an action is interrupted, and optionally rescue.

      # File lib/harbor/processor.rb, line 89
89:     def interruptible
90:       original_handler = trap(:INT, "DEFAULT")
91:       yield
92:     ensure
93:       trap(:INT, original_handler)
94:     end

logger()

       # File lib/harbor/processor.rb, line 146
146:     def logger
147:       @logger ||= begin
148:         logger = Logging::Logger[self.class]
149:         logger.additive = false
150:         logger.level = log_level
151:         layout = Logging::Layouts::Pattern.new(:pattern => "%-5l %d: %m\n")
152: 
153:         if daemonize
154:           logger.add_appenders(Logging::Appenders::File.new(log_file, :layout => layout))
155:         else
156:           logger.add_appenders(Logging::Appenders::Stdout.new(:layout => layout))
157:         end
158:       end
159:     end

options(optparse)

Accepts an instance of OptionParser for wiring up options, like:

  processor = MyProcessor.new
  OptionParser.new do |opts|
    opts.banner = "Usage: my_processor [options]"
    processor.options(opts)
  end.parse!
      # File lib/harbor/processor.rb, line 42
42:     def options(optparse)
43:       optparse.on("-n", "--no-daemon", "Run in the foreground") { self.daemonize = false }
44:       optparse.on("-w", "--workers=COUNT", Integer, "Number of workers to spawn (default: #{worker_count})") { |count| self.worker_count = count }
45:       optparse.on("-l", "--log-level=LEVEL", [:debug, :info, :error], "Set log level (default: #{log_level})") { |log_level| self.log_level = log_level }
46:       optparse.on("-L", "--log-file=FILE", "Log file (default: #{log_file})") { |file| self.log_file = file }
47:       optparse.on("-s", "--sleep=SECONDS", Integer, "Sleep s seconds between runs (default: #{sleep_time})") { |sleep_time| self.sleep_time = sleep_time }
48:     end

process(task)

This method must be over-ridden in your implementation of Harbor::Processor. It will be called within the forked worker process, and accepts as its only argument a task returned by reserve.

      # File lib/harbor/processor.rb, line 64
64:     def process(task)
65:       raise NotImplementedError.new("You must implement #{self.class}#process(task)")
66:     end

reserve()

This method must be over-ridden in your implementation of Harbor::Processor, and should return a task to be performed inside of a forked worker process. It should return nil when there are no available tasks.

      # File lib/harbor/processor.rb, line 55
55:     def reserve
56:       raise NotImplementedError.new("You must implement #{self.class}#reserve")
57:     end

start()

       # File lib/harbor/processor.rb, line 96
 96:     def start
 97:       detach if daemonize
 98: 
 99:       logger.info "running at %s" % [Process.pid]
100:       logger.info "workers = #{worker_count}"
101: 
102:       trap_signals
103: 
104:       @workers = {}
105: 
106:       while alive?
107:         begin
108:           loop do
109:             break if worker_count == 0 && @workers.empty?
110: 
111:             (worker_count - @workers.size).times do
112:               break unless task = reserve
113:               @workers[spawn_worker(task)] = task
114:             end
115: 
116:             pid = Process.wait
117: 
118:             task = @workers.delete(pid)
119: 
120:             case $?.exitstatus
121:             when 0   # success
122:               logger.info "[worker#%-5s] %s: completed" % [Process.pid, task.inspect]
123:             when 1   # exception
124:               logger.error "[worker#%-5s] %s: failed" % [Process.pid, task.inspect]
125:             when 130 # interrupt
126:               logger.warn "[worker#%-5s] %s: interrupted" % [Process.pid, task.inspect]
127:             end
128:           end
129:         rescue Errno::ECHILD
130:         end
131: 
132:         if alive?
133:           begin
134:             interruptible { sleep(sleep_time) }
135:           rescue Interrupt
136:             @alive = false
137:           end
138:         end
139:       end
140: 
141:       logger.info "shutting down"
142:     rescue => e
143:       logger.error("#{e}\n#{e.backtrace.join("\n")}")
144:     end

Private Instance Methods

alive?()

       # File lib/harbor/processor.rb, line 163
163:     def alive?
164:       defined?(@alive) ? @alive : (@alive = true)
165:     end

detach()

       # File lib/harbor/processor.rb, line 183
183:     def detach
184:       srand
185:       fork and exit
186:       Process.setsid # detach -- we want to be able to close our shell!
187: 
188:       log_directory = ::File.dirname(log_file)
189:       FileUtils.mkdir_p(log_directory) unless ::File.directory?(log_directory)
190: 
191:       redirect_io(log_file)
192:     end

ignore_signals()

       # File lib/harbor/processor.rb, line 231
231:     def ignore_signals
232:       trap(:QUIT, "")
233:       trap(:TERM, "")
234:       trap(:TTIN, "")
235:       trap(:TTOU, "")
236:       trap(:INT, "")
237:     end

redirect_io(file = nil)

       # File lib/harbor/processor.rb, line 194
194:     def redirect_io(file = nil)
195:       STDIN.reopen "/dev/null"
196:       STDOUT.reopen file || "/dev/null"
197:       STDOUT.sync = true
198: 
199:       STDERR.reopen STDOUT
200:       STDERR.sync = true
201:     end

spawn_worker(task)

       # File lib/harbor/processor.rb, line 167
167:     def spawn_worker(task)
168:       fork do
169:         begin
170:           $0 = "#{$0}[#{Time.now.strftime("%H:%M:%S")}] #{task.inspect}"
171:           srand
172:           ignore_signals
173: 
174:           process(task)
175:         rescue => e
176:           handle_exception(task, e)
177: 
178:           raise
179:         end
180:       end
181:     end

trap_signals()

       # File lib/harbor/processor.rb, line 203
203:     def trap_signals
204:       trap(:TTIN) do
205:         self.worker_count += 1
206:         logger.info "worker_count = #{worker_count}"
207:       end
208: 
209:       trap(:TTOU) do
210:         self.worker_count -= 1
211:         logger.info "worker_count = #{worker_count}"
212:       end
213: 
214:       graceful_shutdown = lambda do
215:         @alive = false
216:         self.worker_count = 0
217:         logger.info "gracefully shutting down."
218:       end
219: 
220:       trap(:QUIT, graceful_shutdown)
221:       trap(:TERM, graceful_shutdown)
222: 
223:       shutdown = lambda do
224:         @alive = false
225:         self.worker_count = 0
226:         logger.info "stopping workers and shutting down."
227:       end
228:       trap(:INT, shutdown)
229:     end