001/* 002 * Copyright 2009 Red Hat, Inc. 003 * Red Hat licenses this file to you under the Apache License, version 004 * 2.0 (the "License"); you may not use this file except in compliance 005 * with the License. You may obtain a copy of the License at 006 * http://www.apache.org/licenses/LICENSE-2.0 007 * Unless required by applicable law or agreed to in writing, software 008 * distributed under the License is distributed on an "AS IS" BASIS, 009 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 010 * implied. See the License for the specific language governing 011 * permissions and limitations under the License. 012 */ 013package org.hornetq.api.core.client; 014 015import java.util.UUID; 016 017import org.hornetq.api.core.SimpleString; 018import org.hornetq.core.client.impl.ClientMessageImpl; 019 020/** 021 * The ClientRequestor class helps making requests. 022 * <br> 023 * The ClientRequestor constructor is given a ClientSession and a request address. 024 * It creates a temporary queue for the responses and provides a request method that sends the request message and waits for its reply. 025 * 026 * @apiviz.uses org.hornetq.api.core.client.ClientSession 027 * @apiviz.owns org.hornetq.api.core.client.ClientProducer 028 * @apiviz.owns org.hornetq.api.core.client.ClientConsumer 029 * 030 * @author <a href="jmesnil@redhat.com">Jeff Mesnil</a> 031 */ 032public class ClientRequestor 033{ 034 private final ClientSession queueSession; 035 036 private final ClientProducer requestProducer; 037 038 private final ClientConsumer replyConsumer; 039 040 private final SimpleString replyQueue; 041 042 /** 043 * Constructor for the ClientRequestor. 044 * 045 * The implementation expects a ClientSession with automatic commits of sends and acknowledgements 046 * 047 * @param session a ClientSession uses to handle requests and replies 048 * @param requestAddress the address to send request messages to 049 * @throws Exception 050 */ 051 public ClientRequestor(final ClientSession session, final SimpleString requestAddress) throws Exception 052 { 053 queueSession = session; 054 055 requestProducer = queueSession.createProducer(requestAddress); 056 replyQueue = new SimpleString(requestAddress + "." + UUID.randomUUID().toString()); 057 queueSession.createTemporaryQueue(replyQueue, replyQueue); 058 replyConsumer = queueSession.createConsumer(replyQueue); 059 } 060 061 /** 062 * @see ClientRequestor#ClientRequestor(ClientSession, SimpleString) 063 */ 064 public ClientRequestor(final ClientSession session, final String requestAddress) throws Exception 065 { 066 this(session, SimpleString.toSimpleString(requestAddress)); 067 } 068 069 /** 070 * Sends a message to the request address and wait indefinitely for a reply. 071 * The temporary queue is used for the REPLYTO_HEADER_NAME, and only one reply per request is expected 072 * 073 * @param request the message to send 074 * @return the reply message 075 * @throws Exception 076 */ 077 public ClientMessage request(final ClientMessage request) throws Exception 078 { 079 return request(request, 0); 080 } 081 082 /** 083 * Sends a message to the request address and wait for the given timeout for a reply. 084 * The temporary queue is used for the REPLYTO_HEADER_NAME, and only one reply per request is expected 085 * 086 * @param request the message to send 087 * @param timeout the timeout to wait for a reply (in milliseconds) 088 * @return the reply message or <code>null</code> if no message is replied before the timeout elapses 089 * @throws Exception 090 */ 091 public ClientMessage request(final ClientMessage request, final long timeout) throws Exception 092 { 093 request.putStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME, replyQueue); 094 requestProducer.send(request); 095 return replyConsumer.receive(timeout); 096 } 097 098 /** 099 * Closes the ClientRequestor and its session. 100 * 101 * @throws Exception if an exception occurs while closing the ClientRequestor 102 */ 103 public void close() throws Exception 104 { 105 replyConsumer.close(); 106 queueSession.deleteQueue(replyQueue); 107 queueSession.close(); 108 } 109 110}