Package flumotion :: Package manager :: Module admin
[hide private]

Source Code for Module flumotion.manager.admin

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_manager_admin -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  manager-side objects to handle administrative clients 
 24  """ 
 25   
 26  import re 
 27  import os 
 28  import errno 
 29  from StringIO import StringIO 
 30   
 31  from twisted.internet import reactor 
 32  from twisted.python import failure 
 33  from twisted.spread import pb 
 34  from zope.interface import implements 
 35   
 36  from flumotion.manager import base 
 37  from flumotion.common import errors, interfaces, log, planet, registry, debug 
 38  from flumotion.common.python import makedirs 
 39   
 40  # make Result and Message proxyable 
 41  from flumotion.common import messages 
 42   
 43  # make ComponentState proxyable 
 44  from flumotion.twisted import flavors 
 45  from flumotion.common import componentui 
 46   
 47  __version__ = "$Rev: 7981 $" 
 48   
 49   
 50  # FIXME: rename to Avatar since we are in the admin. namespace ? 
 51   
 52   
53 -class AdminAvatar(base.ManagerAvatar):
54 """ 55 I am an avatar created for an administrative client interface. 56 A reference to me is given (for example, to gui.AdminInterface) 57 when logging in and requesting an "admin" avatar. 58 I live in the manager. 59 """ 60 logCategory = 'admin-avatar' 61 62 # override pb.Avatar implementation so we can run admin actions 63
64 - def perspectiveMessageReceived(self, broker, message, args, kwargs):
65 benignMethods = ('ping', ) 66 67 args = broker.unserialize(args) 68 kwargs = broker.unserialize(kwargs) 69 70 if message not in benignMethods: 71 self.vishnu.adminAction(self.remoteIdentity, message, args, kwargs) 72 73 return base.ManagerAvatar.perspectiveMessageReceivedUnserialised( 74 self, broker, message, args, kwargs)
75 76 ### pb.Avatar IPerspective methods 77
79 """ 80 Get the planet state. 81 82 @rtype: L{flumotion.common.planet.ManagerPlanetState} 83 """ 84 self.debug("returning planet state %r" % self.vishnu.state) 85 return self.vishnu.state
86
88 """ 89 Get the worker heaven state. 90 91 @rtype: L{flumotion.common.worker.ManagerWorkerHeavenState} 92 """ 93 self.debug("returning worker heaven state %r" % self.vishnu.state) 94 return self.vishnu.workerHeaven.state
95
96 - def perspective_componentStart(self, componentState):
97 """ 98 Start the given component. The component should be sleeping before 99 this. 100 101 @type componentState: L{planet.ManagerComponentState} 102 """ 103 self.debug('perspective_componentStart(%r)' % componentState) 104 return self.vishnu.componentCreate(componentState)
105
106 - def perspective_componentStop(self, componentState):
107 """ 108 Stop the given component. 109 If the component was sad, we clear its sad state as well, 110 since the stop was explicitly requested by the admin. 111 112 @type componentState: L{planet.ManagerComponentState} 113 """ 114 self.debug('perspective_componentStop(%r)' % componentState) 115 return self.vishnu.componentStop(componentState)
116
117 - def perspective_componentRestart(self, componentState):
118 """ 119 Restart the given component. 120 121 @type componentState: L{planet.ManagerComponentState} 122 """ 123 self.debug('perspective_componentRestart(%r)' % componentState) 124 d = self.perspective_componentStop(componentState) 125 d.addCallback(lambda *x: self.perspective_componentStart( 126 componentState)) 127 return d
128 129 # Generic interface to call into a component 130
131 - def perspective_componentCallRemote(self, componentState, methodName, 132 *args, **kwargs):
133 """ 134 Call a method on the given component on behalf of an admin client. 135 136 @param componentState: state of the component to call the method on 137 @type componentState: L{planet.ManagerComponentState} 138 @param methodName: name of the method to call. Gets proxied to 139 L{flumotion.component.component.""" \ 140 """BaseComponentMedium}'s remote_(methodName) 141 @type methodName: str 142 143 @rtype: L{twisted.internet.defer.Deferred} 144 """ 145 assert isinstance(componentState, planet.ManagerComponentState), \ 146 "%r is not a componentState" % componentState 147 148 if methodName == "start": 149 self.warning('forwarding "start" to perspective_componentStart') 150 return self.perspective_componentStart(componentState) 151 152 m = self.vishnu.getComponentMapper(componentState) 153 if not m: 154 self.warning('Component not mapped. Maybe deleted.') 155 raise errors.UnknownComponentError(componentState) 156 157 avatar = m.avatar 158 159 if not avatar: 160 self.warning('No avatar for %s, cannot call remote' % 161 componentState.get('name')) 162 raise errors.SleepingComponentError(componentState) 163 164 # XXX: Maybe we need to have a prefix, so we can limit what an 165 # admin interface can call on a component 166 try: 167 return avatar.mindCallRemote(methodName, *args, **kwargs) 168 except Exception, e: 169 msg = "exception on remote call %s: %s" % (methodName, 170 log.getExceptionMessage(e)) 171 self.warning(msg) 172 raise errors.RemoteMethodError(methodName, 173 log.getExceptionMessage(e))
174
175 - def perspective_workerCallRemote(self, workerName, methodName, 176 *args, **kwargs):
177 """ 178 Call a remote method on the worker. 179 This is used so that admin clients can call methods from the interface 180 to the worker. 181 182 @param workerName: the worker to call 183 @type workerName: str 184 @param methodName: Name of the method to call. Gets proxied to 185 L{flumotion.worker.medium.WorkerMedium} 's 186 remote_(methodName) 187 @type methodName: str 188 """ 189 190 self.debug('AdminAvatar.workerCallRemote(%r, %r)' % ( 191 workerName, methodName)) 192 workerAvatar = self.vishnu.workerHeaven.getAvatar(workerName) 193 194 # XXX: Maybe we need to a prefix, so we can limit what an admin 195 # interface can call on a worker 196 try: 197 return workerAvatar.mindCallRemote(methodName, *args, **kwargs) 198 except Exception, e: 199 self.warning("exception on remote call: %s" % 200 log.getExceptionMessage(e)) 201 return failure.Failure(errors.RemoteMethodError(methodName, 202 log.getExceptionMessage(e)))
203
204 - def perspective_getEntryByType(self, componentType, entryType):
205 """ 206 Get the entry point for a piece of bundled code in a component by type. 207 @param componentType: the component 208 @type componentType: a string 209 @param entryType: location of the entry point 210 @type entryType: a string 211 Returns: a (filename, methodName) tuple, or raises:: 212 - NoBundleError if the entry location does not exist 213 """ 214 assert componentType is not None 215 216 self.debug('getting entry of type %s for component type %s', 217 entryType, componentType) 218 219 try: 220 componentRegistryEntry = registry.getRegistry().getComponent( 221 componentType) 222 # FIXME: add logic here for default entry points and functions 223 entry = componentRegistryEntry.getEntryByType(entryType) 224 except KeyError: 225 self.warning("Could not find bundle for %s(%s)" % ( 226 componentType, entryType)) 227 raise errors.NoBundleError("entry type %s in component type %s" % 228 (entryType, componentType)) 229 230 filename = os.path.join(componentRegistryEntry.base, entry.location) 231 self.debug('entry point is in file path %s and function %s' % ( 232 filename, entry.function)) 233 return (filename, entry.function)
234
235 - def perspective_getPlugEntry(self, plugType, entryType):
236 """ 237 Get the entry point for a piece of bundled code in a plug by type. 238 @param plugType: the plug 239 @type plugType: a string 240 @param entryType: location of the entry point 241 @type entryType: a string 242 Returns: a (filename, methodName) tuple, or raises:: 243 - NoBundleError if the entry location does not exist 244 """ 245 assert plugType is not None 246 247 self.debug('getting entry of type %s for plug type %s', 248 entryType, plugType) 249 250 try: 251 plugRegistryEntry = registry.getRegistry().getPlug(plugType) 252 entry = plugRegistryEntry.getEntryByType(entryType) 253 except KeyError: 254 self.warning("Could not find bundle for %s(%s)" % ( 255 plugType, entryType)) 256 raise errors.NoBundleError("entry type %s in plug type %s" % 257 (entryType, plugType)) 258 259 self.debug('entry point is in file path %s and function %s' % ( 260 entry.location, entry.function)) 261 return (entry.location, entry.function)
262
264 """ 265 Get the configuration of the manager as an XML string. 266 267 @rtype: str 268 """ 269 return self.vishnu.getConfiguration()
270
271 - def perspective_getScenarioByType(self, scenarioType, entryType):
272 """ 273 Remote method that gets the scenario of a given type. 274 275 @param scenarioType: the component 276 @type scenarioType: a string 277 Returns: a (filename, methodName) tuple, or raises:: 278 - NoBundleError if the entry location does not exist 279 """ 280 assert scenarioType is not None 281 282 self.debug('getting entry of type %s for scenario type %s', 283 entryType, scenarioType) 284 285 try: 286 scenarioRegistryEntry = registry.getRegistry().getScenarioByType( 287 scenarioType) 288 # FIXME: add logic here for default entry points and functions 289 entry = scenarioRegistryEntry.getEntryByType(entryType) 290 except KeyError: 291 self.warning("Could not find bundle for %s(%s)" % ( 292 scenarioType, entryType)) 293 raise errors.NoBundleError("entry type %s in component type %s" % 294 (entryType, scenarioType)) 295 296 filename = os.path.join(scenarioRegistryEntry.getBase(), 297 entry.getLocation()) 298 self.debug('entry point is in file path %s and function %s' % ( 299 filename, entry.function)) 300 301 return (filename, entry.getFunction())
302
303 - def perspective_getScenarios(self):
304 """ 305 Get all the scenarios defined on the registry. 306 307 @rtype : List of L{IScenarioAssistantPlugin} 308 """ 309 r = registry.getRegistry() 310 return r.getScenarios()
311
312 - def _saveFlowFile(self, filename):
313 """Opens a file that the flow should be written to. 314 315 Note that the returned file object might be an existing file, 316 opened in append mode; if the loadConfiguration operation 317 succeeds, the file should first be truncated before writing. 318 """ 319 self.vishnu.adminAction(self.remoteIdentity, 320 '_saveFlowFile', (), {}) 321 322 def ensure_sane(name, extra=''): 323 if not re.match('^[a-zA-Z0-9_' + extra + '-]+$', name): 324 raise errors.ConfigError, \ 325 'Invalid planet or saveAs name: %s' % name
326 327 ensure_sane(self.vishnu.configDir, '/') 328 ensure_sane(filename) 329 directory = os.path.join(self.vishnu.configDir, "flows") 330 self.debug('told to save flow as %s/%s.xml', directory, filename) 331 try: 332 makedirs(directory, 0770) 333 except OSError, e: 334 if e.errno != errno.EEXIST: 335 raise e 336 prev = os.umask(0007) 337 output = open(os.path.join(directory, filename + '.xml'), 'a') 338 os.umask(prev) 339 return output
340
341 - def perspective_loadConfiguration(self, xml, saveAs=None):
342 """ 343 Load the given XML configuration into the manager. If the 344 optional saveAs parameter is passed, the XML snippet will be 345 saved to disk in the manager's flows directory. 346 347 @param xml: the XML configuration snippet. 348 @type xml: str 349 @param saveAs: The name of a file to save the XML as. 350 @type saveAs: str 351 """ 352 353 if saveAs: 354 output = self._saveFlowFile(saveAs) 355 356 # Update the registry if needed, so that new/changed component types 357 # can be parsed. 358 registry.getRegistry().verify() 359 360 f = StringIO(xml) 361 res = self.vishnu.loadComponentConfigurationXML(f, self.remoteIdentity) 362 f.close() 363 364 if saveAs: 365 366 def success(res): 367 self.debug('loadConfiguration succeeded, writing flow to %r', 368 output) 369 output.truncate(0) 370 output.write(xml) 371 output.close() 372 return res
373 374 def failure(res): 375 self.debug('loadConfiguration failed, leaving %r as it was', 376 output) 377 output.close() 378 return res 379 res.addCallbacks(success, failure) 380 381 return res 382
383 - def perspective_loadComponent(self, componentType, componentId, 384 componentLabel, properties, workerName, 385 plugs=None, eaters=None, 386 isClockMaster=None, virtualFeeds=None):
387 """ 388 Load a component into the manager configuration. 389 Returns a deferred that will be called with the component state. 390 391 @param componentType: The registered type of the component to be added 392 @type componentType: str 393 @param componentId: The identifier of the component to add, 394 should be created by the function 395 L{flumotion.common.common.componentId} 396 @type componentId: str 397 @param componentLabel: The human-readable label of the component. 398 if None, no label will be set. 399 @type componentLabel: str or None 400 @param properties: List of property name-value pairs. 401 See L{flumotion.common.config.buildPropertyDict} 402 @type properties: list of (str, object) 403 @param workerName: the name of the worker where the added 404 component should run. 405 @type workerName: str 406 @param plugs: List of plugs, as type-propertyList pairs. 407 See {flumotion.manager.config.buildPlugsSet}. 408 @type plugs: [(str, [(str, object)])] 409 @param eaters: List of (eater name, feed ID) pairs. 410 See L{flumotion.manager.config.buildEatersDict} 411 @type eaters: [(str, str)] 412 @param isClockMaster: True if the component to be added must be 413 a clock master. Passing False here means 414 that the manager will choose what 415 component, if any, will be clock master 416 for this flow. 417 @type isClockMaster: bool 418 @param virtualFeeds: List of (virtual feed, feeder name) pairs. 419 See L{flumotion.manager.config.buildVirtualFeeds} 420 @type virtualFeeds: [(str, str)] 421 """ 422 return self.vishnu.loadComponent(self.remoteIdentity, componentType, 423 componentId, componentLabel, 424 properties, workerName, 425 plugs or [], eaters or [], 426 isClockMaster, virtualFeeds or [])
427
428 - def perspective_deleteFlow(self, flowName):
429 return self.vishnu.deleteFlow(flowName)
430
431 - def perspective_deleteComponent(self, componentState):
432 """Delete a component from the manager. 433 434 A component can only be deleted when it is sleeping or sad. It 435 is the caller's job to ensure this is the case; calling this 436 function on a running component will raise a ComponentBusyError. 437 438 @returns: a deferred that will fire when all listeners have been 439 notified of the component removal 440 """ 441 return self.vishnu.deleteComponent(componentState)
442
443 - def perspective_getVersions(self):
444 return debug.getVersions()
445
446 - def perspective_cleanComponents(self):
447 return self.vishnu.emptyPlanet()
448
449 - def perspective_getWizardEntries(self, types=None, provides=None, 450 accepts=None):
451 """ 452 Fetches the wizard entries which matches the parameters sent in 453 454 @param types: list of component types to fetch, is usually 455 something like ['video-producer'] or ['audio-encoder'] 456 @type types: list of strings 457 @param provides: formats provided, eg ['jpeg', 'speex'] 458 @type provides: list of strings 459 @param accepts: formats accepted, eg ['theora'] 460 @type accepts: list of strings 461 @returns: L{componentui.WizardEntryState} 462 """ 463 464 def extract(wizards): 465 for wizard in wizards: 466 if types is not None: 467 if wizard.type not in types: 468 continue 469 if provides is not None: 470 for format in wizard.provides: 471 if format.media_type in provides: 472 break 473 else: 474 continue 475 if accepts is not None: 476 for format in wizard.accepts: 477 if format.media_type in accepts: 478 break 479 else: 480 continue 481 yield wizard
482 483 retval = [] 484 r = registry.getRegistry() 485 for component in r.getComponents(): 486 retval += extract(component.wizards) 487 for plug in r.getPlugs(): 488 retval += extract(plug.wizards) 489 del r 490 491 return retval 492
493 - def perspective_getComponentEntry(self, componentType):
494 """Fetches a ComponentRegistryEntry given a componentType 495 @param componentType: component type 496 @type componentType: string 497 @returns: the component 498 @rtype: L{ComponentRegistryEntry} 499 """ 500 try: 501 componentRegistryEntry = registry.getRegistry().getComponent( 502 componentType) 503 except KeyError: 504 return None 505 return componentRegistryEntry
506 507
508 -class AdminHeaven(base.ManagerHeaven):
509 """ 510 I interface between the Manager and administrative clients. 511 For each client I create an L{AdminAvatar} to handle requests. 512 I live in the manager. 513 """ 514 515 logCategory = "admin-heaven" 516 implements(interfaces.IHeaven) 517 avatarClass = AdminAvatar
518