[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [ooni-probe/master] Implement basic collector for ooniprobe reports
commit 5544ee71c001d64aecc5a729724af53c8ec2f246
Author: Arturo Filastò <art@xxxxxxxxx>
Date: Sun Nov 11 18:15:54 2012 +0100
Implement basic collector for ooniprobe reports
* Reports can be submitted over the network via http to a remote collector
* Implement the backend component of the collector that writes submitted reports
to flat files, following the report_id naming convention.
* XXX add support for connecting to the collector via Tor Hidden Services
---
nettests/core/http_host.py | 2 +-
ooni/oonicli.py | 2 +
ooni/reporter.py | 176 ++++++++++++++++++++------------------------
ooni/runner.py | 19 +++--
ooni/utils/net.py | 2 +-
oonib/config.py | 6 +-
oonib/report/api.py | 44 ++++++++++-
7 files changed, 138 insertions(+), 113 deletions(-)
diff --git a/nettests/core/http_host.py b/nettests/core/http_host.py
index 0e73f82..662cc40 100644
--- a/nettests/core/http_host.py
+++ b/nettests/core/http_host.py
@@ -16,7 +16,7 @@ from ooni.utils import log
from ooni.templates import httpt
class UsageOptions(usage.Options):
- optParameters = [['backend', 'b', 'http://127.0.0.1:1234',
+ optParameters = [['backend', 'b', 'http://127.0.0.1:57001',
'URL of the test backend to use'],
['content', 'c', None,
'The file to read from containing the content of a block page']]
diff --git a/ooni/oonicli.py b/ooni/oonicli.py
index 9287174..9473ad8 100644
--- a/ooni/oonicli.py
+++ b/ooni/oonicli.py
@@ -40,6 +40,8 @@ class Options(usage.Options, app.ReactorSelectionMixin):
'Report deferred creation and callback stack traces'],]
optParameters = [["reportfile", "o", None, "report file name"],
+ ["collector", "c", None,
+ "Address of the collector of test results. (example: http://127.0.0.1:8888)"],
["logfile", "l", None, "log file name"],
["pcapfile", "p", None, "pcap file name"]]
diff --git a/ooni/reporter.py b/ooni/reporter.py
index 7147b35..05ea94a 100644
--- a/ooni/reporter.py
+++ b/ooni/reporter.py
@@ -106,6 +106,8 @@ class OReporter(object):
pass
def testDone(self, test, test_name):
+ log.debug("Finished running %s" % test_name)
+ log.debug("Writing report")
test_report = dict(test.report)
if isinstance(test.input, packet.Packet):
@@ -120,14 +122,15 @@ class OReporter(object):
'test_name': test_name,
'test_started': test_started,
'report': test_report}
- self.writeReportEntry(report)
+ return self.writeReportEntry(report)
def allDone(self):
log.debug("allDone: Finished running all tests")
- self.finish()
try:
+ log.debug("Stopping the reactor")
reactor.stop()
except:
+ log.debug("Unable to stop the reactor")
pass
return None
@@ -151,6 +154,7 @@ class YAMLReporter(OReporter):
untilConcludes(self._stream.flush)
def writeReportEntry(self, entry):
+ log.debug("Writing report with YAML reporter")
self._write('---\n')
self._write(safe_dump(entry))
self._write('...\n')
@@ -169,88 +173,60 @@ class YAMLReporter(OReporter):
def finish(self):
self._stream.close()
-class OONIBReporter(object):
+
+class OONIBReportUpdateFailed(Exception):
+ pass
+
+class OONIBReportCreationFailed(Exception):
+ pass
+
+class OONIBTestDetailsLookupFailed(Exception):
+ pass
+
+class OONIBReporter(OReporter):
def __init__(self, backend_url):
from twisted.web.client import Agent
from twisted.internet import reactor
self.agent = Agent(reactor)
self.backend_url = backend_url
- def _newReportCreated(self, data):
- log.debug("newReportCreated %s" % data)
- return data
-
- def _processResponseBody(self, response, body_cb):
- log.debug("processResponseBody %s" % response)
- done = defer.Deferred()
- response.deliverBody(BodyReceiver(done))
- done.addCallback(body_cb)
- return done
-
- def createReport(self, test_name,
- test_version, report_header):
- url = self.backend_url + '/new'
- software_version = '0.0.1'
-
- request = {'software_name': 'ooni-probe',
- 'software_version': software_version,
- 'test_name': test_name,
- 'test_version': test_version,
- 'progress': 0,
- 'content': report_header
- }
- def gotDetails(test_details):
- log.debug("Creating report via url %s" % url)
-
- bodyProducer = StringProducer(json.dumps(request))
- d = self.agent.request("POST", url,
- bodyProducer=bodyProducer)
- d.addCallback(self._processResponseBody,
- self._newReportCreated)
- return d
-
- d = getTestDetails(options)
- d.addCallback(gotDetails)
- return d
-
- def writeReportEntry(self, entry, test_id=None):
- if not test_id:
- log.err("Write report entry on OONIB requires test id")
- raise NoTestIDSpecified
+ @defer.inlineCallbacks
+ def writeReportEntry(self, entry):
+ log.debug("Writing report with OONIB reporter")
+ content = '---\n'
+ content += safe_dump(entry)
+ content += '...\n'
- report = '---\n'
- report += safe_dump(entry)
- report += '...\n'
+ url = self.backend_url + '/report/new'
- url = self.backend_url + '/new'
+ request = {'report_id': self.report_id,
+ 'content': content}
- request = {'test_id': test_id,
- 'content': report}
+ log.debug("Updating report with id %s" % self.report_id)
+ request_json = json.dumps(request)
+ log.debug("Sending %s" % request_json)
bodyProducer = StringProducer(json.dumps(request))
- d = self.agent.request("PUT", url,
- bodyProducer=bodyProducer)
-
- d.addCallback(self._processResponseBody,
- self._newReportCreated)
- return d
-
+ log.debug("Creating report via url %s" % url)
+ try:
+ response = yield self.agent.request("PUT", url,
+ bodyProducer=bodyProducer)
+ except:
+ # XXX we must trap this in the runner and make sure to report the data later.
+ raise OONIBReportUpdateFailed
-class OONIBReporter(OReporter):
- def __init__(self, backend_url):
- from twisted.web.client import Agent
- from twisted.internet import reactor
- self.agent = Agent(reactor)
- self.backend_url = backend_url
+ #parsed_response = json.loads(backend_response)
+ #self.report_id = parsed_response['report_id']
+ #self.backend_version = parsed_response['backend_version']
+ #log.debug("Created report with id %s" % parsed_response['report_id'])
- def _processResponseBody(self, *arg, **kw):
- #done = defer.Deferred()
- #response.deliverBody(BodyReceiver(done))
- #done.addCallback(self._newReportCreated)
- #return done
+ @defer.inlineCallbacks
def createReport(self, options):
+ """
+ Creates a report on the oonib collector.
+ """
test_name = options['name']
test_version = options['version']
@@ -258,33 +234,41 @@ class OONIBReporter(OReporter):
url = self.backend_url + '/report/new'
software_version = '0.0.1'
- def gotDetails(test_details):
- content = '---\n'
- content += safe_dump(test_details)
- content += '...\n'
+ test_details = yield getTestDetails(options)
+
+ content = '---\n'
+ content += safe_dump(test_details)
+ content += '...\n'
- request = {'software_name': 'ooniprobe',
- 'software_version': software_version,
- 'test_name': test_name,
- 'test_version': test_version,
- 'progress': 0,
- 'content': content
- }
- log.debug("Creating report via url %s" % url)
- request_json = json.dumps(request)
- log.debug("Sending %s" % request_json)
-
- def bothCalls(*arg, **kw):
- print arg, kw
-
- body_producer = StringProducer(request_json)
- d = self.agent.request("POST", url, None,
- body_producer)
- d.addBoth(self._processResponseBody)
- return d
-
- d = getTestDetails(options)
- d.addCallback(gotDetails)
- # XXX handle errors
- return d
+ request = {'software_name': 'ooniprobe',
+ 'software_version': software_version,
+ 'test_name': test_name,
+ 'test_version': test_version,
+ 'progress': 0,
+ 'content': content
+ }
+ log.debug("Creating report via url %s" % url)
+ request_json = json.dumps(request)
+ log.debug("Sending %s" % request_json)
+
+ bodyProducer = StringProducer(json.dumps(request))
+ log.debug("Creating report via url %s" % url)
+
+ try:
+ response = yield self.agent.request("POST", url,
+ bodyProducer=bodyProducer)
+ except:
+ raise OONIBReportCreationFailed
+
+ # This is a little trix to allow us to unspool the response. We create
+ # a deferred and call yield on it.
+ response_body = defer.Deferred()
+ response.deliverBody(BodyReceiver(response_body))
+
+ backend_response = yield response_body
+
+ parsed_response = json.loads(backend_response)
+ self.report_id = parsed_response['report_id']
+ self.backend_version = parsed_response['backend_version']
+ log.debug("Created report with id %s" % parsed_response['report_id'])
diff --git a/ooni/runner.py b/ooni/runner.py
index b92ad7e..1aac145 100644
--- a/ooni/runner.py
+++ b/ooni/runner.py
@@ -153,8 +153,10 @@ def loadTestsAndOptions(classes, cmd_line_options):
def runTestWithInput(test_class, test_method, test_input, oreporter):
log.debug("Running %s with %s" % (test_method, test_input))
+
def test_done(result, test_instance, test_name):
- oreporter.testDone(test_instance, test_name)
+ log.debug("runTestWithInput: concluded %s" % test_name)
+ return oreporter.testDone(test_instance, test_name)
def test_error(error, test_instance, test_name):
log.err("%s\n" % error)
@@ -221,22 +223,21 @@ def runTestCases(test_cases, options,
log.msg("Could not find inputs!")
log.msg("options[0] = %s" % first)
test_inputs = [None]
-
+
reportFile = open(yamloo_filename, 'w+')
- #oreporter = reporter.YAMLReporter(reportFile)
- oreporter = reporter.OONIBReporter('http://127.0.0.1:8888')
+
+ if cmd_line_options['collector']:
+ oreporter = reporter.OONIBReporter(cmd_line_options['collector'])
+ else:
+ oreporter = reporter.YAMLReporter(reportFile)
input_unit_factory = InputUnitFactory(test_inputs)
log.debug("Creating report")
- yield oreporter.createReport(options)
-
- oreporter = reporter.YAMLReporter(reportFile)
-
- input_unit_factory = InputUnitFactory(test_inputs)
yield oreporter.createReport(options)
+
# This deferred list is a deferred list of deferred lists
# it is used to store all the deferreds of the tests that
# are run
diff --git a/ooni/utils/net.py b/ooni/utils/net.py
index d43261a..3ddba61 100644
--- a/ooni/utils/net.py
+++ b/ooni/utils/net.py
@@ -7,7 +7,7 @@
import sys
from zope.interface import implements
-from twisted.internet import protocol
+from twisted.internet import protocol, defer
from twisted.internet import threads, reactor
from twisted.web.iweb import IBodyProducer
diff --git a/oonib/config.py b/oonib/config.py
index dc2be2f..b34a8ae 100644
--- a/oonib/config.py
+++ b/oonib/config.py
@@ -18,11 +18,13 @@ main.db_threadpool_size = 10
helpers = Storage()
helpers.http_return_request = Storage()
-helpers.http_return_request.port = 57001
+helpers.http_return_request.port = 57001
+# XXX this actually needs to be the advertised Server HTTP header of our web
+# server
helpers.http_return_request.server_version = "Apache"
helpers.tcp_echo = Storage()
-helpers.tcp_echo.port = 57002
+helpers.tcp_echo.port = 57002
helpers.daphn3 = Storage()
helpers.daphn3.yaml_file = "/path/to/data/oonib/daphn3.yaml"
diff --git a/oonib/report/api.py b/oonib/report/api.py
index 489cc25..bd409b0 100644
--- a/oonib/report/api.py
+++ b/oonib/report/api.py
@@ -59,6 +59,28 @@ def parseNewReportRequest(request):
raise InvalidRequestField(k)
return parsed_request
+def parseUpdateReportRequest(request):
+ # XXX this and the function above can probably be refactored into something
+ # more compact. There is quite a bit of code duplication going on here.
+
+ report_id_regexp = re.compile("[a-zA-Z0-9]+$")
+
+ # XXX here we are actually parsing a json object that could be quite big.
+ # If we want this to scale properly we only want to look at the test_id
+ # field.
+ # We are also keeping in memory multiple copies of the same object. A lot
+ # of optimization can be done.
+ parsed_request = json.loads(request)
+ try:
+ report_id = parsed_request['report_id']
+ except KeyError:
+ raise MissingField('report_id')
+
+ if not re.match(report_id_regexp, report_id):
+ raise InvalidRequestField('report_id')
+
+ return parsed_request
+
class NewReportHandlerFile(web.RequestHandler):
"""
Responsible for creating and updating reports by writing to flat file.
@@ -129,7 +151,20 @@ class NewReportHandlerFile(web.RequestHandler):
'content': 'XXX'
}
"""
- pass
+ parsed_request = parseUpdateReportRequest(self.request.body)
+ report_id = parsed_request['report_id']
+ print "Got this request %s" % parsed_request
+
+ report_filename = report_id
+ report_filename += '.yamloo'
+ try:
+ with open(report_filename, 'a+') as f:
+ # XXX this could be quite big. We should probably use the
+ # twisted.internet.fdesc module
+ print parsed_request['content']
+ f.write(parsed_request['content'])
+ except IOError as e:
+ web.HTTPError(404, "Report not found")
class NewReportHandlerDB(web.RequestHandler):
"""
@@ -188,7 +223,8 @@ class PCAPReportHandler(web.RequestHandler):
def post(self):
pass
-spec = [(r"/report/new", NewReportHandlerFile),
- (r"/report/pcap", PCAPReportHandler)]
+reportingBackendAPI = [(r"/report/new", NewReportHandlerFile),
+ (r"/report/pcap", PCAPReportHandler)
+]
-reportingBackend = web.Application(spec, debug=True)
+reportingBackend = web.Application(reportingBackendAPI, debug=True)
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits