1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 manager-side objects to handle administrative clients
24 """
25
26 import re
27 import os
28 import errno
29 from StringIO import StringIO
30
31 from twisted.internet import reactor
32 from twisted.python import failure
33 from twisted.spread import pb
34 from zope.interface import implements
35
36 from flumotion.manager import base
37 from flumotion.common import errors, interfaces, log, planet, registry, debug
38 from flumotion.common import common
39 from flumotion.common.python import makedirs
40 from flumotion.monitor.nagios import util
41
42
43 from flumotion.common import messages
44
45
46 from flumotion.twisted import flavors
47 from flumotion.common import componentui
48
49 __version__ = "$Rev$"
50
51
52
53
54
56 """
57 I am an avatar created for an administrative client interface.
58 A reference to me is given (for example, to gui.AdminInterface)
59 when logging in and requesting an "admin" avatar.
60 I live in the manager.
61 """
62 logCategory = 'admin-avatar'
63
64
65
77
78
79
81 """
82 Get the planet state.
83
84 @rtype: L{flumotion.common.planet.ManagerPlanetState}
85 """
86 self.debug("returning planet state %r" % self.vishnu.state)
87 return self.vishnu.state
88
90 """
91 Get the worker heaven state.
92
93 @rtype: L{flumotion.common.worker.ManagerWorkerHeavenState}
94 """
95 self.debug("returning worker heaven state %r" % self.vishnu.state)
96 return self.vishnu.workerHeaven.state
97
99 """
100 Start the given component. The component should be sleeping before
101 this.
102
103 @type componentState: L{planet.ManagerComponentState}
104 """
105 self.debug('perspective_componentStart(%r)' % componentState)
106 return self.vishnu.componentCreate(componentState)
107
109 """
110 Stop the given component.
111 If the component was sad, we clear its sad state as well,
112 since the stop was explicitly requested by the admin.
113
114 @type componentState: L{planet.ManagerComponentState}
115 """
116 self.debug('perspective_componentStop(%r)' % componentState)
117 return self.vishnu.componentStop(componentState)
118
130
131
132
176
178 """
179 List components in the planet. Returns a list of avatar ids.
180 """
181 componentStates = self.vishnu.state.getComponents()
182 avatar_ids = [common.componentId(c.get('parent').get('name'),
183 c.get('name'))
184 for c in componentStates]
185 return avatar_ids
186
203
205 """
206 List a component and its upstream components along with
207 types and worker hosts.
208
209 @param avatarId: the component avatar id
210 @type avatarId: str
211 """
212
213 def get_eaters_ids(eaters_dic):
214 avatars = []
215 for flow in eaters_dic.keys():
216 comps = eaters_dic[flow]
217 for c in comps:
218 (name, what) = c[0].split(':')
219 avatars.append('/%s/%s' % (flow, name))
220 return avatars
221
222 def create_response(components, workers):
223 comps = []
224 for c in components:
225 workerName = c.get('workerName')
226 host = "unknown"
227 for w in workers:
228 if workerName == w.get('name'):
229 host = w.get('host')
230 break
231 comps.append((c.get('name'), c.get('type'), host))
232 return comps
233
234 component = util.findComponent(self.vishnu.state, avatarId)
235 if not component:
236 self.warning('No component with avatar id %s' % avatarId)
237 raise errors.UnknownComponentError(avatarId)
238
239 eaters = component.get('config').get('eater', {})
240 eaters_id = get_eaters_ids(eaters)
241 comps = [component]
242 while len(eaters_id) > 0:
243 eaters = {}
244 for i in eaters_id:
245 try:
246 compState = util.findComponent(self.vishnu.state, i)
247 comps.append(compState)
248 eaters.update(compState.get('config').get('eater', {}))
249 except Exception, e:
250 self.debug(log.getExceptionMessage(e))
251 emsg = "Error retrieving component '%s'" % i
252 raise errors.UnknownComponentError(emsg)
253 eaters_id = get_eaters_ids(eaters)
254
255 workers = self.vishnu.workerHeaven.state.get('workers')
256 return create_response(comps, workers)
257
260 """
261 Call a remote method on the worker.
262 This is used so that admin clients can call methods from the interface
263 to the worker.
264
265 @param workerName: the worker to call
266 @type workerName: str
267 @param methodName: Name of the method to call. Gets proxied to
268 L{flumotion.worker.medium.WorkerMedium} 's
269 remote_(methodName)
270 @type methodName: str
271 """
272
273 self.debug('AdminAvatar.workerCallRemote(%r, %r)' % (
274 workerName, methodName))
275 workerAvatar = self.vishnu.workerHeaven.getAvatar(workerName)
276
277
278
279 try:
280 return workerAvatar.mindCallRemote(methodName, *args, **kwargs)
281 except Exception, e:
282 self.warning("exception on remote call: %s" %
283 log.getExceptionMessage(e))
284 return failure.Failure(errors.RemoteMethodError(methodName,
285 log.getExceptionMessage(e)))
286
287 - def perspective_getEntryByType(self, componentType, entryType):
288 """
289 Get the entry point for a piece of bundled code in a component by type.
290 @param componentType: the component
291 @type componentType: a string
292 @param entryType: location of the entry point
293 @type entryType: a string
294 Returns: a (filename, methodName) tuple, or raises::
295 - NoBundleError if the entry location does not exist
296 """
297 assert componentType is not None
298
299 self.debug('getting entry of type %s for component type %s',
300 entryType, componentType)
301
302 try:
303 componentRegistryEntry = registry.getRegistry().getComponent(
304 componentType)
305
306 entry = componentRegistryEntry.getEntryByType(entryType)
307 except KeyError:
308 self.warning("Could not find bundle for %s(%s)" % (
309 componentType, entryType))
310 raise errors.NoBundleError("entry type %s in component type %s" %
311 (entryType, componentType))
312
313 filename = os.path.join(componentRegistryEntry.base, entry.location)
314 self.debug('entry point is in file path %s and function %s' % (
315 filename, entry.function))
316 return (filename, entry.function)
317
318 - def perspective_getPlugEntry(self, plugType, entryType):
319 """
320 Get the entry point for a piece of bundled code in a plug by type.
321 @param plugType: the plug
322 @type plugType: a string
323 @param entryType: location of the entry point
324 @type entryType: a string
325 Returns: a (filename, methodName) tuple, or raises::
326 - NoBundleError if the entry location does not exist
327 """
328 assert plugType is not None
329
330 self.debug('getting entry of type %s for plug type %s',
331 entryType, plugType)
332
333 try:
334 plugRegistryEntry = registry.getRegistry().getPlug(plugType)
335 entry = plugRegistryEntry.getEntryByType(entryType)
336 except KeyError:
337 self.warning("Could not find bundle for %s(%s)" % (
338 plugType, entryType))
339 raise errors.NoBundleError("entry type %s in plug type %s" %
340 (entryType, plugType))
341
342 self.debug('entry point is in file path %s and function %s' % (
343 entry.location, entry.function))
344 return (entry.location, entry.function)
345
347 """
348 Get the configuration of the manager as an XML string.
349
350 @rtype: str
351 """
352 return self.vishnu.getConfiguration()
353
355 """
356 Remote method that gets the scenario of a given type.
357
358 @param scenarioType: the component
359 @type scenarioType: a string
360 Returns: a (filename, methodName) tuple, or raises::
361 - NoBundleError if the entry location does not exist
362 """
363 assert scenarioType is not None
364
365 self.debug('getting entry of type %s for scenario type %s',
366 entryType, scenarioType)
367
368 try:
369 scenarioRegistryEntry = registry.getRegistry().getScenarioByType(
370 scenarioType)
371
372 entry = scenarioRegistryEntry.getEntryByType(entryType)
373 except KeyError:
374 self.warning("Could not find bundle for %s(%s)" % (
375 scenarioType, entryType))
376 raise errors.NoBundleError("entry type %s in component type %s" %
377 (entryType, scenarioType))
378
379 filename = os.path.join(scenarioRegistryEntry.getBase(),
380 entry.getLocation())
381 self.debug('entry point is in file path %s and function %s' % (
382 filename, entry.function))
383
384 return (filename, entry.getFunction())
385
387 """
388 Get all the scenarios defined on the registry.
389
390 @rtype : List of L{IScenarioAssistantPlugin}
391 """
392 r = registry.getRegistry()
393 return r.getScenarios()
394
396 """Opens a file that the flow should be written to.
397
398 Note that the returned file object might be an existing file,
399 opened in append mode; if the loadConfiguration operation
400 succeeds, the file should first be truncated before writing.
401 """
402 self.vishnu.adminAction(self.remoteIdentity,
403 '_saveFlowFile', (), {})
404
405 def ensure_sane(name, extra=''):
406 if not re.match('^[a-zA-Z0-9_' + extra + '-]+$', name):
407 raise errors.ConfigError, \
408 'Invalid planet or saveAs name: %s' % name
409
410 ensure_sane(self.vishnu.configDir, '/')
411 ensure_sane(filename)
412 directory = os.path.join(self.vishnu.configDir, "flows")
413 self.debug('told to save flow as %s/%s.xml', directory, filename)
414 try:
415 makedirs(directory, 0770)
416 except OSError, e:
417 if e.errno != errno.EEXIST:
418 raise e
419 prev = os.umask(0007)
420 output = open(os.path.join(directory, filename + '.xml'), 'a')
421 os.umask(prev)
422 return output
423
425 """
426 Load the given XML configuration into the manager. If the
427 optional saveAs parameter is passed, the XML snippet will be
428 saved to disk in the manager's flows directory.
429
430 @param xml: the XML configuration snippet.
431 @type xml: str
432 @param saveAs: The name of a file to save the XML as.
433 @type saveAs: str
434 """
435
436 if saveAs:
437 output = self._saveFlowFile(saveAs)
438
439
440
441 registry.getRegistry().verify()
442
443 f = StringIO(xml)
444 res = self.vishnu.loadComponentConfigurationXML(f, self.remoteIdentity)
445 f.close()
446
447 if saveAs:
448
449 def success(res):
450 self.debug('loadConfiguration succeeded, writing flow to %r',
451 output)
452 output.truncate(0)
453 output.write(xml)
454 output.close()
455 return res
456
457 def failure(res):
458 self.debug('loadConfiguration failed, leaving %r as it was',
459 output)
460 output.close()
461 return res
462 res.addCallbacks(success, failure)
463
464 return res
465
466 - def perspective_loadComponent(self, componentType, componentId,
467 componentLabel, properties, workerName,
468 plugs=None, eaters=None,
469 isClockMaster=None, virtualFeeds=None):
470 """
471 Load a component into the manager configuration.
472 Returns a deferred that will be called with the component state.
473
474 @param componentType: The registered type of the component to be added
475 @type componentType: str
476 @param componentId: The identifier of the component to add,
477 should be created by the function
478 L{flumotion.common.common.componentId}
479 @type componentId: str
480 @param componentLabel: The human-readable label of the component.
481 if None, no label will be set.
482 @type componentLabel: str or None
483 @param properties: List of property name-value pairs.
484 See L{flumotion.common.config.buildPropertyDict}
485 @type properties: list of (str, object)
486 @param workerName: the name of the worker where the added
487 component should run.
488 @type workerName: str
489 @param plugs: List of plugs, as type-propertyList pairs.
490 See {flumotion.manager.config.buildPlugsSet}.
491 @type plugs: [(str, [(str, object)])]
492 @param eaters: List of (eater name, feed ID) pairs.
493 See L{flumotion.manager.config.buildEatersDict}
494 @type eaters: [(str, str)]
495 @param isClockMaster: True if the component to be added must be
496 a clock master. Passing False here means
497 that the manager will choose what
498 component, if any, will be clock master
499 for this flow.
500 @type isClockMaster: bool
501 @param virtualFeeds: List of (virtual feed, feeder name) pairs.
502 See L{flumotion.manager.config.buildVirtualFeeds}
503 @type virtualFeeds: [(str, str)]
504 """
505 return self.vishnu.loadComponent(self.remoteIdentity, componentType,
506 componentId, componentLabel,
507 properties, workerName,
508 plugs or [], eaters or [],
509 isClockMaster, virtualFeeds or [])
510
513
515 """Delete a component from the manager.
516
517 A component can only be deleted when it is sleeping or sad. It
518 is the caller's job to ensure this is the case; calling this
519 function on a running component will raise a ComponentBusyError.
520
521 @returns: a deferred that will fire when all listeners have been
522 notified of the component removal
523 """
524 return self.vishnu.deleteComponent(componentState)
525
528
531
534 """
535 Fetches the wizard entries which matches the parameters sent in
536
537 @param types: list of component types to fetch, is usually
538 something like ['video-producer'] or ['audio-encoder']
539 @type types: list of strings
540 @param provides: formats provided, eg ['jpeg', 'speex']
541 @type provides: list of strings
542 @param accepts: formats accepted, eg ['theora']
543 @type accepts: list of strings
544 @returns: L{componentui.WizardEntryState}
545 """
546
547 def extract(wizards):
548 for wizard in wizards:
549 if types is not None:
550 if wizard.type not in types:
551 continue
552 if provides is not None:
553 for format in wizard.provides:
554 if format.media_type in provides:
555 break
556 else:
557 continue
558 if accepts is not None:
559 for format in wizard.accepts:
560 if format.media_type in accepts:
561 break
562 else:
563 continue
564 yield wizard
565
566 retval = []
567 r = registry.getRegistry()
568 for component in r.getComponents():
569 retval += extract(component.wizards)
570 for plug in r.getPlugs():
571 retval += extract(plug.wizards)
572 del r
573
574 return retval
575
576 - def perspective_getComponentEntry(self, componentType):
577 """Fetches a ComponentRegistryEntry given a componentType
578 @param componentType: component type
579 @type componentType: string
580 @returns: the component
581 @rtype: L{ComponentRegistryEntry}
582 """
583 try:
584 componentRegistryEntry = registry.getRegistry().getComponent(
585 componentType)
586 except KeyError:
587 return None
588 return componentRegistryEntry
589
594
595
597 """
598 I interface between the Manager and administrative clients.
599 For each client I create an L{AdminAvatar} to handle requests.
600 I live in the manager.
601 """
602
603 logCategory = "admin-heaven"
604 implements(interfaces.IHeaven)
605 avatarClass = AdminAvatar
606