# -*- test-case-name: mv3d.test.service.test_overseer -*- # Copyright (C) 2010-2012 Mortal Coil Games # See LICENSE for details. """ Overseer and Subordinate services are for cluster management. They can handle starting and stopping multiple MV3D processes per server. They are also responsible for configuration of those services along with giving remote console access. @author: mike """ import sys import os from time import time from ConfigParser import ConfigParser try: import psutil except ImportError: psutil = None from zope.interface import implements #@UnresolvedImport from twisted.application.service import Service from twisted.internet import protocol, reactor from twisted.internet.defer import inlineCallbacks, returnValue, Deferred from twisted.internet.error import ProcessDone, ProcessTerminated from twisted.spread import pb from twisted.python import log import mv3d from mv3d.net.security import Securable, requirePermissions from mv3d.util.modifier import Modifiable from mv3d.util.iservice import IIterableService, IPBServer from mv3d.server.iserver import IOverseerService, ISubordinateService from mv3d.server.service import checkServicePermissions, viewed, \ viewedWithClient from mv3d.util.conductor import parsePermissionConfig, getHostIp from mv3d.net.client import ServiceLoc from twisted.conch.manhole import ManholeInterpreter from twisted.spread.pb import RemoteReference, DeadReferenceError from mv3d.server.worldgen import WorldGenerator from mv3d.util.profiler import theProfiler from mv3d.util.debugvis import StatVisualizer, RemoteStatVisualizer from mv3d.util.guide import ChangeNotifier from twisted.python.failure import Failure try: import multiprocessing cpuCount = multiprocessing.cpu_count except ImportError: # you aren't running >= 2.6 def cpuCount(): """ Detects the number of CPUs on a system. Cribbed from pp. """ # Linux, Unix and MacOS: if hasattr(os, "sysconf"): if os.sysconf_names.has_key("SC_NPROCESSORS_ONLN"): #@UndefinedVariable # Linux & Unix: ncpus = os.sysconf("SC_NPROCESSORS_ONLN") if isinstance(ncpus, int) and ncpus > 0: return ncpus else: # OSX: return int(os.popen2("sysctl -n hw.ncpu")[1].read()) # Windows: if os.environ.has_key("NUMBER_OF_PROCESSORS"): ncpus = int(os.environ["NUMBER_OF_PROCESSORS"]); if ncpus > 0: return ncpus return 1 # Default baseConfig = """ [%(name)s] name=%(name)s serviceNames=PBServer, Subordinate connectionFactories=PBClient, HttpsClient, HttpClient logLevel=10 grantPermissions=modify:admin directoryServices=%(directory)s dataRoot=%(dataRoot)s logService=%(logService)s delayedInitialization=true # If set, append to this log file each run. logFile=server-%(name)s.log profile=%(profile)s [PBServer] type=mv3d.server.network.PBServer interfaces=mv3d.server.network.PBServerView port=%(port)d authenticators=%(authenticators)s publish=Subordinate insecureLocalAccounts=%(localAccounts)s [PBClient] type=mv3d.net.client.PBConFactory defaultCredentials=%(defaultCredentials)s localAliases=pb://localhost:%(port)d, %(localAliases)s loginServices=%(login)s [HttpsClient] type=mv3d.net.client.JSONConFactory protocol=https [HttpClient] type=mv3d.net.client.JSONConFactory protocol=http [Subordinate] type=mv3d.server.overseer.SubordinateService interfaces=mv3d.server.overseer.SubordinateServiceView grantPermissions=all:admin svnExe=%(svnExe)s """ def fixEnvironment(): """ Importing some things can cause unicode items in your environment dict """ for key, value in os.environ.items(): os.environ[key] = str(value) class MV3DProcessProtocol(protocol.ProcessProtocol): """ Protocol for interacting with a MV3D server """ def __init__(self, overseer, name, config, printLog=False): self.overseer = overseer self.name = name self.config = config self.readyDeferred = Deferred() self.log = [] self.exitCalls = [] self.printLog = printLog def callOnExit(self, func, *args, **kwargs): """ Request a callback to be called when the process exits. """ self.exitCalls.append((func, args, kwargs)) def stopCallOnExit(self, func, *args, **kwargs): """ Remove a callback that would have been called when the process exited """ self.exitCalls.remove((func, args, kwargs)) def connectionMade(self): """ Called when the process is up and running """ self.transport.write(self.name + "\n") self.transport.write(self.config) self.transport.closeStdin() def processExited(self, status): """ The process has exited """ if self.readyDeferred is not None: self.readyDeferred.errback(status) self.readyDeferred = None for func, args, kwargs in self.exitCalls: func(status, *args, **kwargs) def outReceived(self, data): """ Data was received on stdout """ if self.readyDeferred is not None and self.readyDeferred is not None: if "twisted.spread.pb.PBServerFactory starting on" in data: self.readyDeferred.callback(True) self.readyDeferred = None elif "Starting factory 0.0: cpuUse = (cpuTot / totTime) * 100.0 else: cpuUse = 0.0 res["CPUUsage"] = "%.2f%%" % cpuUse res["IPS"] = self.parent.ips if self.parent is not None: res["services"] = len(self.parent.namedServices) if psutil is not None: proc = psutil.Process(os.getpid()) counters = proc.get_io_counters() res["diskIO"] = counters.read_bytes + counters.write_bytes res["procMemoryBytes"] = sum(proc.get_memory_info()) res["procMemoryPercent"] = proc.get_memory_percent() res["systemCPU"] = psutil.cpu_percent() res["systemRAMPercent"] = psutil.phymem_usage().percent res["systemSwapPercent"] = psutil.virtmem_usage().percent return res def getConfig(self): """ Return a dict of config values """ return self.config def getProcesses(self): """ Returns a dict of processes """ procs = self.processes.copy() sub = self.parent.getLocalService(ISubordinateService) procs["local"] = ChildServer(None, None, None, sub) return procs @requirePermissions("reference") def getInterface(self, _client, protocol): """ Hand out public interfaces """ return self.interfaces[protocol] def isLocal(self): """ We are local so return true """ return True class SubordinateServiceView(pb.Viewable): """ View of the subordinate service """ def __init__(self, service): self.service = service def getProtocol(self): """ Return what protocol we implement """ return "pb" @checkServicePermissions("modify") @viewed def view_startNewService(self, client, name, config): """ Start a new service on this server. """ @checkServicePermissions("modify") @viewed def view_shutdownService(self, client, name): """ Shut down an existing service on this server. """ @checkServicePermissions("modify") @viewed def view_addLogObserver(self, client, observer, logLevel=0): """ Add a log observer (remote reference) """ @checkServicePermissions("read") @viewed def view_removeLogObserver(self, client, observer): """ Remove a log observer. """ @checkServicePermissions("modify") @viewed def view_shellEval(self, client, line): """ The user has given us a line to evaluate """ @checkServicePermissions("modify") @viewed def view_svnUpdate(self, client, branch=None, revision="HEAD"): """ Update svn for this server """ @checkServicePermissions("read") @viewed def view_getStatistics(self, client): """ Get various info about this system """ @checkServicePermissions("read") @viewed def view_getStatVisualizer(self, client): """ Get the stat visualizer """ @checkServicePermissions("read") @viewedWithClient def view_getServices(self, client): """ Returns a dict of local services """ @checkServicePermissions("modify") @viewed def view_initializeWorld(self, client, options): """ Initialize basic world data on this server. Generally, only should be done on master servers for a cluster. """ @checkServicePermissions("read") @viewed def view_getProfile(self, client, resultCount, sortField, reverse): """ Returns some profiling information. """ @checkServicePermissions("modify") @viewed def view_startProfiling(self, client, filename): """ Starts profiling with the cProfiler """ @checkServicePermissions("modify") @viewed def view_stopProfiling(self, client): """ Stops profiling with cProfile and writes the results to a file. """ @checkServicePermissions("modify") @viewed def view_resetProfile(self, client): """ Resets the profile information. """ @checkServicePermissions("modify") @viewed def view_getConfig(self, client): """ Returns the current config as a dict """ @checkServicePermissions("modify") @viewed def view_markInitialized(self): """ Called when all services are started. """ class RemoteLogObserver(object): """ Handles passing logs to a remote listener """ def __init__(self, observer, logLevel=0): self.observer = observer self.logLevel = logLevel def emit(self, eventDict): """ Pass this on to the observer """ if eventDict.has_key("logLevel"): if eventDict["logLevel"] < self.logLevel: return eventDict = eventDict.copy() if eventDict.has_key("failure") and isinstance(eventDict["failure"], Failure): eventDict["failure"] = eventDict["failure"].getTraceback() if self.observer.broker.disconnected: log.removeObserver(self.emit) self.observer.callRemote("emit", eventDict) class SVNProcessProtocol(protocol.ProcessProtocol): """ Protocol for interacting with a MV3D server """ def __init__(self): self.exitCalls = [] self.log = "" self.errLog = "" def callOnExit(self, func, *args, **kwargs): """ Request a callback to be called when the process exits. """ self.exitCalls.append((func, args, kwargs)) def stopCallOnExit(self, func, *args, **kwargs): """ Remove a callback that would have been called when the process exited """ self.exitCalls.remove((func, args, kwargs)) def connectionMade(self): """ Called when the process is up and running """ def processExited(self, status): """ The process has exited """ for func, args, kwargs in self.exitCalls: func(status, *args, **kwargs) def outReceived(self, data): """ Data was received on stdout """ self.log += data def errReceived(self, data): """ Data was received on stderr """ self.log += data self.errLog += data class SubordinateService(Service, Securable, Modifiable): """ Slave service to the overseer service. A server will run one or the other. """ implements(ISubordinateService) interpreter = None cpuUse = 0 def __init__(self): Securable.__init__(self) Modifiable.__init__(self) self.logObservers = [] self.interfaces = {} self.svnExe = "svn" self.consoleOut = "" self.lastTime = time() self.lastCPUTime = sum(os.times()[:4]) self.config = {} def configure(self, name, cfg): """ Configure this service """ self.config.update(dict(cfg.items(name))) if cfg.has_option(name, "interfaces"): ii = cfg.get(name, "interfaces") cc = ii.strip().split(".")[-1] mm = ".".join(ii.strip().split(".")[:-1]) i = getattr(__import__(mm, globals(), locals(), [cc]), cc)(self) self.interfaces[i.getProtocol()] = i if cfg.has_option(name, "grantPermissions"): parsePermissionConfig(cfg.get(name, "grantPermissions"), self, True) if cfg.has_option(name, "denyPermissions"): parsePermissionConfig(cfg.get(name, "denyPermissions"), self, False) if cfg.has_option(name, "svnExe"): self.svnExe = cfg.get(name, "svnExe") def startService(self): """ Init the interpreter """ Service.startService(self) clocals = dict(conductor=self.parent) for name, service in self.parent.namedServices.items(): clocals[name] = service self.interpreter = ManholeInterpreter(self, clocals) self.interpreter.push("from mv3d.server.console import *") def addOutput(self, data, _async=False): """ Add output to the interpreter """ self.consoleOut += data def startNewService(self, name, config): """ Start a new service on this server. """ cfg = ConfigParser() cfg.add_section(name) for key, value in config.items(): cfg.set(name, key, value) stype = cfg.get(name, "type") smod = ".".join(stype.split(".")[:-1]) scls = stype.split(".")[-1] svc = getattr(__import__(smod, globals(), locals(), [scls]), scls)() svc.setName(name) svc.parent = self.parent svc.configure(name, cfg) svc.parent = None # HACK ? svc.setServiceParent(self.parent) if IIterableService.providedBy(svc): #@UndefinedVariable self.parent.iterServices.append(name) if cfg.has_option(name, "publishOn"): servers = cfg.get(name, "publishOn").split(",") for server in servers: server = server.strip() self.parent.getServiceNamed(server).publish(name) if self.interpreter is not None: self.interpreter.push("%s = conductor.getServiceNamed('%s')" % ( name, name)) def shutdownService(self, name): """ Shut down an existing service on this server. """ svc = self.parent.getServiceNamed(name) if svc.name in self.parent.iterServices: self.parent.iterServices.remove(svc) return self.parent.removeService(svc) def addLogObserver(self, observer, logLevel=0): """ Add a log observer (remote reference) """ if isinstance(observer, RemoteReference): logger = RemoteLogObserver(observer, logLevel) else: logger = observer log.addObserver(logger.emit) self.logObservers.append(logger) def removeLogObserver(self, observer): """ Remove a log observer. """ for logger in self.logObservers: if logger.observer == observer: log.removeObserver(logger.emit) self.logObservers.remove(logger) break else: raise ValueError("No such observer %s" % observer) def shellEval(self, line): """ The user has given us a line to evaluate """ _more = self.interpreter.push(line) result, self.consoleOut = self.consoleOut, "" return result @inlineCallbacks def svnUpdate(self, branch=None, revision="HEAD", extraArgs=None): """ Update svn for this server """ args = [self.svnExe] path = os.path.abspath(os.path.join(os.path.dirname(mv3d.__file__), "..")) + os.path.sep if extraArgs is not None: args = extraArgs + args args.append("--non-interactive") if branch is None: args.append("update") args.append("%s@%s" % (path, revision)) else: args.append("switch") args.append("%s@%s" % (branch, revision)) args.append(path) protocol = SVNProcessProtocol() dfrd = Deferred() protocol.callOnExit(dfrd.callback) fixEnvironment() if sys.platform == "win32": reactor.spawnProcess(protocol, self.svnExe, args) #@UndefinedVariable else: reactor.spawnProcess(protocol, self.svnExe, args, env=None) #@UndefinedVariable try: yield dfrd except ProcessDone: pass except ProcessTerminated: pass returnValue(protocol.log) def getStatistics(self): """ Return a dict of stats """ res = dict() curTime = time() cpuTime = sum(os.times()[:4]) cpuTot = cpuTime - self.lastCPUTime totTime = curTime - self.lastTime self.lastCPUTime = cpuTime self.lastTime = curTime if totTime > 0.0: self.cpuUse = (cpuTot / totTime) * 100.0 else: self.cpuUse = 0.0 res["CPUUsage"] = "%.2f%%" % self.cpuUse res["Platform"] = sys.platform res["cpuCount"] = cpuCount() res["IPS"] = self.parent.ips if self.parent is not None: res["services"] = len(self.parent.namedServices) if psutil is not None: proc = psutil.Process(os.getpid()) counters = proc.get_io_counters() res["diskIO"] = counters.read_bytes + counters.write_bytes res["procMemoryBytes"] = sum(proc.get_memory_info()) res["procMemoryPercent"] = proc.get_memory_percent() res["systemCPU"] = psutil.cpu_percent() res["systemRAMPercent"] = psutil.phymem_usage().percent res["systemSwapPercent"] = psutil.virtmem_usage().percent return res def getCPUUse(self): """ Gets the cpu usage """ return self.cpuUse def registerStats(self, stats): """ Register available stats with the visualizer """ stats.addStat("cpuTime", 0.3, self.getCPUUse) if psutil is not None: stats.addStat("systemCPU", 0.3, lambda: psutil.cpu_percent()) stats.addStat("systemRAMPercent", 0.3, lambda: psutil.phymem_usage().percent) stats.addStat("systemSwapPercent", 0.3, lambda: psutil.virtmem_usage().percent) proc = psutil.Process(os.getpid()) def getDiskIO(): counters = proc.get_io_counters() return counters.read_bytes + counters.write_bytes stats.addStat("diskIO", 0.3, getDiskIO) stats.addStat("procMemoryBytes", 0.3, lambda: sum(proc.get_memory_info())) stats.addStat("procMemoryPercent", 0.3, lambda: proc.get_memory_percent()) def getConfig(self): """ Returns the config for this service. """ return self.config def getServices(self, client=None): """ Returns a dict of local services """ client = client or dict(username="", groups=["admin"]) try: pbsvc = self.parent.getLocalService(IPBServer) except KeyError: pbsvc = None result = {} for name, service in self.parent.namedServices.items(): try: if pbsvc is not None and name not in pbsvc.services: continue svcClass = ".".join([service.__class__.__module__, service.__class__.__name__]) if pbsvc is not None: sloc = pbsvc.getLocation() else: sloc = ServiceLoc("self") sloc.path = name result[name] = svcClass, sloc except KeyError: continue if result[name] is None: del result[name] return result def initializeWorld(self, options): """ Initialize basic world data on this server. Generally, only should be done on master servers for a cluster. """ wgen = WorldGenerator(self.parent) wgen.giveInput(options) return wgen.produce() def getProfile(self, resultCount, sortField, reverse): """ Returns results from the realtime profiler. """ return theProfiler.top(resultCount, sortField, reverse) def startProfiling(self, filename): """ Starts profiling with the cProfiler """ self.parent.startProfiling(filename) def stopProfiling(self): """ Stops profiling with cProfile and writes the results to a file. """ self.parent.stopProfiling() def getStatVisualizer(self): """ Return a remote reference to the stat visualizer """ return self.parent.statVisualizer def resetProfile(self): """ Resets the profile information """ theProfiler.reset() def markInitialized(self): """ Called when all services are started. """ return self.parent.postInit() @requirePermissions("reference") def getInterface(self, _client, protocol): """ Hand out public interfaces """ return self.interfaces[protocol] def isLocal(self): """ We are local so return true """ return True