[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [ooni-probe/master] Implement test that detects DNS spoofing
commit 4208c6e963ae59380999fae45930656c287a627d
Author: Arturo Filastò <art@xxxxxxxxx>
Date: Sat Nov 24 14:28:10 2012 +0100
Implement test that detects DNS spoofing
* To be run with a known good resolver and the default resolver of the country
being tested, from inside the country with a hostname of a site that is known
to be censored.
---
nettests/core/dnsspoof.py | 64 +++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 64 insertions(+), 0 deletions(-)
diff --git a/nettests/core/dnsspoof.py b/nettests/core/dnsspoof.py
new file mode 100644
index 0000000..8f3e6a2
--- /dev/null
+++ b/nettests/core/dnsspoof.py
@@ -0,0 +1,64 @@
+from twisted.internet import defer
+from twisted.python import usage
+
+from scapy.all import IP, UDP, DNS, DNSQR
+
+from ooni.templates import scapyt
+from ooni.utils import log
+
+class UsageOptions(usage.Options):
+ optParameters = [['resolver', 'r', None,
+ 'Specify the resolver that should be used for DNS queries (ip:port)'],
+ ['hostname', 'h', None,
+ 'Specify the hostname of a censored site'],
+ ['backend', 'b', '8.8.8.8:53',
+ 'Specify the IP address of a good DNS resolver (ip:port)']
+ ]
+
+
+class DNSSpoof(scapyt.ScapyTest):
+ name = "DNS Spoof"
+ timeout = 2
+
+ usageOptions = UsageOptions
+
+ requiredOptions = ['hostname', 'resolver']
+
+ def setUp(self):
+ self.resolverAddr, self.resolverPort = self.localOptions['resolver'].split(':')
+ self.resolverPort = int(self.resolverPort)
+
+ self.controlResolverAddr, self.controlResolverPort = self.localOptions['backend'].split(':')
+ self.controlResolverPort = int(self.controlResolverPort)
+
+ self.hostname = self.localOptions['hostname']
+
+ def postProcessor(self, report):
+ """
+ This is not tested, but the concept is that if the two responses
+ match up then spoofing is occuring.
+ """
+ test_answer = report['test_a_lookup']['answered_packets'][0][1]
+ control_answer = report['test_control_a_lookup']['answered_packets'][0][1]
+ if test_answer[UDP] == control_answer[UDP]:
+ self.report['spoofing'] = True
+ else:
+ self.report['spoofing'] = False
+ return
+
+ @defer.inlineCallbacks
+ def test_a_lookup(self):
+ question = IP(dst=self.resolverAddr)/UDP()/DNS(rd=1,
+ qd=DNSQR(qtype="A", qclass="IN", qname=self.hostname))
+ log.msg("Performing query to %s with %s:%s" % (self.hostname, self.resolverAddr, self.resolverPort))
+ answered, unanswered = yield self.sr1(question)
+
+ @defer.inlineCallbacks
+ def test_control_a_lookup(self):
+ question = IP(dst=self.controlResolverAddr)/UDP()/DNS(rd=1,
+ qd=DNSQR(qtype="A", qclass="IN", qname=self.hostname))
+ log.msg("Performing query to %s with %s:%s" % (self.hostname,
+ self.controlResolverAddr, self.controlResolverPort))
+ answered, unanswered = yield self.sr1(question)
+
+
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits