Package flumotion :: Package component :: Package misc :: Package httpserver :: Package httpcached :: Module request_manager
[hide private]

Source Code for Module flumotion.component.misc.httpserver.httpcached.request_manager

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_component_providers -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22   
 23  from flumotion.common import log 
 24  from flumotion.component.misc.httpserver.httpcached import common 
 25  from flumotion.component.misc.httpserver.httpcached import http_utils 
 26  from flumotion.component.misc.httpserver.httpcached import server_selection 
 27   
 28   
 29  LOG_CATEGORY = "request-manager" 
30 31 32 -class RequestManager(log.Loggable):
33 34 logCategory = LOG_CATEGORY 35
36 - def __init__(self, selector, client):
37 """ 38 Selector: a ServerSelector 39 Client: HttpClient (StreamRequester) 40 """ 41 self.selector = selector 42 self.client = client
43
44 - def retrieve(self, consumer, url, 45 ifModifiedSince=None, ifUnmodifiedSince=None, 46 start=None, size=None):
47 """ 48 Consumer: a StreamConsumer 49 Url: 50 Start: Position from which to start the download 51 Size: Number of bytes to download 52 IfModifiedSince: 53 IfUnmodifiedSince: 54 """ 55 servers = self.selector.getServers() 56 consumer_manager = ConsumerManager(consumer, url, start, size, 57 ifModifiedSince, ifUnmodifiedSince, 58 servers, self.client) 59 return consumer_manager.retrieve()
60
61 - def setup(self):
62 return self.selector.setup()
63
64 - def cleanup(self):
65 return self.selector.cleanup()
66
67 68 -class ConsumerManager(common.StreamConsumer, log.Loggable):
69 70 logCategory = LOG_CATEGORY 71
72 - def __init__(self, consumer, url, start, size, ifModifiedSince, 73 ifUnmodifiedSince, servers, client):
74 self.consumer = consumer 75 self.url = url 76 self.start = start 77 self.size = size 78 self.ifModifiedSince = ifModifiedSince 79 self.ifUnmodifiedSince = ifUnmodifiedSince 80 self.servers = servers 81 self.client = client 82 self.current_server = None 83 self.current_request = None 84 self.last_error = None 85 self.last_message = None 86 87 self.logName = common.log_id(self) # To be able to track the instance
88 89 @property
90 - def host(self):
91 if self.current_request: 92 return self.current_request.host 93 return None
94 95 @property
96 - def port(self):
97 if self.current_request: 98 return self.current_request.port 99 return None
100
101 - def retrieve(self):
102 try: 103 s = self.servers.next() 104 self.current_server = s 105 if self.size is None or self.start is None: 106 self.debug("Retrieving %s from %s:%s", self.url, 107 self.current_server.ip, self.current_server.port) 108 else: 109 self.debug("Retrieving range %s-%s (%s B) of %s from %s:%s", 110 self.start, self.start + self.size, self.size, 111 self.url, self.current_server.ip, 112 self.current_server.port) 113 proxy_address = s.ip 114 proxy_port = s.port 115 self.current_request =\ 116 self.client.retrieve(self, self.url, 117 proxyAddress=proxy_address, 118 proxyPort=proxy_port, 119 ifModifiedSince=self.ifModifiedSince, 120 ifUnmodifiedSince=self.ifUnmodifiedSince, 121 start=self.start, size=self.size) 122 self.log("Retrieving data using %s", self.current_request.logName) 123 return self 124 except StopIteration: 125 code = self.last_error or common.SERVER_UNAVAILABLE 126 message = self.last_message or "" 127 self.consumer.serverError(self, code, message) 128 return self
129
130 - def pause(self):
131 self.log("Pausing request %s", self.url) 132 self.current_request.pause()
133
134 - def resume(self):
135 self.log("Resuming request %s", self.url) 136 self.current_request.resume()
137
138 - def cancel(self):
139 self.debug("Canceling request %s", self.url) 140 self.current_request.cancel() 141 self.current_request = None
142
143 - def serverError(self, getter, code, message):
144 self.debug("Server Error %s (%s) for %s using %s:%s", 145 message, code, self.url, getter.host, getter.port) 146 self.last_error = code 147 self.last_message = message 148 if code in (common.SERVER_DISCONNECTED, 149 common.SERVER_TIMEOUT): 150 # The connection was established 151 # and data may have already been received. 152 self.consumer.serverError(self, code, message) 153 return 154 self.current_server.reportError(code) 155 self.retrieve()
156
157 - def conditionFail(self, getter, code, message):
158 if self.current_request is None: 159 return 160 self.log("Condition Error %s (%s) for %s", 161 message, code, self.url) 162 self.consumer.conditionFail(self, code, message)
163
164 - def streamNotAvailable(self, getter, code, message):
165 if self.current_request is None: 166 return 167 self.log("Stream not available \"%s\" for %s", message, self.url) 168 self.consumer.streamNotAvailable(self, code, message)
169
170 - def onInfo(self, getter, info):
171 if self.current_request is None: 172 return 173 self.consumer.onInfo(self, info)
174
175 - def onData(self, getter, data):
176 if self.current_request is None: 177 return 178 self.consumer.onData(self, data)
179
180 - def streamDone(self, getter):
181 if self.current_request is None: 182 return 183 self.consumer.streamDone(self)
184