[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Refactor database wrappers--anything we need in 3 place...
Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv3121/lib/mixminion
Modified Files:
test.py ClientMain.py Filestore.py
Log Message:
Refactor database wrappers--anything we need in 3 places needs to be defined in one
Index: test.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/test.py,v
retrieving revision 1.141
retrieving revision 1.142
diff -u -d -r1.141 -r1.142
--- test.py 24 Jul 2003 17:37:16 -0000 1.141
+++ test.py 8 Aug 2003 21:40:40 -0000 1.142
@@ -1049,6 +1049,15 @@
self.assertEquals(left+right, lioness_encrypt(plain,key))
self.assertEquals(key, Keyset("ABCDE"*4).getLionessKeys("foo"))
+ u = "Hello world"*2
+ w = whiten(u)
+ self.assertNotEquals(w, u)
+ self.assertEquals(unwhiten(w), u)
+ u = "xyzprdlty"*100
+ w = whiten(u)
+ self.assertNotEquals(w, u)
+ self.assertEquals(unwhiten(w), u)
+
def test_bear(self):
enc = bear_encrypt
dec = bear_decrypt
@@ -1341,8 +1350,8 @@
## Now, for the fragment payloads.
msgID = "This is a message123"
assert len(msgID) == 20
- contents = contents[:28*1024 - 46]
- frag_payload_1 = "\x80\x02"+hash+msgID+"\x00\x01\x00\x00"+contents
+ contents = contents[:28*1024 - 47]
+ frag_payload_1 = "\x80\x00\x02"+hash+msgID+"\x00\x01\x00\x00"+contents
frag_payload_2 = frag_payload_1[:-38] # efwd overhead
p1 = parsePayload(frag_payload_1)
p2 = parsePayload(frag_payload_2)
@@ -1371,11 +1380,11 @@
self.failUnlessRaises(ParseError,parsePayload,frag_payload_2[:-1])
# Impossible message sizes
- min_payload_1 = "\x80\x02"+hash+msgID+"\x00\x00\x6F\xD3"+contents
- bad_payload_1 = "\x80\x02"+hash+msgID+"\x00\x00\x6F\xD2"+contents
- min_payload_2 = "\x80\x02"+hash+msgID+"\x00\x00\x6F\xAD"+contents[:-38]
- bad_payload_2 = "\x80\x02"+hash+msgID+"\x00\x00\x6F\xAC"+contents[:-38]
- min_payload_3 = "\x80\x02"+hash+msgID+"\x00\x00\x6F\xD2"+contents[:-38]
+ min_payload_1 = "\x80\x00\x02"+hash+msgID+"\x00\x00\x6F\xD2"+contents
+ bad_payload_1 = "\x80\x00\x02"+hash+msgID+"\x00\x00\x6F\xD1"+contents
+ min_payload_2 = "\x80\x00\x02"+hash+msgID+"\x00\x00\x6F\xAC"+contents[:-38]
+ bad_payload_2 = "\x80\x00\x02"+hash+msgID+"\x00\x00\x6F\xAB"+contents[:-38]
+ min_payload_3 = "\x80\x00\x02"+hash+msgID+"\x00\x00\x6F\xD1"+contents[:-38]
parsePayload(min_payload_1)
parsePayload(min_payload_2)
parsePayload(min_payload_3)
@@ -1490,16 +1499,13 @@
# Make sure that an empty hash contains nothing, including NUL strings
# and high-ascii strings.
- notseen("a")
- notseen("a*20")
- notseen("\000"*10)
- notseen("\000")
- notseen("\277"*10)
+ notseen("a"*20)
+ notseen("\000"*20)
+ notseen("\277"*20)
# Log a value, and make sure that only that value is now in the log
log("a"*20)
- notseen("a*10")
- notseen("\000"*10)
- notseen("b")
+ notseen("\000"*20)
+ notseen("b"*20)
seen("a"*20)
# Try a second value; make sure both values are now there.
@@ -1510,7 +1516,9 @@
# Try logging a string of NULs
log("\000"*20)
seen("\000"*20)
- notseen("\000"*10)
+ notseen("\001"*20)
+ notseen(("\000"*19)+"\001")
+ notseen("\001"+("\000"*19))
# Try double-logging.
log("\000"*20)
@@ -1523,6 +1531,9 @@
# And a nice plain ascii string
log("abcde"*4)
seen("abcde"*4)
+ seen("a"*20)
+ h[0].sync()
+ seen("a"*20)
# Now reopen the log, and make sure it has all its original contents.
h[0].close()
@@ -1533,8 +1544,7 @@
seen("abcde"*4)
seen("\000"*20)
# (and no other contents)
- notseen(" ")
- notseen("\000"*5)
+ notseen(" "*20)
notseen("\001"*20)
# Now add more, close again, and see if our latest adddition went in.
@@ -6173,7 +6183,7 @@
tc = loader.loadTestsFromTestCase
if 0:
- suite.addTest(tc(QueueTests))
+ suite.addTest(tc(HashLogTests))
return suite
suite.addTest(tc(MiscTests))
Index: ClientMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ClientMain.py,v
retrieving revision 1.104
retrieving revision 1.105
diff -u -d -r1.104 -r1.105
--- ClientMain.py 24 Jul 2003 18:01:29 -0000 1.104
+++ ClientMain.py 8 Aug 2003 21:40:41 -0000 1.105
@@ -1176,8 +1176,81 @@
""")
+class SURBLog(mixminion.Filestore.DBBase):
+ def __init__(self, filename, forceClean=0):
+ clientLock()
+ mixminion.Filestore.DBBase.__init__(self, filename, "SURB log")
+ try:
+ lastCleaned = int(self.log['LAST_CLEANED'])
+ except (KeyError, ValueError):
+ lastCleaned = 0
-class SURBLog:
+ if lastCleaned < time.time()-24*60*60 or forceClean:
+ self.clean()
+
+ def findUnusedSURB(self, surbList, verbose=0, now=None):
+ if now is None:
+ now = time.time()
+ nUsed = nExpired = nShortlived = 0
+ result = None
+ for surb in surbList:
+ expiry = surb.timestamp
+ timeLeft = expiry - now
+ if self.isSURBUsed(surb):
+ nUsed += 1
+ elif timeLeft < 60:
+ nExpired += 1
+ elif timeLeft < 3*60*60:
+ nShortlived += 1
+ else:
+ result = surb
+ break
+
+ if verbose:
+ if nUsed:
+ LOG.warn("Skipping %s used reply blocks", nUsed)
+ if nExpired:
+ LOG.warn("Skipping %s expired reply blocks", nExpired)
+ if nShortlived:
+ LOG.warn("Skipping %s soon-to-expire reply blocks", nShortlived)
+
+ return result
+
+ def close(self):
+ mixminion.Filestore.DBBase.close(self)
+ clientUnlock()
+
+ def isSURBUsed(self, surb):
+ return self.has_key[surb]
+
+ def markSURBUsed(self, surb):
+ self[surb] = surb.timestamp
+
+ def clean(self, now=None):
+ if now is None:
+ now = time.time() + 60*60
+ allHashes = self.log.keys()
+ removed = []
+ for hash in allHashes:
+ if self._decodeVal(self.log[hash]) < now:
+ removed.append(hash)
+ del allHashes
+ for hash in removed:
+ del self.log[hash]
+ self.log['LAST_CLEANED'] = str(int(now))
+ self.sync()
+
+ def _encodeKey(self, surb):
+ return sha1(surb.pack())
+ def _encodeVal(self, timestamp):
+ return str(timestamp)
+ def _decodeVal(self, timestamp):
+ try:
+ return int(timestamp)
+ except ValueError:
+ return 0
+
+class XSURBLog:
"""A SURBLog manipulates a database on disk to remember which SURBs we've
used, so we don't reuse them accidentally.
"""
@@ -1203,7 +1276,6 @@
except (KeyError, ValueError):
lastCleaned = 0
- forceClean = 1
if lastCleaned < time.time()-24*60*60 or forceClean:
self.clean()
Index: Filestore.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Filestore.py,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -d -r1.3 -r1.4
--- Filestore.py 24 Jul 2003 18:01:29 -0000 1.3
+++ Filestore.py 8 Aug 2003 21:40:42 -0000 1.4
@@ -4,7 +4,8 @@
"""mixminion.Filestore
Common code for directory-based, security conscious, threadsafe
- unordered file stores.
+ unordered file stores. Also contains common code for journalable
+ DB-backed threadsafe stores.
"""
# Formerly, this was all in mixminion.server.ServerQueue. But
@@ -12,14 +13,17 @@
# queues, and we needed another for fragment stores anyway. So it was
# time to refactor the common code.
+import anydbm
+import dumbdbm
import os
+import errno
import time
import stat
import cPickle
import threading
from mixminion.Common import MixFatalError, secureDelete, LOG, \
- createPrivateDir, readFile
+ createPrivateDir, readFile, tryUnlink
from mixminion.Crypto import getCommonPRNG
__all__ = [ "StringStore", "StringMetadataStore",
@@ -416,5 +420,187 @@
BaseMetadataStore.__init__(self, location, create, scrub)
StringStoreMixin.__init__(self)
ObjectStoreMixin.__init__(self)
+
+# ======================================================================
+# Database wrappers
+
+class DBBase:
+ # _lock
+ def __init__(self, filename, purpose=""):
+ self._lock = threading.RLock()
+ self.filename = filename
+ parent = os.path.split(filename)[0]
+ createPrivateDir(parent)
+
+ try:
+ st = os.stat(filename)
+ except OSError, e:
+ if e.errno != errno.ENOENT:
+ raise
+ st = None
+
+ if st and st[stat.ST_SIZE] == 0:
+ LOG.warn("Half-created database %s found; cleaning up.", filename)
+ tryUnlink(filename)
+
+ LOG.debug("Opening %s database at %s", purpose, filename)
+ self.log = anydbm.open(filename, 'c')
+ if not hasattr(self.log, 'sync'):
+ if hasattr(self.log, '_commit'):
+ # Workaround for dumbdbm to allow syncing. (Standard in
+ # Python 2.3.)
+ self.log.sync = self.log._commit
+ else:
+ # Otherwise, force a no-op sync method.
+ self.log.sync = lambda : None
+
+ if isinstance(self.log, dumbdbm._Database):
+ LOG.warn("Warning: using a flat file for %s database", purpose)
+
+ # Subclasses may want to check whether this is the right database,
+ # flush the journal, and so on.
+
+ def _encodeKey(self, k):
+ return k
+ def _encodeVal(self, v):
+ return v
+ def _decodeVal(self, v):
+ return v
+
+ def has_key(self, k):
+ try:
+ _ = self[k]
+ return 1
+ except KeyError:
+ return 0
+
+ def __getitem__(self, k):
+ return self.getItem(k)
+
+ def get(self, k, default=None):
+ try:
+ return self[k]
+ except KeyError:
+ return default
+
+ def __setitem__(self, k, v):
+ self.setItem(k, v)
+
+ def getItem(self, k):
+ try:
+ self._lock.acquire()
+ return self._decodeVal(self.log[self._encodeKey(k)])
+ finally:
+ self._lock.release()
+
+ def setItem(self, k, v):
+ self._lock.acquire()
+ try:
+ self.log[self._encodeKey(k)] = self._encodeVal(v)
+ finally:
+ self._lock.release()
+ def sync(self):
+ self._lock.acquire()
+ try:
+ self.log.sync()
+ finally:
+ self._lock.release()
+
+ def close(self):
+ self._lock.acquire()
+ try:
+ self.log.close()
+ self.log = None
+ finally:
+ self._lock.release()
+
+_JOURNAL_OPEN_FLAGS = os.O_WRONLY|os.O_CREAT|getattr(os,'O_SYNC',0)|getattr(os,'O_BINARY',0)
+
+class JournaledDBBase(DBBase):
+ MAX_JOURNAL = 128
+ def __init__(self, location, purpose, klen, vlen, vdflt):
+ DBBase.__init__(self, location, purpose)
+
+ self.klen = klen
+ self.vlen = vlen
+ self.vdefault = vdflt
+
+ self.journalFileName = location+"_jrnl"
+ self.journal = {}
+ if os.path.exists(self.journalFileName):
+ j = readFile(self.journalFileName, 1)
+ for i in xrange(0, len(j), klen+vlen):
+ if vlen:
+ self.journal[j[i:i+klen]] = j[i+klen:i+klen+vlen]
+ else:
+ self.journal[j[i:i+klen]] = self.vdefault
+
+ self.journalFile = os.open(self.journalFileName,
+ _JOURNAL_OPEN_FLAGS|os.O_APPEND, 0600)
+
+ self.sync()
+
+ getItemNoJournal = DBBase.getItem
+ setItemNoJournal = DBBase.setItem
+
+ def _jEncodeKey(self, k):
+ return k
+ def _jDecodeKey(self, k):
+ return k
+ def _jEncodeVal(self, v):
+ return v
+ def _jDecodeVal(self, v):
+ return v
+
+ def getItem(self, k):
+ jk = self._jEncodeKey(k)
+ assert len(jk) == self.klen
+ self._lock.acquire()
+ try:
+ if self.journal.has_key(jk):
+ return self._jDecodeVal(self.journal[jk])
+ return self.getItemNoJournal(k)
+ finally:
+ self._lock.release()
+
+ def setItem(self, k, v):
+ jk = self._jEncodeKey(k)
+ jv = self._jEncodeVal(v)
+ assert len(jk) == self.klen
+ if self.vlen: assert len(jv) == self.vlen
+ self._lock.acquire()
+ try:
+ self.journal[jk] = jv
+ os.write(self.journalFile, jk)
+ if self.vlen:
+ os.write(self.journalFile, jv)
+ if len(self.journal) > self.MAX_JOURNAL:
+ self.sync()
+ finally:
+ self._lock.release()
+
+ def sync(self):
+ self._lock.acquire()
+ try:
+ for jk in self.journal.keys():
+ ek = self._encodeKey(self._jDecodeKey(jk))
+ ev = self._encodeVal(self._jDecodeVal(self.journal[jk]))
+ self.log[ek] = ev
+ self.log.sync()
+ os.close(self.journalFile)
+ self.journalFile = os.open(self.journalFileName,
+ _JOURNAL_OPEN_FLAGS|os.O_TRUNC, 0600)
+ self.journal = {}
+ finally:
+ self._lock.release()
+ def close(self):
+ try:
+ self._lock.acquire()
+ self.sync()
+ self.log.close()
+ self.log = None
+ os.close(self.journalFile)
+ finally:
+ self._lock.release()