[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [ooni-probe/master] Implement disk quota management
commit def4c929d162852ace8a016bb3352677eec5bcde
Author: Arturo Filastò <arturo@xxxxxxxxxxx>
Date: Tue Aug 30 02:09:55 2016 +0200
Implement disk quota management
This is related to the feature described in here: https://github.com/TheTorProject/lepidopter/issues/53
---
ooni/agent/scheduler.py | 67 ++++++++++++++++++++++++++++++++++++++++++++++++
ooni/measurements.py | 20 +++++++++++----
ooni/settings.py | 6 ++++-
ooni/tests/test_utils.py | 34 ++++++++++++++++++++++++
ooni/ui/web/server.py | 12 ++++++++-
ooni/utils/files.py | 34 ++++++++++++++++++++++++
6 files changed, 166 insertions(+), 7 deletions(-)
diff --git a/ooni/agent/scheduler.py b/ooni/agent/scheduler.py
index 3389db1..54551f6 100644
--- a/ooni/agent/scheduler.py
+++ b/ooni/agent/scheduler.py
@@ -1,3 +1,6 @@
+import os
+import errno
+
from datetime import datetime
from twisted.application import service
@@ -8,6 +11,7 @@ from twisted.python.filepath import FilePath
from ooni.scripts import oonireport
from ooni import resources
from ooni.utils import log, SHORT_DATE
+from ooni.utils.files import human_size_to_bytes, directory_usage
from ooni.deck.store import input_store, deck_store
from ooni.settings import config
from ooni.contrib import croniter
@@ -146,6 +150,66 @@ class DeleteOldReports(ScheduledTask):
measurement_path.child(measurement['id']).remove()
+class CheckMeasurementQuota(ScheduledTask):
+ """
+ This task is run to ensure we don't run out of disk space and deletes
+ older reports to avoid filling the quota.
+ """
+ identifier = 'check-measurement-quota'
+ schedule = '@hourly'
+ _warn_when = 0.8
+
+ def task(self):
+ if config.basic.measurement_quota is None:
+ return
+ maximum_bytes = human_size_to_bytes(config.basic.measurement_quota)
+ available_bytes = directory_usage(config.measurements_directory)
+ warning_path = os.path.join(config.running_path, 'quota_warning')
+
+ if (float(available_bytes) / float(maximum_bytes)) >= self._warn_when:
+ log.warn("You are about to reach the maximum allowed quota. Be careful")
+ with open(warning_path, "w") as out_file:
+ out_file.write("{0} {1}".split(available_bytes, maximum_bytes))
+ else:
+ try:
+ os.remove(warning_path)
+ except OSError as ose:
+ if ose.errno != errno.ENOENT:
+ raise
+
+ if float(available_bytes) < float(maximum_bytes):
+ # We are within the allow quota exit.
+ return
+
+ # We should begin to delete old reports
+ amount_to_delete = float(maximum_bytes) - float(available_bytes)
+ amount_deleted = 0
+ measurement_path = FilePath(config.measurements_directory)
+
+ kept_measurements = []
+ stale_measurements = []
+ remaining_measurements = []
+ measurements_by_date = sorted(list_measurements(compute_size=True),
+ key=lambda k: k['test_start_time'])
+ for measurement in measurements_by_date:
+ if measurement['keep'] is True:
+ kept_measurements.append(measurement)
+ elif measurement['stale'] is True:
+ stale_measurements.append(measurement)
+ else:
+ remaining_measurements.append(measurement)
+
+ # This is the order in which we should begin deleting measurements.
+ ordered_measurements = (stale_measurements +
+ remaining_measurements +
+ kept_measurements)
+ while amount_deleted < amount_to_delete:
+ measurement = ordered_measurements.pop(0)
+ log.warn("Deleting report {0}".format(measurement["id"]))
+ measurement_path.child(measurement['id']).remove()
+ amount_deleted += measurement['size']
+
+
class RunDeck(ScheduledTask):
"""
This will run the decks that have been configured on the system as the
@@ -196,6 +260,7 @@ SYSTEM_TASKS = [
UpdateInputsAndResources
]
+
@defer.inlineCallbacks
def run_system_tasks(no_input_store=False):
task_classes = SYSTEM_TASKS[:]
@@ -215,6 +280,7 @@ def run_system_tasks(no_input_store=False):
log.err("Failed to run task {0}".format(task.identifier))
log.exception(exc)
+
class SchedulerService(service.MultiService):
"""
This service is responsible for running the periodic tasks.
@@ -271,6 +337,7 @@ class SchedulerService(service.MultiService):
self.schedule(UpdateInputsAndResources())
self.schedule(UploadReports())
self.schedule(DeleteOldReports())
+ self.schedule(CheckMeasurementQuota())
self.schedule(RefreshDeckList(self))
self._looping_call.start(self.interval)
diff --git a/ooni/measurements.py b/ooni/measurements.py
index 6d90c1b..fd722ee 100644
--- a/ooni/measurements.py
+++ b/ooni/measurements.py
@@ -4,6 +4,7 @@ import signal
from twisted.python.filepath import FilePath
from ooni.utils import log
+from ooni.utils.files import directory_usage
from ooni.settings import config
class MeasurementInProgress(Exception):
@@ -61,7 +62,8 @@ def generate_summary(input_file, output_file):
class MeasurementNotFound(Exception):
pass
-def get_measurement(measurement_id):
+def get_measurement(measurement_id, compute_size=False):
+ size = -1
measurement_path = FilePath(config.measurements_directory)
measurement = measurement_path.child(measurement_id)
if not measurement.exists():
@@ -70,6 +72,7 @@ def get_measurement(measurement_id):
running = False
completed = True
keep = False
+ stale = False
if measurement.child("measurements.njson.progress").exists():
completed = False
# XXX this is done quite often around the code, probably should
@@ -80,10 +83,14 @@ def get_measurement(measurement_id):
os.kill(pid, signal.SIG_DFL)
running = True
except OSError:
- pass
+ stale = True
if measurement.child("keep").exists():
keep = True
+
+ if compute_size is True:
+ size = directory_usage(measurement.path)
+
test_start_time, country_code, asn, test_name = \
measurement_id.split("-")[:4]
return {
@@ -94,7 +101,9 @@ def get_measurement(measurement_id):
"id": measurement_id,
"completed": completed,
"keep": keep,
- "running": running
+ "running": running,
+ "stale": stale,
+ "size": size
}
@@ -115,12 +124,13 @@ def get_summary(measurement_id):
with summary.open("r") as f:
return json.load(f)
-def list_measurements():
+
+def list_measurements(compute_size=False):
measurements = []
measurement_path = FilePath(config.measurements_directory)
for measurement_id in measurement_path.listdir():
try:
- measurements.append(get_measurement(measurement_id))
+ measurements.append(get_measurement(measurement_id, compute_size))
except:
log.err("Failed to get metadata for measurement {0}".format(measurement_id))
return measurements
diff --git a/ooni/settings.py b/ooni/settings.py
index 2bacd57..c36f3f7 100644
--- a/ooni/settings.py
+++ b/ooni/settings.py
@@ -20,6 +20,9 @@ basic:
# Where OONIProbe should be writing it's log file
logfile: {logfile}
loglevel: WARNING
+ # The maximum amount of data to store on disk. Once the quota is reached,
+ # we will start deleting older reports.
+ # measurement_quota: 1G
privacy:
# Should we include the IP address of the probe in the report?
includeip: {include_ip}
@@ -99,7 +102,8 @@ tor:
defaults = {
"basic": {
"loglevel": "WARNING",
- "logfile": "ooniprobe.log"
+ "logfile": "ooniprobe.log",
+ "measurement_quota": "1G"
},
"privacy": {
"includeip": False,
diff --git a/ooni/tests/test_utils.py b/ooni/tests/test_utils.py
index ea1d858..4ef2926 100644
--- a/ooni/tests/test_utils.py
+++ b/ooni/tests/test_utils.py
@@ -1,7 +1,10 @@
import os
+import tempfile
+
from twisted.trial import unittest
from ooni.utils import log, generate_filename, net
+from ooni.utils.files import human_size_to_bytes, directory_usage
class TestUtils(unittest.TestCase):
@@ -43,3 +46,34 @@ class TestUtils(unittest.TestCase):
def test_get_addresses(self):
addresses = net.getAddresses()
assert isinstance(addresses, list)
+
+ def test_human_size(self):
+ self.assertEqual(
+ human_size_to_bytes("1G"),
+ 1024**3
+ )
+ self.assertEqual(
+ human_size_to_bytes("1.3M"),
+ 1.3 * 1024**2
+ )
+ self.assertEqual(
+ human_size_to_bytes("1.2K"),
+ 1.2 * 1024
+ )
+ self.assertEqual(
+ human_size_to_bytes("1"),
+ 1.0
+ )
+ self.assertEqual(
+ human_size_to_bytes("100.2"),
+ 100.2
+ )
+
+ def test_directory_usage(self):
+ tmp_dir = tempfile.mkdtemp()
+ with open(os.path.join(tmp_dir, "something.txt"), "w") as out_file:
+ out_file.write("A"*1000)
+ os.mkdir(os.path.join(tmp_dir, "subdir"))
+ with open(os.path.join(tmp_dir, "subdir", "something.txt"), "w") as out_file:
+ out_file.write("A"*1000)
+ self.assertEqual(directory_usage(tmp_dir), 1000*2)
diff --git a/ooni/ui/web/server.py b/ooni/ui/web/server.py
index ee33fa2..6f9ef69 100644
--- a/ooni/ui/web/server.py
+++ b/ooni/ui/web/server.py
@@ -1,6 +1,8 @@
from __future__ import print_function
+import os
import json
+import errno
import string
from functools import wraps
from random import SystemRandom
@@ -186,13 +188,21 @@ class WebUIAPI(object):
@property
def status(self):
+ quota_warning = None
+ try:
+ with open(os.path.join(config.running_dir, "quota_warning")) as in_file:
+ quota_warning = in_file.read()
+ except IOError as ioe:
+ if ioe.errno != errno.ENOENT:
+ raise
return {
"software_version": ooniprobe_version,
"software_name": "ooniprobe",
"asn": probe_ip.geodata['asn'],
"country_code": probe_ip.geodata['countrycode'],
"director_started": self._director_started,
- "initialized": self._is_initialized
+ "initialized": self._is_initialized,
+ "quota_warning": quota_warning
}
def handle_director_event(self, event):
diff --git a/ooni/utils/files.py b/ooni/utils/files.py
new file mode 100644
index 0000000..aefb831
--- /dev/null
+++ b/ooni/utils/files.py
@@ -0,0 +1,34 @@
+import os
+import re
+
+HUMAN_SIZE = re.compile("(\d+\.?\d*G)|(\d+\.?\d*M)|(\d+\.?\d*K)|(\d+\.?\d*)")
+
+class InvalidFormat(Exception):
+ pass
+
+def human_size_to_bytes(human_size):
+ """
+ Converts a size specified in a human friendly way (for example 1G, 10M,
+ 30K) into bytes.
+ """
+ gb, mb, kb, b = HUMAN_SIZE.match(human_size).groups()
+ if gb is not None:
+ b = float(gb[:-1]) * (1024 ** 3)
+ elif mb is not None:
+ b = float(mb[:-1]) * (1024 ** 2)
+ elif kb is not None:
+ b = float(kb[:-1]) * 1024
+ elif b is not None:
+ b = float(b)
+ else:
+ raise InvalidFormat
+ return b
+
+
+def directory_usage(path):
+ total_usage = 0
+ for root, dirs, filenames in os.walk(path):
+ for filename in filenames:
+ fp = os.path.join(root, filename)
+ total_usage += os.path.getsize(fp)
+ return total_usage
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits