2023-09-30

Process Spawning with Ruby ⚙

Building a hot tub

In 2015, I needed a way to record audio streams on a scheduled basis. To accomplish this I created a Rails app called Radioshifter that served as a UI for editing and storing schedules. Under the hood, Radioshifter did a lot of things. One of those things was spawning new processes whenever it was time to capture an audio stream.

Building a hot tub

Radioshifter - R.I.P. 🪦☠

Methods

There are a few ways to spawn a process with Ruby - system, fork, exec, Process.spawn, and loads more options in Open3 and PTY. Each method has its own use case.

The Method I Chose

I chose Process.spawn because there was no need to wait for the process to finish. I just needed the code to spawn a new process, then detach from the current process with Process.detach, and let the current process finish while the spawned process continued to run.

The Spawned Process

To give you some context, the spawned process was responsible for recording an audio stream. It orchestrated the recording process by calling serveral other system level commands. Some later stage processes even observed other processes, ending them at specific times, or restarting them if they crashed prematurely.

The spawned process was the starting point for a chain of events that involved the following:

  • capturing the audio stream to disk using mplayer
  • converting the audio file(s) to an MP3 file using ffmpeg
  • uploading the MP3 to a user’s Dropbox account
  • cleaning up local recording files

Caveats

While some of the priciples still hold true, the application was made in 2015 and my approach might be a bit different today.

I also would probably name the classes a bit differently now. I’m not a huge fan of the -er -or naming convention that I used for certain classes, but I digress!

The Code

Here is the starting point:

module Scheduler
  class RecordingProcess
    attr_reader :recording_schedule

    def initialize(recording_schedule)
      @recording_schedule = recording_schedule
      @pid_file_name = @recording_schedule.pid_file_name
    end

    def spawn
      return if running?
      logfile_path_option = if ENV['RECORDINGS_LOG_FOLDER']
        "-l #{ENV['RECORDINGS_LOG_FOLDER']}/#{recording_schedule.id}/recording.log"
      end

      pid_file_option = "-p #{@pid_file_name}"

      command = "cd #{Rails.root}/bin && FFMPEG_CMD=#{ ENV['FFMPEG_CMD'] } FFPROBE_CMD=#{ ENV['FFPROBE_CMD'] } ./record_stream -i #{ @recording_schedule.id } -u #{ @recording_schedule.stream_url } -d #{ @recording_schedule.duration } -f /tmp/radio_shifter_recording/#{ @recording_schedule.id } #{logfile_path_option} #{pid_file_option} > /dev/null"
      pid = Process.spawn(command)
      Process.detach(pid)
    end

    # TODO - Stop using PID to prevent duplicate recordings.
    # Have the recorder get a fail lock and exit on fail
    # We still want the PID for other reasons, so have the recorder
    # send it's PID to message queue when it starts, and wipe the PID
    # when it stops.
    def running?
      if File.exists?(@pid_file_name)
        pid = File.read(@pid_file_name).strip.to_i
        begin
          Process.kill(0, pid) == 1
        rescue Errno::ESRCH
          false
        end
      else
        false
      end
    end
  end
end
#!/usr/bin/env ruby
require 'logger'
require 'backburner'
require 'stalker'
require 'optparse'
require 'etc'
require 'fileutils'

class Rails
  class EnvString < String
    def development?
      self == 'development'
    end

    def production?
      self == 'production'
    end

    def test?
      self == 'test'
    end
  end

  def self.env
    @env ||= EnvString.new(ENV["RAILS_ENV"] || 'development')
  end
end

require_relative '../app/extras/services/recording_uploader'
require_relative '../config/initializers/backburner'
require_relative '../lib/scheduler/audio_concatenator'
require_relative '../lib/scheduler/audio_prober'
require_relative '../lib/scheduler/command_line_error'
require_relative '../lib/scheduler/mp3_audio_converter'
require_relative '../lib/scheduler/recorder'
require_relative '../lib/scheduler/recording_process_manager'
require_relative '../lib/scheduler/wav_audio_converter'

ENV['FFMPEG_CMD'] ||= 'ffmpeg'
ENV['FFPROBE_CMD'] ||= 'ffprobe'

# TODO - Research changing the process user and group.
#        This doesn't seem to work on Ubunto.
if Rails.env.production?
  u = Etc.getpwnam('rails')
  Process.uid = u.uid
  Process.gid = u.gid
end

options = {}
OptionParser.new do |opts|
  opts.banner = "Usage: record_stream [options]"
  opts.on('-i', '--schedule_id SCHEDULE_ID', 'The Schedule ID this recording belongs to') { |v| options[:schedule_id] = v }
  opts.on('-u', '--url URL', 'url to record from') { |v| options[:url] = v }
  opts.on('-d', '--duration SECONDS', 'Number of seconds to record') { |v| options[:duration] = v.to_i }
  opts.on('-f', '--folder FOLDER', 'The folder to place the recording in') { |v| options[:folder] = v }
  opts.on('-l', '--logfile FILE', 'The log file ') { |v| options[:log_file] = v }
  opts.on('-p', '--pidfile FILE', 'The PID file ') { |v| options[:pid_file] = v }

end.parse!

Process.daemon

logger = if options[:log_file]
  File.truncate(options[:log_file], 0) if File.exists?(options[:log_file])
  FileUtils::mkdir_p File.dirname(options[:log_file])
  Logger.new(options[:log_file])
end

recorder = Scheduler::Recorder.new(
  options[:url],
  options[:duration],
  options[:folder]
)

FileUtils::mkdir_p options[:folder]
FileUtils::mkdir_p File.dirname(options[:pid_file])
# TODO This is a stop gap for the above process permission issue.
if ENV["RAILS_ENV"] == 'production'
  FileUtils.chown 'rails', 'www-data', options[:folder]
  FileUtils.chown 'rails', 'www-data', File.dirname(options[:pid_file])
end

# TODO This is a stop gap for the above process permission issue
File.open(options[:pid_file], 'w') {|f| f.write(Process.pid) }
if ENV["RAILS_ENV"] == 'production'
  FileUtils.chown 'rails', 'www-data', options[:pid_file]
end

recording_manager = Scheduler::RecordingProcessManager.new(
    options[:schedule_id],
    recorder,
    ENV['FFMPEG_CMD'],
    ENV['FFPROBE_CMD'],
    logger
  )

recording_manager.start
logger.close

if File.exists?(options[:pid_file])
  File.delete(options[:pid_file])
end
module Scheduler
  class RecordingProcessManager
    def initialize(schedule_id, recorder, ffmpeg_path, ffprobe_path, logger = nil)
      @schedule_id = schedule_id
      @recorder = recorder
      @ffmpeg_path = ffmpeg_path
      @ffprobe_path = ffprobe_path
      @logger = logger
    end

    def start
      begin
        run
      rescue CommandLineError => e
      rescue CommandLineError => e
        if @logger
          @logger.error "Command line error."
          @logger.error "#{e.message}"
          @logger.error "#{e.command_output}"
          @logger.error e.backtrace.join("\n")
        end
      rescue Exception => e
        if @logger
          @logger.error "#{e.message}"
          @logger.error e.backtrace.join("\n")
        end
      end
    end

    def run
      success = false
      @recorder.start
      sleep 5
      # TODO - put the following steps into a separate
      # async action
      wav_converter = Scheduler::WavAudioConverter.new(@ffmpeg_path, @recorder.dump_files)
      wav_files = wav_converter.convert
      finalized_wav_file = get_finalized_wav_file(wav_files)
      if finalized_wav_file
        mp3_bitrate = wave_bitrate_to_mp3_bitrate(
          Scheduler::AudioProber.new(@ffprobe_path, finalized_wav_file, @logger).bitrate
        )
        mp3_file = finalized_wav_file.gsub('.wav', '.mp3')
        mp3_converter = Scheduler::Mp3AudioConverter.new(@ffmpeg_path, finalized_wav_file, mp3_file, mp3_bitrate, @logger)
        if mp3_converter.convert
          Services::RecordingUploader.async.upload(@schedule_id, mp3_file)
          cleanup_recording_folder(@recorder.folder)
          success = true
        end
      end
      success
    end

    private

    def cleanup_recording_folder(folder)
      files = Dir[folder + '/*.*'].select{|f| !File.directory?(f) &&  File.extname(f) != '.mp3' }
      File.delete(*files)
    end

    def get_finalized_wav_file(wav_files)
      if wav_files.size > 1
        output_file = "#{@recorder.folder}/combined.wav"
        concatenator = Scheduler::AudioConcatenator.new(@ffmpeg_path, wav_files, output_file)
        return output_file if concatenator.concatenate
      else
        wav_files.first
      end
    end

    def wave_bitrate_to_mp3_bitrate(wav_bitrate)
      if wav_bitrate <= 500000
        '48k'
      elsif wav_bitrate <= 800000
        '64k'
      elsif wav_bitrate <= 1199999
        '96k'
      else
        '128k'
      end
    end

  end
end

Conclusion

When spawning a new process with Ruby, the method you choose will depend on your use case. In my case, I needed to spawn a new process and detach from the current process. In other cases someone might want to wait for the spawned process to finish. Then there are times you way want to read the process output as it happens, and even send it input while it’s running. I hope this article was is helpful in deciding which method to use when spawning a new process with Ruby.