svc/conntrack.in (kickpeers): Refactor and reformat the search loop.
[tripe] / svc / connect.in
index e898141..b6ec4b8 100644 (file)
@@ -37,6 +37,7 @@ from math import sqrt
 import cdb as CDB
 import mLib as M
 import re as RX
+import sys as SYS
 from time import time
 import subprocess as PROC
 
@@ -295,12 +296,15 @@ class Peer (object):
     a FILTER function is given then apply it to the information from the
     database before returning it.
     """
-    attr = me.__dict__.get(key, default)
-    if attr is _magic:
-      raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key)
-    elif filter is not None:
-      attr = filter(attr)
-    return attr
+    try:
+      attr = me.__dict__[key]
+    except KeyError:
+      if default is _magic:
+        raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key)
+      return default
+    else:
+      if filter is not None: attr = filter(attr)
+      return attr
 
   def has(me, key):
     """
@@ -396,6 +400,7 @@ class PingPeer (object):
     me.seq = _pingseq
     _pingseq += 1
     me._failures = 0
+    me._sabotage = False
     me._last = '-'
     me._nping = 0
     me._nlost = 0
@@ -435,14 +440,18 @@ class PingPeer (object):
        me._peer]))
 
   def _reconnect(me):
-    peer = Peer(me._peer)
-    if me._connectp:
-      S.warn('connect', 'reconnecting', me._peer)
-      S.forcekx(me._peer)
-      T.spawn(run_connect, peer, peer.get('connect'))
-      me._timer = M.SelTimer(time() + me._every, me._time)
-    else:
-      S.kill(me._peer)
+    try:
+      peer = Peer(me._peer)
+      if me._connectp:
+        S.warn('connect', 'reconnecting', me._peer)
+        S.forcekx(me._peer)
+        T.spawn(run_connect, peer, peer.get('connect'))
+        me._timer = M.SelTimer(time() + me._every, me._time)
+        me._sabotage = False
+      else:
+        S.kill(me._peer)
+    except TripeError, e:
+      if e.args[0] == 'unknown-peer': me._pinger.kill(me._peer)
 
   def event(me, code, stuff):
     """
@@ -461,16 +470,15 @@ class PingPeer (object):
       me._ping()
     elif code == 'FAIL':
       S.notify('connect', 'ping-failed', me._peer, *stuff)
-      if not stuff:
-        pass
-      elif stuff[0] == 'unknown-peer':
-        me._pinger.kill(me._peer)
-      elif stuff[0] == 'ping-send-failed':
-        me._reconnect()
+      if not stuff: pass
+      elif stuff[0] == 'unknown-peer': me._pinger.kill(me._peer)
+      elif stuff[0] == 'ping-send-failed': me._reconnect()
     elif code == 'INFO':
-      if stuff[0] == 'ping-ok':
-        if me._failures > 0:
-          S.warn('connect', 'ping-ok', me._peer)
+      outcome = stuff[0]
+      if outcome == 'ping-ok' and me._sabotage:
+        outcome = 'ping-timeout'
+      if outcome == 'ping-ok':
+        if me._failures > 0: S.warn('connect', 'ping-ok', me._peer)
         t = float(stuff[1])
         me._last = '%.1fms' % t
         me._sigma_t += t
@@ -479,7 +487,7 @@ class PingPeer (object):
         if me._min == '-' or t < me._min: me._min = t
         if me._max == '-' or t > me._max: me._max = t
         me._timer = M.SelTimer(time() + me._every, me._time)
-      elif stuff[0] == 'ping-timeout':
+      elif outcome == 'ping-timeout':
         me._failures += 1
         me._nlost += 1
         S.warn('connect', 'ping-timeout', me._peer,
@@ -489,9 +497,16 @@ class PingPeer (object):
           me._last = 'timeout'
         else:
           me._reconnect()
-      elif stuff[0] == 'ping-peer-died':
+          me._last = 'reconnect'
+      elif outcome == 'ping-peer-died':
         me._pinger.kill(me._peer)
 
+  def sabotage(me):
+    """Sabotage the peer, for testing purposes."""
+    me._sabotage = True
+    if me._timer: me._timer.kill()
+    T.defer(me._time)
+
   def info(me):
     if not me._nping:
       mean = sd = '-'
@@ -552,7 +567,9 @@ class Pinger (T.Coroutine):
     while True:
       (peer, seq), code, stuff = me._q.get()
       if peer in me._peers and seq == me._peers[peer].seq:
-        me._peers[peer].event(code, stuff)
+        try: me._peers[peer].event(code, stuff)
+        except Exception, e:
+          SYS.excepthook(*SYS.exc_info())
 
   def add(me, peer, pingnow):
     """
@@ -708,14 +725,13 @@ def addpeer(peer, addr):
   if peer.name in S.list():
     S.kill(peer.name)
   try:
-    booltrue = ['t', 'true', 'y', 'yes', 'on']
     S.add(peer.name,
-          tunnel = peer.get('tunnel', None),
-          keepalive = peer.get('keepalive', None),
-          key = peer.get('key', None),
-          priv = peer.get('priv', None),
-          mobile = peer.get('mobile', 'nil') in booltrue,
-          cork = peer.get('cork', 'nil') in booltrue,
+          tunnel = peer.get('tunnel', default = None),
+          keepalive = peer.get('keepalive', default = None),
+          key = peer.get('key', default = None),
+          priv = peer.get('priv', default = None),
+          mobile = peer.get('mobile', filter = boolean, default = False),
+          cork = peer.get('cork', filter = boolean, default = False),
           *addr)
   except T.TripeError, exc:
     raise T.TripeJobError(*exc.args)
@@ -844,6 +860,14 @@ def cmd_passive(*args):
   finally:
     del chalmap[chal]
 
+def cmd_sabotage(name):
+  """
+  sabotage NAME: Sabotage the NAMEd peer so that we think it can't be pinged.
+  """
+  try: pp = pinger.find(name)
+  except KeyError: raise T.TripeJobError('unknown-peer', name)
+  pp.sabotage()
+
 ###--------------------------------------------------------------------------
 ### Start up.
 
@@ -927,11 +951,13 @@ service_info = [('connect', T.VERSION, {
   'active': (1, 1, 'PEER', cmd_active),
   'info': (1, 1, 'PEER', cmd_info),
   'list-active': (0, 0, '', cmd_listactive),
-  'userpeer': (1, 1, 'USER', cmd_userpeer)
+  'userpeer': (1, 1, 'USER', cmd_userpeer),
+  'sabotage': (1, 1, 'PEER', cmd_sabotage)
 })]
 
 if __name__ == '__main__':
   opts = parse_options()
+  OS.environ['TRIPESOCK'] = opts.tripesock
   T.runservices(opts.tripesock, service_info,
                 init = init, setup = setup,
                 daemon = opts.daemon)