[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Finish anydbm-wrapper refactoring; add tests for wrappe...
Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv557/lib/mixminion
Modified Files:
ClientMain.py Filestore.py Fragments.py test.py
Log Message:
Finish anydbm-wrapper refactoring; add tests for wrappers.
Index: ClientMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ClientMain.py,v
retrieving revision 1.105
retrieving revision 1.106
diff -u -d -r1.105 -r1.106
--- ClientMain.py 8 Aug 2003 21:40:41 -0000 1.105
+++ ClientMain.py 17 Aug 2003 21:09:56 -0000 1.106
@@ -9,7 +9,6 @@
__all__ = [ 'Address', 'ClientKeyring', 'ClientDirectory', 'MixminionClient',
'parsePath', ]
-import anydbm
import binascii
import errno
import cPickle
@@ -1177,87 +1176,10 @@
""")
class SURBLog(mixminion.Filestore.DBBase):
- def __init__(self, filename, forceClean=0):
- clientLock()
- mixminion.Filestore.DBBase.__init__(self, filename, "SURB log")
- try:
- lastCleaned = int(self.log['LAST_CLEANED'])
- except (KeyError, ValueError):
- lastCleaned = 0
-
- if lastCleaned < time.time()-24*60*60 or forceClean:
- self.clean()
-
- def findUnusedSURB(self, surbList, verbose=0, now=None):
- if now is None:
- now = time.time()
- nUsed = nExpired = nShortlived = 0
- result = None
- for surb in surbList:
- expiry = surb.timestamp
- timeLeft = expiry - now
- if self.isSURBUsed(surb):
- nUsed += 1
- elif timeLeft < 60:
- nExpired += 1
- elif timeLeft < 3*60*60:
- nShortlived += 1
- else:
- result = surb
- break
-
- if verbose:
- if nUsed:
- LOG.warn("Skipping %s used reply blocks", nUsed)
- if nExpired:
- LOG.warn("Skipping %s expired reply blocks", nExpired)
- if nShortlived:
- LOG.warn("Skipping %s soon-to-expire reply blocks", nShortlived)
-
- return result
-
- def close(self):
- mixminion.Filestore.DBBase.close(self)
- clientUnlock()
-
- def isSURBUsed(self, surb):
- return self.has_key[surb]
-
- def markSURBUsed(self, surb):
- self[surb] = surb.timestamp
-
- def clean(self, now=None):
- if now is None:
- now = time.time() + 60*60
- allHashes = self.log.keys()
- removed = []
- for hash in allHashes:
- if self._decodeVal(self.log[hash]) < now:
- removed.append(hash)
- del allHashes
- for hash in removed:
- del self.log[hash]
- self.log['LAST_CLEANED'] = str(int(now))
- self.sync()
-
- def _encodeKey(self, surb):
- return sha1(surb.pack())
- def _encodeVal(self, timestamp):
- return str(timestamp)
- def _decodeVal(self, timestamp):
- try:
- return int(timestamp)
- except ValueError:
- return 0
-
-class XSURBLog:
"""A SURBLog manipulates a database on disk to remember which SURBs we've
used, so we don't reuse them accidentally.
"""
#FFFF Using this feature should be optional.
-
- ##Fields
- # log -- a database, as returned by anydbm.open.
## Format:
# The database holds two kinds of keys:
# "LAST_CLEANED" -> an integer of the last time self.clean() was called.
@@ -1267,10 +1189,7 @@
forceClean is true, remove expired entries on startup.
"""
clientLock()
- parent, shortfn = os.path.split(filename)
- createPrivateDir(parent)
- LOG.debug("Opening SURB log")
- self.log = anydbm.open(filename, 'c')
+ mixminion.Filestore.DBBase.__init__(self, filename, "SURB log")
try:
lastCleaned = int(self.log['LAST_CLEANED'])
except (KeyError, ValueError):
@@ -1278,8 +1197,9 @@
if lastCleaned < time.time()-24*60*60 or forceClean:
self.clean()
+ self.sync()
- def findUnusedSURB(self, surbList, verbose=0,now=None):
+ def findUnusedSURB(self, surbList, verbose=0, now=None):
"""Given a list of ReplyBlock objects, find the first that is neither
expired, about to expire, or used in the past. Return None if
no such reply block exists."""
@@ -1287,7 +1207,7 @@
now = time.time()
nUsed = nExpired = nShortlived = 0
result = None
- for surb in surbList:
+ for surb in surbList:
expiry = surb.timestamp
timeLeft = expiry - now
if self.isSURBUsed(surb):
@@ -1306,28 +1226,22 @@
if nExpired:
LOG.warn("Skipping %s expired reply blocks", nExpired)
if nShortlived:
- LOG.warn("Skipping %s sooon-to-expire reply blocks", nShortlived)
+ LOG.warn("Skipping %s soon-to-expire reply blocks", nShortlived)
return result
def close(self):
"""Release resources associated with the surblog."""
- self.log.close()
+ mixminion.Filestore.DBBase.close(self)
clientUnlock()
def isSURBUsed(self, surb):
"""Return true iff the ReplyBlock object 'surb' is marked as used."""
- hash = binascii.b2a_hex(sha1(surb.pack()))
- try:
- _ = self.log[hash]
- return 1
- except KeyError:
- return 0
+ return self.has_key[surb]
def markSURBUsed(self, surb):
"""Mark the ReplyBlock object 'surb' as used."""
- hash = binascii.b2a_hex(sha1(surb.pack()))
- self.log[hash] = str(surb.timestamp)
+ self[surb] = surb.timestamp
def clean(self, now=None):
"""Remove all entries from this SURBLog the correspond to expired
@@ -1338,12 +1252,23 @@
allHashes = self.log.keys()
removed = []
for hash in allHashes:
- if self.log[hash] < now:
+ if self._decodeVal(self.log[hash]) < now:
removed.append(hash)
del allHashes
for hash in removed:
del self.log[hash]
self.log['LAST_CLEANED'] = str(int(now))
+ self.sync()
+
+ def _encodeKey(self, surb):
+ return sha1(surb.pack())
+ def _encodeVal(self, timestamp):
+ return str(timestamp)
+ def _decodeVal(self, timestamp):
+ try:
+ return int(timestamp)
+ except ValueError:
+ return 0
class ClientQueue:
"""A ClientQueue holds packets that have been scheduled for delivery
Index: Filestore.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Filestore.py,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -d -r1.5 -r1.6
--- Filestore.py 14 Aug 2003 19:37:24 -0000 1.5
+++ Filestore.py 17 Aug 2003 21:09:56 -0000 1.6
@@ -642,8 +642,8 @@
# to disk.
MAX_JOURNAL = 128
## Fields:
- # klen -- required length of encoded keys
- # vlen -- required length of encoded values
+ # klen -- required length of journal-encoded keys
+ # vlen -- required length of journal-encoded values
# vdflt -- If vlen is 0, default value used when reading journaled value
# from disk.
# journal -- map from journal-encoded key to journal-encoded value.
@@ -749,7 +749,12 @@
class BooleanJournaledDBBase(JournaledDBBase):
"""Specialization of JournaledDBBase that encodes a set of keys, mapping
- each key to the value '1'."""
+ each key to the value '1'.
+
+ (By default, constant-length string keys are accepted, and are
+ hex-encoded when stored in the database, in case the database
+ isn't 8-bit clean.)
+ """
def __init__(self, location, purpose, klen):
JournaledDBBase.__init__(self,location,purpose,klen,0,"1")
def _encodeKey(self, k):
Index: Fragments.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Fragments.py,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -d -r1.1 -r1.2
--- Fragments.py 14 Aug 2003 19:37:24 -0000 1.1
+++ Fragments.py 17 Aug 2003 21:09:56 -0000 1.2
@@ -182,6 +182,7 @@
class _FragmentDB(mixminion.Filestore.DBBase):
def __init__(self, location):
mixminion.Filestore.DBBase.__init__(self, location, "fragment")
+ self.sync()
def markStatus(self, msgid, status, today):
assert status in ("COMPLETED", "REJECTED")
if now is None:
Index: test.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/test.py,v
retrieving revision 1.143
retrieving revision 1.144
diff -u -d -r1.143 -r1.144
--- test.py 14 Aug 2003 19:37:25 -0000 1.143
+++ test.py 17 Aug 2003 21:09:56 -0000 1.144
@@ -2636,7 +2636,7 @@
self.failUnlessRaises(CryptoError, self.sp3.processMessage, m_x)
#----------------------------------------------------------------------
-# QUEUE
+# FILESTORE and QUEUE
class TestDeliveryQueue(DeliveryQueue):
def __init__(self,d,now=None):
@@ -2648,18 +2648,18 @@
def _deliverMessages(self, msgList):
self._msgs = msgList
-class QueueTests(TestCase):
+class FStoreTestBase(TestCase):
+ def unlink(self, fns):
+ for f in fns:
+ os.unlink(f)
+
+class FilestoreTests(FStoreTestBase):
def setUp(self):
mixminion.Common.installSIGCHLDHandler()
self.d1 = mix_mktemp("q1")
self.d2 = mix_mktemp("q2")
self.d3 = mix_mktemp("q3")
-
- def unlink(self, fns):
- for f in fns:
- os.unlink(f)
-
- def testCreateQueue(self):
+ def testCreateStore(self):
Store = mixminion.Filestore.MixedStore
# Nonexistent dir.
@@ -2693,7 +2693,7 @@
queue.removeAll(self.unlink)
- def testQueueOps(self):
+ def testStoreOps(self):
Store = mixminion.Filestore.MixedStore
queue1 = Store(self.d2, create=1)
@@ -2790,7 +2790,7 @@
queue1.cleanQueue(self.unlink)
queue2.cleanQueue(self.unlink)
- def testMetadataQueues(self):
+ def testMetadataStores(self):
d_d = mix_mktemp("q_md")
Store = mixminion.Filestore.StringMetadataStore
@@ -2823,6 +2823,153 @@
self.assert_(not os.path.exists(os.path.join(d_d, "rmvm_"+h2)))
self.assert_(not os.path.exists(os.path.join(d_d, "rmv_"+h2)))
+ def testDBWrappers(self):
+ d_parent = mix_mktemp("db")
+ loc = os.path.join(d_parent, "db0")
+ # Test unjournaled DB.
+ class NumDB(mixminion.Filestore.DBBase):
+ def __init__(self,loc):
+ mixminion.Filestore.DBBase.__init__(self, loc, "numbers")
+ def _encodeKey(self,k):
+ return str(k)
+ def _decodeKey(self,k):
+ return int(k)
+ def _encodeVal(self,v):
+ return v.upper()
+ def _decodeVal(self,v):
+ return v.lower()
+ db0 = NumDB(loc)
+ db0[3] = 'three'
+ db0[99] = 'Ninety-nine'
+ self.assertEquals(db0[3], 'three')
+ self.assertEquals(db0[99], 'ninety-nine')
+ self.assertRaises(KeyError, lambda db0=db0:db0[10])
+ self.assertRaises(KeyError, lambda db0=db0:db0["10"])
+ self.assertEquals(db0.log["3"], 'THREE')
+ db0[3] = 'iii'
+ self.assertEquals(db0.log["3"], 'III')
+ self.assertEquals(db0["3"], 'iii')
+ self.assertEquals(db0.get(10), None)
+ self.assertEquals(db0.get(10, "hi"), "hi")
+ self.assert_(db0.has_key(3))
+ self.assert_(not db0.has_key(" 3"))
+ self.assert_(not db0.has_key("10"))
+ db0[6]='six'
+ del db0[6]
+ self.assert_(not db0.has_key(6))
+ self.assert_(not db0.log.has_key("6"))
+ self.assertRaises(KeyError, lambda db0=db0:db0[6])
+ self.assertUnorderedEq([3, 99], db0.keys())
+ db0.close()
+ db0 = NumDB(loc)
+ self.assertUnorderedEq([3, 99], db0.keys())
+ self.assert_(not db0.has_key(6))
+ self.assertEquals(db0["3"], 'iii')
+ db0.sync()
+ db0[3] = 'three'
+ db0.close()
+
+ for fn in os.listdir(d_parent):
+ self.assertStartsWith(fn, "db0")
+ os.unlink(os.path.join(d_parent,fn))
+
+ # Test journaled DB.
+ loc = os.path.join(d_parent, "db1")
+ jloc = loc+"_jrnl"
+ class JNumDB(mixminion.Filestore.JournaledDBBase):
+ def __init__(self,loc):
+ mixminion.Filestore.JournaledDBBase.__init__(
+ self, loc, "numbers", 5, 20, None)
+ def _encodeKey(self,k): return str(k)
+ def _decodeKey(self,k): return int(k)
+ def _encodeVal(self,v): return v
+ def _decodeVal(self,v): return v
+ def _jEncodeKey(self,k): return "%05d"%k
+ def _jDecodeKey(self,k): return int(k)
+ def _jEncodeVal(self,v): return "%20s"%v
+ def _jDecodeVal(self,v): return v.strip()
+
+ db1 = JNumDB(loc)
+ db1[3] = "three"
+ db1[10] = "ten"
+ db1[8] = "eight"
+ self.assertEquals(readFile(jloc),
+ "00003 three"
+ "00010 ten"
+ "00008 eight")
+ self.assertUnorderedEq(db1.log.keys(), [])
+ self.assertUnorderedEq(db1.keys(), [10, 3, 8])
+ self.assertEquals(db1[3], "three")
+ self.assertEquals(db1[10], "ten")
+ self.assertEquals(db1[8], "eight")
+ self.assertRaises(KeyError, lambda db1=db1:db1[7])
+ self.assert_(db1.has_key(8))
+ self.assert_(not db1.has_key(5))
+ db1.close()
+ db1 = JNumDB(loc)
+ self.assertUnorderedEq(db1.log.keys(), ['10', '3', '8'])
+ self.assertUnorderedEq(db1.keys(), [10, 3, 8])
+ self.assertUnorderedEq(db1.journal.keys(), [])
+ self.assertEquals(readFile(jloc), "")
+ self.assertEquals(db1[3], "three")
+ self.assertEquals(db1[10], "ten")
+ self.assertEquals(db1[8], "eight")
+ self.assertRaises(KeyError, lambda db1=db1:db1[7])
+ self.assert_(db1.has_key(8))
+ self.assert_(not db1.has_key(5))
+ db1[9] = "nine"
+ self.assertEquals(readFile(jloc),
+ "00009 nine")
+ self.assertEquals(db1[9], "nine")
+ db1.sync()
+ self.assertEquals(db1[9], "nine")
+ self.assertEquals(db1[10], "ten")
+ db1.close()
+ for fn in os.listdir(d_parent):
+ self.assertStartsWith(fn, "db1")
+ os.unlink(os.path.join(d_parent,fn))
+
+ # Test journaled value-less DB
+ loc = os.path.join(d_parent, "db2")
+ jloc = loc+"_jrnl"
+ db2 = mixminion.Filestore.BooleanJournaledDBBase(loc, "numbers", 2)
+ db2["02"] = 1
+ db2["03"] = 1
+ db2["05"] = 1
+ db2["07"] = 1
+ db2["11"] = 1
+ db2["13"] = 1
+ self.assertEquals(1, db2["02"])
+ self.assertEquals(readFile(jloc), "020305071113")
+ db2.close()
+ db2 = mixminion.Filestore.BooleanJournaledDBBase(loc, "numbers", 2)
+ self.assertEquals(readFile(jloc), "")
+ self.assertEquals(1, db2["13"])
+ self.assert_(db2.has_key("07"))
+ self.assert_(not db2.has_key("08"))
+ db2.close()
+
+ # Test migration from unjournaled to journaled.
+ loc = os.path.join(d_parent, "db3")
+ jloc = loc+"_jrnl"
+ db3 = NumDB(loc)
+ db3[9] = "nine"
+ db3[10] = "ten"
+ db3[11] = "eleven"
+ db3.close()
+ db3 = JNumDB(loc)
+ self.assertEquals(readFile(jloc), "")
+ db3[12] = 'twelve'
+ self.assertEquals(readFile(jloc), "00012 twelve")
+ self.assertEquals(db3[10], "TEN")
+ self.assertEquals(db3[12], "twelve")
+ db3.close()
+ for fn in os.listdir(d_parent):
+ os.unlink(os.path.join(d_parent,fn))
+
+class QueueTests(FStoreTestBase):
+ def setUp(self):
+ mixminion.Common.installSIGCHLDHandler()
def testDeliveryQueues(self):
d_d = mix_mktemp("qd")
@@ -6185,7 +6332,7 @@
tc = loader.loadTestsFromTestCase
if 0:
- suite.addTest(tc(HashLogTests))
+ suite.addTest(tc(FilestoreTests))
return suite
suite.addTest(tc(MiscTests))
@@ -6199,6 +6346,7 @@
suite.addTest(tc(HashLogTests))
suite.addTest(tc(BuildMessageTests))
suite.addTest(tc(PacketHandlerTests))
+ suite.addTest(tc(FilestoreTests))
suite.addTest(tc(QueueTests))
suite.addTest(tc(EventStatsTests))
suite.addTest(tc(ModuleTests))