[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [ooni-probe/master] Implement logging to PCAP file support
commit 6a3ee55b574adaa8740ccafe2e4a01719dc0e86e
Author: Arturo Filastò <art@xxxxxxxxx>
Date: Fri Nov 9 13:20:38 2012 +0100
Implement logging to PCAP file support
---
docs/source/index.rst | 2 +-
ooni/config.py | 17 ++-
ooni/lib/txscapy.py | 381 ------------------------------------------------
ooni/nettest.py | 2 +-
ooni/oonicli.py | 43 +++++-
ooni/runner.py | 24 +--
ooni/utils/__init__.py | 4 +
ooni/utils/net.py | 38 ++++--
ooniprobe.conf | 5 +-
9 files changed, 94 insertions(+), 422 deletions(-)
diff --git a/docs/source/index.rst b/docs/source/index.rst
index 2497a09..132381f 100644
--- a/docs/source/index.rst
+++ b/docs/source/index.rst
@@ -28,7 +28,7 @@ On debian based systems these can be installed with:
The python dependencies required for running ooniprobe are:
* Twisted
- * Scapy
+ * Scapy >= 2.2.0
* txtorcon
They can be installed from the requirements.txt with:
diff --git a/ooni/config.py b/ooni/config.py
index de5f45b..f3d1a80 100644
--- a/ooni/config.py
+++ b/ooni/config.py
@@ -6,8 +6,9 @@
import os
import yaml
-from twisted.internet import reactor
+from twisted.internet import reactor, threads
+from ooni.utils import date
from ooni.utils import Storage
def get_root_path():
@@ -16,6 +17,18 @@ def get_root_path():
root = os.path.abspath(root)
return root
+def oreport_filenames():
+ """
+ returns the filenames for the pcap file and the yamloo report
+
+ returns
+ yamloo_filename, pcap_filename
+ """
+ base_filename = "%s_"+date.timestamp()+".%s"
+ yamloo_filename = base_filename % ("report", "yamloo")
+ pcap_filename = base_filename % ("packets", "pcap")
+ return yamloo_filename, pcap_filename
+
config_file = os.path.join(get_root_path(), 'ooniprobe.conf')
try:
f = open(config_file)
@@ -41,7 +54,5 @@ advanced = Storage()
for k, v in configuration['advanced'].items():
advanced[k] = v
-threadpool = ThreadPool(0, advanced.threadpool_size)
-threadpool.start()
# This is used to keep track of the state of the sniffer
sniffer_running = None
diff --git a/ooni/lib/txscapy.py b/ooni/lib/txscapy.py
deleted file mode 100644
index 00224d6..0000000
--- a/ooni/lib/txscapy.py
+++ /dev/null
@@ -1,381 +0,0 @@
-# -*- coding:utf8 -*-
-"""
-txscapy
-*******
-(c) 2012 Arturo Filastò
-a twisted wrapper for scapys send and receive functions.
-
-This software has been written to be part of OONI, the Open Observatory of
-Network Interference. More information on that here: http://ooni.nu/
-
-"""
-
-import struct
-import socket
-import os
-import sys
-import time
-
-from twisted.internet import protocol, base, fdesc, error, defer
-from twisted.internet import reactor, threads
-from zope.interface import implements
-
-from scapy.all import Gen
-from scapy.all import SetGen
-
-from ooni.utils import log
-
-LINUX=sys.platform.startswith("linux")
-OPENBSD=sys.platform.startswith("openbsd")
-FREEBSD=sys.platform.startswith("freebsd")
-NETBSD=sys.platform.startswith("netbsd")
-DARWIN=sys.platform.startswith("darwin")
-SOLARIS=sys.platform.startswith("sunos")
-WINDOWS=sys.platform.startswith("win32")
-
-from scapy.all import RawPcapWriter, MTU, BasePacketList, conf
-class PcapWriter(RawPcapWriter):
- def __init__(self, filename, linktype=None, gz=False, endianness="",
- append=False, sync=False):
- RawPcapWriter.__init__(self, filename, linktype=linktype, gz=gz,
- endianness=endianness, append=append, sync=sync)
- fdesc.setNonBlocking(self.f)
-
- def _write_header(self, pkt):
- if self.linktype == None:
- if type(pkt) is list or type(pkt) is tuple or isinstance(pkt, BasePacketList):
- pkt = pkt[0]
- try:
- self.linktype = conf.l2types[pkt.__class__]
- except KeyError:
- self.linktype = 1
- RawPcapWriter._write_header(self, pkt)
-
- def _write_packet(self, packet):
- sec = int(packet.time)
- usec = int(round((packet.time-sec)*1000000))
- s = str(packet)
- caplen = len(s)
- RawPcapWriter._write_packet(self, s, sec, usec, caplen, caplen)
-
-class ScapySocket(object):
- MTU = 1500
- def __init__(self, filter=None, iface=None, nofilter=None):
- from scapy.all import conf
- self.ssocket = conf.L3socket(filter=filter, iface=iface, nofilter=nofilter)
-
- def fileno(self):
- return self.ssocket.ins.fileno()
-
- def send(self, data):
- return self.ssocket.send(data)
-
- def recv(self):
- if FREEBSD or DARWIN:
- return self.ssocket.nonblock_recv()
- else:
- return self.ssocket.recv(self.MTU)
-
-class TXScapy(object):
- """
- A twisted based wrapper for scapy send and receive functionality.
-
- It sends packets inside of a threadpool and receives packets using the
- libdnet receive non blocking file descriptor.
- """
- min = 2
- max = 6
- debug = False
- write_only_answers = False
- pcapwriter = None
- recv = False
- timeout_call = None
- answers = []
- questions = []
-
- def __init__(self, pkts=None, maxPacketSize=8192, reactor=None, filter=None,
- iface=None, nofilter=None, pcapfile=None, timeout=None, *arg, **kw):
- self.maxPacketSize = maxPacketSize
- if not reactor:
- from twisted.internet import reactor
-
- self._reactor = reactor
-
- if pkts:
- self._buildPacketQueues(pkts)
- try:
- self._buildSocket()
- except Exception, e:
- log.err("Unable to build socket. Are you root?")
- sys.exit()
-
- self.cthreads = 0
- self.mthreads = 80
-
- self.running = False
- self.done = False
- self.finished = False
-
- import thread
- from twisted.python import threadpool
- self.threadID = thread.get_ident
- self.threadpool = threadpool.ThreadPool(self.min, self.max)
- self.startID = self._reactor.callWhenRunning(self._start)
-
- self.deferred = defer.Deferred()
-
- if pcapfile:
- self.pcapwriter = PcapWriter(pcapfile)
-
- if timeout and self.recv:
- pass
-
- def _buildSocket(self, filter=None, iface=None, nofilter=None):
- self.socket = ScapySocket(filter, iface, nofilter)
- if self.recv:
- self._reactor.addReader(self)
-
- def _buildPacketQueues(self, pkts):
- """
- Converts the list of packets to a Scapy generator and sets up all the
- necessary attributes for understanding if all the needed responses have
- been received.
- """
- if not isinstance(pkts, Gen):
- self.pkts = SetGen(pkts)
-
- self.outqueue = [p for p in pkts]
-
- self.total_count = len(self.outqueue)
- self.answer_count = 0
- self.out_count = 0
-
- self.hsent = {}
- for p in self.outqueue:
- h = p.hashret()
- if h in self.hsent:
- self.hsent[h].append(p)
- else:
- self.hsent[h] = [p]
-
-
- def gotAnswer(self, answer, question):
- """
- Got a packet that has been identified as an answer to one of the sent
- out packets.
-
- If the answer count matches the sent count the finish callback is
- fired.
-
- @param answer: the packet received on the wire.
-
- @param question: the sent packet that matches that response.
-
- """
- if self.pcapwriter and self.write_only_answers:
- self.pcapwriter.write(question)
- self.pcapwriter.write(answer)
- self.answers.append(answers)
- self.questions.append(question)
- self.answer_count += 1
- if self.answer_count >= self.total_count and self.running:
- log.debug("Got all the answers I need")
- self.finalClose()
- self.deferred.callback(None)
-
- def processAnswer(self, pkt, hlst):
- """
- Checks if the potential answer is in fact an answer to one of the
- matched sent packets. Uses the scapy .answers() function to verify
- this.
-
- @param pkt: The packet to be tested if is the answer to a sent packet.
-
- @param hlst: a list of packets that match the hash for an answer to
- pkt.
- """
- for i in range(len(hlst)):
- if pkt.answers(hlst[i]):
- self.gotAnswer(pkt, hlst[i])
-
- def fileno(self):
- """
- Returns a fileno for use by twisteds Reader.
- """
- return self.socket.fileno()
-
- def processPacket(self, pkt):
- """
- Override this method to process your packets.
-
- @param pkt: the packet that has been received.
- """
- pkt.show()
-
- def doRead(self):
- """
- There is something to be read on the wire. Do all the processing on the
- received packet.
- """
- pkt = self.socket.recv()
- if not pkt:
- return
- if self.pcapwriter and not self.write_only_answers:
- self.pcapwriter.write(pkt)
-
- self.processPacket(pkt)
-
- h = pkt.hashret()
- if h in self.hsent:
- hlst = self.hsent[h]
- self.processAnswer(pkt, hlst)
-
- def logPrefix(self):
- """
- The prefix to be prepended in logging.
- """
- return "txScapy"
-
- def _start(self):
- """
- Start the twisted thread pool.
- """
- self.startID = None
- return self.start()
-
- def start(self):
- """
- Actually start the thread pool.
- """
- if not self.running:
- self.threadpool.start()
- self.shutdownID = self._reactor.addSystemEventTrigger(
- 'during', 'shutdown', self.finalClose)
- self.running = True
-
- def sendPkt(self, pkt):
- """
- Send a packet to the wire.
-
- @param pkt: The packet to be sent.
- """
- self.socket.send(pkt)
-
- def timeout(self, *arg, **kw):
- if not self.done:
- log.debug("I have not finished. Setting to call in %s" %
- self.timeoutSeconds)
- self.timeout_call = self._reactor.callLater(self.timeoutSeconds, self.timeout, None)
- elif self.running:
- log.debug("Cancelling timeout call")
- self.finalClose()
- self.deferred.callback(None)
-
- def sr(self, pkts, filter=None, iface=None, nofilter=0, timeout=None, *args, **kw):
- """
- Wraps the scapy sr function.
-
- @param nofilter: put 1 to avoid use of bpf filters
-
- @param retry: if positive, how many times to resend unanswered packets
- if negative, how many times to retry when no more packets are
- answered (XXX to be implemented)
-
- @param timeout: how much time to wait after the last packet has
- been sent (XXX to be implemented)
-
- @param multi: whether to accept multiple answers for the same
- stimulus (XXX to be implemented)
-
- @param filter: provide a BPF filter
- @param iface: listen answers only on the given interface
- """
- log.debug("TXScapy sending and receiving packets")
- self.recv = True
- if timeout:
- self.timeoutSeconds = timeout
- self.timeout_call = self._reactor.callLater(timeout, self.timeout, None)
- self._sendrcv(pkts, filter=filter, iface=iface, nofilter=nofilter)
-
- def send(self, pkts, filter=None, iface=None, nofilter=0, *args, **kw):
- """
- Wraps the scapy send function. Its the same as send and receive, except
- it does not receive. Who would have ever guessed? ;)
-
- @param nofilter: put 1 to avoid use of bpf filters
-
- @param retry: if positive, how many times to resend unanswered packets
- if negative, how many times to retry when no more packets are
- answered (XXX to be implemented)
-
- @param timeout: how much time to wait after the last packet has
- been sent (XXX to be implemented)
-
- @param multi: whether to accept multiple answers for the same
- stimulus (XXX to be implemented)
-
- @param filter: provide a BPF filter
- @param iface: listen answers only on the given interface
- """
- self.recv = False
- self._sendrcv(pkts, filter=filter, iface=iface, nofilter=nofilter)
-
- def _sendrcv(self, pkts, filter=None, iface=None, nofilter=0):
- self._buildSocket(filter, iface, nofilter)
- self._buildPacketQueues(pkts)
- def sent(cb):
- if self.cthreads < self.mthreads and not self.done:
- pkt = None
- try:
- pkt = self.outqueue.pop()
- except:
- self.done = True
- if not self.recv and self.running:
- log.debug("I am not in receiving state running callback")
- self.deferred.callback(None)
- return
- d = threads.deferToThreadPool(reactor, self.threadpool,
- self.sendPkt, pkt)
- d.addCallback(sent)
- return d
-
- for x in range(self.mthreads):
- try:
- pkt = self.outqueue.pop()
- except:
- self.done = True
- return
- if self.cthreads >= self.mthreads and self.done:
- return
- d = threads.deferToThreadPool(reactor, self.threadpool,
- self.sendPkt, pkt)
- d.addCallback(sent)
- return d
-
- def connectionLost(self, why):
- pass
-
- def finalClose(self):
- """
- Clean all the shutdown related functions.
- """
- self.shutdownID = None
- self.threadpool.stop()
- if self.timeout_call:
- self.timeout_call.cancel()
- self.timeout_call = None
- self.running = False
-
-@xxxxxxxxxxxxxxxxxxxxx
-def txsr(*args, **kw):
- tr = TXScapy(*args, **kw)
- tr.sr(*args, **kw)
- yield tr.deferred
- tr.finalClose()
-
-@xxxxxxxxxxxxxxxxxxxxx
-def txsend(*arg, **kw):
- tr = TXScapy(*arg, **kw)
- tr.send(*arg, **kw)
- yield tr.deferred
- tr.finalClose()
diff --git a/ooni/nettest.py b/ooni/nettest.py
index 289cd23..6221a3f 100644
--- a/ooni/nettest.py
+++ b/ooni/nettest.py
@@ -136,7 +136,7 @@ class NetTestCase(object):
for x in fp.xreadlines():
yield x.strip()
fp.close()
-
+
def _checkRequiredOptions(self):
for required_option in self.requiredOptions:
log.debug("Checking if %s is present" % required_option)
diff --git a/ooni/oonicli.py b/ooni/oonicli.py
index 2a6f0cd..4b63a61 100644
--- a/ooni/oonicli.py
+++ b/ooni/oonicli.py
@@ -27,7 +27,7 @@ from ooni import nettest, runner, reporter, config
from ooni.inputunit import InputUnitFactory
-from ooni.utils import net
+from ooni.utils import net, checkForRoot
from ooni.utils import log
@@ -44,7 +44,8 @@ class Options(usage.Options, app.ReactorSelectionMixin):
'Report deferred creation and callback stack traces'],]
optParameters = [["reportfile", "o", None, "report file name"],
- ["logfile", "l", None, "log file name"],]
+ ["logfile", "l", None, "log file name"],
+ ["pcapfile", "p", None, "pcap file name"]]
compData = usage.Completions(
extraActions=[usage.CompleteFiles(
@@ -75,6 +76,12 @@ class Options(usage.Options, app.ReactorSelectionMixin):
except:
raise usage.UsageError("No test filename specified!")
+def testsEnded(*arg, **kw):
+ """
+ You can place here all the post shutdown tasks.
+ """
+ log.debug("Finished running all tests")
+
def run():
"""
Call me to begin testing from a file.
@@ -90,11 +97,37 @@ def run():
if cmd_line_options['debug-stacktraces']:
defer.setDebugging(True)
+ yamloo_filename, pcap_filename = config.oreport_filenames()
+
+ if cmd_line_options['reportfile']:
+ yamloo_filename = cmd_line_options['reportfile']
+ pcap_filename = yamloo_filename+".pcap"
+
+ if os.path.exists(yamloo_filename):
+ log.msg("Report already exists with filename %s" % yamloo_filename)
+ log.msg("Renaming it to %s" % yamloo_filename+'.old')
+ os.rename(yamloo_filename, yamloo_filename+'.old')
+ if os.path.exists(pcap_filename):
+ log.msg("Report already exists with filename %s" % pcap_filename)
+ log.msg("Renaming it to %s" % pcap_filename+'.old')
+ os.rename(pcap_filename, pcap_filename+'.old')
+
log.start(cmd_line_options['logfile'])
classes = runner.findTestClassesFromConfig(cmd_line_options)
test_cases, options = runner.loadTestsAndOptions(classes, cmd_line_options)
- d =
- runner.runTestCases(test_cases, options, cmd_line_options)
- reactor.run()
+ if config.privacy.includepcap:
+ try:
+ checkForRoot()
+ except:
+ log.err("includepcap options requires root priviledges to run")
+ log.err("disable it in your ooniprobe.conf file")
+ sys.exit(1)
+ log.debug("Starting sniffer")
+ sniffer_d = net.capturePackets(pcap_filename)
+ tests_d = runner.runTestCases(test_cases, options,
+ cmd_line_options, yamloo_filename)
+ tests_d.addBoth(testsEnded)
+
+ reactor.run()
diff --git a/ooni/runner.py b/ooni/runner.py
index 083de35..d8b7df8 100644
--- a/ooni/runner.py
+++ b/ooni/runner.py
@@ -24,7 +24,8 @@ from ooni.inputunit import InputUnitFactory
from ooni.nettest import NetTestCase
from ooni import reporter
-from ooni.utils import log, date
+
+from ooni.utils import log, date, checkForRoot
def processTest(obj, cmd_line_options):
"""
@@ -41,8 +42,7 @@ def processTest(obj, cmd_line_options):
input_file = obj.inputFile
if obj.requiresRoot:
- if os.getuid() != 0:
- raise Exception("This test requires root to run")
+ checkForRoot("test")
if obj.optParameters or input_file \
or obj.usageOptions or obj.optFlags:
@@ -184,7 +184,8 @@ def runTestWithInputUnit(test_class,
return defer.DeferredList(dl)
@defer.inlineCallbacks
-def runTestCases(test_cases, options, cmd_line_options):
+def runTestCases(test_cases, options,
+ cmd_line_options, yamloo_filename):
try:
assert len(options) != 0, "Length of options is zero!"
except AssertionError, ae:
@@ -203,17 +204,7 @@ def runTestCases(test_cases, options, cmd_line_options):
log.msg("options[0] = %s" % first)
test_inputs = [None]
- if cmd_line_options['reportfile']:
- report_filename = cmd_line_options['reportfile']
- else:
- report_filename = 'report_'+date.timestamp()+'.yamloo'
-
- if os.path.exists(report_filename):
- print "Report already exists with filename %s" % report_filename
- print "Renaming it to %s" % report_filename+'.old'
- os.rename(report_filename, report_filename+'.old')
-
- reportFile = open(report_filename, 'w+')
+ reportFile = open(yamloo_filename, 'w+')
oreporter = reporter.OReporter(reportFile)
input_unit_factory = InputUnitFactory(test_inputs)
@@ -226,6 +217,7 @@ def runTestCases(test_cases, options, cmd_line_options):
test_class = test_case[0]
test_method = test_case[1]
yield runTestWithInputUnit(test_class,
- test_method, input_unit, oreporter)
+ test_method, input_unit,
+ oreporter)
oreporter.allDone()
diff --git a/ooni/utils/__init__.py b/ooni/utils/__init__.py
index cd82ab4..9961e03 100644
--- a/ooni/utils/__init__.py
+++ b/ooni/utils/__init__.py
@@ -3,6 +3,7 @@
"""
import imp
+import os
import logging
import string
import random
@@ -55,6 +56,9 @@ class Storage(dict):
for (k, v) in value.items():
self[k] = v
+def checkForRoot(what):
+ if os.getuid() != 0:
+ raise Exception("This %s requires root to run" % what)
def get_logger(config):
loglevel = getattr(logging, config.loglevel.upper())
diff --git a/ooni/utils/net.py b/ooni/utils/net.py
index a5a512d..3fd4b41 100644
--- a/ooni/utils/net.py
+++ b/ooni/utils/net.py
@@ -4,25 +4,41 @@
# --------
# OONI utilities for networking related operations
+import sys
+from twisted.internet import threads, reactor
+
from scapy.all import utils
-from twisted.internet import defer
-from ooni.utils import log
-from ooni.config import threadpool
+
+from ooni.utils import log, txscapy
def getClientAddress():
address = {'asn': 'REPLACE_ME',
'ip': 'REPLACE_ME'}
return address
-def writePacketToPcap(pkt):
- from scapy.all import utils
- log.debug("Writing to pcap file %s" % pkt)
- utils.wrpcap('/tmp/foo.pcap', pkt)
-
-def capturePackets():
+def capturePackets(pcap_filename):
from scapy.all import sniff
- return defer.deferToThread(sniff, writePacketToPcap,
- lfilter=writePacketToPcap)
+ global stop_packet_capture
+ stop_packet_capture = False
+
+ def stopCapture():
+ # XXX this is a bit of a hack to stop capturing packets when we close
+ # the reactor. Ideally we would want to be able to do this
+ # programmatically, but this requires some work on implementing
+ # properly the sniff function with deferreds.
+ global stop_packet_capture
+ stop_packet_capture = True
+
+ def writePacketToPcap(pkt):
+ from scapy.all import utils
+ pcapwriter = txscapy.TXPcapWriter(pcap_filename, append=True)
+ pcapwriter.write(pkt)
+ if stop_packet_capture:
+ sys.exit(1)
+
+ d = threads.deferToThread(sniff, lfilter=writePacketToPcap)
+ reactor.addSystemEventTrigger('before', 'shutdown', stopCapture)
+ return d
class PermissionsError(SystemExit):
def __init__(self, *args, **kwargs):
diff --git a/ooniprobe.conf b/ooniprobe.conf
index b7ea1f3..1e76ad7 100644
--- a/ooniprobe.conf
+++ b/ooniprobe.conf
@@ -15,15 +15,12 @@ privacy:
# Should we include the ASN of the probe in the report?
includecity: false
# Should we collect a full packet capture on the client?
- includepcap: true
+ includepcap: false
advanced:
# XXX change this to point to the directory where you have stored the GeoIP
# database file. This should be the directory in which OONI is installed
# /path/to/ooni-probe/data/
geoip_data_dir: /home/x/code/networking/ooni-probe/data/
debug: true
-<<<<<<< HEAD
threadpool_size: 10
-=======
->>>>>>> master
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits