X-Git-Url: https://git.distorted.org.uk/~mdw/tripe/blobdiff_plain/d64ce4ae05ad8250e08296af18d7d6ddcc5bbc9a..fa59a04b4daaa8d75e1fb0e875ed6ed5fb6d1304:/svc/connect.in diff --git a/svc/connect.in b/svc/connect.in index 000ca50e..b6ec4b8c 100644 --- a/svc/connect.in +++ b/svc/connect.in @@ -10,19 +10,18 @@ ### ### This file is part of Trivial IP Encryption (TrIPE). ### -### TrIPE is free software; you can redistribute it and/or modify -### it under the terms of the GNU General Public License as published by -### the Free Software Foundation; either version 2 of the License, or -### (at your option) any later version. +### TrIPE is free software: you can redistribute it and/or modify it under +### the terms of the GNU General Public License as published by the Free +### Software Foundation; either version 3 of the License, or (at your +### option) any later version. ### -### TrIPE is distributed in the hope that it will be useful, -### but WITHOUT ANY WARRANTY; without even the implied warranty of -### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -### GNU General Public License for more details. +### TrIPE is distributed in the hope that it will be useful, but WITHOUT +### ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +### FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +### for more details. ### ### You should have received a copy of the GNU General Public License -### along with TrIPE; if not, write to the Free Software Foundation, -### Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +### along with TrIPE. If not, see . VERSION = '@VERSION@' @@ -34,9 +33,11 @@ import tripe as T import os as OS import signal as SIG import errno as E +from math import sqrt import cdb as CDB import mLib as M import re as RX +import sys as SYS from time import time import subprocess as PROC @@ -127,7 +128,7 @@ class ErrorWatch (T.Coroutine): def dbwatch(): """ - Coroutine function: wake up every second and notice changes to the + Coroutine function: wake up every minute and notice changes to the database. When a change happens, tell the Pinger (q.v.) to rescan its peers. """ @@ -135,7 +136,7 @@ def dbwatch(): main = cr.parent fw = M.FWatch(opts.cdb) while True: - timer = M.SelTimer(time() + 1, lambda: cr.switch()) + timer = M.SelTimer(time() + 60, lambda: cr.switch()) main.switch() if fw.update(): pinger.rescan(False) @@ -295,12 +296,15 @@ class Peer (object): a FILTER function is given then apply it to the information from the database before returning it. """ - attr = me.__dict__.get(key, default) - if attr is _magic: - raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key) - elif filter is not None: - attr = filter(attr) - return attr + try: + attr = me.__dict__[key] + except KeyError: + if default is _magic: + raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key) + return default + else: + if filter is not None: attr = filter(attr) + return attr def has(me, key): """ @@ -396,6 +400,13 @@ class PingPeer (object): me.seq = _pingseq _pingseq += 1 me._failures = 0 + me._sabotage = False + me._last = '-' + me._nping = 0 + me._nlost = 0 + me._sigma_t = 0 + me._sigma_t2 = 0 + me._min = me._max = '-' if pingnow: me._timer = None me._ping() @@ -429,14 +440,18 @@ class PingPeer (object): me._peer])) def _reconnect(me): - peer = Peer(me._peer) - if me._connectp: - S.warn('connect', 'reconnecting', me._peer) - S.forcekx(me._peer) - T.spawn(run_connect, peer, peer.get('connect')) - me._timer = M.SelTimer(time() + me._every, me._time) - else: - S.kill(me._peer) + try: + peer = Peer(me._peer) + if me._connectp: + S.warn('connect', 'reconnecting', me._peer) + S.forcekx(me._peer) + T.spawn(run_connect, peer, peer.get('connect')) + me._timer = M.SelTimer(time() + me._every, me._time) + me._sabotage = False + else: + S.kill(me._peer) + except TripeError, e: + if e.args[0] == 'unknown-peer': me._pinger.kill(me._peer) def event(me, code, stuff): """ @@ -455,28 +470,63 @@ class PingPeer (object): me._ping() elif code == 'FAIL': S.notify('connect', 'ping-failed', me._peer, *stuff) - if not stuff: - pass - elif stuff[0] == 'unknown-peer': - me._pinger.kill(me._peer) - elif stuff[0] == 'ping-send-failed': - me._reconnect() + if not stuff: pass + elif stuff[0] == 'unknown-peer': me._pinger.kill(me._peer) + elif stuff[0] == 'ping-send-failed': me._reconnect() elif code == 'INFO': - if stuff[0] == 'ping-ok': - if me._failures > 0: - S.warn('connect', 'ping-ok', me._peer) + outcome = stuff[0] + if outcome == 'ping-ok' and me._sabotage: + outcome = 'ping-timeout' + if outcome == 'ping-ok': + if me._failures > 0: S.warn('connect', 'ping-ok', me._peer) + t = float(stuff[1]) + me._last = '%.1fms' % t + me._sigma_t += t + me._sigma_t2 += t*t + me._nping += 1 + if me._min == '-' or t < me._min: me._min = t + if me._max == '-' or t > me._max: me._max = t me._timer = M.SelTimer(time() + me._every, me._time) - elif stuff[0] == 'ping-timeout': + elif outcome == 'ping-timeout': me._failures += 1 + me._nlost += 1 S.warn('connect', 'ping-timeout', me._peer, 'attempt', str(me._failures), 'of', str(me._retries)) if me._failures < me._retries: me._ping() + me._last = 'timeout' else: me._reconnect() - elif stuff[0] == 'ping-peer-died': + me._last = 'reconnect' + elif outcome == 'ping-peer-died': me._pinger.kill(me._peer) + def sabotage(me): + """Sabotage the peer, for testing purposes.""" + me._sabotage = True + if me._timer: me._timer.kill() + T.defer(me._time) + + def info(me): + if not me._nping: + mean = sd = '-' + else: + mean = me._sigma_t/me._nping + sd = sqrt(me._sigma_t2/me._nping - mean*mean) + n = me._nping + me._nlost + if not n: pclost = '-' + else: pclost = '%d' % ((100*me._nlost + n//2)//n) + return { 'last-ping': me._last, + 'mean-ping': '%.1fms' % mean, + 'sd-ping': '%.1fms' % sd, + 'n-ping': '%d' % me._nping, + 'n-lost': '%d' % me._nlost, + 'percent-lost': pclost, + 'min-ping': '%.1fms' % me._min, + 'max-ping': '%.1fms' % me._max, + 'state': me._timer and 'idle' or 'check', + 'failures': me._failures } + @T._callback def _time(me): """ @@ -517,7 +567,9 @@ class Pinger (T.Coroutine): while True: (peer, seq), code, stuff = me._q.get() if peer in me._peers and seq == me._peers[peer].seq: - me._peers[peer].event(code, stuff) + try: me._peers[peer].event(code, stuff) + except Exception, e: + SYS.excepthook(*SYS.exc_info()) def add(me, peer, pingnow): """ @@ -529,7 +581,8 @@ class Pinger (T.Coroutine): def kill(me, peername): """Remove PEER from the peers being watched by the Pinger.""" - del me._peers[peername] + try: del me._peers[peername] + except KeyError: pass return me def rescan(me, startup): @@ -586,6 +639,10 @@ class Pinger (T.Coroutine): """ return me._peers.keys() + def find(me, name): + """Return the PingPeer with the given name.""" + return me._peers[name] + ###-------------------------------------------------------------------------- ### New connections. @@ -668,14 +725,13 @@ def addpeer(peer, addr): if peer.name in S.list(): S.kill(peer.name) try: - booltrue = ['t', 'true', 'y', 'yes', 'on'] S.add(peer.name, - tunnel = peer.get('tunnel', None), - keepalive = peer.get('keepalive', None), - key = peer.get('key', None), - priv = peer.get('priv', None), - mobile = peer.get('mobile', 'nil') in booltrue, - cork = peer.get('cork', 'nil') in booltrue, + tunnel = peer.get('tunnel', default = None), + keepalive = peer.get('keepalive', default = None), + key = peer.get('key', default = None), + priv = peer.get('priv', default = None), + mobile = peer.get('mobile', filter = boolean, default = False), + cork = peer.get('cork', filter = boolean, default = False), *addr) except T.TripeError, exc: raise T.TripeJobError(*exc.args) @@ -712,11 +768,13 @@ def cmd_kick(name): """ kick NAME: Force a new connection attempt for the NAMEd peer. """ - if name not in pinger.adopted(): - raise T.TripeJobError('peer-not-adopted', name) + try: pp = pinger.find(name) + except KeyError: raise T.TripeJobError('peer-not-adopted', name) try: peer = Peer(name) except KeyError: raise T.TripeJobError('unknown-peer', name) - T.spawn(connect, peer) + conn = peer.get('connect', None) + if conn: T.spawn(run_connect, peer, peer.get('connect')) + else: T.spawn(lambda p: S.forcekx(p.name), peer) def cmd_adopted(): """ @@ -753,10 +811,16 @@ def cmd_info(name): """ try: peer = Peer(name) except KeyError: raise T.TripeJobError('unknown-peer', name) - items = list(peer.list()) + d = {} + try: pp = pinger.find(name) + except KeyError: pass + else: d.update(pp.info()) + items = list(peer.list()) + d.keys() items.sort() for i in items: - T.svcinfo('%s=%s' % (i, peer.get(i))) + try: v = d[i] + except KeyError: v = peer.get(i) + T.svcinfo('%s=%s' % (i, v)) def cmd_userpeer(user): """ @@ -796,6 +860,14 @@ def cmd_passive(*args): finally: del chalmap[chal] +def cmd_sabotage(name): + """ + sabotage NAME: Sabotage the NAMEd peer so that we think it can't be pinged. + """ + try: pp = pinger.find(name) + except KeyError: raise T.TripeJobError('unknown-peer', name) + pp.sabotage() + ###-------------------------------------------------------------------------- ### Start up. @@ -879,11 +951,13 @@ service_info = [('connect', T.VERSION, { 'active': (1, 1, 'PEER', cmd_active), 'info': (1, 1, 'PEER', cmd_info), 'list-active': (0, 0, '', cmd_listactive), - 'userpeer': (1, 1, 'USER', cmd_userpeer) + 'userpeer': (1, 1, 'USER', cmd_userpeer), + 'sabotage': (1, 1, 'PEER', cmd_sabotage) })] if __name__ == '__main__': opts = parse_options() + OS.environ['TRIPESOCK'] = opts.tripesock T.runservices(opts.tripesock, service_info, init = init, setup = setup, daemon = opts.daemon)