[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [ooni-probe/master] Completely rewrite the txscapy.
commit 12726ca463d9e68e93d49fcb418421648d054744
Author: Arturo Filastò <art@xxxxxxxxx>
Date: Fri Nov 9 22:19:44 2012 +0100
Completely rewrite the txscapy.
* It is now much cleaner and does not start the packet capture in a separate thread.
It subclasses the twisted filedescriptor protocol and returns a deferred that will
fire the callback with the packets it received and the ones it considers answers
to the request.
---
nettests/bridge_reachability/echo.py | 2 +-
nettests/core/chinatrigger.py | 9 ++-
ooni/templates/scapyt.py | 105 +++------------------------
ooni/utils/txscapy.py | 133 ++++++++++++++++++++++++++++++----
4 files changed, 137 insertions(+), 112 deletions(-)
diff --git a/nettests/bridge_reachability/echo.py b/nettests/bridge_reachability/echo.py
index 542e017..5060ffd 100644
--- a/nettests/bridge_reachability/echo.py
+++ b/nettests/bridge_reachability/echo.py
@@ -164,5 +164,5 @@ class EchoTest(BaseScapyTest):
raise IfaceError("Could not find a working network interface.")
def test_icmp(self):
- self.sr(IP(dst=self.input)/ICMP())
+ return self.sr(IP(dst=self.input)/ICMP())
diff --git a/nettests/core/chinatrigger.py b/nettests/core/chinatrigger.py
index 53fadb9..de1f64d 100644
--- a/nettests/core/chinatrigger.py
+++ b/nettests/core/chinatrigger.py
@@ -23,6 +23,7 @@ class ChinaTriggerTest(BaseScapyTest):
name = "chinatrigger"
usageOptions = UsageOptions
requiredOptions = ['dst', 'port']
+ timeout = 2
def setUp(self):
self.dst = self.localOptions['dst']
@@ -47,7 +48,7 @@ class ChinaTriggerTest(BaseScapyTest):
def set_random_field(pkt):
ret = pkt[:15]
for i in range(28):
- ret += chr(random.randint(0, 256))
+ ret += chr(random.randint(0, 255))
ret += pkt[15+28:]
return ret
@@ -57,9 +58,9 @@ class ChinaTriggerTest(BaseScapyTest):
Slightly changed mutate function.
"""
ret = pkt[:idx-1]
- mutation = chr(random.randint(0, 256))
+ mutation = chr(random.randint(0, 255))
while mutation == pkt[idx]:
- mutation = chr(random.randint(0, 256))
+ mutation = chr(random.randint(0, 255))
ret += mutation
ret += pkt[idx:]
return ret
@@ -103,5 +104,5 @@ class ChinaTriggerTest(BaseScapyTest):
for x in range(len(pkt)):
mutation = IP(dst=self.dst)/TCP(dport=self.port)/ChinaTriggerTest.mutate(pkt, x)
pkts.append(mutation)
- self.send(pkts)
+ return self.sr(pkts, timeout=2)
diff --git a/ooni/templates/scapyt.py b/ooni/templates/scapyt.py
index a1d2969..4c18f0a 100644
--- a/ooni/templates/scapyt.py
+++ b/ooni/templates/scapyt.py
@@ -14,7 +14,7 @@ from scapy.all import send, sr, IP, TCP
from ooni.nettest import NetTestCase
from ooni.utils import log
-from ooni.lib.txscapy import TXScapy
+from ooni.utils.txscapy import ScapyProtocol
def createPacketReport(packet_list):
"""
@@ -42,106 +42,25 @@ class BaseScapyTest(NetTestCase):
requiresRoot = True
- sentPackets = []
- answeredPackets = []
-
- def sr(self, pkts, *arg, **kw):
+ def sr(self, packets, *arg, **kw):
"""
Wrapper around scapy.sendrecv.sr for sending and receiving of packets
at layer 3.
"""
- answered_packets, unanswered = sr(pkts, *arg, **kw)
- self.report['answered_packets'] = createPacketReport(answered_packets)
- self.report['sent_packets'] = createPacketReport(pkts)
- return (answered_packets, sent_packets)
+ def finished(result):
+ answered, unanswered = result
+ sent_packets, received_packets = answered
+ self.report['answered_packets'] = createPacketReport(received_packets)
+ self.report['sent_packets'] = createPacketReport(sent_packets)
+
+ scapyProtocol = ScapyProtocol(*arg, **kw)
+ d = scapyProtocol.startSending(packets)
+ return d
def send(self, pkts, *arg, **kw):
"""
Wrapper around scapy.sendrecv.send for sending of packets at layer 3
"""
- sent_packets = send(pkts, *arg, **kw)
- self.report['sent_packets'] = createPacketReport(pkts)
- return sent_packets
-
-class TXScapyTest(BaseScapyTest):
- """
- A utility class for writing scapy driven OONI tests.
-
- * pcapfile: specify where to store the logged pcapfile
-
- * timeout: timeout in ms of when we should stop waiting to receive packets
-
- * receive: if we should also receive packets and not just send
-
- XXX This is currently not working
- """
- name = "TX Scapy Test"
- version = 0.1
-
- receive = True
- timeout = 1
- pcapfile = 'packet_capture.pcap'
- packet = IP()/TCP()
- reactor = None
-
- answered = None
- unanswered = None
-
- def processInputs(self):
- """
- Place here the logic for validating and processing of inputs and
- command line arguments.
- """
- pass
-
- def tearDown(self):
- log.debug("Tearing down reactor")
-
- def finished(self, *arg):
- log.debug("Calling final close")
-
- self.questions = self.txscapy.questions
- self.answers = self.txscapy.answers
+ raise Exception("Not implemented")
- log.debug("These are the questions: %s" % self.questions)
- log.debug("These are the answers: %s" % self.answers)
-
- self.txscapy.finalClose()
-
- def sendReceivePackets(self):
- packets = self.buildPackets()
-
- log.debug("Sending and receiving %s" % packets)
-
- self.txscapy = TXScapy(packets, pcapfile=self.pcapfile,
- timeout=self.timeout, reactor=self.reactor)
-
- self.txscapy.sr(packets, pcapfile=self.pcapfile,
- timeout=self.timeout, reactor=self.reactor)
-
- d = self.txscapy.deferred
- d.addCallback(self.finished)
-
- return d
-
- def sendPackets(self):
- log.debug("Sending and receiving of packets %s" % packets)
-
- packets = self.buildPackets()
-
- self.txscapy = TXScapy(packets, pcapfile=self.pcapfile,
- timeout=self.timeout, reactor=self.reactor)
-
- self.txscapy.send(packets, reactor=self.reactor).deferred
-
- d = self.txscapy.deferred
- d.addCallback(self.finished)
-
- return d
-
- def buildPackets(self):
- """
- Override this method to build scapy packets.
- """
- pass
diff --git a/ooni/utils/txscapy.py b/ooni/utils/txscapy.py
index a3a5610..2559d19 100644
--- a/ooni/utils/txscapy.py
+++ b/ooni/utils/txscapy.py
@@ -12,29 +12,134 @@ import os
import sys
import time
-from twisted.internet import protocol, base, fdesc, error, defer
-from twisted.internet import reactor, threads
+from twisted.internet import protocol, base, fdesc
+from twisted.internet import reactor, threads, error
+from twisted.internet import defer, abstract
from zope.interface import implements
-from scapy.all import Gen
-from scapy.all import SetGen
-
-from ooni.utils import log
from scapy.all import PcapWriter, MTU
from scapy.all import BasePacketList, conf, PcapReader
+from scapy.all import conf, Gen, SetGen
+
+from ooni.utils import log
+
class TXPcapWriter(PcapWriter):
def __init__(self, *arg, **kw):
PcapWriter.__init__(self, *arg, **kw)
fdesc.setNonBlocking(self.f)
-def txSniff(count=0, store=1, offline=None,
- prn = None, lfilter=None,
- L2socket=None, timeout=None,
- opened_socket=None, stop_filter=None,
- *arg, **karg):
- """
- XXX we probably want to rewrite the scapy sniff function to better suite our needs.
- """
+class ScapyProtocol(abstract.FileDescriptor):
+ def __init__(self, super_socket=None,
+ reactor=None, timeout=None, receive=True):
+ abstract.FileDescriptor.__init__(self, reactor)
+ # By default we use the conf.L3socket
+ if not super_socket:
+ super_socket = conf.L3socket()
+ self.super_socket = super_socket
+
+ self.timeout = timeout
+
+ # This dict is used to store the unique hashes that allow scapy to
+ # match up request with answer
+ self.hr_sent_packets = {}
+
+ # These are the packets we have received as answer to the ones we sent
+ self.answered_packets = []
+
+ # These are the packets we send
+ self.sent_packets = []
+
+ # This deferred will fire when we have finished sending a receiving packets.
+ self.d = defer.Deferred()
+ self.debug = False
+ self.multi = False
+ # XXX this needs to be implemented. It would involve keeping track of
+ # the state of the sending via the super socket file descriptor and
+ # firing the callback when we have concluded sending. Check out
+ # twisted.internet.udp to see how this is done.
+ self.receive = receive
+
+ def fileno(self):
+ return self.super_socket.ins.fileno()
+
+ def processPacket(self, packet):
+ """
+ Hook useful for processing packets as they come in.
+ """
+
+ def processAnswer(self, packet, answer_hr):
+ log.debug("Got an answer processing it")
+ for i in range(len(answer_hr)):
+ if packet.answers(answer_hr[i]):
+ self.answered_packets.append((answer_hr[i], packet))
+ if self.debug:
+ print packet.src, packet.ttl
+ #answer.show()
+
+ if not self.multi:
+ del(answer_hr[i])
+ break
+ if len(self.answered_packets) == len(self.sent_packets):
+ # All of our questions have been answered.
+ self.stopSending()
+
+ def doRead(self):
+ timeout = time.time() - self._start_time
+ log.debug("Checking for timeout %s > %s" % (timeout, self.timeout))
+ if self.timeout and time.time() - self._start_time > self.timeout:
+ self.stopSending()
+ packet = self.super_socket.recv()
+ if packet:
+ self.processPacket(packet)
+ # A string that has the same value for the request than for the
+ # response.
+ hr = packet.hashret()
+ if hr in self.hr_sent_packets:
+ answer_hr = self.hr_sent_packets[hr]
+ self.processAnswer(packet, answer_hr)
+
+ def stopSending(self):
+ self.stopReading()
+ self.super_socket.close()
+ if hasattr(self, "d"):
+ result = (self.answered_packets, self.sent_packets)
+ self.d.callback(result)
+ del self.d
+
+ def write(self, packet):
+ """
+ Write a scapy packet to the wire
+ """
+ hashret = packet.hashret()
+ if hashret in self.hr_sent_packets:
+ self.hr_sent_packets[hashret].append(packet)
+ else:
+ self.hr_sent_packets[hashret] = [packet]
+ self.sent_packets.append(packet)
+ return self.super_socket.send(packet)
+
+ def sendPackets(self, packets):
+ if not isinstance(packets, Gen):
+ packets = SetGen(packets)
+ for packet in packets:
+ self.write(packet)
+
+ def startSending(self, packets):
+ self._start_time = time.time()
+ self.startReading()
+ self.sendPackets(packets)
+ return self.d
+
+def sr(x, filter=None, iface=None, nofilter=0, timeout=None):
+ super_socket = conf.L3socket(filter=filter, iface=iface, nofilter=nofilter)
+ sp = ScapyProtocol(super_socket=super_socket, timeout=timeout)
+ return sp.startSending(x)
+
+def send(x, filter=None, iface=None, nofilter=0, timeout=None):
+ super_socket = conf.L3socket(filter=filter, iface=iface, nofilter=nofilter)
+ sp = ScapyProtocol(super_socket=super_socket, timeout=timeout)
+ return sp.startSending(x)
+
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits