I wrote the following Ruby script several years ago and have been using it often ever since on a Bioinformatics computer cluster.
It pulls out a list of hosts from the Torque queuing system qnodes
. It ssh
'es and runs a command on all the nodes. Then it prints the output and/or errors in a defined order (alphabetical sort of the hostnames).
A nice feature: results are printed immediately for the host that is next in the order.
I would like to use it as an example for a Ruby workshop. Could you please suggest best-practice and design pattern improvements?
#!/usr/bin/ruby
EXCLUDE = [/girkelab/, /biocluster/, /parrot/, /owl/]
require "open3"
# Non-interactive, no password asking, and seasonable timeouts
SSH_OPTIONS = ["-o PreferredAuthentications=publickey,hostbased,gssapi,gssapi-with-mic",
"-o ForwardX11=no",
"-o BatchMode=yes",
"-o SetupTimeOut=5",
"-o ServerAliveInterval=5",
"-o ServerAliveCountMax=2"
].join(" ")
SSH = "/usr/bin/ssh #{SSH_OPTIONS}"
MKDIR = "/bin/mkdir"
raise "Pleae give this command at least one argument" if ARGV.size < 1
COMMAND = ARGV[0..-1].join(' ')
output_o = {}
output_e = {}
IO_CONNECTIONS_TO_REMOTE_PROCESSES = {}
def on_all_nodes(&block)
nodes = []
Kernel.open('|qnodes | grep -v "^ " | grep -v "^$"') do |f|
while line = f.gets
i = line.split(' ').first
nodes.push(i) if EXCLUDE.select{|x| i =~ x}.empty?
end
end
nodes.sort.each {|n| block.call(n)}
end
# Create processes
on_all_nodes do |node|
stdin, stdout, stderr = Open3.popen3("#{SSH} #{node} \"#{COMMAND}\"")
IO_CONNECTIONS_TO_REMOTE_PROCESSES[node] = [stdin, stdout, stderr]
end
has_remote_errors = false
# Collect results
on_all_nodes do |node|
stdin, stdout, stderr = IO_CONNECTIONS_TO_REMOTE_PROCESSES[node]
stdin.close
e_thread = Thread.new do
while line = stderr.gets
line.chomp!
STDERR.puts "#{node} ERROR: #{line}"
has_remote_errors = true
end
end
o_thread = Thread.new do
while line = stdout.gets
line.chomp!
puts "#{node} : #{line}"
end
end
# Let the threads finish
t1 = nil
t2 = nil
while [t1, t2].include? nil
if t1.nil?
t1 = e_thread.join(0.1) # Gives 1/10 of a second to STDERR
end
if t2.nil?
t2 = o_thread.join(0.1) # Give 1/10 of a second to STDOUT
end
end
end
exit(1) if has_remote_errors