def process_frame frame
log :received, frame
case frame
when Frame::Header
@header = frame.payload
@body = ''
when Frame::Body
@body << frame.payload
if @body.length >= @header.size
@header.properties.update(@method.arguments)
@consumer.receive @header, @body if @consumer
@body = @header = @consumer = @method = nil
end
when Frame::Method
case method = frame.payload
when Protocol::Channel::OpenOk
send Protocol::Access::Request.new(:realm => '/data',
:read => true,
:write => true,
:active => true,
:passive => true)
when Protocol::Access::RequestOk
@ticket = method.ticket
callback{
send Protocol::Channel::Close.new(:reply_code => 200,
:reply_text => 'bye',
:method_id => 0,
:class_id => 0)
} if @closing
succeed
when Protocol::Basic::CancelOk
if @consumer = consumers[ method.consumer_tag ]
@consumer.cancelled
else
MQ.error "Basic.CancelOk for invalid consumer tag: #{method.consumer_tag}"
end
when Protocol::Queue::DeclareOk
queues[ method.queue ].receive_status method
when Protocol::Basic::Deliver, Protocol::Basic::GetOk
@method = method
@header = nil
@body = ''
if method.is_a? Protocol::Basic::GetOk
@consumer = get_queue{|q| q.shift }
MQ.error "No pending Basic.GetOk requests" unless @consumer
else
@consumer = consumers[ method.consumer_tag ]
MQ.error "Basic.Deliver for invalid consumer tag: #{method.consumer_tag}" unless @consumer
end
when Protocol::Basic::GetEmpty
if @consumer = get_queue{|q| q.shift }
@consumer.receive nil, nil
else
MQ.error "Basic.GetEmpty for invalid consumer"
end
when Protocol::Channel::Close
raise Error, "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]} on #{@channel}"
when Protocol::Channel::CloseOk
@closing = false
conn.callback{ |c|
c.channels.delete @channel
c.close if c.channels.empty?
}
when Protocol::Basic::ConsumeOk
if @consumer = consumers[ method.consumer_tag ]
@consumer.confirm_subscribe
else
MQ.error "Basic.ConsumeOk for invalid consumer tag: #{method.consumer_tag}"
end
end
end
end