require 'socket' require 'optparse' class Dropper def initialize(loss, period, amplitude, phase) @loss = loss @t0 = Time.now - period * phase if period != 0 @omega = 2 * Math::PI / period else @omega = 0 end @amplitude = amplitude puts self.inspect if $DEBUG end def setLossRate(newRate) if newRate >= 0.0 && newRate <= 1.0 puts "Time: " + Time.now.to_s + " Loss Rate: " + @loss.to_s @loss = newRate end end def drop? p = @loss * (1 + (Math.sin((Time.now - @t0) * @omega) * @amplitude)) STDOUT.puts Time.now.to_s + " #{p}" if $DEBUG dropping = rand < p STDOUT.puts dropping ? "Dropping (#{p})" : "Keeping (#{p})" if $DEBUG dropping end end class ShaperQueue OVERHEAD = 28 # IP + UDP def initialize(delay, rate) @delay = Float(delay) @rate = Float(rate) @q = [] @next = Time.now end def enqueue(buf) t = Time.now + @delay t = @next if @next > t @q.push([t, buf]) @next = t + (buf.size+OVERHEAD)*8/@rate end def when @q.first.first unless @q.empty? end def dequeue @q.shift[1] end end class UDPReplySocket < UDPSocket attr_accessor :q, :rev def initialize(hp, af = Socket::AF_INET) super(af) bind(*hp) @peer = nil end def urecv d, si = recvfrom(65536) if @peer != si STDOUT.puts "Fixing to #{si[3]}:#{si[1]}" if $DEBUG @peer = si end d end def usend(d) raise "Unset peer" unless @peer send(d, 0, @peer[3], @peer[1]) end end class UDPBoundSocket < UDPSocket attr_accessor :q, :rev def initialize(hp, af = Socket::AF_INET) super(af) connect(*hp) @peer = nil @hp = hp end def urecv d, si = recvfrom(65536) if @peer != si STDOUT.puts "Receiving from to #{si[3]}:#{si[1]} -- expected #{@hp[0]}:#{@hp[1]}" @peer = si end d end def usend(d) send(d, 0) end end class LossValueUpdater def initialize(lossVar, lossModule) @lm = lossModule @text = File.readlines(lossVar) @baseTime = Time.now @numbers = @text[0].scan /[-+]?\d*\.?\d+/ @pos = 1 end def updatetimeLossParms if @numbers[0] != nil currTime = Time.now diffTime = currTime.to_i - @baseTime.to_i if diffTime >= @numbers[0].to_i @lm.setLossRate(@numbers[1].to_f) if @text[@pos] != nil @numbers = @text[@pos].scan /[-+]?\d*\.?\d+/ else @numbers[0] = nil end @pos += 1 end end end end if $0 == __FILE__ $dropper = nil $fileName = nil $lossTimevar = nil OptionParser.new do |opts| opts.banner = "Usage: #{$0} options" opts.on('-p LOSSFRACTION[:MODPERIOD:MODAMP[:MODPHASE]]', String, /([\d.]+)(?::([\d.]+):([\d.]+)(?::([\d.]+))?)?/, "Loss fraction (0.0..1.0), optionally\nfollowed by frequency (in 1/s), amplitude (fraction of fraction) and\noptionally phase of modulation function") { |value| loss = value[1].to_f raise OptionParser::ParseError, "$loss must be between 0 and 1" unless loss.between?(0.0, 1.0) $dropper = Dropper.new(loss, *value[2..4].map {|x| x.to_f }) } opts.on('-l [HOST:]PORT', String, /(?:(.*):)?(\d+)/, "The host/port the initiator sends to") { |value| $lhp = value[1, 2] } opts.on('-c [HOST:]PORT', String, /(?:(.*):)?(\d+)/, "The host/port where the respondent is waiting") { |value| $chp = value[1, 2] } opts.on('-b BITRATE', Float, "Bitrate in bit/s (float)") { | $rate | } opts.on('-d DELAY', Float, "Delay in s (float)") { | $delay | } opts.on('-f FILENAME', String, "File representing loss variations") { |$fileName | } begin opts.parse! raise OptionParser::ParseError, "All options (except -d) must be given" unless $lhp and $chp and $rate and $delay rescue OptionParser::ParseError => e puts "ERROR: #{e.to_s}" puts puts opts exit -1 end end so = [UDPReplySocket.new($lhp), UDPBoundSocket.new($chp)] 2.times do |i| so[i].q = ShaperQueue.new($delay, $rate) so[1-i].rev = so[i] end $lossTimevar = LossValueUpdater.new($fileName, $dropper) if $fileName != nil loop do t = Time.now t1 = nil $lossTimevar.updatetimeLossParms if $lossTimevar != nil so.each do |s| s.rev.usend(s.q.dequeue) while s.q.when and s.q.when < t if t2 = s.q.when t1 = t2 if !t1 or t1 > t2 end end t1 -= t if t1 puts "Delaying for #{t1}" if $DEBUG r, w, e = select(so, nil, nil, t1) r.each do |s| p = s.urecv s.q.enqueue(p) unless $dropper && $dropper.drop? end if r end end