[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [flashproxy/rtmfp] Rewrite ProxyPair to be more generic.
commit 9cef884c60fa5c4cea7eb1fa092c9fb53a8a1550
Author: David Fifield <david@xxxxxxxxxxxxxxx>
Date: Sat Jun 11 21:41:41 2011 -0700
Rewrite ProxyPair to be more generic.
---
ProxyPair.as | 204 +++++++++++++++++++++++++++-------------------------------
1 files changed, 95 insertions(+), 109 deletions(-)
diff --git a/ProxyPair.as b/ProxyPair.as
index fa51933..ca9f092 100644
--- a/ProxyPair.as
+++ b/ProxyPair.as
@@ -1,6 +1,5 @@
package
{
- import flash.errors.IllegalOperationError;
import flash.events.Event;
import flash.events.EventDispatcher;
import flash.events.IOErrorEvent;
@@ -11,119 +10,122 @@ package
import flash.utils.ByteArray;
import flash.utils.clearTimeout;
import flash.utils.setTimeout;
-
- import swfcat;
-
+
+ /* An instance of a client-relay connection. */
public class ProxyPair extends EventDispatcher
{
+ // Socket to client.
+ private var s_c:*;
+ private var connect_c:Function;
+
+ // Socket to relay.
+ private var s_r:*;
+ private var connect_r:Function;
+
+ // Parent swfcat, for UI updates and rate meter.
private var ui:swfcat;
-
- protected var client_addr:Object;
-
- /* Not defined here: subclasses should define their own
- * protected var client_socket:Object;
- */
-
- private var c2r_schedule:Array;
-
- private var relay_addr:Object;
- private var relay_socket:Socket;
+
+ // Pending byte read counts for relay and client sockets.
private var r2c_schedule:Array;
-
+ private var c2r_schedule:Array;
// Callback id.
private var flush_id:uint;
-
- public function ProxyPair(self:ProxyPair, ui:swfcat)
- {
- if (self != this) {
- //only a subclass can pass a valid reference to self
- throw new IllegalOperationError("ProxyPair cannot be instantiated directly.");
- }
-
- this.ui = ui;
- this.c2r_schedule = new Array();
- this.r2c_schedule = new Array();
-
- setup_relay_socket();
-
- /* client_socket setup should be taken */
- /* care of in the subclass constructor */
- }
-
- public function close():void
- {
- if (relay_socket != null && relay_socket.connected) {
- relay_socket.close();
- }
-
- /* subclasses should override to close */
- /* their client_socket according to impl. */
- }
-
- public function get connected():Boolean
- {
- return (relay_socket != null && relay_socket.connected);
-
- /* subclasses should override to check */
- /* connectivity of their client_socket. */
- }
-
- public function set client(client_addr:Object):void
+
+ public function log(msg:String):void
{
- /* subclasses should override to */
- /* connect the client_socket here */
+ ui.puts(id() + ": " + msg)
}
-
- public function set relay(relay_addr:Object):void
+
+ // String describing this pair for output.
+ public function id():String
{
- this.relay_addr = relay_addr;
- log("Relay: connecting to " + relay_addr.host + ":" + relay_addr.port + ".");
- relay_socket.connect(relay_addr.host, relay_addr.port);
+ return "<>";
}
-
- protected function transfer_bytes(src:Object, dst:Object, num_bytes:uint):void
+
+ public function ProxyPair(ui:swfcat, s_c:*, connect_c:Function, s_r:*, connect_r:Function)
{
- /* No-op: must be overridden by subclasses */
+ this.ui = ui;
+ /* s_c is a socket for connecting to the client. connect_c is a
+ function that, when called, connects s_c. Likewise for s_r and
+ connect_r. */
+ this.s_c = s_c;
+ this.connect_c = connect_c;
+ this.s_r = s_r;
+ this.connect_r = connect_r;
+
+ this.c2r_schedule = [];
+ this.r2c_schedule = [];
}
- protected function socket_error(message:String):Function
+ /* Return a function that shows an error message and closes the other half
+ of a communication pair. */
+ private function socket_error(message:String, other:*):Function
{
return function(e:Event):void {
if (e is TextEvent)
log(message + ": " + (e as TextEvent).text + ".");
else
log(message + ".");
- close();
+ if (other && other.connected)
+ other.close();
+ dispatchEvent(new Event(Event.COMPLETE));
};
}
-
- private function setup_relay_socket():void
+
+ public function connect():void
+ {
+ s_r.addEventListener(Event.CONNECT, relay_connected);
+ s_r.addEventListener(Event.CLOSE, socket_error("Relay: closed", s_c));
+ s_r.addEventListener(IOErrorEvent.IO_ERROR, socket_error("Relay: I/O error", s_c));
+ s_r.addEventListener(SecurityErrorEvent.SECURITY_ERROR, socket_error("Relay: security error", s_c));
+ s_r.addEventListener(ProgressEvent.SOCKET_DATA, relay_to_client);
+
+ s_c.addEventListener(Event.CONNECT, client_connected);
+ s_c.addEventListener(Event.CLOSE, socket_error("Client: closed", s_r));
+ s_c.addEventListener(IOErrorEvent.IO_ERROR, socket_error("Client: I/O error", s_r));
+ s_c.addEventListener(SecurityErrorEvent.SECURITY_ERROR, socket_error("Client: security error", s_r));
+ s_c.addEventListener(ProgressEvent.SOCKET_DATA, client_to_relay);
+
+ log("Relay: connecting.");
+ connect_r();
+ log("Client: connecting.");
+ connect_c();
+ }
+
+ private function relay_connected(e:Event):void
{
- relay_socket = new Socket();
- relay_socket.addEventListener(Event.CONNECT, function (e:Event):void {
- log("Relay: connected to " + relay_addr.host + ":" + relay_addr.port + ".");
- if (connected) {
- dispatchEvent(new Event(Event.CONNECT));
- }
- });
- relay_socket.addEventListener(Event.CLOSE, socket_error("Relay: closed"));
- relay_socket.addEventListener(IOErrorEvent.IO_ERROR, socket_error("Relay: I/O error"))
- relay_socket.addEventListener(SecurityErrorEvent.SECURITY_ERROR, socket_error("Relay: security error"))
- relay_socket.addEventListener(ProgressEvent.SOCKET_DATA, relay_to_client);
+ log("Relay: connected.");
}
-
- protected function client_to_relay(e:ProgressEvent):void
+
+ private function client_connected(e:Event):void
{
- c2r_schedule.push(e.bytesLoaded);
- flush();
+ log("Client: connected.");
}
-
+
private function relay_to_client(e:ProgressEvent):void
{
r2c_schedule.push(e.bytesLoaded);
flush();
}
-
+
+ private function client_to_relay(e:ProgressEvent):void
+ {
+ c2r_schedule.push(e.bytesLoaded);
+ flush();
+ }
+
+ private function transfer_chunk(s_from:*, s_to:*, n:uint,
+ label:String):void
+ {
+ var bytes:ByteArray;
+
+ bytes = new ByteArray();
+ s_from.readBytes(bytes, 0, n);
+ s_to.writeBytes(bytes);
+ ui.rate_limit.update(n);
+ log(label + ": read " + bytes.length + ".");
+ }
+
/* Send as much data as the rate limit currently allows. */
private function flush():void
{
@@ -131,37 +133,21 @@ package
clearTimeout(flush_id);
flush_id = undefined;
- if (!connected)
- /* Can't do anything until connected. */
+ if (!(s_r.connected && s_c.connected))
+ /* Can't do anything until both sockets are connected. */
return;
- while (!ui.rate_limit.is_limited() && (c2r_schedule.length > 0 || r2c_schedule.length > 0)) {
- var num_bytes:uint;
-
- if (c2r_schedule.length > 0) {
- num_bytes = c2r_schedule.shift();
- transfer_bytes(null, relay_socket, num_bytes);
- ui.rate_limit.update(num_bytes);
- }
-
- if (r2c_schedule.length > 0) {
- num_bytes = r2c_schedule.shift();
- transfer_bytes(relay_socket, null, num_bytes);
- ui.rate_limit.update(num_bytes);
- }
+ while (!ui.rate_limit.is_limited() &&
+ (r2c_schedule.length > 0 || c2r_schedule.length > 0)) {
+ if (r2c_schedule.length > 0)
+ transfer_chunk(s_r, s_c, r2c_schedule.shift(), "Relay");
+ if (c2r_schedule.length > 0)
+ transfer_chunk(s_c, s_r, c2r_schedule.shift(), "Client");
}
/* Call again when safe, if necessary. */
- if (c2r_schedule.length > 0 || r2c_schedule.length > 0)
+ if (r2c_schedule.length > 0 || c2r_schedule.length > 0)
flush_id = setTimeout(flush, ui.rate_limit.when() * 1000);
}
-
- /* Helper function to write output to the
- * swfcat console. Set as protected for
- * subclasses */
- protected function log(s:String):void
- {
- ui.puts(s);
- }
}
}
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits