# -*- test-case-name: mv3d.test.net.test_net -*- # Copyright (C) 2006-2012 Mortal Coil Games # See LICENSE for details. """ """ import logging from time import time from twisted.spread import pb from twisted.internet import defer from twisted.python.util import mergeFunctionMetadata from mv3d.net.security import Securable from mv3d.util.persist import Integer, Text, MapAttribute, Pickled from twisted.internet.defer import gatherResults pb.MAX_BROKER_REFS = 65536 Priorities = {"Master":1, "Slave":2, "Cache":3} class CacheableError(Exception): """ Raised when there is a problem related to cacheable things """ def withClientUpdate(func): """ Update all clients of this cacheable by remotely calling this function on them. """ def inner(self, *args, **kw): result = func(self, *args, **kw) if hasattr(self, "updateAllClients"): self.updateAllClients(func.__name__, *args, **kw) else: self.updateAllSlaves(func.__name__, *args, **kw) return result mergeFunctionMetadata(func, inner) return inner def runOnMaster(func): """ Make sure this function gets run on the master version of this object """ def wrapper(self, *args, **keys): if hasattr(self, "setupCacheable"): self.setupCacheable() if self.priority != 1: if (hasattr(self.getManipulator(), "broker") and self.getManipulator().broker.disconnected): raise CacheableError("Disconnected") return self.getManipulator().callRemote( "manipulate", func.__name__, *args, **keys) result = func(self, *args, **keys) if hasattr(self, "updateAllClients"): self.updateAllClients(func.__name__, *args, **keys) else: self.updateAllSlaves(func.__name__, *args, **keys) return result mergeFunctionMetadata(func, wrapper) return wrapper def onlyOnSlave(func): """ Make sure this function gets run on a slave and not on the master """ def wrapper(self, *args, **kw): if hasattr(self, "getPriority"): assert self.getPriority() != 1, "This must only be called on a slave" return func(self, *args, **kw) mergeFunctionMetadata(func, wrapper) return wrapper def onlyOnMaster(func): """ Make sure this function gets run on the master and not on the slave """ def wrapper(self, *args, **kw): assert self.getPriority() == 1, "This must only be called on a master" return func(self, *args, **kw) mergeFunctionMetadata(func, wrapper) return wrapper def observable(func): """ This func runs only on the master but is also observable on clients. Returns two functions, the func and then the observer. Usage: def foo(bar): ... foo, observe_foo = observable(foo) """ return runOnMaster(func), onlyOnSlave(withClientUpdate(func)) class Manipulator(Securable, pb.Viewable): """ Manipulators provide a method of updating the master of a cacheable object. They should be phased out in favor of HighlyAvailable objects since they don't adhere to any sort of security policy. """ def __init__(self, parent): Securable.__init__(self) self.parent = parent def view_getManipulator(self, _con, nm): """ Returns a manipulator instance with the given name. """ return self.parent.getManipulator(nm)(self.parent) def view_manipulate(self, con, what, *a, **kw): """ This is a hack. It probably shouldn't exist. It allows you to take the manipulator and call any random method on the parent object. """ self.parent.checkPermissions(con, "modify", raiseException=True) return getattr(self.parent, what)(*a, **kw) class Cacheable(pb.Cacheable, pb.RemoteCache): """ A convenience Cacheable class """ copylevel = Integer(default=0) defaultmanipulator = Text(default="Manipulator") priority = Integer(default=Priorities["Master"]) lastuse = Integer() manipulatorclasses = MapAttribute(Text(), Pickled()) server = None from_connection = None manipulator = None exclude_from_state = None clients = None replacefunctions = None allowedState = None owner = None def __init__(self): self.exclude_from_state = ["clients", "broker", "from_connection", "server", "manipulator", "owner", "exclude_from_state", "replacefunctions", "datastore", "store"] self.lastuse = time() self.masterservers = [] self.slaveservers = [] self.replacefunctions = [] self.priority = 1 self.copylevel = 0 self.manipulatorclasses = {} self.defaultmanipulator = "Manipulator" from mv3d.util.classgen import ClassGenerator self.addManipulatorClass("Manipulator", ClassGenerator(classname="Manipulator", modulename="mv3d.net.pb")) self.clients = {} def setupCacheable(self): if self.exclude_from_state is None: self.exclude_from_state = ["clients", "broker", "from_connection", "server", "manipulator", "owner", "exclude_from_state", "replacefunctions", "datastore", "store"] self.lastuse = time() if self.replacefunctions is None: self.replacefunctions = [] self.priority = 1 if self.manipulatorclasses is None: self.manipulatorclasses = {} from mv3d.util.classgen import ClassGenerator self.addManipulatorClass("Manipulator", ClassGenerator(classname="Manipulator", modulename="mv3d.net.pb")) if self.clients is None: self.clients = {} def __getstate__(self): self.setupCacheable() state = self.__dict__.copy() def killit(k): if state.has_key(k): del state[k] for xclude in self.exclude_from_state: # if not xclude in self.persist_ok: killit(xclude) return state def __setstate__(self, state): self.__init__() for key, value in state.items(): setattr(self, key, value) #self.__dict__.update(state) def getManipulator(self, nm=None): """ returns the default manipulator if nm is None otherwise, return a new manipulator """ self.setupCacheable() if nm is None: if self.manipulator is None: self.manipulator = ( self.manipulatorclasses[self.defaultmanipulator].construct(self)) return self.manipulator if self.manipulatorclasses.has_key(nm): return self.manipulatorclasses[nm].construct(self) raise CacheableError("We don't have a %s manipulator in %s" % ( nm, self.__class__.__name__)) def addManipulatorClass(self, nm, m): """ Adds a manipulator class- nm is a string/name m is the manipulator class generator """ self.setupCacheable() self.manipulatorclasses[nm] = m if hasattr(self, "save"): self.queueSave(selectAttributes=["manipulatorclasses"]) def remManipulatorClass(self, nm): """ Try to remove a manipulator class named nm (str) """ self.setupCacheable() del self.manipulatorclasses[nm] if hasattr(self, "save"): self.queueSave(selectAttributes=["manipulatorclasses"]) def getDefaultManipulator(self): """ Get the name of the default manipulator (str) """ return self.defaultmanipulator def setDefaultManipulator(self, nm): """ Sets the name of the default manipulator (str) """ self.defaultmanipulator = nm if hasattr(self, "save"): self.queueSave(selectAttributes=["defaultmanipulator"]) setDefaultManipulator, observe_setDefaultManipulator = observable(setDefaultManipulator) def setOwner(self, ownerAccount): """ Sets the owner perspective """ if self.getPriority() != 1: return self.getManipulator().callRemote("manipulate", "setOwner", ownerAccount) self.owner = ownerAccount return self.updateAllClients("setOwner", ownerAccount) def observe_setOwner(self, ownerAccount): """ Sets the owner perspective """ self.owner = ownerAccount return self.updateAllClients("setOwner", ownerAccount) def getManipulatorClass(self, nm=None): """ Returns manipulator class named nm-- if not specified, returns the default manipulator class. """ self.setupCacheable() if nm is None: nm = self.defaultmanipulator return self.manipulatorclasses[nm] def getCopyLevel(self): """ Returns the amount of caches this object is in between here and the master server """ return self.copylevel def getParentConnection(self): """ returns the connection instance that we came in over. """ return self.from_connection def getServer(self): """ Returns the server we belong to """ return self.server def setServer(self, s): """ Set the server that we belong to """ self.server = s def setPriority(self, p): """ Sets whether this instance of the cache is a master, slave, or just a plain old cache p can be either an int or a str """ if isinstance(p, str): p = Priorities[p] self.priority = p if hasattr(self, "save"): self.queueSave(selectAttributes=["priority"]) def getPriority(self): """ Return the priority (int) that we are set to """ return self.priority def addExclude(self, *elist): """ Add an item to be excluded from transferring over the network """ self.setupCacheable() if isinstance(elist[0], list) or isinstance(elist[0], tuple): for e in elist[0]: if e not in self.exclude_from_state: self.exclude_from_state.append(e) return for e in elist: if e not in self.exclude_from_state: self.exclude_from_state.append(e) def remExclude(self, e): """ Remove a previously added item so that it will now be transferred over the network. """ self.setupCacheable() self.exclude_from_state.remove(e) def addMasterServer(self, s): """ Add a master server to this cache s should be a ServiceLoc instance Really, there should be only one master server """ self.setupCacheable() self.masterservers.append(s) addMasterServer, observe_addMasterServer = observable(addMasterServer) def remMasterServer(self, s): """ Remove a master server from the list. s is a ServiceLoc instance returns 1 if s in masterservers """ self.masterservers.remove(s) remMasterServer, observe_remMasterServer = observable(remMasterServer) def getMasterServers(self): """ returns the master server list """ return self.masterservers def addSlaveServer(self, s): """ Add s as a slave server of this cache. s is a ServiceLoc instance """ self.slaveservers.append(s) addSlaveServer, observe_addSlaveServer = observable(addSlaveServer) def remSlaveServer(self, s): """ Remove s from the list of slave servers. s is a ServiceLoc instance returns 1 if s in slaveservers """ self.slaveservers.remove(s) remSlaveServer, observe_remSlaveServer = observable(remSlaveServer) def getSlaveServers(self): """ Returns the list of slave servers """ return self.slaveservers def updateAllClients(self, remoteCall, *args, **kwargs): """ Call this to update all the clients that are caching us. rc, a, and kw are basically the same args you'd pass to callRemote, so rc=a string of the remote procedure to run This function also updates the lastuse variable. That is used to determine if this cache is needed. """ self.setupCacheable() dd = [] badc = [] for persp, client in self.clients.items(): if self.server is not None: self.server.cupdates += 1 try: dd.append(client.callRemote(remoteCall, *args, **kwargs)) except AttributeError: badc.append(persp) except pb.DeadReferenceError: badc.append(persp) for persp in badc: del self.clients[persp] if self.owner == persp: self.owner = None return defer.DeferredList(dd) def updateOtherClients(self, remoteCall, *args, **kwargs): """ Call this to update the clients that are caching us other then the owner client. remoteCall, args, and kwargs are basically the same args you'd pass to callRemote, so rc=a string of the remote procedure to run This function also updates the lastuse variable. That is used to determine if this cache is needed. """ self.setupCacheable() dd = [] badc = [] for persp, client in self.clients.items(): if persp.accountInfo == self.owner: continue if self.server is not None: self.server.cupdates += 1 try: dd.append(client.callRemote(remoteCall, *args, **kwargs)) except AttributeError: badc.append(persp) except pb.DeadReferenceError: badc.append(persp) for persp in badc: del self.clients[persp] return defer.DeferredList(dd) def updateOwnerClient(self, remoteCall, *args, **kwargs): """ Call this to update the registered owner client. remoteCall, args, and kwargs are basically the same args you'd pass to callRemote, so rc=a string of the remote procedure to run This function also updates the lastuse variable. That is used to determine if this cache is needed. """ if self.owner is None: return defer.succeed(None) self.setupCacheable() owners = [persp for persp in self.clients.keys() if persp.accountInfo == self.owner] try: return self.clients[owners[0]].callRemote(remoteCall, *args, **kwargs) except IndexError: pass # don't currently have this client. TODO: Security issue? except AttributeError: del self.clients[owners[0]] self.owner = None raise except pb.DeadReferenceError: del self.clients[owners[0]] self.owner = None raise def readyClassOnClients(self, cgen): """ Ready the specified class or classes specified by a class generator or a list of class generators on the clients. """ dlist = [] for client in self.clients.keys(): dlist.append(client.readyClass(cgen)) return gatherResults(dlist) def hasClients(self): """ Return 1 if there are attached clients """ self.setupCacheable() return len(self.clients) != 0 def getLastUse(self): """ Return the last usage time """ return self.lastuse def timeSinceLastUse(self): """ Return how much time has passed since the last use """ return time() - self.lastuse def canPrune(self): """ This function will return 1 if it believes this cache is not needed any more. It will only return 1 for Cache priority objects that have no clients. ToDo: It should also check the last use time. """ if self.priority != Priorities["Cache"]: return False if self.hasClients(): return False return True def countClients(self): """ Return the number of connected clients """ return len(self.clients) def getClients(self): """ return the list of clients (which are RemoteReferences """ return self.clients def addClient(self, persp, con): """ Add a client persp=perspective con=observer (in twisted speak) """ self.setupCacheable() self.clients[persp] = con def remClient(self, persp): """ Remote a client persp=perspective """ self.setupCacheable() try: del self.clients[persp] except KeyError: pass def hasLostMaster(self): """ You need to add a docstring here ! """ # if we are the master, then # everything is all good if self.priority == 1: return False if not hasattr(self.getManipulator(), "broker"): return False return self.broker.disconnected def getState(self): """ Taking into account the exclude list, this function grabs the __dict___ and culls excluded items before returning it. """ self.setupCacheable() if self.allowedState is None: # old style state = self.__dict__.copy() for r in self.exclude_from_state: if state.has_key(r): del state[r] else: state = dict(zip(self.allowedState, [getattr(self, attr) for attr in self.allowedState])) # increase the copy level, set the top server if needed # and set the priority down to cache. # note that the other end will need to manyally # select Slave as a priority level if it is desired. state["copylevel"] = self.copylevel + 1 state["priority"] = Priorities["Cache"] from mv3d.util.classgen import ClassGenerator if not isinstance(self, ClassGenerator): try: state["manipulator"] = self.manipulatorclasses[self.defaultmanipulator].construct(self) except KeyError: state["manipulator"] = None else: state["manipulator"] = None return state def setState(self, state): """ This simply updates __dict___ with state """ for key, value in state.items(): setattr(self, key, value) # self.__dict__.update(state) def getStateToCacheAndObserveFor(self, perspective, observer): """ Called when this object is sent over the network. First update the last use. Then Check to make sure the remote user has permissions to reference us. If not, return {} Else, add her as a client and return the state after adding a item that will give the other end a reference to the receiving connection """ assert perspective is not None, ( "Perspective of %s is None" % self.__class__.__name__) self.lastuse = time() if hasattr(self, "checkPermissions") and not self.checkPermissions( perspective, "reference"): if perspective != None: if hasattr(self, "perspective"): pp = self.perspective else: pp = "Unknown" perspective.parent.parent.log("Access denied to " "%s for reference of %s" % (pp, self.__class__.__name__), logging.ERROR) raise CacheableError("Permission denied sending %s!" % self.__class__.__name__) self.addClient(perspective, observer) st = self.getState() if hasattr(perspective, "controller"): st["cacheable_con"] = perspective.controller return st def setCopyableState(self, state): """ Called when this item is received over the netwokr. First, update the last use. Then Get the special cacheable_con item and grab the Connection we came in over and set that as from_connection. Then from that connection, get its parent and set that as server. Continue on to SetState """ self.lastuse = time() if (state.has_key("cacheable_con") and state["cacheable_con"] is not None): self.from_connection = state["cacheable_con"].object self.parent = self.from_connection.parent else: if hasattr(self.broker, "mv3d_server"): self.server = self.broker.mv3d_server g = self.setState(state) self.onCached() return g def onCached(self): """ Override on subclasses to be notified when a new cached copy is created. This happens on the recipient. """ def stoppedObserving(self, perspective, _observer): """ Called when a client is disconnecting from us for whatever reason. """ self.lastuse = time() self.remClient(perspective) def remoteMessageReceived(self, broker, message, args, kw): """ Override this method from pb.RemoteCache so that we can do a sanity check to make sure that the priority of this object >1 """ if self.priority == 1: raise CacheableError( "Attempted to issue commands to master copy of %s." % self.__class__.__name__) return pb.RemoteCache.remoteMessageReceived(self, broker, message, args, kw) class Copyable(pb.Copyable, pb.RemoteCopy): """ Defines an object that can be copied across the network via twisted.spread """ exclude_from_state = None allowedState = None isCopy = False def setupCopyable(self): """ Set this instance up if needed """ if self.exclude_from_state is None: self.exclude_from_state = ["broker"] def addExclude(self, *elist): """ Add an item to be excluded from transferring over the network """ self.setupCopyable() if isinstance(elist[0], list) or isinstance(elist[0], tuple): for e in elist[0]: if e not in self.exclude_from_state: self.exclude_from_state.append(e) return for e in elist: if e not in self.exclude_from_state: self.exclude_from_state.append(e) def remExclude(self, e): """ You need to add a docstring here ! """ self.setupCopyable() self.exclude_from_state.remove(e) def getStateToCopy(self): """ Get the state of this object excluding previously mentioned exclude items """ self.setupCopyable() if self.allowedState is None: state = self.__dict__.copy() for r in self.exclude_from_state: if state.has_key(r): del state[r] else: state = dict(zip(self.allowedState, [getattr(self, attrib) for attrib in self.allowedState])) return state def setCopyableState(self, state): """ Set the state of this object to what was sent over the wire """ try: self.__init__() except TypeError: pass for key, value in state.items(): setattr(self, key, value) self.isCopy = True self.onCopied() # self.__dict__.update(state) def onCopied(self): """ Called when the object is copied to another process. """