[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [stem/master] Rewrite and testing for system utilities



commit f6560b1210a20bdee7ccb68567b44b7f877a3e38
Author: Damian Johnson <atagar@xxxxxxxxxxxxxx>
Date:   Fri Nov 18 23:23:28 2011 -0800

    Rewrite and testing for system utilities
    
    Spent most of this week improving the implementation, api, documentation, and
    most importantly testing for the system functions. They now have almost
    complete code coverage by both unit and integ tests. Besides the obvious, this
    will help cross-platform compatability in the future since I'll have a sampling
    of input for platforms I don't have.
    
    Generated real output for all commands except sockstat (I only have access to
    linux and mac, not free/openbsd). I'll probably contact Fabian for help with
    this one.
---
 run_tests.py                |    2 +
 stem/util/system.py         |  536 +++++++++++++++++++++++++++----------------
 test/integ/util/__init__.py |    2 +-
 test/integ/util/conf.py     |    4 +
 test/integ/util/system.py   |  193 +++++++++++++++-
 test/unit/util/__init__.py  |    2 +-
 test/unit/util/system.py    |  290 +++++++++++++++++++++++
 7 files changed, 824 insertions(+), 205 deletions(-)

diff --git a/run_tests.py b/run_tests.py
index 774c130..41df4bf 100755
--- a/run_tests.py
+++ b/run_tests.py
@@ -17,6 +17,7 @@ import test.unit.types.control_line
 import test.unit.types.version
 import test.unit.connection.protocolinfo_response
 import test.unit.util.enum
+import test.unit.util.system
 import test.integ.types.control_message
 import test.integ.util.conf
 import test.integ.util.system
@@ -34,6 +35,7 @@ UNIT_TESTS = (("stem.types.ControlMessage", test.unit.types.control_message.Test
               ("stem.types.Version", test.unit.types.version.TestVerion),
               ("stem.connection.ProtocolInfoResponse", test.unit.connection.protocolinfo_response.TestProtocolInfoResponse),
               ("stem.util.enum", test.unit.util.enum.TestEnum),
+              ("stem.util.system", test.unit.util.system.TestSystem),
              )
 
 INTEG_TESTS = (("stem.types.ControlMessage", test.integ.types.control_message.TestControlMessage),
diff --git a/stem/util/system.py b/stem/util/system.py
index e5ba367..659d3ca 100644
--- a/stem/util/system.py
+++ b/stem/util/system.py
@@ -1,10 +1,14 @@
 """
 Helper functions for working with the underlying system. These are mostly os
-dependent, only working on linux, osx, and bsd.
+dependent, only working on linux, osx, and bsd. In almost all cases they're
+best-effort, providing None if the lookup fails.
 
+is_bsd - checks if we're running on the bsd family of operating systems
 is_available - determines if a command is availabe on this system
 is_running - determines if a given process is running
-get_pid - provides the process id a given process is running under
+get_pid_by_name - gets the pid for a process by the given name
+get_pid_by_port - gets the pid for a process listening to a given port
+get_pid_by_open_file - gets the pid for the process with an open file
 get_cwd - provides the current working directory for a given process
 get_bsd_jail_id - provides the BSD jail id a given process is running within
 is_relative_path - checks if a given path can be expanded by expand_path
@@ -12,7 +16,6 @@ expand_path - expands relative paths and ~ entries
 call - runs the given system command and provides back the results
 """
 
-import re
 import os
 import time
 import logging
@@ -22,11 +25,48 @@ import stem.util.proc
 
 LOGGER = logging.getLogger("stem")
 
+# Functor for mocking the call function, which should either return a string
+# (or None) to emulate responses, or a boolean to indicate if we should filter
+# the call. Also, when set...
+# - we don't check that commands exist before calling them
+# - the use of proc functions is disabled
+#
+# This is intended for testing purposes only.
+
+CALL_MOCKING = None
+
 # Mapping of commands to if they're available or not. This isn't always
 # reliable, failing for some special commands. For these the cache is
 # prepopulated to skip lookups.
+
 CMD_AVAILABLE_CACHE = {"ulimit": True}
 
+IS_RUNNING_PS_LINUX      = "ps -A co command"
+IS_RUNNING_PS_BSD        = "ps -ao ucomm="
+GET_PID_BY_NAME_PGREP    = "pgrep -x %s"
+GET_PID_BY_NAME_PIDOF    = "pidof %s"
+GET_PID_BY_NAME_PS_LINUX = "ps -o pid -C %s"
+GET_PID_BY_NAME_PS_BSD   = "ps axc"
+GET_PID_BY_NAME_LSOF     = "lsof -tc %s"
+GET_PID_BY_PORT_NETSTAT  = "netstat -npltu"
+GET_PID_BY_PORT_SOCKSTAT = "sockstat -4l -P tcp -p %s"
+GET_PID_BY_PORT_LSOF     = "lsof -wnP -iTCP -sTCP:LISTEN"
+GET_PID_BY_FILE_LSOF     = "lsof -tw %s"
+GET_CWD_PWDX             = "pwdx %s"
+GET_CWD_LSOF             = "lsof -a -p %s -d cwd -Fn"
+GET_BSD_JAIL_ID_PS       = "ps -p %s -o jid"
+
+def is_bsd():
+  """
+  Checks if we are within the BSD family of operating systems. This presently
+  recognizes Macs, FreeBSD, and OpenBSD but may be expanded later.
+  
+  Returns:
+    bool to indicate if we're a BSD OS
+  """
+  
+  return os.uname()[0] in ("Darwin", "FreeBSD", "OpenBSD")
+
 def is_available(command, cached=True):
   """
   Checks the current PATH to see if a command is available or not. If more
@@ -58,288 +98,365 @@ def is_available(command, cached=True):
     CMD_AVAILABLE_CACHE[command] = cmd_exists
     return cmd_exists
 
-def is_running(command, suppress_exc = True):
+def is_running(command):
   """
   Checks for if a process with a given name is running or not.
   
   Arguments:
-    command (str)       - process name to be checked
-    suppress_exc (bool) - if True then None is returned on failure, otherwise
-                          this raises the exception
+    command (str) - process name to be checked
   
   Returns:
-    True if the process is running, False otherwise
-  
-  Raises:
-    OSError if this can't be determined and suppress_exc is False
+    True if the process is running, False if it's not among ps results, and
+    None if ps can't be queried
   """
   
   # Linux and the BSD families have different variants of ps. Guess based on
-  # os.uname() results which to try first, then fall back to the other.
+  # the is_bsd() check which to try first, then fall back to the other.
   #
   # Linux
-  #   -A          - Select all processes. Identical to -e.
+  #   -A          - Select all processes.
   #   -co command - Shows just the base command.
   #
   # Mac / BSD
   #   -a        - Display information about other users' processes as well as
-  #               your own.
+  #               our own.
   #   -o ucomm= - Shows just the ucomm attribute ("name to be used for
   #               accounting")
   
-  primary_resolver, secondary_resolver = "ps -A co command", "ps -ao ucomm="
-  
-  if os.uname()[0] in ("Darwin", "FreeBSD", "OpenBSD"):
-    primary_resolver, secondary_resolver = secondary_resolver, primary_resolver
-  
-  command_listing = call(primary_resolver)
-  if not command_listing:
-    command_listing = call(secondary_resolver)
+  if CALL_MOCKING or is_available("ps"):
+    if is_bsd():
+      primary_resolver = IS_RUNNING_PS_BSD
+      secondary_resolver = IS_RUNNING_PS_LINUX
+    else:
+      primary_resolver = IS_RUNNING_PS_LINUX
+      secondary_resolver = IS_RUNNING_PS_BSD
+    
+    command_listing = call(primary_resolver)
+    if not command_listing:
+      command_listing = call(secondary_resolver)
+    
+    if command_listing:
+      return command in command_listing
   
-  if command_listing:
-    return command in command_listing
-  else:
-    if suppress_exc: return None
-    else: raise OSError("Unable to check via 'ps -A co command'")
+  return None
 
-def get_pid(process_name, process_port = None, suppress_exc = True):
+def get_pid_by_name(process_name):
   """
-  Attempts to determine the process id for a running process, using the
-  following:
+  Attempts to determine the process id for a running process, using...
   
-  1. "pgrep -x <name>"
-  2. "pidof <name>"
-  3. "netstat -npl | grep 127.0.0.1:<port>"
-  4. "ps -o pid -C <name>"
-  5. "sockstat -4l -P tcp -p <port> | grep <name>"
-  6. "ps axc | egrep \" <name>$\""
-  7. "lsof -wnPi | egrep \"^<name>.*:<port>\""
+  1. pgrep -x <name>
+  2. pidof <name>
+  3. ps -o pid -C <name> (linux)
+     ps axc | egrep " <name>$" (bsd)
+  4. lsof -tc <name>
   
-  If pidof or ps provide multiple instance of the process then their results
-  are discarded (since only netstat can differentiate using a bound port).
+  Results with multiple instances of the process are discarded.
   
   Arguments:
-    process_name (str)  - process name for which to fetch the pid
-    process_port (int)  - port that the process we're interested in is bound
-                          to, this is used to disambiguate if there's multiple
-                          instances running
-    suppress_exc (bool) - if True then None is returned on failure, otherwise
-                          this raises the exception
+    process_name (str) - process name for which to fetch the pid
   
   Returns:
-    int with the process id, None if it can't be determined and suppress_exc is
-    True
-  
-  Raises:
-    IOError if either no running process exists or it can't be determined and
-    suppress_exc is True
+    int with the process id, None if it can't be determined
   """
   
   # attempts to resolve using pgrep, failing if:
-  # - the process is running under a different name
-  # - there are multiple instances
+  # - we're running on bsd (command unavailable)
+  #
+  # example output:
+  #   atagar@morrigan:~$ pgrep -x vim
+  #   3283
+  #   3392
   
-  try:
-    results = call("pgrep -x %s" % process_name)
+  if CALL_MOCKING or is_available("pgrep"):
+    results = call(GET_PID_BY_NAME_PGREP % process_name)
     
-    if results and len(results) == 1 and len(results[0].split()) == 1:
+    if results and len(results) == 1:
       pid = results[0].strip()
       if pid.isdigit(): return int(pid)
-  except IOError: pass
   
   # attempts to resolve using pidof, failing if:
-  # - the process is running under a different name
-  # - there are multiple instances
+  # - we're running on bsd (command unavailable)
+  #
+  # example output:
+  #   atagar@morrigan:~$ pidof vim
+  #   3392 3283
   
-  try:
-    results = call("pidof %s" % process_name)
+  if CALL_MOCKING or is_available("pidof"):
+    results = call(GET_PID_BY_NAME_PIDOF % process_name)
     
     if results and len(results) == 1 and len(results[0].split()) == 1:
       pid = results[0].strip()
       if pid.isdigit(): return int(pid)
-  except IOError: pass
   
-  # attempts to resolve using netstat, failing if:
-  # - the process being run as a different user due to permissions
-  
-  if process_port:
-    try:
-      results = call("netstat -npl")
+  # attempts to resolve using ps, failing if:
+  # - system's ps variant doesn't handle these flags (none known at the moment)
+  #
+  # example output:
+  #   atagar@morrigan:~/Desktop/stem$ ps -o pid -C vim
+  #     PID
+  #    3283
+  #    3392
+  #   
+  #   atagar$ ps axc
+  #     PID   TT  STAT      TIME COMMAND
+  #       1   ??  Ss     9:00.22 launchd
+  #      10   ??  Ss     0:09.97 kextd
+  #      11   ??  Ss     5:47.36 DirectoryService
+  #      12   ??  Ss     3:01.44 notifyd
+  
+  if CALL_MOCKING or is_available("ps"):
+    if CALL_MOCKING or not is_bsd():
+      # linux variant of ps
+      results = call(GET_PID_BY_NAME_PS_LINUX % process_name)
+      
+      if results and len(results) == 2:
+        pid = results[1].strip()
+        if pid.isdigit(): return int(pid)
+    
+    if CALL_MOCKING or is_bsd():
+      # bsd variant of ps
+      results = call(GET_PID_BY_NAME_PS_BSD)
       
-      # filters to results with our port (same as "grep 127.0.0.1:<port>")
       if results:
-        results = [r for r in results if "127.0.0.1:%i" % process_port in r]
+        # filters results to those with our process name
+        results = [r for r in results if r.endswith(" %s" % process_name)]
         
-        if len(results) == 1:
-          results = results[0].split()[6] # process field (ex. "7184/tor")
-          pid = results[:results.find("/")]
+        if len(results) == 1 and len(results[0].split()) > 0:
+          pid = results[0].split()[0]
           if pid.isdigit(): return int(pid)
-    except IOError: pass
   
-  # attempts to resolve using ps, failing if:
-  # - the process is running under a different name
-  # - there are multiple instances
+  # resolves using lsof which works on both Linux and BSD, only failing if:
+  # - lsof is unavailable (not included by default on OpenBSD)
+  # - the process being run as a different user due to permissions
+  # - the process doesn't have any open files to be reported by lsof?
+  #
+  # flags:
+  #   t - only show pids
+  #   c - restrict results to that command
+  #
+  # example output:
+  #   atagar@morrigan:~$ lsof -t -c vim
+  #   2470
+  #   2561
   
-  try:
-    results = call("ps -o pid -C %s" % process_name)
+  if CALL_MOCKING or is_available("lsof"):
+    results = call(GET_PID_BY_NAME_LSOF % process_name)
     
-    if results and len(results) == 2:
-      pid = results[1].strip()
+    if results and len(results) == 1:
+      pid = results[0].strip()
       if pid.isdigit(): return int(pid)
-  except IOError: pass
   
-  # attempts to resolve using sockstat, failing if:
-  # - sockstat doesn't accept the -4 flag (BSD only)
-  # - the process is running under a different name
-  # - there are multiple instances using the same port on different addresses
-  # 
-  # TODO: The later two issues could be solved by filtering for an expected IP
-  # address instead of the process name.
+  LOGGER.debug("failed to resolve a pid for %s" % process_name)
+  return None
+
+def get_pid_by_port(port):
+  """
+  Attempts to determine the process id for a process with the given port,
+  using...
   
-  if process_port:
-    try:
-      results = call("sockstat -4l -P tcp -p %i" % process_port)
-      
-      # filters to results with our port (same as "grep <name>")
-      if results:
-        results = [r for r in results if process_name in r]
-        
-        if len(results) == 1 and len(results[0].split()) == 7:
-          pid = results[0].split()[2]
-          if pid.isdigit(): return int(pid)
-    except IOError: pass
+  1. netstat -npltu | grep 127.0.0.1:<port>
+  2. sockstat -4l -P tcp -p <port>
+  3. lsof -wnP -iTCP -sTCP:LISTEN | grep ":<port>"
   
-  # attempts to resolve via a ps command that works on mac/bsd (this and lsof
-  # are the only resolvers to work on that platform). This fails if:
-  # - the process is running under a different name
-  # - there are multiple instances
+  Most queries limit results to listening TCP connections.
   
-  try:
-    results = call("ps axc")
+  Arguments:
+    port (int) - port where the process we're looking for is listening
+  
+  Returns:
+    int with the process id, None if it can't be determined
+  """
+  
+  # attempts to resolve using netstat, failing if:
+  # - netstat doesn't accept these flags (Linux only)
+  # - the process being run as a different user due to permissions
+  #
+  # flags:
+  #   n - numeric (disables hostname lookups)
+  #   p - program (include pids)
+  #   l - listening (include listening sockets)
+  #   tu - show tcp and udp sockets, and nothing else
+  #
+  # example output:
+  #   atagar@morrigan:~$ netstat -npltu
+  #   Active Internet connections (only servers)
+  #   Proto Recv-Q Send-Q Local Address           Foreign Address   State    PID/Program name
+  #   tcp        0      0 127.0.0.1:631           0.0.0.0:*         LISTEN   -
+  #   tcp        0      0 127.0.0.1:9051          0.0.0.0:*         LISTEN   1641/tor
+  #   tcp6       0      0 ::1:631                 :::*              LISTEN   -
+  #   udp        0      0 0.0.0.0:5353            0.0.0.0:*                  -
+  #   udp6       0      0 fe80::7ae4:ff:fe2f::123 :::*                       -
+  
+  if CALL_MOCKING or is_available("netstat"):
+    results = call(GET_PID_BY_PORT_NETSTAT)
     
-    # filters to results with our port (same as "egrep ' <name>$'")
     if results:
-      results = [r for r in results if r.endswith(" %s" % process_name)]
+      # filters to results with our port
+      results = [r for r in results if "127.0.0.1:%s" % port in r]
       
-      if len(results) == 1 and len(results[0].split()) > 0:
-        pid = results[0].split()[0]
+      if len(results) == 1 and len(results[0].split()) == 7:
+        results = results[0].split()[6] # process field (ex. "7184/tor")
+        pid = results[:results.find("/")]
         if pid.isdigit(): return int(pid)
-  except IOError: pass
   
-  # attempts to resolve via lsof, this should work on linux, mac, and bsd
-  # and only fail if:
-  # - the process is running under a different name
-  # - the process being run as a different user due to permissions
+  # attempts to resolve using sockstat, failing if:
+  # - sockstat doesn't accept the -4 flag (BSD only)
   # - there are multiple instances using the same port on different addresses
-  
-  try:
-    results = call("lsof -wnPi")
+  #
+  # flags:
+  #   4 - only show IPv4 sockets
+  #   l - listening sockets
+  #   P tcp - only show tcp connections
+  #   p - only includes results if the local or foreign port match this
+  #
+  # example output:
+  #   # TODO: We need an example for the actual command we're using. I'm
+  #   # suspecting that replacing the grep with checking the local port works,
+  #   # but should double check.
+  #   
+  #   # sockstat -4 | grep tor
+  #   _tor     tor        4397  7  tcp4   51.64.7.84:9050    *:*
+  #   _tor     tor        4397  8  udp4   51.64.7.84:53      *:*
+  #   _tor     tor        4397  12 tcp4   51.64.7.84:54011   80.3.121.7:9001
+  #   _tor     tor        4397  15 tcp4   51.64.7.84:59374   7.42.1.102:9001
+  #   _tor     tor        4397  20 tcp4   51.64.7.84:51946   32.83.7.104:443
+  
+  if CALL_MOCKING or is_available("sockstat"):
+    results = call(GET_PID_BY_PORT_SOCKSTAT % port)
     
-    # filters to results with our port (same as "egrep '^<name>.*:<port>'")
     if results:
-      port_comp = str(process_port) if process_port else ""
-      results = [r for r in results if re.match("^%s.*:%s" % (process_name, port_comp), r)]
-    
-    # This can result in multiple entries with the same pid (from the query
-    # itself). Checking all lines to see if they're in agreement about the pid.
+      # filters to results where this is the local port
+      results = [r for r in results if (len(r.split()) == 7 and (":%s" % port) in r.split()[5])]
+      
+      if len(results) == 1:
+        pid = results[0].split()[2]
+        if pid.isdigit(): return int(pid)
+  
+  # resolves using lsof which works on both Linux and BSD, only failing if:
+  # - lsof is unavailable (not included by default on OpenBSD)
+  # - the process being run as a different user due to permissions
+  # - there are multiple instances using the same port on different addresses
+  #
+  # flags:
+  #   w - disables warning messages
+  #   n - numeric addresses (disables hostname lookups)
+  #   P - numeric ports (disables replacement of ports with their protocol)
+  #   iTCP - only show tcp connections
+  #   sTCP:LISTEN - listening sockets
+  #
+  # example output:
+  #   atagar@morrigan:~$ lsof -wnP -iTCP -sTCP:LISTEN
+  #   COMMAND  PID   USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
+  #   tor     1745 atagar    6u  IPv4  14229      0t0  TCP 127.0.0.1:9051 (LISTEN)
+  
+  if CALL_MOCKING or is_available("lsof"):
+    results = call(GET_PID_BY_PORT_LSOF)
     
     if results:
-      pid = ""
-      
-      for line in results:
-        line_comp = line.split()
-        
-        if len(line_comp) >= 2 and (not pid or line_comp[1] == pid):
-          pid = line_comp[1]
-        else: raise IOError
+      # filters to results with our port
+      results = [r for r in results if (len(r.split()) == 10 and (":%s" % port) in r.split()[8])]
       
-      if pid.isdigit(): return int(pid)
-  except IOError: pass
+      if len(results) == 1:
+        pid = results[0].split()[1]
+        if pid.isdigit(): return int(pid)
   
-  exc_msg = "failed to resolve a pid for %s" % process_name
+  return None # all queries failed
+
+def get_pid_by_open_file(path):
+  """
+  Attempts to determine the process id for a process with the given open file,
+  using...
   
-  if suppress_exc:
-    LOGGER.debug(exc_msg)
-    return None
-  else:
-    raise IOError(exc_msg)
+  lsof -w <path>
+  
+  Arguments:
+    path (str) - location of the socket file to query against
+  
+  Returns:
+    int with the process id, None if it can't be determined
+  """
+  
+  # resolves using lsof which works on both Linux and BSD, only failing if:
+  # - lsof is unavailable (not included by default on OpenBSD)
+  # - the file can't be read due to permissions
+  #
+  # flags:
+  #   t - only show pids
+  #   w - disables warning messages
+  #
+  # example output:
+  #   atagar@morrigan:~$ lsof -tw /tmp/foo
+  #   4762
+  
+  if CALL_MOCKING or is_available("lsof"):
+    results = call(GET_PID_BY_FILE_LSOF % path)
+    
+    if results and len(results) == 1:
+      pid = results[0].strip()
+      if pid.isdigit(): return int(pid)
+  
+  return None # all queries failed
 
-def get_cwd(pid, suppress_exc = True):
+def get_cwd(pid):
   """
   Provices the working directory of the given process.
   
   Arguments:
-    pid (int)           - process id of the process to be queried
-    suppress_exc (bool) - if True then None is returned on failure, otherwise
-                          this raises the exception
+    pid (int) - process id of the process to be queried
   
   Returns:
     str with the absolute path for the process' present working directory, None
-    if it can't be determined and suppress_exc is True
-  
-  Raises:
-    IOError if this fails and suppress_exc is False
+    if it can't be determined
   """
   
   # try fetching via the proc contents if it's available
-  if stem.util.proc.is_available():
+  if stem.util.proc.is_available() and not CALL_MOCKING:
     try: return stem.util.proc.get_cwd(pid)
     except IOError: pass
   
-  # Fall back to a pwdx query. This isn't available on BSD. If we attempt this
-  # lookup then it trumps lsof when logging isssues at the end since it's the
-  # better lookup method for this information.
-  
-  exc_msg = None
+  # Fall back to a pwdx query. This isn't available on BSD.
+  logging_prefix = "get_cwd(%s):" % pid
   
-  if is_available("pwdx"):
+  if CALL_MOCKING or is_available("pwdx"):
     # pwdx results are of the form:
     # 3799: /home/atagar
     # 5839: No such process
     
-    try:
-      results = call("pwdx %s" % pid)
-      
-      if not results:
-        exc_msg = "pwdx didn't return any results"
-      elif results[0].endswith("No such process"):
-        exc_msg = "pwdx reported no process for pid %s" % pid
-      elif len(results) != 1 or results[0].count(" ") != 1:
-        exc_msg = "we got unexpected output from pwdx: %s" % results
-      else:
-        return results[0].split(" ", 1)[1].strip()
-    except OSError, exc:
-      exc_msg = "pwdx query for %s failed: %s" % (pid, exc)
+    results = call(GET_CWD_PWDX % pid)
+    
+    if not results:
+      LOGGER.debug("%s pwdx didn't return any results" % logging_prefix)
+    elif results[0].endswith("No such process"):
+      LOGGER.debug("%s pwdx processes reported for this pid" % logging_prefix)
+    elif len(results) != 1 or results[0].count(" ") != 1 or not results[0].startswith("%s: " % pid):
+      LOGGER.debug("%s we got unexpected output from pwdx: %s" % (logging_prefix, results))
+    else:
+      return results[0].split(" ", 1)[1].strip()
   
   # Use lsof as the final fallback. This is available on both Linux and is the
   # only lookup method here that works for BSD...
   # https://trac.torproject.org/projects/tor/ticket/4236
   #
-  # ~$ lsof -a -p 75717 -d cwd -Fn
-  # p75717
-  # n/Users/atagar/tor/src/or
+  # flags:
+  #   a - presents the intersection of the following arguments
+  #   p - limits results to this pid
+  #   d cwd - limits results to just the cwd rather than all open files
+  #   Fn - short listing in a single column, with just the pid and cwd
+  #
+  # example output:
+  #   ~$ lsof -a -p 75717 -d cwd -Fn
+  #   p75717
+  #   n/Users/atagar/tor/src/or
   
-  try:
-    results = call("lsof -a -p %s -d cwd -Fn" % pid)
+  if CALL_MOCKING or is_available("lsof"):
+    results = call(GET_CWD_LSOF % pid)
     
     if results and len(results) == 2 and results[1].startswith("n/"):
       return results[1][1:].strip()
-    elif not exc_msg:
-      exc_msg = "we got unexpected output from lsof: %s" % results
-  except OSError, exc:
-    if not exc_msg:
-      exc_msg = "lsof query for the cwd of %s failed: %s" % (pid, exc)
+    else:
+      LOGGER.debug("%s we got unexpected output from lsof: %s" % (logging_prefix, results))
   
-  if not exc_msg:
-    # shouldn't happen, somehow we never registered a failure...
-    exc_msg = "unable to query pwdx or lsof for the cwd of %s" % pid
-  
-  # we failed all lookups, either raise or log the issue and return None
-  if suppress_exc:
-    LOGGER.debug(exc_msg)
-    return None
-  else:
-    raise IOError(exc_msg)
+  return None # all queries failed
 
 def get_bsd_jail_id(pid):
   """
@@ -360,9 +477,9 @@ def get_bsd_jail_id(pid):
   #   JID
   #    1
   
-  ps_output = call("ps -p %s -o jid" % pid)
+  ps_output = call(GET_BSD_JAIL_ID_PS % pid)
   
-  if len(ps_output) == 2 and len(ps_output[1].split()) == 1:
+  if ps_output and len(ps_output) == 2 and len(ps_output[1].split()) == 1:
     jid = ps_output[1].strip()
     if jid.isdigit(): return int(jid)
   
@@ -383,7 +500,8 @@ def is_relative_path(path):
 def expand_path(path, cwd = None):
   """
   Provides an absolute path, expanding tildas with the user's home and
-  appending a current working directory if the path was relative.
+  appending a current working directory if the path was relative. This is
+  unix-specific and paths never have an ending slash.
   
   Arguments:
     path (str) - path to be expanded
@@ -394,12 +512,14 @@ def expand_path(path, cwd = None):
     str of the path expanded to be an absolute path
   """
   
+  relative_path = path
+  
   if not path or path[0] == "/":
     # empty or already absolute - nothing to do
-    return path
+    pass
   elif path.startswith("~"):
     # prefixed with a ~ or ~user entry
-    return os.path.expanduser(path)
+    relative_path = os.path.expanduser(path)
   else:
     # relative path, expand with the cwd
     if not cwd: cwd = os.getcwd()
@@ -407,8 +527,11 @@ def expand_path(path, cwd = None):
     # we'll be dealing with both "my/path/" and "./my/path" entries, so
     # cropping the later
     if path.startswith("./"): path = path[2:]
+    elif path == ".": path = ""
     
-    return os.path.join(cwd, path)
+    relative_path = os.path.join(cwd, path)
+  
+  return relative_path.rstrip("/")
 
 def call(command, suppress_exc = True):
   """
@@ -429,6 +552,29 @@ def call(command, suppress_exc = True):
     OSError if this fails and suppress_exc is False
   """
   
+  # Most of our system functions rely on system calls through this function, so
+  # testing may need to mock responses to exercise our various functionality.
+  
+  if CALL_MOCKING:
+    mock_response = CALL_MOCKING(command)
+    
+    # Mocking either returns...
+    # - str/None for unit tests to emulate system calls
+    # - bool for integ tests to indicate if we should make this system call or
+    #   suppress it, to selectively trigger logic
+    
+    if isinstance(mock_response, bool):
+      if mock_response:
+        pass # mocking indicates that we should go on to make the real call
+      else:
+        # call should be suppressed
+        if suppress_exc: return None
+        else: raise OSError("System call filtered by mocking")
+    else:
+      if mock_response == None and not suppress_exc:
+        raise OSError("Mocking provided a None response")
+      else: return mock_response
+  
   try:
     start_time = time.time()
     stdout, stderr = subprocess.Popen(command.split(), stdout = subprocess.PIPE, stderr = subprocess.PIPE).communicate()
diff --git a/test/integ/util/__init__.py b/test/integ/util/__init__.py
index a0b3303..95e5870 100644
--- a/test/integ/util/__init__.py
+++ b/test/integ/util/__init__.py
@@ -2,5 +2,5 @@
 Integration tests for stem.util.* contents.
 """
 
-__all__ = ["system"]
+__all__ = ["conf", "system"]
 
diff --git a/test/integ/util/conf.py b/test/integ/util/conf.py
index 03ac4a1..a74406f 100644
--- a/test/integ/util/conf.py
+++ b/test/integ/util/conf.py
@@ -22,6 +22,10 @@ startup.run alias l=ls
 """ % CONF_HEADER
 
 class TestConf(unittest.TestCase):
+  """
+  Tests the stem.util.conf contents.
+  """
+  
   def tearDown(self):
     # cleans up test configurations we made
     if os.path.exists(CONF_PATH):
diff --git a/test/integ/util/system.py b/test/integ/util/system.py
index fc7c384..5d4a924 100644
--- a/test/integ/util/system.py
+++ b/test/integ/util/system.py
@@ -4,6 +4,7 @@ process.
 """
 
 import os
+import getpass
 import unittest
 
 import test.runner
@@ -15,6 +16,10 @@ class TestSystem(unittest.TestCase):
   running.
   """
   
+  def tearDown(self):
+    # resets call mocking back to being disabled
+    stem.util.system.CALL_MOCKING = None
+  
   def test_is_available(self):
     """
     Checks the stem.util.system.is_available function.
@@ -33,26 +38,172 @@ class TestSystem(unittest.TestCase):
     
     self.assertTrue(stem.util.system.is_running("tor"))
     self.assertFalse(stem.util.system.is_running("blarg_and_stuff"))
-
-  def test_get_pid(self):
+  
+  def test_get_pid_by_name(self):
+    """
+    Checks general usage of the stem.util.system.get_pid_by_name function. This
+    will fail if there's other tor instances running.
+    """
+    
+    runner = test.runner.get_runner()
+    self.assertEquals(runner.get_pid(), stem.util.system.get_pid_by_name("tor"))
+    self.assertEquals(None, stem.util.system.get_pid_by_name("blarg_and_stuff"))
+  
+  def test_get_pid_by_name_pgrep(self):
+    """
+    Tests the get_pid_by_name function with a pgrep response.
+    """
+    
+    if not stem.util.system.is_available("pgrep"):
+      self.skipTest("(pgrep unavailable)")
+    
+    pgrep_prefix = stem.util.system.GET_PID_BY_NAME_PGREP % ""
+    self._run_pid_test(pgrep_prefix, stem.util.system.get_pid_by_name, "tor")
+  
+  def test_get_pid_by_name_pidof(self):
+    """
+    Tests the get_pid_by_name function with a pidof response.
+    """
+    
+    if not stem.util.system.is_available("pidof"):
+      self.skipTest("(pidof unavailable)")
+    
+    pidof_prefix = stem.util.system.GET_PID_BY_NAME_PIDOF % ""
+    self._run_pid_test(pidof_prefix, stem.util.system.get_pid_by_name, "tor")
+  
+  def test_get_pid_by_name_ps_linux(self):
+    """
+    Tests the get_pid_by_name function with the linux variant of ps.
     """
-    Checks the stem.util.system.get_pid function.
+    
+    if not stem.util.system.is_available("ps"):
+      self.skipTest("(ps unavailable)")
+    elif stem.util.system.is_bsd(): self.skipTest("(linux only)")
+    
+    ps_prefix = stem.util.system.GET_PID_BY_NAME_PS_LINUX % ""
+    self._run_pid_test(ps_prefix, stem.util.system.get_pid_by_name, "tor")
+  
+  def test_get_pid_by_name_ps_bsd(self):
+    """
+    Tests the get_pid_by_name function with the bsd variant of ps.
+    """
+    
+    if not stem.util.system.is_available("ps"):
+      self.skipTest("(ps unavailable)")
+    elif not stem.util.system.is_bsd(): self.skipTest("(bsd only)")
+    
+    ps_cmd = stem.util.system.GET_PID_BY_NAME_PS_BSD
+    self._run_pid_test(ps_cmd, stem.util.system.get_pid_by_name, "tor")
+  
+  def test_get_pid_by_name_lsof(self):
+    """
+    Tests the get_pid_by_name function with a lsof response.
+    """
+    
+    if not stem.util.system.is_available("lsof"):
+      self.skipTest("(lsof unavailable)")
+    
+    lsof_prefix = stem.util.system.GET_PID_BY_NAME_LSOF % ""
+    self._run_pid_test(lsof_prefix, stem.util.system.get_pid_by_name, "tor")
+  
+  def test_get_pid_by_port(self):
+    """
+    Checks general usage of the stem.util.system.get_pid_by_port function.
     """
     
     runner = test.runner.get_runner()
-    self.assertEquals(runner.get_pid(), stem.util.system.get_pid("tor", runner.get_control_port()))
-    self.assertEquals(None, stem.util.system.get_pid("blarg_and_stuff"))
+    tor_pid, tor_port = runner.get_pid(), runner.get_control_port()
+    self.assertEquals(tor_pid, stem.util.system.get_pid_by_port(tor_port))
+    self.assertEquals(None, stem.util.system.get_pid_by_port(99999))
+  
+  def test_get_pid_by_port_netstat(self):
+    """
+    Tests the get_pid_by_port function with a netstat response.
+    """
+    
+    if not stem.util.system.is_available("netstat"):
+      self.skipTest("(netstat unavailable)")
+    elif stem.util.system.is_bsd(): self.skipTest("(linux only)")
+    
+    netstat_cmd = stem.util.system.GET_PID_BY_PORT_NETSTAT
+    runner_port = test.runner.get_runner().get_control_port()
+    self._run_pid_test(netstat_cmd, stem.util.system.get_pid_by_port, runner_port)
+  
+  def test_get_pid_by_port_sockstat(self):
+    """
+    Tests the get_pid_by_port function with a sockstat response.
+    """
+    
+    if not stem.util.system.is_available("sockstat"):
+      self.skipTest("(sockstat unavailable)")
+    elif not stem.util.system.is_bsd(): self.skipTest("(bsd only)")
+    
+    sockstat_prefix = stem.util.system.GET_PID_BY_PORT_SOCKSTAT % ""
+    runner_port = test.runner.get_runner().get_control_port()
+    self._run_pid_test(sockstat_prefix, stem.util.system.get_pid_by_port, runner_port)
+  
+  def test_get_pid_by_port_lsof(self):
+    """
+    Tests the get_pid_by_port function with a lsof response.
+    """
+    
+    if not stem.util.system.is_available("lsof"):
+      self.skipTest("(lsof unavailable)")
+    
+    lsof_cmd = stem.util.system.GET_PID_BY_PORT_LSOF
+    runner_port = test.runner.get_runner().get_control_port()
+    self._run_pid_test(lsof_cmd, stem.util.system.get_pid_by_port, runner_port)
+  
+  def test_get_pid_by_open_file(self):
+    """
+    Checks the stem.util.system.get_pid_by_open_file function.
+    """
+    
+    # we're not running with a control socket so this just exercises the
+    # failure case
+    
+    self.assertEquals(None, stem.util.system.get_pid_by_open_file("/tmp"))
+    self.assertEquals(None, stem.util.system.get_pid_by_open_file("/non-existnt-path"))
   
   def test_get_cwd(self):
     """
-    Checks the stem.util.system.get_cwd function.
+    Checks general usage of the stem.util.system.get_cwd function.
     """
     
     # tor's pwd will match our process since we started it
     runner = test.runner.get_runner()
     self.assertEquals(os.getcwd(), stem.util.system.get_cwd(runner.get_pid()))
-    self.assertEquals(None, stem.util.system.get_cwd(99999, True))
-    self.assertRaises(IOError, stem.util.system.get_cwd, 99999, False)
+    self.assertEquals(None, stem.util.system.get_cwd(99999))
+  
+  def test_get_cwd_pwdx(self):
+    """
+    Tests the get_pid_by_cwd function with a pwdx response.
+    """
+    
+    if not stem.util.system.is_available("pwdx"):
+      self.skipTest("(pwdx unavailable)")
+    
+    # filter the call function to only allow this command
+    pwdx_prefix = GET_CWD_PWDX % ""
+    stem.util.system.CALL_MOCKING = lambda cmd: cmd.startswith(pwdx_prefix)
+    
+    runner_pid = test.runner.get_runner().get_pid()
+    self.assertEquals(os.getcwd(), stem.util.system.get_cwd(runner_pid))
+  
+  def test_get_cwd_pwdx(self):
+    """
+    Tests the get_pid_by_cwd function with a lsof response.
+    """
+    
+    if not stem.util.system.is_available("lsof"):
+      self.skipTest("(lsof unavailable)")
+    
+    # filter the call function to only allow this command
+    lsof_prefix = "lsof -a -p "
+    stem.util.system.CALL_MOCKING = lambda cmd: cmd.startswith(lsof_prefix)
+    
+    runner_pid = test.runner.get_runner().get_pid()
+    self.assertEquals(os.getcwd(), stem.util.system.get_cwd(runner_pid))
   
   def test_get_bsd_jail_id(self):
     """
@@ -62,4 +213,30 @@ class TestSystem(unittest.TestCase):
     """
     
     self.assertEquals(0, stem.util.system.get_bsd_jail_id(99999))
+  
+  def test_expand_path(self):
+    """
+    Exercises the stem.expand_path method with actual runtime data.
+    """
+    
+    self.assertEquals(os.getcwd(), stem.util.system.expand_path("."))
+    self.assertEquals(os.getcwd(), stem.util.system.expand_path("./"))
+    self.assertEquals(os.path.join(os.getcwd(), "foo"), stem.util.system.expand_path("./foo"))
+    
+    home_dir, username = os.getenv("HOME"), getpass.getuser()
+    self.assertEquals(home_dir, stem.util.system.expand_path("~"))
+    self.assertEquals(home_dir, stem.util.system.expand_path("~/"))
+    self.assertEquals(home_dir, stem.util.system.expand_path("~%s" % username))
+    self.assertEquals(os.path.join(home_dir, "foo"), stem.util.system.expand_path("~%s/foo" % username))
+  
+  def _run_pid_test(self, cmd_prefix, test_function, arg):
+    """
+    Runs a get_pid_by_* test with the given inputs.
+    """
+    
+    # filter the call function to only allow this command
+    stem.util.system.CALL_MOCKING = lambda cmd: cmd.startswith(cmd_prefix)
+    
+    runner_pid = test.runner.get_runner().get_pid()
+    self.assertEquals(runner_pid, test_function(arg))
 
diff --git a/test/unit/util/__init__.py b/test/unit/util/__init__.py
index 84f12bf..57015c1 100644
--- a/test/unit/util/__init__.py
+++ b/test/unit/util/__init__.py
@@ -2,5 +2,5 @@
 Unit tests for stem.util.* contents.
 """
 
-__all__ = ["enum"]
+__all__ = ["enum", "system"]
 
diff --git a/test/unit/util/system.py b/test/unit/util/system.py
new file mode 100644
index 0000000..1941c46
--- /dev/null
+++ b/test/unit/util/system.py
@@ -0,0 +1,290 @@
+"""
+Unit tests for the stem.util.system functions. This works by mocking the
+stem.util.system.call function to selectively exercise other functions. None of
+these tests actually make system calls, use proc, or otherwise deal with the
+system running the tests.
+"""
+
+import functools
+import unittest
+import stem.util.system as system
+
+# Base responses for the get_pid_by_name tests. The 'success' and
+# 'multiple_results' entries are filled in by tests.
+
+GET_PID_BY_NAME_BASE_RESULTS = {
+  "success": [],
+  "multiple_results": [],
+  "malformed_data": ["bad data"],
+  "no_results": [],
+  "command_fails": None,
+}
+
+# testing output for system calls
+
+GET_PID_BY_NAME_PS_BSD = [
+  "  PID   TT  STAT      TIME COMMAND",
+  "    1   ??  Ss     9:00.22 launchd",
+  "   10   ??  Ss     0:09.97 kextd",
+  "   11   ??  Ss     5:47.36 DirectoryService",
+  "   12   ??  Ss     3:01.44 notifyd"]
+
+GET_PID_BY_PORT_NETSTAT_RESULTS = [
+  "Active Internet connections (only servers)",
+  "Proto Recv-Q Send-Q Local Address           Foreign Address   State    PID/Program name",
+  "tcp        0      0 127.0.0.1:631           0.0.0.0:*         LISTEN   -     ",
+  "tcp        0      0 127.0.0.1:9051          0.0.0.0:*         LISTEN   1641/tor  ",
+  "tcp6       0      0 ::1:631                 :::*              LISTEN   -     ",
+  "udp        0      0 0.0.0.0:5353            0.0.0.0:*                  -     ",
+  "udp6       0      0 fe80::7ae4:ff:fe2f::123 :::*                       -     "]
+
+GET_PID_BY_PORT_SOCKSTAT_RESULTS = [
+  "_tor     tor        4397  7  tcp4   51.64.7.84:9051    *:*",
+  "_tor     tor        4397  12 tcp4   51.64.7.84:54011   80.3.121.7:9051",
+  "_tor     tor        4397  15 tcp4   51.64.7.84:59374   7.42.1.102:9051"]
+
+GET_PID_BY_PORT_LSOF_RESULTS = [
+  "COMMAND  PID   USER   FD   TYPE DEVICE SIZE/OFF NODE NAME",
+  "tor     1745 atagar    6u  IPv4  14229      0t0  TCP 127.0.0.1:9051 (LISTEN)",
+  "apache   329 atagar    6u  IPv4  14229      0t0  TCP 127.0.0.1:80 (LISTEN)"]
+
+def mock_call(base_cmd, responses, command):
+  """
+  Template mock function for testing a series of test inputs. Test functions
+  make a partial with the first two values filled out, then the system's call
+  method provides the 'command' argument.
+  
+  Arguments:
+    base_cmd (str)         - command to match against
+    responses (list, dict) - either list with the response, or mapping of
+                             base_cmd formatted string completions to responses
+    command (str)          - system call being mocked
+  """
+  
+  if isinstance(responses, list):
+    if base_cmd == command: return responses
+    else: return None
+  else:
+    for cmd_completion in responses:
+      if command == base_cmd % cmd_completion:
+        return responses[cmd_completion]
+
+class TestSystem(unittest.TestCase):
+  """
+  Tests the stem.util.system contents.
+  """
+  
+  def tearDown(self):
+    # resets call mocking back to being disabled
+    system.CALL_MOCKING = None
+  
+  def test_is_running(self):
+    """
+    Exercises multiple use cases for the is_running function.
+    """
+    
+    # mock response with a linux and bsd resolver
+    running_commands = ["irssi", "moc", "tor", "ps"]
+    
+    for ps_cmd in (system.IS_RUNNING_PS_LINUX, system.IS_RUNNING_PS_BSD):
+      system.CALL_MOCKING = functools.partial(mock_call, ps_cmd, running_commands)
+      
+      self.assertTrue(system.is_running("irssi"))
+      self.assertTrue(system.is_running("moc"))
+      self.assertTrue(system.is_running("tor"))
+      self.assertTrue(system.is_running("ps"))
+      self.assertFalse(system.is_running("something_else"))
+    
+    # mock both calls failing
+    system.CALL_MOCKING = lambda cmd: None
+    self.assertFalse(system.is_running("irssi"))
+    self.assertEquals(None, system.is_running("irssi"))
+  
+  def test_get_pid_by_name_pgrep(self):
+    """
+    Tests the get_pid_by_name function with pgrep responses.
+    """
+    
+    responses = dict(GET_PID_BY_NAME_BASE_RESULTS)
+    responses["success"] = ["1111"]
+    responses["multiple_results"] = ["123", "456", "789"]
+    system.CALL_MOCKING = functools.partial(mock_call, system.GET_PID_BY_NAME_PGREP, responses)
+    
+    for test_input in responses:
+      expected_response = 1111 if test_input == "success" else None
+      self.assertEquals(expected_response, system.get_pid_by_name(test_input))
+  
+  def test_get_pid_by_name_pidof(self):
+    """
+    Tests the get_pid_by_name function with pidof responses.
+    """
+    
+    responses = dict(GET_PID_BY_NAME_BASE_RESULTS)
+    responses["success"] = ["1111"]
+    responses["multiple_results"] = ["123 456 789"]
+    system.CALL_MOCKING = functools.partial(mock_call, system.GET_PID_BY_NAME_PIDOF, responses)
+    
+    for test_input in responses:
+      expected_response = 1111 if test_input == "success" else None
+      self.assertEquals(expected_response, system.get_pid_by_name(test_input))
+  
+  def test_get_pid_by_name_ps_linux(self):
+    """
+    Tests the get_pid_by_name function with the linux variant of ps.
+    """
+    
+    responses = dict(GET_PID_BY_NAME_BASE_RESULTS)
+    responses["success"] = ["PID", " 1111"]
+    responses["multiple_results"] = ["PID", " 123", " 456", " 789"]
+    system.CALL_MOCKING = functools.partial(mock_call, system.GET_PID_BY_NAME_PS_LINUX, responses)
+    
+    for test_input in responses:
+      expected_response = 1111 if test_input == "success" else None
+      self.assertEquals(expected_response, system.get_pid_by_name(test_input))
+  
+  def test_get_pid_by_name_ps_bsd(self):
+    """
+    Tests the get_pid_by_name function with the bsd variant of ps.
+    """
+    
+    system.CALL_MOCKING = functools.partial(mock_call, system.GET_PID_BY_NAME_PS_BSD, GET_PID_BY_NAME_PS_BSD)
+    self.assertEquals(1, system.get_pid_by_name("launchd"))
+    self.assertEquals(11, system.get_pid_by_name("DirectoryService"))
+    self.assertEquals(None, system.get_pid_by_name("blarg"))
+  
+  def test_get_pid_by_name_lsof(self):
+    """
+    Tests the get_pid_by_name function with lsof responses.
+    """
+    
+    responses = dict(GET_PID_BY_NAME_BASE_RESULTS)
+    responses["success"] = ["1111"]
+    responses["multiple_results"] = ["123", "456", "789"]
+    system.CALL_MOCKING = functools.partial(mock_call, system.GET_PID_BY_NAME_LSOF, responses)
+    
+    for test_input in responses:
+      expected_response = 1111 if test_input == "success" else None
+      self.assertEquals(expected_response, system.get_pid_by_name(test_input))
+  
+  def test_get_pid_by_port_netstat(self):
+    """
+    Tests the get_pid_by_port function with a netstat response.
+    """
+    
+    system.CALL_MOCKING = functools.partial(mock_call, system.GET_PID_BY_PORT_NETSTAT, GET_PID_BY_PORT_NETSTAT_RESULTS)
+    self.assertEquals(1641, system.get_pid_by_port(9051))
+    self.assertEquals(1641, system.get_pid_by_port("9051"))
+    self.assertEquals(None, system.get_pid_by_port(631))
+    self.assertEquals(None, system.get_pid_by_port(123))
+  
+  def test_get_pid_by_port_sockstat(self):
+    """
+    Tests the get_pid_by_port function with a sockstat response.
+    """
+    
+    system.CALL_MOCKING = functools.partial(mock_call, system.GET_PID_BY_PORT_SOCKSTAT % 9051, GET_PID_BY_PORT_SOCKSTAT_RESULTS)
+    self.assertEquals(4397, system.get_pid_by_port(9051))
+    self.assertEquals(4397, system.get_pid_by_port("9051"))
+    self.assertEquals(None, system.get_pid_by_port(123))
+  
+  def test_get_pid_by_port_lsof(self):
+    """
+    Tests the get_pid_by_port function with a lsof response.
+    """
+    
+    system.CALL_MOCKING = functools.partial(mock_call, system.GET_PID_BY_PORT_LSOF, GET_PID_BY_PORT_LSOF_RESULTS)
+    self.assertEquals(1745, system.get_pid_by_port(9051))
+    self.assertEquals(1745, system.get_pid_by_port("9051"))
+    self.assertEquals(329, system.get_pid_by_port(80))
+    self.assertEquals(None, system.get_pid_by_port(123))
+  
+  def test_get_pid_by_open_file_lsof(self):
+    """
+    Tests the get_pid_by_open_file function with a lsof response.
+    """
+    
+    lsof_query = system.GET_PID_BY_FILE_LSOF % "/tmp/foo"
+    system.CALL_MOCKING = functools.partial(mock_call, lsof_query, ["4762"])
+    self.assertEquals(4762, system.get_pid_by_open_file("/tmp/foo"))
+    self.assertEquals(None, system.get_pid_by_open_file("/tmp/somewhere_else"))
+  
+  def test_get_cwd_pwdx(self):
+    """
+    Tests the get_cwd function with a pwdx response.
+    """
+    
+    responses = {
+      "3799": ["3799: /home/atagar"],
+      "5839": ["5839: No such process"],
+      "1234": ["malformed output"],
+      "7878": None,
+    }
+    
+    system.CALL_MOCKING = functools.partial(mock_call, system.GET_CWD_PWDX, responses)
+    
+    for test_input in responses:
+      expected_response = "/home/atagar" if test_input == "3799" else None
+      self.assertEquals(expected_response, system.get_cwd(test_input))
+  
+  def test_get_cwd_lsof(self):
+    """
+    Tests the get_cwd function with a lsof response.
+    """
+    
+    responses = {
+      "75717": ["p75717", "n/Users/atagar/tor/src/or"],
+      "1234": ["malformed output"],
+      "7878": None,
+    }
+    
+    system.CALL_MOCKING = functools.partial(mock_call, system.GET_CWD_LSOF, responses)
+    
+    for test_input in responses:
+      expected_response = "/Users/atagar/tor/src/or" if test_input == "75717" else None
+      self.assertEquals(expected_response, system.get_cwd(test_input))
+  
+  def test_get_bsd_jail_id(self):
+    """
+    Tests the get_bsd_jail_id function.
+    """
+    
+    responses = {
+      "1111": ["JID", " 1"],
+      "2222": ["JID", " 0"],
+      "3333": ["JID", "bad data"],
+      "4444": ["bad data"],
+      "5555": [],
+      "6666": None
+    }
+    
+    system.CALL_MOCKING = functools.partial(mock_call, system.GET_BSD_JAIL_ID_PS, responses)
+    
+    for test_input in responses:
+      expected_response = 1 if test_input == "1111" else 0
+      self.assertEquals(expected_response, system.get_bsd_jail_id(test_input))
+  
+  def test_is_relative_path(self):
+    """
+    Tests the is_relative_path function.
+    """
+    
+    self.assertTrue(system.is_relative_path("hello/world"))
+    self.assertTrue(system.is_relative_path("~/hello/world"))
+    self.assertTrue(system.is_relative_path("~user/hello/world"))
+    self.assertFalse(system.is_relative_path("/tmp/hello/world"))
+  
+  def test_expand_path(self):
+    """
+    Tests the expand_path function. This does not exercise home directory
+    expansions since that deals with our environment (that's left to integ
+    tests).
+    """
+    
+    self.assertEquals("", system.expand_path(""))
+    self.assertEquals("/tmp", system.expand_path("/tmp"))
+    self.assertEquals("/tmp", system.expand_path("/tmp/"))
+    self.assertEquals("/tmp", system.expand_path(".", "/tmp"))
+    self.assertEquals("/tmp", system.expand_path("./", "/tmp"))
+    self.assertEquals("/tmp/foo", system.expand_path("foo", "/tmp"))
+    self.assertEquals("/tmp/foo", system.expand_path("./foo", "/tmp"))
+

_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits