[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [tor/master] Code to manage publish/subscribe setup via subsystem interface.



commit bdeaf7d4b2929609c4d3f2ce9adfd973361ef578
Author: Nick Mathewson <nickm@xxxxxxxxxxxxxx>
Date:   Tue Jan 15 10:27:39 2019 -0500

    Code to manage publish/subscribe setup via subsystem interface.
    
    This commit has the necessary logic to run the publish/subscribe
    system from the mainloop, and to initialize it on startup and tear
    it down later.
---
 src/app/main/subsysmgr.c            |  52 ++++++++++++-
 src/app/main/subsysmgr.h            |   5 ++
 src/core/include.am                 |   2 +
 src/core/mainloop/mainloop_pubsub.c | 149 ++++++++++++++++++++++++++++++++++++
 src/core/mainloop/mainloop_pubsub.h |  23 ++++++
 src/lib/subsys/subsys.h             |   4 +-
 6 files changed, 232 insertions(+), 3 deletions(-)

diff --git a/src/app/main/subsysmgr.c b/src/app/main/subsysmgr.c
index abd2edd10..91a567ce0 100644
--- a/src/app/main/subsysmgr.c
+++ b/src/app/main/subsysmgr.c
@@ -5,9 +5,14 @@
 
 #include "orconfig.h"
 #include "app/main/subsysmgr.h"
-#include "lib/err/torerr.h"
 
+#include "lib/dispatch/dispatch_naming.h"
+#include "lib/dispatch/msgtypes.h"
+#include "lib/err/torerr.h"
 #include "lib/log/log.h"
+#include "lib/malloc/malloc.h"
+#include "lib/pubsub/pubsub_build.h"
+#include "lib/pubsub/pubsub_connect.h"
 
 #include <stdio.h>
 #include <stdlib.h>
@@ -106,6 +111,51 @@ subsystems_init_upto(int target_level)
 }
 
 /**
+ * Add publish/subscribe relationships to <b>builder</b> for all
+ * initialized subsystems of level no more than <b>target_level</b>.
+ **/
+int
+subsystems_add_pubsub_upto(pubsub_builder_t *builder,
+                           int target_level)
+{
+  for (unsigned i = 0; i < n_tor_subsystems; ++i) {
+    const subsys_fns_t *sys = tor_subsystems[i];
+    if (!sys->supported)
+      continue;
+    if (sys->level > target_level)
+      break;
+    if (! sys_initialized[i])
+      continue;
+    int r = 0;
+    if (sys->add_pubsub) {
+      subsys_id_t sysid = get_subsys_id(sys->name);
+      raw_assert(sysid != ERROR_ID);
+      pubsub_connector_t *connector;
+      connector = pubsub_connector_for_subsystem(builder, sysid);
+      r = sys->add_pubsub(connector);
+      pubsub_connector_free(connector);
+    }
+    if (r < 0) {
+      fprintf(stderr, "BUG: subsystem %s (at %u) could not connect to "
+              "publish/subscribe system.", sys->name, sys->level);
+      raw_assert_unreached_msg("A subsystem couldn't be connected.");
+    }
+  }
+
+  return 0;
+}
+
+/**
+ * Add publish/subscribe relationships to <b>builder</b> for all
+ * initialized subsystems.
+ **/
+int
+subsystems_add_pubsub(pubsub_builder_t *builder)
+{
+  return subsystems_add_pubsub_upto(builder, MAX_SUBSYS_LEVEL);
+}
+
+/**
  * Shut down all the subsystems.
  **/
 void
diff --git a/src/app/main/subsysmgr.h b/src/app/main/subsysmgr.h
index 4b3cad62a..4878cf8c3 100644
--- a/src/app/main/subsysmgr.h
+++ b/src/app/main/subsysmgr.h
@@ -14,6 +14,11 @@ extern const unsigned n_tor_subsystems;
 int subsystems_init(void);
 int subsystems_init_upto(int level);
 
+struct pubsub_builder_t;
+int subsystems_add_pubsub_upto(struct pubsub_builder_t *builder,
+                               int target_level);
+int subsystems_add_pubsub(struct pubsub_builder_t *builder);
+
 void subsystems_shutdown(void);
 void subsystems_shutdown_downto(int level);
 
diff --git a/src/core/include.am b/src/core/include.am
index ae47c75e0..3a0e907ed 100644
--- a/src/core/include.am
+++ b/src/core/include.am
@@ -22,6 +22,7 @@ LIBTOR_APP_A_SOURCES = 				\
 	src/core/mainloop/connection.c		\
 	src/core/mainloop/cpuworker.c		\
 	src/core/mainloop/mainloop.c		\
+	src/core/mainloop/mainloop_pubsub.c	\
 	src/core/mainloop/netstatus.c		\
 	src/core/mainloop/periodic.c		\
 	src/core/or/address_set.c		\
@@ -213,6 +214,7 @@ noinst_HEADERS +=					\
 	src/core/mainloop/connection.h			\
 	src/core/mainloop/cpuworker.h			\
 	src/core/mainloop/mainloop.h			\
+	src/core/mainloop/mainloop_pubsub.h		\
 	src/core/mainloop/netstatus.h			\
 	src/core/mainloop/periodic.h			\
 	src/core/or/addr_policy_st.h			\
diff --git a/src/core/mainloop/mainloop_pubsub.c b/src/core/mainloop/mainloop_pubsub.c
new file mode 100644
index 000000000..ab3614ae0
--- /dev/null
+++ b/src/core/mainloop/mainloop_pubsub.c
@@ -0,0 +1,149 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#include "orconfig.h"
+
+#include "src/core/or/or.h"
+#include "src/core/mainloop/mainloop.h"
+#include "src/core/mainloop/mainloop_pubsub.h"
+
+#include "lib/container/smartlist.h"
+#include "lib/dispatch/dispatch.h"
+#include "lib/dispatch/dispatch_naming.h"
+#include "lib/evloop/compat_libevent.h"
+#include "lib/pubsub/pubsub.h"
+#include "lib/pubsub/pubsub_build.h"
+
+/**
+ * Dispatcher to use for delivering messages.
+ **/
+static dispatch_t *the_dispatcher = NULL;
+static pubsub_items_t *the_pubsub_items = NULL;
+/**
+ * A list of mainloop_event_t, indexed by channel ID, to flush the messages
+ * on a channel.
+ **/
+static smartlist_t *alert_events = NULL;
+
+/**
+ * Mainloop event callback: flush all the messages in a channel.
+ *
+ * The channel is encoded as a pointer, and passed via arg.
+ **/
+static void
+flush_channel_event(mainloop_event_t *ev, void *arg)
+{
+  (void)ev;
+  if (!the_dispatcher)
+    return;
+
+  channel_id_t chan = (channel_id_t)(uintptr_t)(arg);
+  dispatch_flush(the_dispatcher, chan, INT_MAX);
+}
+
+int
+tor_mainloop_connect_pubsub(struct pubsub_builder_t *builder)
+{
+  int rv = -1;
+  tor_mainloop_disconnect_pubsub();
+
+  the_dispatcher = pubsub_builder_finalize(builder, &the_pubsub_items);
+  if (! the_dispatcher)
+    goto err;
+
+  const size_t num_channels = get_num_channel_ids();
+  alert_events = smartlist_new();
+  for (size_t i = 0; i < num_channels; ++i) {
+    smartlist_add(alert_events,
+                  mainloop_event_postloop_new(flush_channel_event,
+                                              (void*)(uintptr_t)(i)));
+  }
+
+  rv = 0;
+ err:
+  tor_mainloop_disconnect_pubsub();
+  return rv;
+}
+
+/**
+ * Dispatch alertfn callback: do nothing. Implements DELIV_NEVER.
+ **/
+static void
+alertfn_never(dispatch_t *d, channel_id_t chan, void *arg)
+{
+  (void)d;
+  (void)chan;
+  (void)arg;
+}
+
+/**
+ * Dispatch alertfn callback: activate a mainloop event. Implements
+ * DELIV_PROMPT.
+ **/
+static void
+alertfn_prompt(dispatch_t *d, channel_id_t chan, void *arg)
+{
+  (void)d;
+  (void)chan;
+  mainloop_event_t *event = arg;
+  mainloop_event_activate(event);
+}
+
+/**
+ * Dispatch alertfn callback: flush all messages right now. Implements
+ * DELIV_IMMEDIATE.
+ **/
+static void
+alertfn_immediate(dispatch_t *d, channel_id_t chan, void *arg)
+{
+  (void) arg;
+  dispatch_flush(d, chan, INT_MAX);
+}
+
+/**
+ * Set the strategy to be used for delivering messages on the named channel.
+ **/
+int
+tor_mainloop_set_delivery_strategy(const char *msg_channel_name,
+                                   deliv_strategy_t strategy)
+{
+  channel_id_t chan = get_channel_id(msg_channel_name);
+  if (BUG(chan == ERROR_ID) ||
+      BUG(chan >= smartlist_len(alert_events)))
+    return -1;
+
+  switch (strategy) {
+    case DELIV_NEVER:
+      dispatch_set_alert_fn(the_dispatcher, chan, alertfn_never, NULL);
+      break;
+    case DELIV_PROMPT:
+      dispatch_set_alert_fn(the_dispatcher, chan, alertfn_prompt,
+                            smartlist_get(alert_events, chan));
+      break;
+    case DELIV_IMMEDIATE:
+      dispatch_set_alert_fn(the_dispatcher, chan, alertfn_immediate, NULL);
+      break;
+  }
+  return 0;
+}
+
+/**
+ * Remove all pubsub dispatchers and events from the mainloop.
+ **/
+void
+tor_mainloop_disconnect_pubsub(void)
+{
+  if (the_pubsub_items) {
+    pubsub_items_clear_bindings(the_pubsub_items);
+    pubsub_items_free(the_pubsub_items);
+  }
+  if (alert_events) {
+    SMARTLIST_FOREACH(alert_events, mainloop_event_t *, ev,
+                      mainloop_event_free(ev));
+    smartlist_free(alert_events);
+  }
+  dispatch_free(the_dispatcher);
+}
diff --git a/src/core/mainloop/mainloop_pubsub.h b/src/core/mainloop/mainloop_pubsub.h
new file mode 100644
index 000000000..6eff77842
--- /dev/null
+++ b/src/core/mainloop/mainloop_pubsub.h
@@ -0,0 +1,23 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#ifndef TOR_MAINLOOP_PUBSUB_H
+#define TOR_MAINLOOP_PUBSUB_H
+
+struct pubsub_builder_t;
+
+typedef enum {
+   DELIV_NEVER=0,
+   DELIV_PROMPT,
+   DELIV_IMMEDIATE,
+} deliv_strategy_t;
+
+int tor_mainloop_connect_pubsub(struct pubsub_builder_t *builder);
+int tor_mainloop_set_delivery_strategy(const char *msg_channel_name,
+                                        deliv_strategy_t strategy);
+void tor_mainloop_disconnect_pubsub(void);
+
+#endif
diff --git a/src/lib/subsys/subsys.h b/src/lib/subsys/subsys.h
index 2452ec6e2..6f1710c71 100644
--- a/src/lib/subsys/subsys.h
+++ b/src/lib/subsys/subsys.h
@@ -8,7 +8,7 @@
 
 #include <stdbool.h>
 
-struct dispatch_connector_t;
+struct pubsub_connector_t;
 
 /**
  * A subsystem is a part of Tor that is initialized, shut down, configured,
@@ -58,7 +58,7 @@ typedef struct subsys_fns_t {
   /**
    * Connect a subsystem to the message dispatch system.
    **/
-  int (*add_pubsub)(struct dispatch_connector_t *);
+  int (*add_pubsub)(struct pubsub_connector_t *);
 
   /**
    * Perform any necessary pre-fork cleanup.  This function may not fail.



_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits