# -*- test-case-name: mv3d.test.service.test_assetserver -*- # Copyright (C) 2006-2012 Mortal Coil Games # See LICENSE for details. """ Interface and Code for Asset Servers """ import os from twisted.internet import defer from twisted.internet.defer import inlineCallbacks, returnValue, gatherResults from twisted.spread import pb from twisted.application.service import Service from twisted.python.log import deferr from zope.interface import implements #@UnresolvedImport import mv3d from mv3d.net.client import ServiceLoc from mv3d.net.security import requirePermissions, Securable from mv3d.util.conductor import parseInterfaceConfig, parsePermissionConfig from mv3d.util.classgen import ClassGenerator from mv3d.util.modifier import Modifiable from mv3d.server.service import checkServicePermissions, viewed from mv3d.server.iserver import IAssetService from mv3d.server.cluster import Cluster, WithPools, ViewWithPools from mv3d.net.ha import HighlyAvailable, RedundantPool, Locator, \ CopiedPoolMember from mv3d.util.persist import Reference, Integer, Text, Persistable from mv3d.server.persist import SQLiteStore from mv3d.util.guide import NotifierProperty, ChangeNotifier from mv3d.net.pb import Copyable class AssetServerError(Exception): """ Raised when there's an issue with the asset server """ class AssetGroup(Persistable, HighlyAvailable, Modifiable): """ The class to hold a whole assetgroup """ agid = Integer(autoSave=True, partialSave=True) name = Text(autoSave=True, partialSave=True) description = Text(autoSave=True, partialSave=True) cluster = None classModifiers = dict(web=[ ClassGenerator("mv3d.server.editor.AssetGroupEditor"), ClassGenerator("mv3d.server.editor.GroupAsset"), ClassGenerator("mv3d.server.editor.DeleteAssetGroup"), ]) def __init__(self, name=None, description=None, store=None): """ Set initial parameters The asset hash holds asset ids as the key and AssetRevisionSets as the value """ Persistable.__init__(self) HighlyAvailable.__init__(self) Modifiable.__init__(self) self.name = name self.description = description self.store = store def getID(self): """ returns my id """ return self.agid def setID(self, iid): """ Sets my id to i """ self.agid = iid def getName(self): """ return my name (str) """ return self.name def setName(self, name): """ Set my name to n (str) """ self.name = name def getDescription(self): """ return my description """ return self.description def setDescription(self, desc): """ Set the description to desc (str) """ self.description = desc def getAsset(self, conductor, aid): """ Return the asset specified by id. """ assert aid[0] == self.agid return self.cluster.getItem(aid).getItem(conductor) @inlineCallbacks def newAsset(self, conductor, cgen, storedData=None): """ Convenience function to create an asset """ poolID = self.cluster.mechanism.getLocationFor(-1) asvc = conductor.getLocalService(IAssetService) locations = asvc.getPool(poolID).getMembers() aid = yield self.cluster.newItem(conductor, poolID, locations) yield asvc.getPool(poolID).newItem(aid, cgen, storedData) asset = yield asvc.getAsset(aid) asset.parent = asvc returnValue(asset) @inlineCallbacks def remAsset(self, aid): """ Convenience function to remove an asset. """ self.parent.removeItem(aid[0], aid) class AssetGroupHolder(Cluster): """ Holds a realm """ _schemaVersion = 1 group = Reference(AssetGroup) groupID = Integer(autoSave=True, partialSave=True) shardType = RedundantPool setMechanism = False uid = None @inlineCallbacks def new(self, service): """ Initializes this cluster and installs it on the specified service. The service is assumed to have an addPool method to add new pools. The cluster itself is a sharded pool but it contains a special sharded pool per master shard that has a mapping of id -> (redundant) pool. It doesn't have to be a redundant pool, but that is likely to be the most common setup. The cluster also contains a typical redundant pool to store things that are common throughout the cluster. In the most basic case, that would just be an id dispenser. Other implementations of clusters may also hold a Realm object or AssetGroup. """ shardID = yield Cluster.new(self, service) self.iddisp.setParentID(self.groupID) returnValue(shardID) @inlineCallbacks def sync(self, conductor): """ Synchronize with an existing realm holder. Grab the id from the realm and get a pointer to it locally. """ yield Cluster.sync(self, conductor) master = self.getHighestPriority() agloc = yield master.pool.callRemote("getGroupInfo") realmSvc = yield self.getLocalService(conductor) localPool = realmSvc.getPool(self.sharedPoolID) self.group = localPool.getItem(agloc) self.group.cluster = self def onLoad(self): """ Called when we've been loaded from the datastore. Set up some attributes on our group. """ Cluster.onLoad(self) self.group.cluster = self def view_getGroupInfo(self, _client): """ Return the local ID of the asset group that this holds. """ return self.group.getID() @inlineCallbacks def view_getAssetGroup(self, client): """ Return the actual asset group object """ cgs = self.group.getClassGenerator() yield client.readyClass(cgs) returnValue(self.group) def view_getInfo(self, _client): """ Return some basic info about the group """ return (self.uid, self.group.name, self.group.description) @inlineCallbacks def addShard(self, conductor, poolID, location, range_=None): """ Make sure to set the """ poolID = yield Cluster.addShard(self, conductor, poolID, location, range_) asvc = yield self.getLocalService(conductor) asvc.getPool(poolID).datastore = self.datastore returnValue(poolID) def getItem(self, itemID, _active=True): """ Returns an instance of Locator to inform the user that the item exists in another pool. This pool does not directly store any items. """ if itemID == self.groupID: return self pool = self.guards[self.mechanism.getLocationFor(itemID)] return Locator(pool.locations, pool.poolID, itemID) class AssetServiceView(pb.Viewable, ViewWithPools): """ A service that deals with assets and asset groups """ def __init__(self, service): self.service = service def getProtocol(self): """ Return what protocol we implement """ return "pb" @checkServicePermissions("read") @viewed def view_getAssetGroup(self, client, aid): """ See getAssetGroup # TODO: check asset group permissions """ @checkServicePermissions("read") def view_getAsset(self, client, aid, _local=False): """ Just get the asset and return it except, we have to ready it first """ d = self.service.getAsset(aid) def gotAsset(a): d = client.readyClass(a.getClassGenerator()) return d.addCallback(lambda _: a) return d.addCallback(gotAsset) @checkServicePermissions("read") @viewed def view_getStatistics(self, client): """ Retrieves the statistics from the realm server """ @checkServicePermissions("read") @viewed def view_getConfig(self, client): """ Returns the config for this asset service. """ @checkServicePermissions("modify") @viewed def view_setStore(self, client, store): """ Sets the datastore this asset service uses """ @checkServicePermissions("modify") @viewed def view_setPublicLocation(self, client, location): """ Sets the public location of the asset service """ @checkServicePermissions("modify") @viewed def view_listAssetGroups(self, client): """ Return a list of [id, name] of all local asset groups """ @checkServicePermissions("modify") @viewed def view_listAssets(self, client, groupID, offset=0, limit=0): """ Return a list of [id, name] of assets in the specified group. Up to count will be returned if availble and specified (0 = unlimited). If specified, the asset ids will begin at startID. """ @checkServicePermissions("modify") @viewed def view_newAsset(self, client, agid, cgen, storedData=None): """ Create a new asset of type cgen optionally from the specified stored data. """ @checkServicePermissions("modify") @viewed def view_newMasterAssetGroup(self, client, cgen): """ Create a new asset group that we will be the master for. """ @checkServicePermissions("modify") @viewed def view_remAssetGroup(self, client, agid): """ Remove a local asset group from the cluster. """ @checkServicePermissions("modify") @viewed def view_updateAssetGroup(self, client, agid, name, description): """ Set name and description of an asset group """ @checkServicePermissions("read") @viewed def view_getGroupInfo(self, client, agid): """ Returns a dict with some attributes for the given group """ class AssetService(Service, Securable, Modifiable, WithPools): """ """ implements(IAssetService) store = None publicLocation = None classModifiers = dict( web=[ClassGenerator("mv3d.server.editor.Stats"), ClassGenerator("mv3d.server.editor.ViewAssetGroups")], guide=[ClassGenerator("mv3d.server.asset.AssetConfig")] ) def __init__(self): Securable.__init__(self) Modifiable.__init__(self) self.pools = {} self.assetCache = {} self.store = SQLiteStore() self.store.open() self.interfaces = {} self.config = {} def configure(self, nm, cf): """ Configure this interface """ self.config.update(dict(cf.items(nm))) if cf.has_option(nm, "store"): self.store = SQLiteStore() fname = cf.get(nm, "store") if os.path.abspath(fname) != fname: fname = os.path.join(self.parent.dataRoot, fname) dirname = os.path.dirname(fname) if not os.path.exists(dirname): os.makedirs(dirname) self.store.open(fname) if cf.has_option(nm, "grantPermissions"): parsePermissionConfig(cf.get(nm, "grantPermissions"), self, True) if cf.has_option(nm, "denyPermissions"): parsePermissionConfig(cf.get(nm, "denyPermissions"), self, False) if cf.has_option(nm, "publicLocation"): self.publicLocation = ServiceLoc(cf.get(nm, "publicLocation")) if cf.has_option(nm, "interfaces"): parseInterfaceConfig(cf.get(nm, "interfaces"), self) @inlineCallbacks def startService(self): """ Initialize the interface, start the coiterator, and join any realms needed. """ yield self.loadPools() for pool in self.pools.values(): if isinstance(pool, AssetGroupHolder): pool.group.parent = self @inlineCallbacks def stopService(self): """ Shut down the service. """ yield self.onShutdown() self.store.close() def hasAssetGroup(self, agid): """ Check if we have this asset group Returns True or False """ if self.pools.has_key(agid): return True return False def getDirectoryServers(self): """ return all of our directory servers """ return self.parent.directoryServices def getAssetGroup(self, agid, local=True): """ If the asset group is local, then return it. If local is False, then return a remote pool when it's not local. """ if local: return self.pools[agid] pool = self.pools.get(agid) if pool is not None: return pool return self.getRemoteAssetGroup(agid) @inlineCallbacks def getRemoteAssetGroup(self, agid): """ Returns a remote pool to the asset group holder """ dsvc = yield self.parent.getOneService(self.getDirectoryServers()) locator = yield dsvc.getItem("AssetGroups", agid) pool = yield locator.getItem(self.parent) returnValue(pool) @inlineCallbacks def getAsset(self, aid, local=False): """ Get an asset. If the group is local and the asset exists, return it. If the group isn't local, connect to a server that has it and get it from there before returning it. """ assert isinstance(aid, tuple), "aid is %r, not tuple" % aid if self.assetCache.has_key(aid): returnValue(self.assetCache[aid]) agid = aid[0] group = None try: group = self.getAssetGroup(agid) except KeyError: if local: raise if group is not None: locator = yield group.getItem(aid) asset = yield locator.getItem(self.parent) asset.parent = self returnValue(asset) # now we must search elsewhere... # directory servers should have "AssetGroup" # directories, so we query one now. dsvc = yield self.parent.getOneService(self.getDirectoryServers()) locator = yield dsvc.getItem("AssetGroups", agid) pool = yield locator.getItem(self.parent) locator = yield pool.pool.callRemote("getItem", aid) asset = yield locator.getItem(self.parent) asset.parent = self self.assetCache[asset.getID()] = asset returnValue(asset) def getAssets(self, assetIDs): """ Gets a list of asset ids. """ dfrds = [] for aid in assetIDs: dfrds.append(self.getAsset(aid)) return gatherResults(dfrds) @inlineCallbacks def acquireAsset(self, aid, downloadList=None, **keys): """ Download this asset. """ asset = yield self.getAsset(aid) yield asset.acquireAsset(downloadList, **keys) returnValue(asset) @inlineCallbacks def newMasterAssetGroup(self, cgen): """ Create a new asset group that we will be the master for. """ transactionID = self.store.startTransaction() try: group = yield defer.maybeDeferred(cgen.construct) dsvc = yield self.parent.getOneService(self.getDirectoryServers()) group.agid = yield dsvc.newID("AssetGroups") holder = AssetGroupHolder(self.publicLocation, self.store, group.agid) holder.groupID = group.agid holder.group = group group.cluster = holder self.addPool(holder) yield dsvc.setLocation("AssetGroups", group.agid, holder.uid, [self.publicLocation]) yield holder.new(self) rPool = self.getPool(holder.sharedPoolID) yield rPool.addNewItem(holder.group) holder.group.clusterID = holder.uid holder.group.grantPermission(u"reference", u"all") holder.group.grantPermission(u"read", u"all") group.parent = self except: self.store.rollbackTransaction(transactionID) raise self.store.commitTransaction(transactionID) returnValue(holder.group.agid) @inlineCallbacks def remAssetGroup(self, agid): """ Remove a local asset group from the cluster. """ transactionID = self.store.startTransaction() try: group = self.getAssetGroup(agid) dsvc = yield self.parent.getOneService(self.getDirectoryServers()) yield dsvc.removeItem("AssetGroups", agid) group.destroy(self.parent) del self.pools[agid] except: self.store.rollbackTransaction(transactionID) raise self.store.commitTransaction(transactionID) @inlineCallbacks def newAsset(self, agid, cgen, storedData=None): """ Create a new asset of type cgen optionally from the specified stored data. """ transactionID = self.store.startTransaction() try: group = self.getAssetGroup(agid) asset = yield group.group.newAsset(self.parent, cgen, storedData) except: self.store.rollbackTransaction(transactionID) raise self.store.commitTransaction(transactionID) returnValue(asset.aid) @inlineCallbacks def listAssetGroups(self, offset=0, limit=10): """ Return a list of [id, name] of all local asset groups """ dsvc = yield self.parent.getOneService(self.getDirectoryServers()) groupIDs = yield dsvc.listItems("AssetGroups", offset, limit) groups = [] for groupID in groupIDs: pool = self.pools.get(groupID) if pool is not None: groups.append((pool.uid, pool.group.name)) continue pool = yield self.getAssetGroup(groupID, False) groups.append((yield pool.pool.callRemote("getInfo"))) returnValue(groups) @inlineCallbacks def listAssets(self, groupID, offset=0, limit=0): """ Return a list of [id, name, type] of assets in the specified group. Up to limit will be returned if availble and specified (0 = unlimited). If specified, the asset ids will begin at offset. """ group = yield self.getAssetGroup(groupID, False) if isinstance(group, CopiedPoolMember): assetIDs = yield group.pool.callRemote("listItems", offset, limit) assets = [] for assetID in assetIDs: assets.append((yield self.getAsset(assetID))) else: assets = yield group.getItems(self.parent, offset, limit) result = [] for asset in assets: result.append((asset.aid, asset.name, asset.__class__.__name__)) returnValue(result) def getStatistics(self): """ Return some general statistics about this service as a dict """ stats = {} ags = AssetGroup.query(self.store) stats["Cache entries"] = len(self.assetCache) stats["Local groups"] = len(ags) #for ag in ags: # stats["Assets in %s (group %d)" % (ag.name, ag.agid)] = len( # ag.getAssets()) return stats def getGroupInfo(self, agid): """ Returns a dict with some attributes for the given group """ group = self.getAssetGroup(agid) return dict(agid=agid, name=group.group.name, description=group.group.description) @inlineCallbacks def getAssetInfo(self, aid): """ Returns a dict with some attributes for the given asset """ asset = yield self.getAsset(aid) returnValue(dict(aid=aid, name=asset.name, type=asset.__class__.__name__, category=asset.category, tags=asset.tags)) def updateAssetGroup(self, agid, name, description): """ Set name and description of an asset group """ group = self.getAssetGroup(agid).group group.name = name group.description = description def getConfig(self): """ Returns a dict with the config of this sim. """ return self.config def setStore(self, store): """ Sets the datastore this asset service uses """ if store != self.store.filename: self.store.close() self.store = SQLiteStore() self.store.open(store) def setPublicLocation(self, location): """ Sets the public location of the asset service """ self.publicLocation = ServiceLoc(location) @requirePermissions("reference") def getInterface(self, _client, protocol): """ Hand out public interfaces """ return self.interfaces[protocol] def isLocal(self): """ We are local so return true """ return True def getLocation(self): """ Return our local location """ return ServiceLoc("self/%s" % self.name) class AssetConfig(ChangeNotifier): """ Guide interface for changing asset properties """ store = NotifierProperty("_store", "store/asset") publicLocation = NotifierProperty("_publicLocation", "self/Asset") window = None def __init__(self, asset): self.asset = asset self.getConfig() self.doneDeferred = defer.Deferred() @inlineCallbacks def getConfig(self): """ Called on startup to load everything """ config = yield self.asset.getConfig() self.store = config.get("store", None) self.publicLocation = config.get("publicLocation", None) try: from mv3d.tools.guidewx import WxParser parser = WxParser() fname = os.path.abspath(os.path.join(os.path.dirname(mv3d.__file__), "..", "templates", "guide", "assetconfig.xml")) self.window = parser.load(fname, self) except: deferr() self.window.show() def cancel(self, _obj, _property): """ Called when the user hits the cancel button """ if self.doneDeferred is not None: self.doneDeferred.callback(False) self.doneDeferred = None self.window.destroy() @inlineCallbacks def submit(self, _obj, _property): """ Called when the user hits the submit button. """ if self.doneDeferred is not None: yield self.asset.setStore(self.store) yield self.asset.setPublicLocation(self.publicLocation) self.doneDeferred.callback(True) self.doneDeferred = None self.window.destroy()