# -*- test-case-name: mv3d.test.service.test_persistservice -*- # Copyright (C) 2010-2012 Mortal Coil Games # See LICENSE for details. """ @author: mike """ import uuid import sqlite3 from time import time from twisted.internet.task import LoopingCall from twisted.internet.defer import inlineCallbacks, returnValue from zope.interface import Interface, implements #@UnresolvedImport from mv3d.util.persist import Persistable, UntypedReference, Text, IDTuple, List, \ AutoAttrib from mv3d.util.classgen import ClassGenerator import os from mv3d.util.math3d import Vector from tempfile import mkstemp from mv3d.server.iserver import IAssetService, IRealmService, ISimulationService from mv3d.util.profiler import timed from mv3d.util.enum import Enum class SaveScheduler(object): """ Helper object to coordinate saving persistables at a specific time interval """ saveLoop = None def __init__(self, store): self.store = store self.objs = [] def schedule(self, dTime): """ Start a recurring save schedule based on dTime (seconds) """ if self.saveLoop is None: self.saveLoop = LoopingCall(self.save) self.saveLoop.start(dTime) def stop(self): """ Stop saving automatically """ if self.saveLoop is not None: self.saveLoop.stop() def append(self, obj, partial=False, selectAttributes=None): """ Adds an additional object to the list that will be saved. If partial is True, only dirty attributes are saved unless selectAttributes is specified. If it is, then only selectAttributes will be saved. """ for sched in self.objs: if sched[0] == obj: self.objs.remove(sched) break self.objs.append((obj, partial, selectAttributes)) def remove(self, obj): """ Removes an item from automatic saving """ for sched in self.objs: if sched[0] == obj: self.objs.remove(sched) return raise ValueError("No such item!") def save(self): """ Actually save all the objects to the store. """ for obj, partial, selectAttributes in self.objs: obj.save(self.store, partial=partial, selectAttributes=selectAttributes) class FreezeCategories(Enum): """ Basic enum class """ assetGroup = 0 asset = 1 realm = 2 area = 3 object = 4 class FrozenItem(Persistable): """ Holds a reference to an item that was frozen. """ category = Text(autoSave=True, partialSave=True) itemID = IDTuple(autoSave=True, partialSave=True) item = UntypedReference(autoSave=True, partialSave=True) subItems = List(UntypedReference()) def __init__(self, category=None, itemID=None, item=None): Persistable.__init__(self) self.category = category self.itemID = itemID self.item = item class Freezer(object): """ Utility to import/export all types of MV3D data. It can be used to save worlds for version control, or to load multiple copies of areas for instances. """ def __init__(self, store): self.store = store def freezeObject(self, item, positionOffset=None): """ Freeze a single item. If specified, positionOffset will be added to the frozen item's position. """ frozen = FrozenItem(FreezeCategories.object, item.getID(), item) frozen.save(self.store) oldStore = item.store oldPosition = item.getPosition() if positionOffset is not None: item.setPosition(oldPosition + positionOffset) try: item.save(self.store) finally: if positionOffset is not None: item.setPosition(oldPosition) item.save(oldStore) return frozen @inlineCallbacks def thawObject(self, frozen, area, positionOffset=None): """ Thaw a single item and place it in the specified area. If positionOffset is specified, it will be added to the frozen item's position. Returns the item's new id. """ item = frozen.item item.containedby = [] handle, tmpFile = mkstemp() newStore = SQLiteStore() newStore.open(tmpFile) item.save(newStore) newStore.close() fil = os.fdopen(handle, "r") data = fil.read() fil.close() os.remove(tmpFile) pos = Vector(item.getPosition()) + (positionOffset or (0, 0, 0)) itemID = yield area.newItem(pos, ClassGenerator().setFrom(item), data) returnValue(itemID) def freezeAsset(self, asset): """ Freeze an asset. """ frozen = FrozenItem(FreezeCategories.asset, asset.getID(), asset) frozen.save(self.store) oldStore = asset.store try: asset.save(self.store) finally: asset.save(oldStore) return frozen @inlineCallbacks def thawAsset(self, frozen, group, conductor): """ Thaw a single asset and add it to the specified group. The asset's new id will be returned. """ asset = frozen.item handle, tmpFile = mkstemp() newStore = SQLiteStore() newStore.open(tmpFile) asset.save(newStore) newStore.close() fil = os.fdopen(handle, "r") data = fil.read() fil.close() os.remove(tmpFile) asset = yield group.newAsset(conductor, ClassGenerator().setFrom(asset), data) returnValue(asset.getID()) @inlineCallbacks def freezeArea(self, area, includeObjects=True, positionOffset=None): """ Freeze a whole area optionally including all the objects within it. If specified, all frozen objects will be translated by positionOffset. """ frozen = FrozenItem(FreezeCategories.area, area.getID(), area) frozen.save(self.store) oldStore = area.store oldItems = area.items if not includeObjects: area.items = [] oldShard = area.simShardId area.simShardId = None area.save(self.store) area.items = oldItems area.simShardId = oldShard if includeObjects: for itemID in area.items: item = yield area.sim.getItem(itemID) self.freezeObject(item, positionOffset) area.save(oldStore) returnValue(frozen) @inlineCallbacks def thawArea(self, frozen, realm, pool, sim, positionOffset=None): """ Thaw a whole area including any objects frozen within it. If specified, all frozen objects will be translated by positionOffset. """ area = frozen.item aid = yield realm.newItem(sim.parent, pool.uid, pool.getMembers()) area.uid = aid yield area.create(sim, pool.uid, area.center, area.size) pool.addMasterPool(sim.parent, ["self/Sim"], aid) yield pool.updateMechanism("addItem", aid, aid) items = area.items area.items = [] for itemID in items: frozenItem = FrozenItem.get(self.store, (FrozenItem.category == FreezeCategories.object) & (FrozenItem.itemID == itemID)) yield self.thawObject(frozenItem, area, positionOffset) returnValue(aid) @inlineCallbacks def freezeRealm(self, realm, includeAreas=True, positionOffset=None): """ Freeze an entire realm, optionally including all areas and objects. If specified, all frozen objects will be translated by positionOffset. """ frozen = FrozenItem(FreezeCategories.realm, realm.realm.rid, realm.realm) frozen.save(self.store) oldStore = realm.store realm.save(self.store) if includeAreas: for guard in realm.guards.values(): pool = guard.pool.pool # TODO: this will break with multiple servers! for simClusterGuard in pool.guards.values(): simPool = simClusterGuard.pool.pool for areaGuard in simPool.guards.values(): area = areaGuard.pool.pool yield self.freezeArea(area, positionOffset=positionOffset) realm.save(oldStore) returnValue(frozen) @inlineCallbacks def thawRealm(self, frozen, simCluster, conductor, positionOffset=None): """ Thaw an entire realm. If specified, all frozen objects will be translated by positionOffset. """ loadedRealm = frozen.item oldRealmID = loadedRealm.rid rsvc = conductor.getLocalService(IRealmService) sim = conductor.getLocalService(ISimulationService) rid = yield rsvc.newMasterRealm( None, loadedRealm) realm = yield rsvc.getRealm(rid) realmHolder = rsvc.getPool(realm.clusterID) areas = FrozenItem.query(self.store, FrozenItem.category == FreezeCategories.area) for frozenArea in areas: if frozenArea.itemID[0] == oldRealmID: yield self.thawArea(frozenArea, realmHolder, simCluster, sim, positionOffset) returnValue(rid) @inlineCallbacks def freezeAssetGroup(self, conductor, group, includeAssets=True): """ Freeze a whole AssetGroup optionally including all assets within it. """ frozen = FrozenItem(FreezeCategories.assetGroup, group.groupID, group) frozen.save(self.store) oldStore = group.store group.save(self.store) if includeAssets: items = yield group.getItems(conductor) for item in items: self.freezeAsset(item) group.save(oldStore) returnValue(frozen) @inlineCallbacks def thawAssetGroup(self, frozen, conductor): """ Thaw a whole AssetGroup including all assets frozen within it. """ loadedGroup = frozen.item asvc = conductor.getLocalService(IAssetService) grpid = yield asvc.newMasterAssetGroup(ClassGenerator( "mv3d.server.asset.AssetGroup")) group = yield asvc.getAssetGroup(grpid) group.group.name = loadedGroup.group.name group.group.description = loadedGroup.group.description assets = FrozenItem.query(self.store, FrozenItem.category == FreezeCategories.asset) for frozenAsset in assets: if frozenAsset.itemID[0] == loadedGroup.groupID: yield self.thawAsset(frozenAsset, group.group, conductor) returnValue(grpid) class IPersistStore(Interface): """ Interface for a datastore on the persist service """ def open(**kwargs): #@NoSelf """ Open the store. This will optionally create it if it doesn't exist. If the fileName parameters is not specified, then the store will be a memory only store. """ def registerSchema(typeName, version, attributeList, force=False): #@NoSelf """ Install new schema into the store. This must be run once on the store for each type you expect to store there. typeName must be unique. Version is an integer specifying the version number of the type. attributeList includes all the schema data to register, and force will force registration of that schema. """ def startTransaction(): #@NoSelf """ Begin a transaction on the store. This will return a transaction ID which can be passed to other methods in order to execute operations within the same transaction. """ def commitTransaction(transactionID): #@NoSelf """ Successfully complete a transaction. Call with the transaction ID that was given out in startTransaction. """ def rollbackTransaction(transactionID): #@NoSelf """ Revert any changes made during the specified transaction. The transaction is closed and the ID is no longer valid after this point. """ def new(typeName, data, transactionID=None): #@NoSelf """ Store a new item. typeName must match a type which has had its schema registered with this store. """ def update(typeName, parameters, data, transactionID=None): #@NoSelf """ Update an existing item in the store. Parameters specifies a query that will match one or more records to be updated. data includes what fields to update and does not necessarily have to include all fields in the item. See query for more information on parameters. """ def delete(typeName, parameters, specificVersion=None, #@NoSelf transactionID=None): """ Remove an item or items from the store. As with update, parameters specifies a query that will match one or more records which will be deleted. If this is not done in a transaction, it can not be undone. See query for more information on parameters. """ def query(typeName, parameters, fields=None, transactionID=None): #@NoSelf """ Executes a query to return 0 or more results. If fields is not specified, all fields are returned including a version number for the schema from which the result came from. When fields is specified, results are only returned from the most recent schema version. This is because fields in previous schema versions may not be the same. parameters is a possibly nested set of 3 length tuples. Each one contains (left, operator, right). For boolean operators like and/or, left and right should be another 3 length tuple. Example: (("name", "=", "mike"), " and ", ("age", ">", "20")) All items are expected to be in store format already. """ class SQLiteStore(object): """ Store that uses SQLite as a back end. """ implements(IPersistStore) store = None schemas = None queryLogFile = None#"querylog.txt" querylog = None printLog = False isOpen = False filename = None recordQueryTimes = False transactionCount = 0 def __init__(self): self.datatypeMap = dict(Text="text", Stringable="text", Integer="integer", Float="real", UUID="text", Pickled="text", Boolean="boolean") self.transactions = {} self.queryTimes = {} if self.queryLogFile is not None: self.querylog = open("querylog-%s.txt" % id(self), "w") def _getRegisteredSchemas(self): """ Grabs all the registered schemas and versions out of the database or initializes a new database if needed. """ self.schemas = [] try: self.schemas = self._execute( "select typeName, version from schemas") except sqlite3.OperationalError: #@UndefinedVariable self._initializeNewDB() def _initializeNewDB(self): """ Gets a new database ready for use. """ self._execute("create table schemas (typeName text, version integer)") if self.transactions: # unlikely we're in a transaction but possible transactionID = self.transactions.keys()[0] self.transactions[transactionID] = self.store.cursor() self.transactions[transactionID].execute("begin transaction") def _getHighestSchemaVersion(self, typeName): """ Returns the highest schema version that is installed for the specified type. """ highVersion = 0 for tname, ver in self.schemas: if tname == typeName and ver > highVersion: highVersion = ver return highVersion def _getAllSchemaVersions(self, typeName): """ Returns a list of all versions of schema that are installed for the specified type """ versions = [] for tname, ver in self.schemas: if tname == typeName: versions.append(ver) return versions def open(self, filename=":memory:"): """ Open the store. This will optionally create it if it doesn't exist. If the fileName parameters is not specified, then the store will be a memory only store. """ self.filename = filename self.store = sqlite3.connect(filename) if self.queryLogFile is not None: self.querylog.write("Opened file %s\n" % filename) self.store.isolation_level = None self.store.execute("pragma journal_mode = PERSIST") self._getRegisteredSchemas() self.isOpen = True def close(self): """ Close the store """ # TODO: this is a bit of a hack mostly for unit tests. if not self.isOpen: return AutoAttrib.saveAll(self) if self.queryLogFile is not None: self.querylog.write("Closed file %s\n" % self.filename) self.store.close() self.isOpen = False def configure(self, cfg, section): """ Configure and open the store based on the config and section within that config given. """ fileName = ":memory:" if cfg.has_option(section, "fileName"): fileName = cfg.get(section, "fileName") self.open(fileName) def registerSchema(self, typeName, version, attributeList, force=False, transactionID=None): """ Install new schema into the store. This must be run once on the store for each type you expect to store there. typeName must be unique. Version is an integer specifying the version number of the type. attributeList includes all the schema data to register, and force will force registration of that schema. """ if (typeName, version) in self.schemas and not force: return highVersion = self._getHighestSchemaVersion(typeName) if highVersion > 0: if highVersion > version and not force: raise ValueError( "Version %d already installed of %s when attempting to" " install version %d" % (highVersion, typeName, version)) self._execute("alter table %s rename to %s" % (typeName, typeName + "_%d" % highVersion), transactionID=transactionID) query = "create table %s" % typeName columns = [] indexes = [] for attrib in attributeList: columns.append('"%s" %s' % (attrib["name"], self.datatypeMap[attrib["type"]])) if attrib["index"]: indexes.append(attrib["name"]) query = "%s (%s)" % (query, ", ".join(columns)) self._execute(query, transactionID=transactionID) if indexes: for index in indexes: self._execute("create index %s%d_%s_index on %s (\"%s\")" % ( typeName, version, index, typeName, index), transactionID=transactionID) self._execute("insert into schemas (typeName, version) values " "('%s', %d)" % (typeName, version), transactionID=transactionID) self.schemas.append((typeName, version)) # sqlite sometimes breaks transactions on create table statements. if transactionID is None and self.transactions: transactionID = self.transactions.keys()[0] if transactionID is not None: curs = self.store.cursor() try: curs.execute("begin transaction") self.transactions[transactionID] = curs except sqlite3.OperationalError: pass # sometimes it doesn't break the transaction. def startTransaction(self): """ Begin a transaction on the store. This will return a transaction ID which can be passed to other methods in order to execute operations within the same transaction. Unfortunately, SQLite doesn't support multiple transactions """ self.transactionCount += 1 if self.transactions: return self.transactions.keys()[0] transactionID = uuid.uuid1() if self.printLog: print "begin transaction (%s)" % transactionID if self.queryLogFile: self.querylog.write("begin transaction (%s)\n" % transactionID) self.transactions[transactionID] = self.store.cursor() self.transactions[transactionID].execute("begin transaction") return transactionID def commitTransaction(self, transactionID): """ Successfully complete a transaction. Call with the transaction ID that was given out in startTransaction. """ self.transactionCount -= 1 if self.transactionCount <= 0: if self.printLog: print "commit transaction (%s)" % transactionID if self.queryLogFile: self.querylog.write("commit transaction (%s)\n" % transactionID) try: self.transactions[transactionID].execute("commit") except sqlite3.OperationalError, exc: if not "no transaction is active" in str(exc): raise del self.transactions[transactionID] def rollbackTransaction(self, transactionID): """ Revert any changes made during the specified transaction. The transaction is closed and the ID is no longer valid after this point. """ self.transactionCount = 0 self.transactions[transactionID].execute("rollback") del self.transactions[transactionID] if self.printLog: print "rollback transaction (%s)" % transactionID if self.queryLogFile: self.querylog.write("rollback transaction (%s)\n" % transactionID) def new(self, typeName, data, transactionID=None): """ Store a new item. typeName must match a type which has had its schema registered with this store. """ query = "insert into %s " % typeName query += "(%s) " % ",".join(['"%s"' % key for key in data.keys()]) query += "values (%s)" % ",".join(["?"] * len(data)) self._execute(query, data.values(), transactionID) def _generateQueryParameters(self, parameters, subs=None): """ Recursively transform a set of tuple based query parameters into an actual sql query. """ if subs is None: subs = [] left, comparison, right = parameters if isinstance(left, (tuple, list)): left, _ = self._generateQueryParameters(left, subs) if isinstance(right, (tuple, list)): right, _ = self._generateQueryParameters(right, subs) return "%s%s%s" % (left, comparison, right), subs else: subs.append(right) return '"%s"%s?' % (left, comparison), subs def update(self, typeName, parameters, data, transactionID=None): """ Update an existing item in the store. Parameters specifies a query that will match one or more records to be updated. data includes what fields to update and does not necessarily have to include all fields in the item. See query for more information on parameters. """ query = "update %s set " % typeName values = [] subs = [] for key, value in data.items(): values.append('"%s" = ?' % key) subs.append(value) parmString, _ = self._generateQueryParameters(parameters, subs) query += ", ".join(values) query += " where %s" % parmString self._execute(query, subs, transactionID) def delete(self, typeName, parameters, specificVersion=None, transactionID=None): """ Remove an item or items from the store. As with update, parameters specifies a query that will match one or more records which will be deleted. If this is not done in a transaction, it can not be undone. See query for more information on parameters. """ table = typeName if specificVersion is not None: table = "%s_%d" % (table, specificVersion) parmString, subs = self._generateQueryParameters(parameters) query = "delete from %s where %s" % (table, parmString) self._execute(query, subs, transactionID) def query(self, typeName, parameters, fields=None, transactionID=None, offset=0, limit= -1, order=None, count=False): """ Executes a query to return 0 or more results. If fields is not specified, all fields are returned including a version number for the schema from which the result came from. When fields is specified, results are only returned from the most recent schema version. This is because fields in previous schema versions may not be the same. parameters is a possibly nested set of 3 length tuples. Each one contains (left, operator, right). For boolean operators like and/or, left and right should be another 3 length tuple. Example: (("name", "=", "mike"), " and ", ("age", ">", "20")) All items are expected to be in store format already. """ highVersion = self._getHighestSchemaVersion(typeName) if count: versions = self._getAllSchemaVersions(typeName) fieldList = "count(*)" elif fields is None: versions = self._getAllSchemaVersions(typeName) fieldList = "*" else: versions = [highVersion] fieldList = ",".join(fields) result = [] for version in versions: table = typeName if version != highVersion: table += "_%d" % version query = "select %s from %s " % (fieldList, table) if parameters is not None: parms, subs = self._generateQueryParameters(parameters) query += "where %s " % parms else: subs = [] if order is not None: # else chaos! query += "order by %s %s " % order query += "limit %d " % limit if offset != 0: query += "offset %d " % offset if transactionID is None: cursor = self.store.cursor() else: cursor = self.transactions[transactionID] try: if self.printLog: print query, subs if self.querylog is not None: self.querylog.write("%s %r\n" % (query, subs)) cursor.execute(query, subs) except sqlite3.OperationalError: #@UndefinedVariable continue data = cursor.fetchall() desc = [col[0] for col in cursor.description] for row in data: result.append(dict(zip(desc, row))) result[-1]["_schemaVersion"] = version return result def _execute(self, query, subs=None, transactionID=None): """ Execute a bunch of SQL and return the results """ if self.querylog is not None: self.querylog.write("%s %r" % (query, subs)) if self.printLog: print query, subs #if self.recordQueryTimes: start = time() if transactionID is None: cursor = self.store.cursor() else: cursor = self.transactions[transactionID] if query == 'delete from None_None where "owner"=?': print "FOF" if subs is not None: cursor.execute(query, subs) else: cursor.execute(query) res = cursor.fetchall() if transactionID is None: self.store.commit() if self.recordQueryTimes: count, total = self.queryTimes.get(query, (0, 0.0)) self.queryTimes[query] = count + 1, total + (time() - start) if self.querylog is not None: self.querylog.write("in %.2f\n" % (time() - start)) return res def analyze(self, classes=None, fix=False): """ Check for errors and optionally fix them. ** WARNING ** Setting fix to True may cause data loss in the case of errors. Use with care! """ log = [] for cls in classes or []: log.extend(self._checkClass(cls, fix)) for schema, _version in self.schemas: try: owners = self._execute("select owningObject from %s" % schema) except sqlite3.OperationalError: #@UndefinedVariable continue for owner in owners: owner = owner[0] if owner is None: continue cpath, oid = owner.split(",") mname = ".".join(cpath.split(".")[:-1]) cname = cpath.split(".")[-1] mod = __import__(mname, fromlist=[cname]) cls = getattr(mod, cname) count = self._execute("select count(*) from %s where %s='%s'" % (cls._typeName, cls._idAttribute.schema["name"], oid))[0][0] if count == 0: log.append("Error: Orphaned Object %s %s" % (schema, owner)) if fix: self._execute("delete from %s where owningObject='%s'" % (schema, owner)) elif count > 1: log.append("Error: %d Extra Child Objects %s %s" % (count, schema, owner)) if fix: ids = self._execute( "select ROWID from %s where %s='%s'" % ( cls._typeName, cls._idAttribute.schema["name"], oid)) ids = [id[0] for id in ids] for rowid in ids[:-1]: self._execute("delete from %s where ROWID=%d" % ( cls._typeName, rowid)) return log def _checkClass(self, cls, fix): """ Check the consistency of a specific class """ log = [] count = cls.count(self) ids = self._execute("select %s from %s" % (cls._idAttribute.schema["name"], cls._typeName)) if len(set(ids)) < count: log.append("Found %d duplicate entries for %s" % ( count - len(set(ids)), cls._typeName)) if fix: dupes = ids[:] for iid in set(ids): dupes.remove(iid) for (dupe,) in dupes: rids = self._execute("select rowid from %s where %s='%s'" % ( cls._typeName, cls._idAttribute.schema["name"], dupe)) assert len(rids) > 1 self._execute("delete from %s where rowid=%s" % (cls._typeName, rids[-1][0])) for offset in range(count): try: _obj = cls.query(self, offset=offset, limit=1, useCache=False) except Exception, exc: iid = self._execute("select %s from %s limit 1 offset %d" % ( cls._idAttribute.schema["name"], cls._typeName, offset)) log.append("Error loading %s with id %s (%s)" % (cls._typeName, iid, exc)) if fix: # item is in cache obj = cls.query(self, offset=offset, limit=1) obj[0].delete() # self._execute("delete from %s where %s=?" % # cls._idAttribute.schema["name"], [iid]) return log def getTopQueryTimes(self, count=20): """ Returns a list of the top query times. """ times = [] for query, (cnt, ttime) in self.queryTimes.items(): times.append((ttime, ttime / float(cnt), cnt, query)) times.sort(reverse=True) return times[:count] class PersistService(object): """ Handles persisting things to an object store. This particular store uses sqlite under the hood. """ def __init__(self): self._stores = {} def mount(self, storeName, store): """ Mount the given store """ self._stores[storeName] = store def mountByConfig(self, cfg, name): """ Read the configuration to get the store type and then create it and open it with the config. """ type_ = cfg.get(name, "type") className = type_.split(".")[-1] moduleName = ".".join(type_.split(".")[:-1]) module = __import__(moduleName, fromlist=[className]) cls = getattr(module, className) store = cls() store.configure(cfg, name) self.mount(name, store) return store def unmount(self, storeName): """ Unmount a store """ del self._stores[storeName] def getStore(self, storeName): """ Returns the store with the given name. """ return self._stores[storeName]