[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Refactor database wrappers--anything we need in 3 place...



Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv3121/lib/mixminion

Modified Files:
	test.py ClientMain.py Filestore.py 
Log Message:
Refactor database wrappers--anything we need in 3 places needs to be defined in one

Index: test.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/test.py,v
retrieving revision 1.141
retrieving revision 1.142
diff -u -d -r1.141 -r1.142
--- test.py	24 Jul 2003 17:37:16 -0000	1.141
+++ test.py	8 Aug 2003 21:40:40 -0000	1.142
@@ -1049,6 +1049,15 @@
         self.assertEquals(left+right, lioness_encrypt(plain,key))
         self.assertEquals(key, Keyset("ABCDE"*4).getLionessKeys("foo"))
 
+        u = "Hello world"*2
+        w = whiten(u)
+        self.assertNotEquals(w, u)
+        self.assertEquals(unwhiten(w), u)
+        u = "xyzprdlty"*100
+        w = whiten(u)
+        self.assertNotEquals(w, u)
+        self.assertEquals(unwhiten(w), u)
+
     def test_bear(self):
         enc = bear_encrypt
         dec = bear_decrypt
@@ -1341,8 +1350,8 @@
         ## Now, for the fragment payloads.
         msgID = "This is a message123"
         assert len(msgID) == 20
-        contents = contents[:28*1024 - 46]
-        frag_payload_1 = "\x80\x02"+hash+msgID+"\x00\x01\x00\x00"+contents
+        contents = contents[:28*1024 - 47]
+        frag_payload_1 = "\x80\x00\x02"+hash+msgID+"\x00\x01\x00\x00"+contents
         frag_payload_2 = frag_payload_1[:-38] # efwd overhead
         p1 = parsePayload(frag_payload_1)
         p2 = parsePayload(frag_payload_2)
@@ -1371,11 +1380,11 @@
         self.failUnlessRaises(ParseError,parsePayload,frag_payload_2[:-1])
 
         # Impossible message sizes
-        min_payload_1 = "\x80\x02"+hash+msgID+"\x00\x00\x6F\xD3"+contents
-        bad_payload_1 = "\x80\x02"+hash+msgID+"\x00\x00\x6F\xD2"+contents
-        min_payload_2 = "\x80\x02"+hash+msgID+"\x00\x00\x6F\xAD"+contents[:-38]
-        bad_payload_2 = "\x80\x02"+hash+msgID+"\x00\x00\x6F\xAC"+contents[:-38]
-        min_payload_3 = "\x80\x02"+hash+msgID+"\x00\x00\x6F\xD2"+contents[:-38]
+        min_payload_1 = "\x80\x00\x02"+hash+msgID+"\x00\x00\x6F\xD2"+contents
+        bad_payload_1 = "\x80\x00\x02"+hash+msgID+"\x00\x00\x6F\xD1"+contents
+        min_payload_2 = "\x80\x00\x02"+hash+msgID+"\x00\x00\x6F\xAC"+contents[:-38]
+        bad_payload_2 = "\x80\x00\x02"+hash+msgID+"\x00\x00\x6F\xAB"+contents[:-38]
+        min_payload_3 = "\x80\x00\x02"+hash+msgID+"\x00\x00\x6F\xD1"+contents[:-38]
         parsePayload(min_payload_1)
         parsePayload(min_payload_2)
         parsePayload(min_payload_3)
@@ -1490,16 +1499,13 @@
 
         # Make sure that an empty hash contains nothing, including NUL strings
         # and high-ascii strings.
-        notseen("a")
-        notseen("a*20")
-        notseen("\000"*10)
-        notseen("\000")
-        notseen("\277"*10)
+        notseen("a"*20)
+        notseen("\000"*20)
+        notseen("\277"*20)
         # Log a value, and make sure that only that value is now in the log
         log("a"*20)
-        notseen("a*10")
-        notseen("\000"*10)
-        notseen("b")
+        notseen("\000"*20)
+        notseen("b"*20)
         seen("a"*20)
 
         # Try a second value; make sure both values are now there.
@@ -1510,7 +1516,9 @@
         # Try logging a string of NULs
         log("\000"*20)
         seen("\000"*20)
-        notseen("\000"*10)
+        notseen("\001"*20)
+        notseen(("\000"*19)+"\001")
+        notseen("\001"+("\000"*19))
 
         # Try double-logging.
         log("\000"*20)
@@ -1523,6 +1531,9 @@
         # And a nice plain ascii string
         log("abcde"*4)
         seen("abcde"*4)
+        seen("a"*20)
+        h[0].sync()
+        seen("a"*20)
 
         # Now reopen the log, and make sure it has all its original contents.
         h[0].close()
@@ -1533,8 +1544,7 @@
         seen("abcde"*4)
         seen("\000"*20)
         # (and no other contents)
-        notseen(" ")
-        notseen("\000"*5)
+        notseen(" "*20)
         notseen("\001"*20)
 
         # Now add more, close again, and see if our latest adddition went in.
@@ -6173,7 +6183,7 @@
     tc = loader.loadTestsFromTestCase
 
     if 0:
-        suite.addTest(tc(QueueTests))
+        suite.addTest(tc(HashLogTests))
         return suite
 
     suite.addTest(tc(MiscTests))

Index: ClientMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ClientMain.py,v
retrieving revision 1.104
retrieving revision 1.105
diff -u -d -r1.104 -r1.105
--- ClientMain.py	24 Jul 2003 18:01:29 -0000	1.104
+++ ClientMain.py	8 Aug 2003 21:40:41 -0000	1.105
@@ -1176,8 +1176,81 @@
 
 """)
 
+class SURBLog(mixminion.Filestore.DBBase):
+    def __init__(self, filename, forceClean=0):
+        clientLock()
+        mixminion.Filestore.DBBase.__init__(self, filename, "SURB log")
+        try:
+            lastCleaned = int(self.log['LAST_CLEANED'])
+        except (KeyError, ValueError):
+            lastCleaned = 0
 
-class SURBLog:
+        if lastCleaned < time.time()-24*60*60 or forceClean:
+            self.clean()
+
+    def findUnusedSURB(self, surbList, verbose=0, now=None):
+        if now is None:
+            now = time.time()
+        nUsed = nExpired = nShortlived = 0
+        result = None
+        for surb in surbList: 
+            expiry = surb.timestamp
+            timeLeft = expiry - now
+            if self.isSURBUsed(surb):
+                nUsed += 1
+            elif timeLeft < 60:
+                nExpired += 1
+            elif timeLeft < 3*60*60:
+                nShortlived += 1
+            else:
+                result = surb
+                break
+
+        if verbose:
+            if nUsed:
+                LOG.warn("Skipping %s used reply blocks", nUsed)
+            if nExpired:
+                LOG.warn("Skipping %s expired reply blocks", nExpired)
+            if nShortlived:
+                LOG.warn("Skipping %s soon-to-expire reply blocks", nShortlived)
+
+        return result
+
+    def close(self):
+        mixminion.Filestore.DBBase.close(self)
+        clientUnlock()
+
+    def isSURBUsed(self, surb):
+        return self.has_key[surb]
+
+    def markSURBUsed(self, surb):
+        self[surb] = surb.timestamp
+
+    def clean(self, now=None):
+        if now is None:
+            now = time.time() + 60*60
+        allHashes = self.log.keys()
+        removed = []
+        for hash in allHashes:
+            if self._decodeVal(self.log[hash]) < now:
+                removed.append(hash)
+        del allHashes
+        for hash in removed:
+            del self.log[hash]
+        self.log['LAST_CLEANED'] = str(int(now))
+        self.sync()
+
+    def _encodeKey(self, surb):
+        return sha1(surb.pack())
+    def _encodeVal(self, timestamp):
+        return str(timestamp)
+    def _decodeVal(self, timestamp):
+        try:
+            return int(timestamp)
+        except ValueError:
+            return 0
+
+class XSURBLog:
     """A SURBLog manipulates a database on disk to remember which SURBs we've
        used, so we don't reuse them accidentally.
        """
@@ -1203,7 +1276,6 @@
         except (KeyError, ValueError):
             lastCleaned = 0
 
-        forceClean = 1
         if lastCleaned < time.time()-24*60*60 or forceClean:
             self.clean()
 

Index: Filestore.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Filestore.py,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -d -r1.3 -r1.4
--- Filestore.py	24 Jul 2003 18:01:29 -0000	1.3
+++ Filestore.py	8 Aug 2003 21:40:42 -0000	1.4
@@ -4,7 +4,8 @@
 """mixminion.Filestore
 
    Common code for directory-based, security conscious, threadsafe
-   unordered file stores.
+   unordered file stores.  Also contains common code for journalable
+   DB-backed threadsafe stores.
    """
 
 # Formerly, this was all in mixminion.server.ServerQueue.  But
@@ -12,14 +13,17 @@
 # queues, and we needed another for fragment stores anyway.  So it was
 # time to refactor the common code.
 
+import anydbm
+import dumbdbm
 import os
+import errno
 import time
 import stat
 import cPickle
 import threading
 
 from mixminion.Common import MixFatalError, secureDelete, LOG, \
-     createPrivateDir, readFile
+     createPrivateDir, readFile, tryUnlink
 from mixminion.Crypto import getCommonPRNG
 
 __all__ = [ "StringStore", "StringMetadataStore",
@@ -416,5 +420,187 @@
         BaseMetadataStore.__init__(self, location, create, scrub)
         StringStoreMixin.__init__(self)
         ObjectStoreMixin.__init__(self)
+
+# ======================================================================
+# Database wrappers
+
+class DBBase:
+    # _lock
+    def __init__(self, filename, purpose=""):
+        self._lock = threading.RLock()
+        self.filename = filename
+        parent = os.path.split(filename)[0]
+        createPrivateDir(parent)
+
+        try:
+            st = os.stat(filename)
+        except OSError, e:
+            if e.errno != errno.ENOENT:
+                raise
+            st = None
+
+        if st and st[stat.ST_SIZE] == 0:
+            LOG.warn("Half-created database %s found; cleaning up.", filename)
+            tryUnlink(filename)
+
+        LOG.debug("Opening %s database at %s", purpose, filename)
+        self.log = anydbm.open(filename, 'c')
+        if not hasattr(self.log, 'sync'):
+            if hasattr(self.log, '_commit'):
+                # Workaround for dumbdbm to allow syncing. (Standard in 
+                # Python 2.3.)
+                self.log.sync = self.log._commit
+            else:
+                # Otherwise, force a no-op sync method.
+                self.log.sync = lambda : None
+
+        if isinstance(self.log, dumbdbm._Database):
+            LOG.warn("Warning: using a flat file for %s database", purpose)
+
+        # Subclasses may want to check whether this is the right database,
+        # flush the journal, and so on.
+
+    def _encodeKey(self, k):
+        return k
+    def _encodeVal(self, v):
+        return v
+    def _decodeVal(self, v):
+        return v
+
+    def has_key(self, k):
+        try:
+            _ = self[k]
+            return 1
+        except KeyError:
+            return 0
+
+    def __getitem__(self, k):
+        return self.getItem(k)
+
+    def get(self, k, default=None):
+        try:
+            return self[k]
+        except KeyError:
+            return default
+
+    def __setitem__(self, k, v):
+        self.setItem(k, v)
+
+    def getItem(self, k):
+        try:
+            self._lock.acquire()
+            return self._decodeVal(self.log[self._encodeKey(k)])
+        finally:
+            self._lock.release()
+
+    def setItem(self, k, v):
+        self._lock.acquire()
+        try:
+            self.log[self._encodeKey(k)] = self._encodeVal(v)
+        finally:
+            self._lock.release()
         
+    def sync(self):
+        self._lock.acquire()
+        try:
+            self.log.sync()
+        finally:
+            self._lock.release()
+
+    def close(self):
+        self._lock.acquire()
+        try:
+            self.log.close()
+            self.log = None
+        finally:
+            self._lock.release()
+
+_JOURNAL_OPEN_FLAGS = os.O_WRONLY|os.O_CREAT|getattr(os,'O_SYNC',0)|getattr(os,'O_BINARY',0)
+
+class JournaledDBBase(DBBase):
+    MAX_JOURNAL = 128
+    def __init__(self, location, purpose, klen, vlen, vdflt):
+        DBBase.__init__(self, location, purpose)
+
+        self.klen = klen
+        self.vlen = vlen
+        self.vdefault = vdflt
+
+        self.journalFileName = location+"_jrnl"
+        self.journal = {}
+        if os.path.exists(self.journalFileName):
+            j = readFile(self.journalFileName, 1)
+            for i in xrange(0, len(j), klen+vlen):
+                if vlen:
+                    self.journal[j[i:i+klen]] = j[i+klen:i+klen+vlen]
+                else:
+                    self.journal[j[i:i+klen]] = self.vdefault
+
+        self.journalFile = os.open(self.journalFileName,
+                                   _JOURNAL_OPEN_FLAGS|os.O_APPEND, 0600)
+
+        self.sync()
+
+    getItemNoJournal = DBBase.getItem
+    setItemNoJournal = DBBase.setItem
+
+    def _jEncodeKey(self, k):
+        return k
+    def _jDecodeKey(self, k):
+        return k
+    def _jEncodeVal(self, v):
+        return v
+    def _jDecodeVal(self, v):
+        return v
+
+    def getItem(self, k):
+        jk = self._jEncodeKey(k)
+        assert len(jk) == self.klen
+        self._lock.acquire()
+        try:
+            if self.journal.has_key(jk):
+                return self._jDecodeVal(self.journal[jk])
+            return self.getItemNoJournal(k)
+        finally:
+            self._lock.release()
+
+    def setItem(self, k, v):
+        jk = self._jEncodeKey(k)
+        jv = self._jEncodeVal(v)
+        assert len(jk) == self.klen
+        if self.vlen: assert len(jv) == self.vlen
+        self._lock.acquire()
+        try:
+            self.journal[jk] = jv
+            os.write(self.journalFile, jk)
+            if self.vlen:
+                os.write(self.journalFile, jv)
+            if len(self.journal) > self.MAX_JOURNAL:
+                self.sync()
+        finally:
+            self._lock.release()
+
+    def sync(self):
+        self._lock.acquire()
+        try:
+            for jk in self.journal.keys():
+                ek = self._encodeKey(self._jDecodeKey(jk))
+                ev = self._encodeVal(self._jDecodeVal(self.journal[jk]))
+                self.log[ek] = ev
+            self.log.sync()
+            os.close(self.journalFile)
+            self.journalFile = os.open(self.journalFileName,
+                                       _JOURNAL_OPEN_FLAGS|os.O_TRUNC, 0600)
+            self.journal = {}
+        finally:
+            self._lock.release()
     
+    def close(self):
+        try:
+            self._lock.acquire()
+            self.sync()
+            self.log.close()
+            self.log = None
+            os.close(self.journalFile)
+        finally:
+            self._lock.release()