# -*- test-case-name: mv3d.test.test_clientfaker -*- # Copyright (C) 2006-2012 Mortal Coil Games # See LICENSE for details. """ Tests of the player service and other classes in player.py """ from time import time from twisted.internet import reactor from twisted.internet import defer from twisted.internet.defer import inlineCallbacks from twisted.spread.pb import PBConnectionLost from zope.interface import implements #@UnresolvedImport from mv3d.util.conductor import Conductor from mv3d.phys.body import IVisualObjectView from mv3d.net.client import PBConFactory from mv3d.net.client import JSONConFactory, ServiceLoc from mv3d.net.pb import Manipulator, Cacheable from mv3d.test.server import WithRealServer from mv3d.util.persist import AutoAttrib from mv3d.client.sim import SimService from mv3d.client.asset import AssetService from mv3d.client.ui.irenderer import ISceneNode, IStaticMesh, IAnimatedMesh, \ ISky try: from mv3d.client.ui.ogre3d import OgreNode, OgreObject from mv3d.client.ui.ogre3d import CaelumSky from mv3d.client.player import PlayerClient from mv3d.client.ui import ogre3d noOgre = "" except ImportError: noOgre = "Ogre is not Available" class FakeNode: """ Fake a node in ogre """ def attachObject(self, o): pass def setPosition(self, p): pass def setOrientation(self, o): pass def setScale(self, o): pass def createChildSceneNode(self, _nm): return FakeNode() def getParent(self): return FakeNode() def removeChild(self, child): pass def addChild(self, child): pass def detachAllObjects(self, *a): pass class FakeRenderer: """ Fake the ogre renderer """ def __init__(self): self.sceneManager = self self.rootSceneNode = self self.camera = self self.root = self def createChildSceneNode(self, _nm): return FakeNode() def addNode(self, node): pass def setFog(self, *a): pass def getRootSceneNode(self): """ return it """ return self.rootSceneNode def getRendererClass(self, interface): """ Returns the specific class that implements the interface. """ if interface is ISceneNode: return OgreNode if interface is IStaticMesh: return OgreObject if interface is IAnimatedMesh: return OgreObject if interface is ISky: return CaelumSky raise ValueError("Unknown interface: %s" % interface) def getType(self): return None def imageToTexture(self, image, name, group): return self class FakeOgreObject: """ Make a fake object """ if not noOgre: implements(ogre3d.IOgreObject) def __init__(self, *_a): self.plane = None def create(self, *a): pass def createPlane(self, *a, **kw): pass def createManual(self, *a, **kw): pass def createFromMeshGen(self, *_a, **_kw): return defer.succeed(True) def getObject(self): return self def getName(self): return "BOO" class WaterPlaneVisualObjectView(IVisualObjectView, Cacheable): """ A view of water """ def __init__(self): IVisualObjectView.__init__(self) Cacheable.__init__(self) def getID(self): return self.iid def create(self, area, pos, rot): """ Create the graphical representation """ def destroy(self): """ Destroy the graphical representation """ self.node.finalize(1) def enterArea(self, area): """ We've entered a new area """ node = area.getNode() self.node.setParent(node) observe_enterArea = enterArea class FakeAssetClient(AssetService): """ We really shouldn't download anything """ def acquireAssets(self, *_a, **_kw): return defer.succeed([]) def acquireAsset(self, *_a, **_kw): return defer.succeed(True) class ClientFaker(Conductor): """ This class fakes a client without needing a display (or ogre) """ loopMoves = True currentMove = None nextMove = None def __init__(self, loginProtocol="http"): Conductor.__init__(self) self.sc = SimService() self.ac = FakeAssetClient() self.pc = PlayerClient() self.pbc = PBConFactory(self) self.hc = JSONConFactory(self) self.hc.protocol = loginProtocol self.sc.setServiceParent(self) self.ac.setServiceParent(self) self.pc.setServiceParent(self) self.addConnectionFactory(self.pbc) self.addConnectionFactory(self.hc) self.pc.renderer = FakeRenderer() self.moveQueue = [] def monkeyPatch(self): """ Monkeypatch ftw. These are necessary for running this against a normal server """ self.CaelumSky = ogre3d.CaelumSky self.OgreObject = ogre3d.OgreObject ogre3d.CaelumSky = FakeOgreObject ogre3d.OgreObject = FakeOgreObject from mv3d.client.view import realm from mv3d.client.view import visual realm.CaelumSky = FakeOgreObject self.WaterPlaneVisualObjectView = visual.WaterPlaneVisualObjectView visual.WaterPlaneVisualObjectView = WaterPlaneVisualObjectView def unMonkeyPatch(self): """ Put things back the way they were """ from mv3d.client.view import realm from mv3d.client.ui import ogre3d ogre3d.CaelumSky = self.CaelumSky ogre3d.OgreObject = self.OgreObject from mv3d.client.view import visual visual.WaterPlaneVisualObjectView = self.WaterPlaneVisualObjectView @inlineCallbacks def connect(self, login="http://localhost:8181/service", svc="pb://test:user@localhost:8282/Player", pcid=None): """ Connect to a player service """ # print login, service, pcid self.pcid = pcid if login is not None: if not isinstance(login, list): login = [ServiceLoc(login)] self.pbc.loginServices = login self.pc.loginservers = [svc] service = yield self.getService(ServiceLoc(svc)) if pcid is not None: self.pc.pc, agg = yield service.connectPC(pcid, self.sc.getNotifier( service)) self.pc.aggregators.append(agg) self.pc.pcid = pcid self.sc.views.append(service) pco = yield service.viewObjects([pcid]) yield self.sc.addItem(pco[0]) # camera = self.pc.getRenderer().camera # self.pc.camera = PointPhysical(self, self.pc.pcid, camera) # self.pc.camera.start() def do(self, action, *a, **kw): """ Perform an action """ # print self.pcid, "Doing", action return self.pc.do(action, *a, **kw) def getPosition(self): """ Returns the position of the client's PC """ pcob = self.sc.items[self.pc.pcid] return pcob.body.getPosition() def addMove(self, delay, action, *a, **kw): """ Add a move to the queue """ self.moveQueue.append((delay, action, a, kw)) def startQueue(self): """ Start running the move queue """ assert len(self.moveQueue) assert self.nextMove is None self.currentMove = -1 self.moveDeferred = defer.Deferred() def nextMove(): self.currentMove += 1 if self.currentMove == len(self.moveQueue): self.currentMove = 0 cur = self.moveQueue[self.currentMove] d = self.do(cur[1], *cur[2], **cur[3]) def done(_r): nxt = self.currentMove + 1 if nxt == len(self.moveQueue): if self.loopMoves: nxt = 0 else: self.nextMove = None self.moveDeferred.callback(True) return self.moveDeferred self.nextMove = reactor.callLater(#@UndefinedVariable self.moveQueue[nxt][0], nextMove) return self.moveDeferred return d.addCallback(done) return nextMove() def stopQueue(self): """ Stop running the queue """ if self.nextMove is not None and not self.nextMove.cancelled: self.nextMove.cancel() self.moveDeferred.callback(True) class FakePlayerManip(Manipulator): """ A fake player manipulator """ def view_test(self, _client, arg): """ """ self.parent.test = arg def view_spam(self, _client, arg): self.parent.spam = arg class ClientFakerTests(WithRealServer): """ Tests for the client faker """ if noOgre: skip = noOgre @inlineCallbacks def setUp(self): AutoAttrib.saveQueue = None yield WithRealServer.setUp(self) self.client = ClientFaker() self.client.monkeyPatch() self.realm.setPhysicsProperty("gravity", (0, 0, 0)) yield self.object.deactivate() self.pid = 0, 1 @inlineCallbacks def tearDown(self): """ todo: we shouldn't have to flush the errors. """ self.client.unMonkeyPatch() yield WithRealServer.tearDown(self) yield self.client.pbc.disconnectAll() dfr = defer.Deferred() reactor.callLater(0.5, dfr.callback, True) #@UndefinedVariable yield dfr self.flushLoggedErrors(PBConnectionLost) AutoAttrib.saveQueue = [] def test_connect(self): """ Test connecting to the server """ return self.client.connect() @inlineCallbacks def test_connectPC(self): """ Test connecting to a pc """ yield self.client.connect(pcid=self.pid) # d = defer.Deferred() # reactor.callLater(0.5, d.callback, True) # yield d # self.assertNoOgre() @inlineCallbacks def test_doSomething(self): """ Test actually performing an action """ yield self.client.connect(pcid=self.pid) yield self.client.do("test", 12345) self.assertEqual(self.object.test, 12345) # self.assertNoOgre() @inlineCallbacks def test_getPosition(self): """ Test getting the position of an object """ self.object.body.setPosition((10, 10, 10)) yield self.client.connect(pcid=self.pid) d = defer.Deferred() reactor.callLater(0.5, d.callback, True) #@UndefinedVariable yield d self.assertEqual(self.client.getPosition(), (10, 10, 10)) # self.assertNoOgre() @inlineCallbacks def test_moveQueue(self): """ Test adding a move queue to the client """ self.client.addMove(1, "test", 9999) self.client.addMove(0.5, "spam", 11) self.client.loopMoves = False yield self.client.connect(pcid=self.pid) start = time() yield self.client.startQueue() self.assertEqual(self.object.test, 9999) self.assertEqual(self.object.spam, 11) self.failUnless(time() - start >= 0.5) class LoadTest: """ Convenience function to load test a server """ def __init__(self, pcs, login="https://localhost:8080/service/", player="pb://mike:pass@localhost:1999/Player"): self.pcs = pcs self.login = login self.player = player self.clients = [ClientFaker("https") for _ in range(len(pcs))] self.moves = [(0.2, "Wave"), (0.2, "Walk", 1), (5, "Walk", -1), (0.5, "Turn", 1), (1, "Turn", 0)] @inlineCallbacks def start(self): """ Start the load test """ for num, client in enumerate(self.clients): print "Connecting", num yield client.connect(self.login, self.player, self.pcs[num]) for client in self.clients: for move in self.moves: client.addMove(move[0], move[1], *move[2:]) client.startQueue() def go(self): """ Start the reactor """ self.clients[0].monkeyPatch() from twisted.internet import reactor self.start() reactor.run() #@UndefinedVariable def stop(self): """ Stop the test """ for client in self.clients(): client.stopQueue()