[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [ooni-probe/master] Port china trigger to new API
commit 9bf7fc8d86901bca4061dd82f6ca6af311cabee4
Author: Arturo Filastò <art@xxxxxxxxx>
Date: Thu Nov 8 18:13:12 2012 +0100
Port china trigger to new API
---
nettests/core/chinatrigger.py | 107 ++++++++++++++++++++++++++
ooni/templates/scapyt.py | 12 ++--
to-be-ported/old-api/chinatrigger.py | 140 ----------------------------------
3 files changed, 113 insertions(+), 146 deletions(-)
diff --git a/nettests/core/chinatrigger.py b/nettests/core/chinatrigger.py
new file mode 100644
index 0000000..53fadb9
--- /dev/null
+++ b/nettests/core/chinatrigger.py
@@ -0,0 +1,107 @@
+import random
+import string
+import struct
+import time
+
+from twisted.python import usage
+from ooni.templates.scapyt import BaseScapyTest
+
+class UsageOptions(usage.Options):
+ optParameters = [['dst', 'd', None, 'Specify the target address'],
+ ['port', 'p', None, 'Specify the target port']
+ ]
+
+class ChinaTriggerTest(BaseScapyTest):
+ """
+ This test is a OONI based implementation of the C tool written
+ by Philipp Winter to engage chinese probes in active scanning.
+
+ Example of running it:
+ ./bin/ooniprobe chinatrigger -d 127.0.0.1 -p 8080
+ """
+
+ name = "chinatrigger"
+ usageOptions = UsageOptions
+ requiredOptions = ['dst', 'port']
+
+ def setUp(self):
+ self.dst = self.localOptions['dst']
+ self.port = int(self.localOptions['port'])
+
+ @staticmethod
+ def set_random_servername(pkt):
+ ret = pkt[:121]
+ for i in range(16):
+ ret += random.choice(string.ascii_lowercase)
+ ret += pkt[121+16:]
+ return ret
+
+ @staticmethod
+ def set_random_time(pkt):
+ ret = pkt[:11]
+ ret += struct.pack('!I', int(time.time()))
+ ret += pkt[11+4:]
+ return ret
+
+ @staticmethod
+ def set_random_field(pkt):
+ ret = pkt[:15]
+ for i in range(28):
+ ret += chr(random.randint(0, 256))
+ ret += pkt[15+28:]
+ return ret
+
+ @staticmethod
+ def mutate(pkt, idx):
+ """
+ Slightly changed mutate function.
+ """
+ ret = pkt[:idx-1]
+ mutation = chr(random.randint(0, 256))
+ while mutation == pkt[idx]:
+ mutation = chr(random.randint(0, 256))
+ ret += mutation
+ ret += pkt[idx:]
+ return ret
+
+ @staticmethod
+ def set_all_random_fields(pkt):
+ pkt = ChinaTriggerTest.set_random_servername(pkt)
+ pkt = ChinaTriggerTest.set_random_time(pkt)
+ pkt = ChinaTriggerTest.set_random_field(pkt)
+ return pkt
+
+ def test_send_mutations(self):
+ from scapy.all import IP, TCP
+ pkt = "\x16\x03\x01\x00\xcc\x01\x00\x00\xc8"\
+ "\x03\x01\x4f\x12\xe5\x63\x3f\xef\x7d"\
+ "\x20\xb9\x94\xaa\x04\xb0\xc1\xd4\x8c"\
+ "\x50\xcd\xe2\xf9\x2f\xa9\xfb\x78\xca"\
+ "\x02\xa8\x73\xe7\x0e\xa8\xf9\x00\x00"\
+ "\x3a\xc0\x0a\xc0\x14\x00\x39\x00\x38"\
+ "\xc0\x0f\xc0\x05\x00\x35\xc0\x07\xc0"\
+ "\x09\xc0\x11\xc0\x13\x00\x33\x00\x32"\
+ "\xc0\x0c\xc0\x0e\xc0\x02\xc0\x04\x00"\
+ "\x04\x00\x05\x00\x2f\xc0\x08\xc0\x12"\
+ "\x00\x16\x00\x13\xc0\x0d\xc0\x03\xfe"\
+ "\xff\x00\x0a\x00\xff\x01\x00\x00\x65"\
+ "\x00\x00\x00\x1d\x00\x1b\x00\x00\x18"\
+ "\x77\x77\x77\x2e\x67\x6e\x6c\x69\x67"\
+ "\x78\x7a\x70\x79\x76\x6f\x35\x66\x76"\
+ "\x6b\x64\x2e\x63\x6f\x6d\x00\x0b\x00"\
+ "\x04\x03\x00\x01\x02\x00\x0a\x00\x34"\
+ "\x00\x32\x00\x01\x00\x02\x00\x03\x00"\
+ "\x04\x00\x05\x00\x06\x00\x07\x00\x08"\
+ "\x00\x09\x00\x0a\x00\x0b\x00\x0c\x00"\
+ "\x0d\x00\x0e\x00\x0f\x00\x10\x00\x11"\
+ "\x00\x12\x00\x13\x00\x14\x00\x15\x00"\
+ "\x16\x00\x17\x00\x18\x00\x19\x00\x23"\
+ "\x00\x00"
+
+ pkt = ChinaTriggerTest.set_all_random_fields(pkt)
+ pkts = [IP(dst=self.dst)/TCP(dport=self.port)/pkt]
+ for x in range(len(pkt)):
+ mutation = IP(dst=self.dst)/TCP(dport=self.port)/ChinaTriggerTest.mutate(pkt, x)
+ pkts.append(mutation)
+ self.send(pkts)
+
diff --git a/ooni/templates/scapyt.py b/ooni/templates/scapyt.py
index 8683a8e..a1d2969 100644
--- a/ooni/templates/scapyt.py
+++ b/ooni/templates/scapyt.py
@@ -45,22 +45,22 @@ class BaseScapyTest(NetTestCase):
sentPackets = []
answeredPackets = []
- def sr(self, *arg, **kw):
+ def sr(self, pkts, *arg, **kw):
"""
Wrapper around scapy.sendrecv.sr for sending and receiving of packets
at layer 3.
"""
- answered_packets, sent_packets = sr(*arg, **kw)
+ answered_packets, unanswered = sr(pkts, *arg, **kw)
self.report['answered_packets'] = createPacketReport(answered_packets)
- self.report['sent_packets'] = createPacketReport(sent_packets)
+ self.report['sent_packets'] = createPacketReport(pkts)
return (answered_packets, sent_packets)
- def send(self, *arg, **kw):
+ def send(self, pkts, *arg, **kw):
"""
Wrapper around scapy.sendrecv.send for sending of packets at layer 3
"""
- sent_packets = send(*arg, **kw)
- self.report['sent_packets'] = createPacketReport(sent_packets)
+ sent_packets = send(pkts, *arg, **kw)
+ self.report['sent_packets'] = createPacketReport(pkts)
return sent_packets
class TXScapyTest(BaseScapyTest):
diff --git a/to-be-ported/old-api/chinatrigger.py b/to-be-ported/old-api/chinatrigger.py
deleted file mode 100644
index cf4bcb3..0000000
--- a/to-be-ported/old-api/chinatrigger.py
+++ /dev/null
@@ -1,140 +0,0 @@
-import random
-import string
-import struct
-import time
-
-from zope.interface import implements
-from twisted.python import usage
-from twisted.plugin import IPlugin
-from twisted.internet import protocol, defer
-from ooni.plugoo.tests import ITest, OONITest
-from ooni.plugoo.assets import Asset
-from ooni.utils import log
-from ooni.protocols.scapyproto import ScapyTest
-
-from ooni.lib.txscapy import txsr, txsend
-
-class scapyArgs(usage.Options):
- optParameters = [['dst', 'd', None, 'Specify the target address'],
- ['port', 'p', None, 'Specify the target port'],
- ['pcap', 'f', None, 'The pcap file to write with the sent and received packets'],
- ]
-
-class ChinaTriggerTest(ScapyTest):
- """
- This test is a OONI based implementation of the C tool written
- by Philipp Winter to engage chinese probes in active scanning.
-
- Example of running it:
- ./ooni/ooniprobe.py chinatrigger -d 127.0.0.1 -p 8080 -f bla.pcap
- """
- implements(IPlugin, ITest)
-
- shortName = "chinatrigger"
- description = "Triggers the chinese probes into scanning"
- requirements = ['root']
- options = scapyArgs
- blocking = False
-
- receive = True
- pcapfile = 'example_scapy.pcap'
- timeout = 5
-
- def initialize(self, reactor=None):
- if not self.reactor:
- from twisted.internet import reactor
- self.reactor = reactor
-
- @staticmethod
- def set_random_servername(pkt):
- ret = pkt[:121]
- for i in range(16):
- ret += random.choice(string.ascii_lowercase)
- ret += pkt[121+16:]
- return ret
-
- @staticmethod
- def set_random_time(pkt):
- ret = pkt[:11]
- ret += struct.pack('!I', int(time.time()))
- ret += pkt[11+4:]
- return ret
-
- @staticmethod
- def set_random_field(pkt):
- ret = pkt[:15]
- for i in range(28):
- ret += chr(random.randint(0, 256))
- ret += pkt[15+28:]
- return ret
-
- @staticmethod
- def mutate(pkt, idx):
- """
- Slightly changed mutate function.
- """
- ret = pkt[:idx-1]
- mutation = chr(random.randint(0, 256))
- while mutation == pkt[idx]:
- mutation = chr(random.randint(0, 256))
- ret += mutation
- ret += pkt[idx:]
- return ret
-
- @staticmethod
- def set_all_random_fields(pkt):
- pkt = ChinaTriggerTest.set_random_servername(pkt)
- pkt = ChinaTriggerTest.set_random_time(pkt)
- pkt = ChinaTriggerTest.set_random_field(pkt)
- return pkt
-
- def build_packets(self, *args, **kw):
- """
- Override this method to build scapy packets.
- """
- from scapy.all import IP, TCP
- pkt = "\x16\x03\x01\x00\xcc\x01\x00\x00\xc8"\
- "\x03\x01\x4f\x12\xe5\x63\x3f\xef\x7d"\
- "\x20\xb9\x94\xaa\x04\xb0\xc1\xd4\x8c"\
- "\x50\xcd\xe2\xf9\x2f\xa9\xfb\x78\xca"\
- "\x02\xa8\x73\xe7\x0e\xa8\xf9\x00\x00"\
- "\x3a\xc0\x0a\xc0\x14\x00\x39\x00\x38"\
- "\xc0\x0f\xc0\x05\x00\x35\xc0\x07\xc0"\
- "\x09\xc0\x11\xc0\x13\x00\x33\x00\x32"\
- "\xc0\x0c\xc0\x0e\xc0\x02\xc0\x04\x00"\
- "\x04\x00\x05\x00\x2f\xc0\x08\xc0\x12"\
- "\x00\x16\x00\x13\xc0\x0d\xc0\x03\xfe"\
- "\xff\x00\x0a\x00\xff\x01\x00\x00\x65"\
- "\x00\x00\x00\x1d\x00\x1b\x00\x00\x18"\
- "\x77\x77\x77\x2e\x67\x6e\x6c\x69\x67"\
- "\x78\x7a\x70\x79\x76\x6f\x35\x66\x76"\
- "\x6b\x64\x2e\x63\x6f\x6d\x00\x0b\x00"\
- "\x04\x03\x00\x01\x02\x00\x0a\x00\x34"\
- "\x00\x32\x00\x01\x00\x02\x00\x03\x00"\
- "\x04\x00\x05\x00\x06\x00\x07\x00\x08"\
- "\x00\x09\x00\x0a\x00\x0b\x00\x0c\x00"\
- "\x0d\x00\x0e\x00\x0f\x00\x10\x00\x11"\
- "\x00\x12\x00\x13\x00\x14\x00\x15\x00"\
- "\x16\x00\x17\x00\x18\x00\x19\x00\x23"\
- "\x00\x00"
-
- pkt = ChinaTriggerTest.set_all_random_fields(pkt)
- pkts = [IP(dst=self.dst)/TCP(dport=self.port)/pkt]
- for x in range(len(pkt)):
- mutation = IP(dst=self.dst)/TCP(dport=self.port)/ChinaTriggerTest.mutate(pkt, x)
- pkts.append(mutation)
- return pkts
-
- def load_assets(self):
- if self.local_options:
- self.dst = self.local_options['dst']
- self.port = int(self.local_options['port'])
- if self.local_options['pcap']:
- self.pcapfile = self.local_options['pcap']
- if not self.port or not self.dst:
- pass
-
- return {}
-
-#chinatrigger = ChinaTriggerTest(None, None, None)
-
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits