[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [ooni-probe/master] Smear load generated by scheduler



commit ff8678565142978388e2019759d5717a90ba7982
Author: Leonid Evdokimov <leon@xxxxxxxxxxxx>
Date:   Mon Apr 3 19:37:22 2017 +0300

    Smear load generated by scheduler
    
    * Smear load generated by scheduler, see #630
    
    It runs ASAP after the installation. It survives reboot in predictable
    way: avoids double-run and does not skip scheduler cycle on restart.
    It supports non-24/7 operations as it targets running @daily task within
    2.4 hours window, so the probe that was down will likely run a task
    on boot and boot time is random enough for load smearing purposes.
    
    * Schedule decks right after initialisation, see #630
    
    That's required as refresh_deck_list() does not schedule anything if the
    `config` is not `is_initialized()`.
    
    Another possible way to implement this behavior is to postpone
    RefreshDeckList.task run checking config.is_initialized() in
    should_run().
---
 ooni/agent/scheduler.py      |  17 +++++--
 ooni/contrib/croniter.py     |   6 +--
 ooni/tests/test_scheduler.py | 104 ++++++++++++++++++++++++++++++++-----------
 ooni/ui/web/server.py        |   2 +-
 4 files changed, 96 insertions(+), 33 deletions(-)

diff --git a/ooni/agent/scheduler.py b/ooni/agent/scheduler.py
index 8de3b303..32edf0af 100644
--- a/ooni/agent/scheduler.py
+++ b/ooni/agent/scheduler.py
@@ -1,8 +1,9 @@
 import os
 import errno
+import random
 
 from hashlib import md5
-from datetime import datetime
+from datetime import datetime, timedelta
 
 from twisted.application import service
 from twisted.internet import defer, reactor
@@ -87,24 +88,32 @@ class ScheduledTask(object):
         assert self.identifier is not None, "self.identifier must be set"
         assert self.schedule is not None, "self.schedule must be set"
 
+        # XXX: both _last_run_lock and _smear_coef require that there is single
+        # instance of the ScheduledTask of each type identified by `identifier`.
         self._last_run = FilePath(scheduler_directory).child(self.identifier)
         self._last_run_lock = FileSystemlockAndMutex(
             FilePath(scheduler_directory).child(self.identifier + ".lock").path
         )
+        self._smear_coef = random.random()
 
     def cancel(self):
         """
         Cancel a currently running task.
         If it is locked, then release the lock.
         """
-        if self._last_run_lock.locked:
-            self._last_run_lock.release()
-
+        if not self._last_run_lock.locked:
+            # _last_run_lock.release() will throw if we try to release it
+            log.err('BUG: cancelling non-locked task {} without holding lock'.format(self.identifier))
+            return
+        # probably, cancelling the task TAKEN the lock is even worse :-)
+        self._last_run_lock.release()
 
     @property
     def should_run(self):
         current_time = datetime.utcnow().replace(tzinfo=tz.tzutc())
         next_cycle = croniter(self.schedule, self.last_run).get_next(datetime)
+        delta = (croniter(self.schedule, next_cycle).get_next(datetime) - next_cycle).total_seconds()
+        next_cycle = next_cycle + timedelta(seconds=delta * 0.1 * self._smear_coef)
         if next_cycle <= current_time:
             return True
         return False
diff --git a/ooni/contrib/croniter.py b/ooni/contrib/croniter.py
index 653dbbf3..f98daa98 100644
--- a/ooni/contrib/croniter.py
+++ b/ooni/contrib/croniter.py
@@ -157,7 +157,7 @@ class croniter(object):
 
     def get_current(self, ret_type=None):
         ret_type = ret_type or self._ret_type
-        if ret_type == datetime.datetime:
+        if issubclass(ret_type,  datetime.datetime):
             return self._timestamp_to_datetime(self.cur)
         return self.cur
 
@@ -219,7 +219,7 @@ class croniter(object):
 
         ret_type = ret_type or self._ret_type
 
-        if ret_type not in (float, datetime.datetime):
+        if not issubclass(ret_type, (float, datetime.datetime)):
             raise TypeError("Invalid ret_type, only 'float' or 'datetime' "
                             "is acceptable.")
 
@@ -239,7 +239,7 @@ class croniter(object):
             result = self._calc(self.cur, expanded, is_prev)
         self.cur = result
 
-        if ret_type == datetime.datetime:
+        if issubclass(ret_type, datetime.datetime):
             result = self._timestamp_to_datetime(result)
 
         return result
diff --git a/ooni/tests/test_scheduler.py b/ooni/tests/test_scheduler.py
index 56a4bb40..bfd22a5d 100644
--- a/ooni/tests/test_scheduler.py
+++ b/ooni/tests/test_scheduler.py
@@ -274,43 +274,97 @@ class TestSchedulerService(ConfigTestCase):
 
         mock_director = mock.MagicMock()
         d = defer.Deferred()
-        with mock.patch('ooni.agent.scheduler.deck_store', self.deck_store):
-
-            dummy_clock = task.Clock()
+        dummy_clock = task.Clock()
+        class FakeDatetime(datetime):
+            @staticmethod
+            def utcnow():
+                return datetime(2000,1,1, 7,0,0) + timedelta(seconds=dummy_clock.seconds())
+        with mock.patch('ooni.agent.scheduler.deck_store', self.deck_store), \
+             mock.patch('ooni.agent.scheduler.datetime', FakeDatetime):
             scheduler_service = SchedulerService(
                 director=mock_director,
                 _reactor=dummy_clock
             )
             scheduler_service.startService()
-            dummy_clock.advance(30)
+            dummy_clock.advance(45)
 
-            now_time = datetime.utcnow()
-            DT_FRMT = "%Y-%m-%dT%H:%M:%SZ"
+            # these tasks were run before clock was pumped
+            for t in scheduler_service._scheduled_tasks:
+                self.assertIn(t.schedule, ('@daily', '@hourly'))
+                with open(os.path.join(self.scheduler_directory, t.identifier)) as in_file:
+                    self.assertEqual(in_file.read(), '2000-01-01T07:00:00Z')
 
+            # that's leaping clock, it leads to immediate scheduling
+            dummy_clock.advance(24 * 60 * 60)
             for t in scheduler_service._scheduled_tasks:
-                with open(os.path.join(self.scheduler_directory,
-                                       t.identifier)) as in_file:
-                    dstr = datetime.strptime(in_file.read(),
-                                             DT_FRMT).strftime("%Y-%m-%dT%H")
-                    self.assertEqual(dstr, now_time.strftime("%Y-%m-%dT%H"))
+                with open(os.path.join(self.scheduler_directory, t.identifier)) as in_file:
+                    self.assertEqual(in_file.read(), '2000-01-02T07:00:45Z')
 
-            dummy_clock.advance(30)
-            dummy_clock.advance(30)
-            dummy_clock.advance(30)
-            dummy_clock.advance(30)
-            dummy_clock.advance(30)
-            dummy_clock.advance(30)
-            # Here we pretend they ran yesterday so to re-trigger the daily
-            # tasks
+            # nothing happens during an hour
+            dummy_clock.advance(60 * 60 - 46)
+            self.assertEqual(FakeDatetime.utcnow(), datetime(2000,1,2, 7,59,59))
             for t in scheduler_service._scheduled_tasks:
-                with open(os.path.join(self.scheduler_directory,
-                                       t.identifier), 'w') as out_file:
-                    yesterday = (now_time - timedelta(days=1,
-                                                      hours=2)).strftime(DT_FRMT)
-                    out_file.write(yesterday)
-            dummy_clock.advance(30)
+                with open(os.path.join(self.scheduler_directory, t.identifier)) as in_file:
+                    self.assertEqual(in_file.read(), '2000-01-02T07:00:45Z')
+
+            # that's ticking clock, it smears the load a bit
+            dummy_clock.pump([1] * 1800)
+            zero, hourly, daily = 0, 0, 0
+            for t in scheduler_service._scheduled_tasks:
+                with open(os.path.join(self.scheduler_directory, t.identifier)) as in_file:
+                    if t.schedule == '@daily':
+                        daily += 1
+                        self.assertEqual(in_file.read(), '2000-01-02T07:00:45Z')
+                    elif t.schedule == '@hourly':
+                        hourly += 1
+                        # `:[03]0Z` is caused by scheduler resolution & ticking one second a time
+                        last_run = in_file.read()
+                        self.assertRegexpMatches(last_run, '^2000-01-02T08:0.:[03]0Z$')
+                        if last_run == '2000-01-02T08:00:00Z':
+                            zero += 1
+            self.assertGreater(hourly, 0)
+            self.assertGreater(daily, 0)
+            self.assertLess(zero, hourly)
+            self.assertLessEqual(zero, 1) # should ALMOST never happen
+
+            # leaping to the end of the day
+            dummy_clock.advance((datetime(2000,1,2, 23,59,59) - FakeDatetime.utcnow()).total_seconds())
+            for t in scheduler_service._scheduled_tasks:
+                with open(os.path.join(self.scheduler_directory, t.identifier)) as in_file:
+                    if t.schedule == '@daily':
+                        self.assertEqual(in_file.read(), '2000-01-02T07:00:45Z')
+                    elif t.schedule == '@hourly':
+                        self.assertEqual(in_file.read(), '2000-01-02T23:59:59Z')
+
+            # save ~30% of the testcase runtime while ticking through six hours
+            for t in scheduler_service._scheduled_tasks[:]:
+                if t.schedule == '@hourly':
+                    scheduler_service.unschedule(t)
+
+            # ticking through six hours
+            dummy_clock.pump([random.uniform(0, 120) for i in xrange(6*60)])
+            for t in scheduler_service._scheduled_tasks:
+                with open(os.path.join(self.scheduler_directory, t.identifier)) as in_file:
+                    # randomized clock kills 30s resolution of the scheduler
+                    self.assertRegexpMatches(in_file.read(), '^2000-01-03T0[012]:..:..Z$')
+            self.assertGreater(FakeDatetime.utcnow(), datetime(2000,1,3, 5,0,0)) # should be ~6:00
+
+            # verify, that double-run does not happen even in case of reseeding (reboot/restart)
+            dummy_clock.advance((datetime(2000,1,3, 23,59,59) - FakeDatetime.utcnow()).total_seconds())
+            launches = {}
+            while FakeDatetime.utcnow() < datetime(2000,1,4, 6,0,0):
+                for t in scheduler_service._scheduled_tasks:
+                    with open(os.path.join(self.scheduler_directory, t.identifier)) as in_file:
+                        launches.setdefault(t.identifier, set())
+                        launches[t.identifier].add(in_file.read())
+                    self.assertLessEqual(t._smear_coef, 1.0)
+                    t._smear_coef = random.random()
+                dummy_clock.advance(random.uniform(0, 120))
+            self.assertEqual(len(launches), len(scheduler_service._scheduled_tasks))
+            self.assertEqual({k: len(v) for k, v in launches.iteritems()}, dict.fromkeys(launches.iterkeys(), 2))
 
             # We check that the run method of the deck was called twice
+            # NB: That does NOT check that @daily task was called exactly twice
             self.mock_deck.run.assert_has_calls([
                 mock.call(mock_director, from_schedule=True), mock.call(mock_director, from_schedule=True)
             ])
diff --git a/ooni/ui/web/server.py b/ooni/ui/web/server.py
index fc197a12..9111078b 100644
--- a/ooni/ui/web/server.py
+++ b/ooni/ui/web/server.py
@@ -340,8 +340,8 @@ class WebUIAPI(object):
             except DeckNotFound:
                 raise WebUIError(404, 'Deck not found')
 
-        self.scheduler.refresh_deck_list()
         config.set_initialized()
+        self.scheduler.refresh_deck_list()
 
         self._is_initialized = True
 



_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits