X-Git-Url: https://git.distorted.org.uk/~mdw/tripe/blobdiff_plain/b9dedfa6e0494bbeba9f954ed91fdd5c9fb2570e..HEAD:/svc/connect.in diff --git a/svc/connect.in b/svc/connect.in index e8981411..25c80b0f 100644 --- a/svc/connect.in +++ b/svc/connect.in @@ -37,6 +37,7 @@ from math import sqrt import cdb as CDB import mLib as M import re as RX +import sys as SYS from time import time import subprocess as PROC @@ -295,12 +296,15 @@ class Peer (object): a FILTER function is given then apply it to the information from the database before returning it. """ - attr = me.__dict__.get(key, default) - if attr is _magic: - raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key) - elif filter is not None: - attr = filter(attr) - return attr + try: + attr = me.__dict__[key] + except KeyError: + if default is _magic: + raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key) + return default + else: + if filter is not None: attr = filter(attr) + return attr def has(me, key): """ @@ -396,6 +400,7 @@ class PingPeer (object): me.seq = _pingseq _pingseq += 1 me._failures = 0 + me._sabotage = False me._last = '-' me._nping = 0 me._nlost = 0 @@ -420,6 +425,7 @@ class PingPeer (object): me._timeout = peer.get('timeout', filter = T.timespec, default = 10) me._retries = peer.get('retries', filter = int, default = 5) me._connectp = peer.has('connect') + me._knockp = peer.has('knock') return me def _ping(me): @@ -435,14 +441,18 @@ class PingPeer (object): me._peer])) def _reconnect(me): - peer = Peer(me._peer) - if me._connectp: - S.warn('connect', 'reconnecting', me._peer) - S.forcekx(me._peer) - T.spawn(run_connect, peer, peer.get('connect')) - me._timer = M.SelTimer(time() + me._every, me._time) - else: - S.kill(me._peer) + try: + peer = Peer(me._peer) + if me._connectp or me._knockp: + S.warn('connect', 'reconnecting', me._peer) + S.forcekx(me._peer) + if me._connectp: T.spawn(run_connect, peer, peer.get('connect')) + me._timer = M.SelTimer(time() + me._every, me._time) + me._sabotage = False + else: + S.kill(me._peer) + except TripeError, e: + if e.args[0] == 'unknown-peer': me._pinger.kill(me._peer) def event(me, code, stuff): """ @@ -461,16 +471,15 @@ class PingPeer (object): me._ping() elif code == 'FAIL': S.notify('connect', 'ping-failed', me._peer, *stuff) - if not stuff: - pass - elif stuff[0] == 'unknown-peer': - me._pinger.kill(me._peer) - elif stuff[0] == 'ping-send-failed': - me._reconnect() + if not stuff: pass + elif stuff[0] == 'unknown-peer': me._pinger.kill(me._peer) + elif stuff[0] == 'ping-send-failed': me._reconnect() elif code == 'INFO': - if stuff[0] == 'ping-ok': - if me._failures > 0: - S.warn('connect', 'ping-ok', me._peer) + outcome = stuff[0] + if outcome == 'ping-ok' and me._sabotage: + outcome = 'ping-timeout' + if outcome == 'ping-ok': + if me._failures > 0: S.warn('connect', 'ping-ok', me._peer) t = float(stuff[1]) me._last = '%.1fms' % t me._sigma_t += t @@ -479,7 +488,7 @@ class PingPeer (object): if me._min == '-' or t < me._min: me._min = t if me._max == '-' or t > me._max: me._max = t me._timer = M.SelTimer(time() + me._every, me._time) - elif stuff[0] == 'ping-timeout': + elif outcome == 'ping-timeout': me._failures += 1 me._nlost += 1 S.warn('connect', 'ping-timeout', me._peer, @@ -489,9 +498,16 @@ class PingPeer (object): me._last = 'timeout' else: me._reconnect() - elif stuff[0] == 'ping-peer-died': + me._last = 'reconnect' + elif outcome == 'ping-peer-died': me._pinger.kill(me._peer) + def sabotage(me): + """Sabotage the peer, for testing purposes.""" + me._sabotage = True + if me._timer: me._timer.kill() + T.defer(me._time) + def info(me): if not me._nping: mean = sd = '-' @@ -510,7 +526,7 @@ class PingPeer (object): 'min-ping': '%.1fms' % me._min, 'max-ping': '%.1fms' % me._max, 'state': me._timer and 'idle' or 'check', - 'failures': me._failures } + 'failures': str(me._failures) } @T._callback def _time(me): @@ -552,7 +568,9 @@ class Pinger (T.Coroutine): while True: (peer, seq), code, stuff = me._q.get() if peer in me._peers and seq == me._peers[peer].seq: - me._peers[peer].event(code, stuff) + try: me._peers[peer].event(code, stuff) + except Exception, e: + SYS.excepthook(*SYS.exc_info()) def add(me, peer, pingnow): """ @@ -699,23 +717,26 @@ def disownpeer(peer): T.Coroutine(run_ifupdown, name = 'ifdown %s' % peer.name) \ .switch('ifdown', peer) -def addpeer(peer, addr): +def addpeer(peer, addr, ephemp): """ Process a connect request from a new peer PEER on address ADDR. - Any existing peer with this name is disconnected from the server. + Any existing peer with this name is disconnected from the server. EPHEMP + is the default ephemeral-ness state for the new peer. """ if peer.name in S.list(): S.kill(peer.name) try: - booltrue = ['t', 'true', 'y', 'yes', 'on'] S.add(peer.name, - tunnel = peer.get('tunnel', None), - keepalive = peer.get('keepalive', None), - key = peer.get('key', None), - priv = peer.get('priv', None), - mobile = peer.get('mobile', 'nil') in booltrue, - cork = peer.get('cork', 'nil') in booltrue, + tunnel = peer.get('tunnel', default = None), + keepalive = peer.get('keepalive', default = None), + key = peer.get('key', default = None), + priv = peer.get('priv', default = None), + mobile = peer.get('mobile', filter = boolean, default = False), + knock = peer.get('knock', default = None), + cork = peer.get('cork', filter = boolean, default = False), + ephemeral = peer.get('ephemeral', filter = boolean, + default = ephemp), *addr) except T.TripeError, exc: raise T.TripeJobError(*exc.args) @@ -744,6 +765,23 @@ def notify(_, code, *rest): try: cr = chalmap[chal] except KeyError: pass else: cr.switch(rest[1:]) + elif code == 'KNOCK': + try: p = Peer(rest[0]) + except KeyError: + S.warn(['connect', 'knock-unknown-peer', rest[0]]) + return + if p.get('peer') != 'PASSIVE': + S.warn(['connect', 'knock-active-peer', p.name]) + return + dot = p.name.find('.') + if dot >= 0: kname = p.name[dot + 1:] + else: kname = p.name + ktag = p.get('key', p.name) + if kname != ktag: + S.warn(['connect', 'knock-tag-mismatch', + 'peer', pname, 'public-key-tag', ktag]) + return + T.spawn(addpeer, p, rest[1:], True) ###-------------------------------------------------------------------------- ### Command implementation. @@ -778,7 +816,7 @@ def cmd_active(name): addr = peer.get('peer') if addr == 'PASSIVE': raise T.TripeJobError('passive-peer', name) - addpeer(peer, M.split(addr, quotep = True)[0]) + addpeer(peer, M.split(addr, quotep = True)[0], True) def cmd_listactive(): """ @@ -804,7 +842,7 @@ def cmd_info(name): for i in items: try: v = d[i] except KeyError: v = peer.get(i) - T.svcinfo('%s=%s' % (i, v)) + T.svcinfo('%s=%s' % (i, v.replace('\n', ' '))) def cmd_userpeer(user): """ @@ -840,10 +878,18 @@ def cmd_passive(*args): addr = cr.parent.switch() if addr is None: raise T.TripeJobError('connect-timeout') - addpeer(peer, addr) + addpeer(peer, addr, True) finally: del chalmap[chal] +def cmd_sabotage(name): + """ + sabotage NAME: Sabotage the NAMEd peer so that we think it can't be pinged. + """ + try: pp = pinger.find(name) + except KeyError: raise T.TripeJobError('unknown-peer', name) + pp.sabotage() + ###-------------------------------------------------------------------------- ### Start up. @@ -868,7 +914,7 @@ def setup(): for name in M.split(autos)[0]: try: peer = Peer(name, cdb) - addpeer(peer, M.split(peer.get('peer'), quotep = True)[0]) + addpeer(peer, M.split(peer.get('peer'), quotep = True)[0], False) except T.TripeJobError, err: S.warn('connect', 'auto-add-failed', name, *err.args) @@ -927,11 +973,13 @@ service_info = [('connect', T.VERSION, { 'active': (1, 1, 'PEER', cmd_active), 'info': (1, 1, 'PEER', cmd_info), 'list-active': (0, 0, '', cmd_listactive), - 'userpeer': (1, 1, 'USER', cmd_userpeer) + 'userpeer': (1, 1, 'USER', cmd_userpeer), + 'sabotage': (1, 1, 'PEER', cmd_sabotage) })] if __name__ == '__main__': opts = parse_options() + OS.environ['TRIPESOCK'] = opts.tripesock T.runservices(opts.tripesock, service_info, init = init, setup = setup, daemon = opts.daemon)