EXAMPLE_03_STOCKS

Path: doc/EXAMPLE_03_STOCKS
Last Update: Mon May 10 06:42:50 +0000 2010

Stocks Example

  $:.unshift File.dirname(__FILE__) + '/../../lib'
  require 'mq'

  AMQP.start(:host => 'localhost') do

    def log *args
      p [ Time.now, *args ]
    end

    def publish_stock_prices
      mq = MQ.new
      EM.add_periodic_timer(1){
        puts

        { :appl => 170+rand(1000)/100.0,
          :msft => 22+rand(500)/100.0
        }.each do |stock, price|
          stock = "usd.#{stock}"

          log :publishing, stock, price
          mq.topic('stocks').publish(price, :key => stock)
        end
      }
    end

    def watch_appl_stock
      mq = MQ.new
      mq.queue('apple stock').bind(mq.topic('stocks'), :key => 'usd.appl').subscribe{ |price|
        log 'apple stock', price
      }
    end

    def watch_us_stocks
      mq = MQ.new
      mq.queue('us stocks').bind(mq.topic('stocks'), :key => 'usd.*').subscribe{ |info, price|
        log 'us stock', info.routing_key, price
      }
    end

    publish_stock_prices
    watch_appl_stock
    watch_us_stocks

  end

  __END__

  [Fri Aug 15 01:39:00 -0700 2008, :publishing, "usd.appl", 173.45]
  [Fri Aug 15 01:39:00 -0700 2008, :publishing, "usd.msft", 26.98]
  [Fri Aug 15 01:39:00 -0700 2008, "apple stock", "173.45"]
  [Fri Aug 15 01:39:00 -0700 2008, "us stock", "usd.appl", "173.45"]
  [Fri Aug 15 01:39:00 -0700 2008, "us stock", "usd.msft", "26.98"]

  [Fri Aug 15 01:39:01 -0700 2008, :publishing, "usd.appl", 179.72]
  [Fri Aug 15 01:39:01 -0700 2008, :publishing, "usd.msft", 26.56]
  [Fri Aug 15 01:39:01 -0700 2008, "apple stock", "179.72"]
  [Fri Aug 15 01:39:01 -0700 2008, "us stock", "usd.appl", "179.72"]
  [Fri Aug 15 01:39:01 -0700 2008, "us stock", "usd.msft", "26.56"]

[Validate]