X-Git-Url: https://git.distorted.org.uk/~mdw/tripe/blobdiff_plain/48b845698dcf3ec4b9f8b9f1848a157f0245d7cc..HEAD:/svc/connect.in diff --git a/svc/connect.in b/svc/connect.in index 37241bbc..25c80b0f 100644 --- a/svc/connect.in +++ b/svc/connect.in @@ -1,28 +1,27 @@ #! @PYTHON@ ### -*-python-*- ### -### Service for establishing dynamic connections +### Connect to remote peers, and keep track of them ### -### (c) 2006 Straylight/Edgeware +### (c) 2007 Straylight/Edgeware ### ###----- Licensing notice --------------------------------------------------- ### ### This file is part of Trivial IP Encryption (TrIPE). ### -### TrIPE is free software; you can redistribute it and/or modify -### it under the terms of the GNU General Public License as published by -### the Free Software Foundation; either version 2 of the License, or -### (at your option) any later version. +### TrIPE is free software: you can redistribute it and/or modify it under +### the terms of the GNU General Public License as published by the Free +### Software Foundation; either version 3 of the License, or (at your +### option) any later version. ### -### TrIPE is distributed in the hope that it will be useful, -### but WITHOUT ANY WARRANTY; without even the implied warranty of -### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -### GNU General Public License for more details. +### TrIPE is distributed in the hope that it will be useful, but WITHOUT +### ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +### FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +### for more details. ### ### You should have received a copy of the GNU General Public License -### along with TrIPE; if not, write to the Free Software Foundation, -### Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +### along with TrIPE. If not, see . VERSION = '@VERSION@' @@ -32,14 +31,245 @@ VERSION = '@VERSION@' from optparse import OptionParser import tripe as T import os as OS +import signal as SIG +import errno as E +from math import sqrt import cdb as CDB import mLib as M +import re as RX +import sys as SYS from time import time +import subprocess as PROC S = T.svcmgr ###-------------------------------------------------------------------------- -### Main service machinery. +### Running auxiliary commands. + +class SelLineQueue (M.SelLineBuffer): + """Glues the select-line-buffer into the coroutine queue system.""" + + def __new__(cls, file, queue, tag, kind): + """See __init__ for documentation.""" + return M.SelLineBuffer.__new__(cls, file.fileno()) + + def __init__(me, file, queue, tag, kind): + """ + Initialize a new line-reading adaptor. + + The adaptor reads lines from FILE. Each line is inserted as a message of + the stated KIND, bearing the TAG, into the QUEUE. End-of-file is + represented as None. + """ + me._q = queue + me._file = file + me._tag = tag + me._kind = kind + me.enable() + + @T._callback + def line(me, line): + me._q.put((me._tag, me._kind, line)) + + @T._callback + def eof(me): + me.disable() + me._q.put((me._tag, me._kind, None)) + +class ErrorWatch (T.Coroutine): + """ + An object which watches stderr streams for errors and converts them into + warnings of the form + + WARN connect INFO stderr LINE + + The INFO is a list of tokens associated with the file when it was + registered. + + Usually there is a single ErrorWatch object, called errorwatch. + """ + + def __init__(me): + """Initialization: there are no arguments.""" + T.Coroutine.__init__(me) + me._q = T.Queue() + me._map = {} + me._seq = 1 + + def watch(me, file, info): + """ + Adds FILE to the collection of files to watch. + + INFO will be written in the warning messages from this FILE. Returns a + sequence number which can be used to unregister the file again. + """ + seq = me._seq + me._seq += 1 + me._map[seq] = info, SelLineQueue(file, me._q, seq, 'stderr') + return seq + + def unwatch(me, seq): + """Stop watching the file with sequence number SEQ.""" + del me._map[seq] + return me + + def run(me): + """ + Coroutine function: read items from the queue and report them. + + Unregisters files automatically when they reach EOF. + """ + while True: + seq, _, line = me._q.get() + if line is None: + me.unwatch(seq) + else: + S.warn(*['connect'] + me._map[seq][0] + ['stderr', line]) + +def dbwatch(): + """ + Coroutine function: wake up every minute and notice changes to the + database. When a change happens, tell the Pinger (q.v.) to rescan its + peers. + """ + cr = T.Coroutine.getcurrent() + main = cr.parent + fw = M.FWatch(opts.cdb) + while True: + timer = M.SelTimer(time() + 60, lambda: cr.switch()) + main.switch() + if fw.update(): + pinger.rescan(False) + S.notify('connect', 'peerdb-update') + +class ChildWatch (M.SelSignal): + """ + An object which watches for specified processes exiting and reports + terminations by writing items of the form (TAG, 'exit', RESULT) to a queue. + + There is usually only one ChildWatch object, called childwatch. + """ + + def __new__(cls): + """Initialize the child-watcher.""" + return M.SelSignal.__new__(cls, SIG.SIGCHLD) + + def __init__(me): + """Initialize the child-watcher.""" + me._pid = {} + me.enable() + + def watch(me, pid, queue, tag): + """ + Register PID as a child to watch. If it exits, write (TAG, 'exit', CODE) + to the QUEUE, where CODE is one of + + * None (successful termination) + * ['exit-nonzero', CODE] (CODE is a string!) + * ['exit-signal', 'S' + CODE] (CODE is the signal number as a string) + * ['exit-unknown', STATUS] (STATUS is the entire exit status, in hex) + """ + me._pid[pid] = queue, tag + return me + + def unwatch(me, pid): + """Unregister PID as a child to watch.""" + del me._pid[pid] + return me + + @T._callback + def signalled(me): + """ + Called when child processes exit: collect exit statuses and report + failures. + """ + while True: + try: + pid, status = OS.waitpid(-1, OS.WNOHANG) + except OSError, exc: + if exc.errno == E.ECHILD: + break + if pid == 0: + break + if pid not in me._pid: + continue + queue, tag = me._pid[pid] + if OS.WIFEXITED(status): + exit = OS.WEXITSTATUS(status) + if exit == 0: + code = None + else: + code = ['exit-nonzero', str(exit)] + elif OS.WIFSIGNALED(status): + code = ['exit-signal', 'S' + str(OS.WTERMSIG(status))] + else: + code = ['exit-unknown', hex(status)] + queue.put((tag, 'exit', code)) + +class Command (object): + """ + Represents a running command. + + This class is the main interface to the machery provided by the ChildWatch + and ErrorWatch objects. See also potwatch. + """ + + def __init__(me, info, queue, tag, args, env): + """ + Start a new child process. + + The ARGS are a list of arguments to be given to the child process. The + ENV is either None or a dictionary of environment variable assignments to + override the extant environment. INFO is a list of tokens to be included + in warnings about the child's stderr output. If the child writes a line + to standard output, put (TAG, 'stdout', LINE) to the QUEUE. When the + child exits, write (TAG, 'exit', CODE) to the QUEUE. + """ + me._info = info + me._q = queue + me._tag = tag + myenv = OS.environ.copy() + if env: myenv.update(env) + me._proc = PROC.Popen(args = args, env = myenv, bufsize = 1, + stdout = PROC.PIPE, stderr = PROC.PIPE) + me._lq = SelLineQueue(me._proc.stdout, queue, tag, 'stdout') + errorwatch.watch(me._proc.stderr, info) + childwatch.watch(me._proc.pid, queue, tag) + + def __del__(me): + """ + If I've been forgotten then stop watching for termination. + """ + childwatch.unwatch(me._proc.pid) + +def potwatch(what, name, q): + """ + Watch the queue Q for activity as reported by a Command object. + + Information from the process's stdout is reported as + + NOTE WHAT NAME stdout LINE + + abnormal termination is reported as + + WARN WHAT NAME CODE + + where CODE is what the ChildWatch wrote. + """ + eofp = deadp = False + while not deadp or not eofp: + _, kind, more = q.get() + if kind == 'stdout': + if more is None: + eofp = True + else: + S.notify('connect', what, name, 'stdout', more) + elif kind == 'exit': + if more: S.warn('connect', what, name, *more) + deadp = True + +###-------------------------------------------------------------------------- +### Peer database utilities. _magic = ['_magic'] # An object distinct from all others @@ -54,23 +284,33 @@ class Peer (object): one given on the command-line. """ me.name = peer - try: - record = (cdb or CDB.init(opts.cdb))['P' + peer] - except KeyError: - raise T.TripeJobError('unknown-peer', peer) + record = (cdb or CDB.init(opts.cdb))['P' + peer] me.__dict__.update(M.URLDecode(record, semip = True)) - def get(me, key, default = _magic): + def get(me, key, default = _magic, filter = None): """ Get the information stashed under KEY from the peer's database record. If DEFAULT is given, then use it if the database doesn't contain the - necessary information. If no DEFAULT is given, then report an error. + necessary information. If no DEFAULT is given, then report an error. If + a FILTER function is given then apply it to the information from the + database before returning it. + """ + try: + attr = me.__dict__[key] + except KeyError: + if default is _magic: + raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key) + return default + else: + if filter is not None: attr = filter(attr) + return attr + + def has(me, key): """ - attr = me.__dict__.get(key, default) - if attr is _magic: - raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key) - return attr + Return whether the peer's database record has the KEY. + """ + return key in me.__dict__ def list(me): """ @@ -78,37 +318,507 @@ class Peer (object): """ return me.__dict__.iterkeys() -def addpeer(peer, addr): +def boolean(value): + """Parse VALUE as a boolean.""" + return value in ['t', 'true', 'y', 'yes', 'on'] + +###-------------------------------------------------------------------------- +### Waking up and watching peers. + +def run_connect(peer, cmd): + """ + Start the job of connecting to the passive PEER. + + The CMD string is a shell command which will connect to the peer (via some + back-channel, say ssh and userv), issue a command + + SVCSUBMIT connect passive [OPTIONS] USER + + and write the resulting challenge to standard error. + """ + q = T.Queue() + cmd = Command(['connect', peer.name], q, 'connect', + ['/bin/sh', '-c', cmd], None) + _, kind, more = q.peek() + if kind == 'stdout': + if more is None: + S.warn('connect', 'connect', peer.name, 'unexpected-eof') + else: + chal = more + S.greet(peer.name, chal) + q.get() + potwatch('connect', peer.name, q) + +def run_disconnect(peer, cmd): + """ + Start the job of disconnecting from a passive PEER. + + The CMD string is a shell command which will disconnect from the peer. + """ + q = T.Queue() + cmd = Command(['disconnect', peer.name], q, 'disconnect', + ['/bin/sh', '-c', cmd], None) + potwatch('disconnect', peer.name, q) + +_pingseq = 0 +class PingPeer (object): + """ + Object representing a peer which we are pinging to ensure that it is still + present. + + PingPeer objects are held by the Pinger (q.v.). The Pinger maintains an + event queue -- which saves us from having an enormous swarm of coroutines + -- but most of the actual work is done here. + + In order to avoid confusion between different PingPeer instances for the + same actual peer, each PingPeer has a sequence number (its `seq' + attribute). Events for the PingPeer are identified by a (PEER, SEQ) pair. + (Using the PingPeer instance itself will prevent garbage collection of + otherwise defunct instances.) + """ + + def __init__(me, pinger, queue, peer, pingnow): + """ + Create a new PingPeer. + + The PINGER is the Pinger object we should send the results to. This is + used when we remove ourselves, if the peer has been explicitly removed. + + The QUEUE is the event queue on which timer and ping-command events + should be written. + + The PEER is a `Peer' object describing the peer. + + If PINGNOW is true, then immediately start pinging the peer. Otherwise + wait until the usual retry interval. + """ + global _pingseq + me._pinger = pinger + me._q = queue + me._peer = peer.name + me.update(peer) + me.seq = _pingseq + _pingseq += 1 + me._failures = 0 + me._sabotage = False + me._last = '-' + me._nping = 0 + me._nlost = 0 + me._sigma_t = 0 + me._sigma_t2 = 0 + me._min = me._max = '-' + if pingnow: + me._timer = None + me._ping() + else: + me._timer = M.SelTimer(time() + me._every, me._time) + + def update(me, peer): + """ + Refreshes the timer parameters for this peer. We don't, however, + immediately reschedule anything: that will happen next time anything + interesting happens. + """ + if peer is None: peer = Peer(me._peer) + assert peer.name == me._peer + me._every = peer.get('every', filter = T.timespec, default = 120) + me._timeout = peer.get('timeout', filter = T.timespec, default = 10) + me._retries = peer.get('retries', filter = int, default = 5) + me._connectp = peer.has('connect') + me._knockp = peer.has('knock') + return me + + def _ping(me): + """ + Send a ping to the peer; the result is sent to the Pinger's event queue. + """ + S.rawcommand(T.TripeAsynchronousCommand( + me._q, (me._peer, me.seq), + ['EPING', + '-background', S.bgtag(), + '-timeout', str(me._timeout), + '--', + me._peer])) + + def _reconnect(me): + try: + peer = Peer(me._peer) + if me._connectp or me._knockp: + S.warn('connect', 'reconnecting', me._peer) + S.forcekx(me._peer) + if me._connectp: T.spawn(run_connect, peer, peer.get('connect')) + me._timer = M.SelTimer(time() + me._every, me._time) + me._sabotage = False + else: + S.kill(me._peer) + except TripeError, e: + if e.args[0] == 'unknown-peer': me._pinger.kill(me._peer) + + def event(me, code, stuff): + """ + Respond to an event which happened to this peer. + + Timer events indicate that we should start a new ping. (The server has + its own timeout which detects lost packets.) + + We trap unknown-peer responses and detach from the Pinger. + + If the ping fails and we run out of retries, we attempt to restart the + connection. + """ + if code == 'TIMER': + me._failures = 0 + me._ping() + elif code == 'FAIL': + S.notify('connect', 'ping-failed', me._peer, *stuff) + if not stuff: pass + elif stuff[0] == 'unknown-peer': me._pinger.kill(me._peer) + elif stuff[0] == 'ping-send-failed': me._reconnect() + elif code == 'INFO': + outcome = stuff[0] + if outcome == 'ping-ok' and me._sabotage: + outcome = 'ping-timeout' + if outcome == 'ping-ok': + if me._failures > 0: S.warn('connect', 'ping-ok', me._peer) + t = float(stuff[1]) + me._last = '%.1fms' % t + me._sigma_t += t + me._sigma_t2 += t*t + me._nping += 1 + if me._min == '-' or t < me._min: me._min = t + if me._max == '-' or t > me._max: me._max = t + me._timer = M.SelTimer(time() + me._every, me._time) + elif outcome == 'ping-timeout': + me._failures += 1 + me._nlost += 1 + S.warn('connect', 'ping-timeout', me._peer, + 'attempt', str(me._failures), 'of', str(me._retries)) + if me._failures < me._retries: + me._ping() + me._last = 'timeout' + else: + me._reconnect() + me._last = 'reconnect' + elif outcome == 'ping-peer-died': + me._pinger.kill(me._peer) + + def sabotage(me): + """Sabotage the peer, for testing purposes.""" + me._sabotage = True + if me._timer: me._timer.kill() + T.defer(me._time) + + def info(me): + if not me._nping: + mean = sd = '-' + else: + mean = me._sigma_t/me._nping + sd = sqrt(me._sigma_t2/me._nping - mean*mean) + n = me._nping + me._nlost + if not n: pclost = '-' + else: pclost = '%d' % ((100*me._nlost + n//2)//n) + return { 'last-ping': me._last, + 'mean-ping': '%.1fms' % mean, + 'sd-ping': '%.1fms' % sd, + 'n-ping': '%d' % me._nping, + 'n-lost': '%d' % me._nlost, + 'percent-lost': pclost, + 'min-ping': '%.1fms' % me._min, + 'max-ping': '%.1fms' % me._max, + 'state': me._timer and 'idle' or 'check', + 'failures': str(me._failures) } + + @T._callback + def _time(me): + """ + Handle timer callbacks by posting a timeout event on the queue. + """ + me._timer = None + me._q.put(((me._peer, me.seq), 'TIMER', None)) + + def __str__(me): + return 'PingPeer(%s, %d, f = %d)' % (me._peer, me.seq, me._failures) + def __repr__(me): + return str(me) + +class Pinger (T.Coroutine): + """ + The Pinger keeps track of the peers which we expect to be connected and + takes action if they seem to stop responding. + + There is usually only one Pinger, called pinger. + + The Pinger maintains a collection of PingPeer objects, and an event queue. + The PingPeers direct the results of their pings, and timer events, to the + event queue. The Pinger's coroutine picks items off the queue and + dispatches them back to the PingPeers as appropriate. + """ + + def __init__(me): + """Initialize the Pinger.""" + T.Coroutine.__init__(me) + me._peers = {} + me._q = T.Queue() + + def run(me): + """ + Coroutine function: reads the pinger queue and sends events to the + PingPeer objects they correspond to. + """ + while True: + (peer, seq), code, stuff = me._q.get() + if peer in me._peers and seq == me._peers[peer].seq: + try: me._peers[peer].event(code, stuff) + except Exception, e: + SYS.excepthook(*SYS.exc_info()) + + def add(me, peer, pingnow): + """ + Add PEER to the collection of peers under the Pinger's watchful eye. + The arguments are as for PingPeer: see above. + """ + me._peers[peer.name] = PingPeer(me, me._q, peer, pingnow) + return me + + def kill(me, peername): + """Remove PEER from the peers being watched by the Pinger.""" + try: del me._peers[peername] + except KeyError: pass + return me + + def rescan(me, startup): + """ + General resynchronization method. + + We scan the list of peers (with connect scripts) known at the server. + Any which are known to the Pinger but aren't known to the server are + removed from our list; newly arrived peers are added. (Note that a peer + can change state here either due to the server sneakily changing its list + without issuing notifications or, more likely, the database changing its + idea of whether a peer is interesting.) Finally, PingPeers which are + still present are prodded to update their timing parameters. + + This method is called once at startup to pick up the peers already + installed, and again by the dbwatcher coroutine when it detects a change + to the database. + """ + if T._debug: print '# rescan peers' + correct = {} + start = {} + for name in S.list(): + try: peer = Peer(name) + except KeyError: continue + if peer.get('watch', filter = boolean, default = False): + if T._debug: print '# interesting peer %s' % peer + correct[peer.name] = start[peer.name] = peer + elif startup: + if T._debug: print '# peer %s ready for adoption' % peer + start[peer.name] = peer + for name, obj in me._peers.items(): + try: + peer = correct[name] + except KeyError: + if T._debug: print '# peer %s vanished' % name + del me._peers[name] + else: + obj.update(peer) + for name, peer in start.iteritems(): + if name in me._peers: continue + if startup: + if T._debug: print '# setting up peer %s' % name + ifname = S.ifname(name) + addr = S.addr(name) + T.defer(adoptpeer, peer, ifname, *addr) + else: + if T._debug: print '# adopting new peer %s' % name + me.add(peer, True) + return me + + def adopted(me): + """ + Returns the list of peers being watched by the Pinger. + """ + return me._peers.keys() + + def find(me, name): + """Return the PingPeer with the given name.""" + return me._peers[name] + +###-------------------------------------------------------------------------- +### New connections. + +def encode_envvars(env, prefix, vars): + """ + Encode the variables in VARS suitably for including in a program + environment. Lowercase letters in variable names are forced to uppercase; + runs of non-alphanumeric characters are replaced by single underscores; and + the PREFIX is prepended. The resulting variables are written to ENV. + """ + for k, v in vars.iteritems(): + env[prefix + r_bad.sub('_', k.upper())] = v + +r_bad = RX.compile(r'[\W_]+') +def envvars(peer): + """ + Translate the database information for a PEER into a dictionary of + environment variables with plausible upper-case names and a P_ prefix. + Also collect the crypto information into A_ variables. + """ + env = {} + encode_envvars(env, 'P_', dict([(k, peer.get(k)) for k in peer.list()])) + encode_envvars(env, 'A_', S.algs(peer.name)) + return env + +def run_ifupdown(what, peer, *args): + """ + Run the interface up/down script for a peer. + + WHAT is 'ifup' or 'ifdown'. PEER names the peer in question. ARGS is a + list of arguments to pass to the script, in addition to the peer name. + + The command is run and watched in the background by potwatch. + """ + q = T.Queue() + c = Command([what, peer.name], q, what, + M.split(peer.get(what), quotep = True)[0] + + [peer.name] + list(args), + envvars(peer)) + potwatch(what, peer.name, q) + +def adoptpeer(peer, ifname, *addr): + """ + Add a new peer to our collection. + + PEER is the `Peer' object; IFNAME is the interface name for its tunnel; and + ADDR is the list of tokens representing its address. + + We try to bring up the interface and provoke a connection to the peer if + it's passive. + """ + if peer.has('ifup'): + T.Coroutine(run_ifupdown, name = 'ifup %s' % peer.name) \ + .switch('ifup', peer, ifname, *addr) + cmd = peer.get('connect', default = None) + if cmd is not None: + T.Coroutine(run_connect, name = 'connect %s' % peer.name) \ + .switch(peer, cmd) + if peer.get('watch', filter = boolean, default = False): + pinger.add(peer, False) + +def disownpeer(peer): + """Drop the PEER from the Pinger and put its interface to bed.""" + try: pinger.kill(peer) + except KeyError: pass + cmd = peer.get('disconnect', default = None) + if cmd is not None: + T.Coroutine(run_disconnect, name = 'disconnect %s' % peer.name) \ + .switch(peer, cmd) + if peer.has('ifdown'): + T.Coroutine(run_ifupdown, name = 'ifdown %s' % peer.name) \ + .switch('ifdown', peer) + +def addpeer(peer, addr, ephemp): """ Process a connect request from a new peer PEER on address ADDR. - Any existing peer with this name is disconnected from the server. + Any existing peer with this name is disconnected from the server. EPHEMP + is the default ephemeral-ness state for the new peer. """ if peer.name in S.list(): S.kill(peer.name) try: S.add(peer.name, - tunnel = peer.get('tunnel', None), - keepalive = peer.get('keepalive', None), - key = peer.get('key', None), - cork = peer.get('cork', 'nil') in ['t', 'true', 'y', 'yes', 'on'], + tunnel = peer.get('tunnel', default = None), + keepalive = peer.get('keepalive', default = None), + key = peer.get('key', default = None), + priv = peer.get('priv', default = None), + mobile = peer.get('mobile', filter = boolean, default = False), + knock = peer.get('knock', default = None), + cork = peer.get('cork', filter = boolean, default = False), + ephemeral = peer.get('ephemeral', filter = boolean, + default = ephemp), *addr) except T.TripeError, exc: raise T.TripeJobError(*exc.args) +## Dictionary mapping challenges to waiting passive-connection coroutines. +chalmap = {} + +def notify(_, code, *rest): + """ + Watch for notifications. + + We trap ADD and KILL notifications, and send them straight to adoptpeer and + disownpeer respectively; and dispatch GREET notifications to the + corresponding waiting coroutine. + """ + if code == 'ADD': + try: p = Peer(rest[0]) + except KeyError: return + adoptpeer(p, *rest[1:]) + elif code == 'KILL': + try: p = Peer(rest[0]) + except KeyError: return + disownpeer(p, *rest[1:]) + elif code == 'GREET': + chal = rest[0] + try: cr = chalmap[chal] + except KeyError: pass + else: cr.switch(rest[1:]) + elif code == 'KNOCK': + try: p = Peer(rest[0]) + except KeyError: + S.warn(['connect', 'knock-unknown-peer', rest[0]]) + return + if p.get('peer') != 'PASSIVE': + S.warn(['connect', 'knock-active-peer', p.name]) + return + dot = p.name.find('.') + if dot >= 0: kname = p.name[dot + 1:] + else: kname = p.name + ktag = p.get('key', p.name) + if kname != ktag: + S.warn(['connect', 'knock-tag-mismatch', + 'peer', pname, 'public-key-tag', ktag]) + return + T.spawn(addpeer, p, rest[1:], True) + +###-------------------------------------------------------------------------- +### Command implementation. + +def cmd_kick(name): + """ + kick NAME: Force a new connection attempt for the NAMEd peer. + """ + try: pp = pinger.find(name) + except KeyError: raise T.TripeJobError('peer-not-adopted', name) + try: peer = Peer(name) + except KeyError: raise T.TripeJobError('unknown-peer', name) + conn = peer.get('connect', None) + if conn: T.spawn(run_connect, peer, peer.get('connect')) + else: T.spawn(lambda p: S.forcekx(p.name), peer) + +def cmd_adopted(): + """ + adopted: Report a list of adopted peers. + """ + for name in pinger.adopted(): + T.svcinfo(name) + def cmd_active(name): """ active NAME: Handle an active connection request for the peer called NAME. The appropriate address is read from the database automatically. """ - peer = Peer(name) + try: peer = Peer(name) + except KeyError: raise T.TripeJobError('unknown-peer', name) addr = peer.get('peer') if addr == 'PASSIVE': raise T.TripeJobError('passive-peer', name) - addpeer(peer, M.split(addr, quotep = True)[0]) + addpeer(peer, M.split(addr, quotep = True)[0], True) -def cmd_list(): +def cmd_listactive(): """ list: Report a list of the available active peers. """ @@ -121,14 +831,26 @@ def cmd_info(name): """ info NAME: Report the database entries for the named peer. """ - peer = Peer(name) - items = list(peer.list()) + try: peer = Peer(name) + except KeyError: raise T.TripeJobError('unknown-peer', name) + d = {} + try: pp = pinger.find(name) + except KeyError: pass + else: d.update(pp.info()) + items = list(peer.list()) + d.keys() items.sort() for i in items: - T.svcinfo('%s=%s' % (i, peer.get(i))) + try: v = d[i] + except KeyError: v = peer.get(i) + T.svcinfo('%s=%s' % (i, v.replace('\n', ' '))) -## Dictionary mapping challenges to waiting passive-connection coroutines. -chalmap = {} +def cmd_userpeer(user): + """ + userpeer USER: Report the peer name for the named user. + """ + try: name = CDB.init(opts.cdb)['U' + user] + except KeyError: raise T.TripeJobError('unknown-user', user) + T.svcinfo(name) def cmd_passive(*args): """ @@ -143,10 +865,10 @@ def cmd_passive(*args): if opt == '-timeout': timeout = T.timespec(op.arg()) user, = op.rest(1, 1) - try: - peer = CDB.init(opts.cdb)['U' + user] - except KeyError: - raise T.TripeJobError('unknown-user', user) + try: name = CDB.init(opts.cdb)['U' + user] + except KeyError: raise T.TripeJobError('unknown-user', user) + try: peer = Peer(name) + except KeyError: raise T.TripeJobError('unknown-peer', name) chal = S.getchal() cr = T.Coroutine.getcurrent() timer = M.SelTimer(time() + timeout, lambda: cr.switch(None)) @@ -156,23 +878,17 @@ def cmd_passive(*args): addr = cr.parent.switch() if addr is None: raise T.TripeJobError('connect-timeout') - addpeer(Peer(peer), addr) + addpeer(peer, addr, True) finally: del chalmap[chal] -def notify(_, code, *rest): +def cmd_sabotage(name): """ - Watch for notifications. - - In particular, if a GREETing appears quoting a challenge in the chalmap - then wake up the corresponding coroutine. + sabotage NAME: Sabotage the NAMEd peer so that we think it can't be pinged. """ - if code != 'GREET': - return - chal = rest[0] - addr = rest[1:] - if chal in chalmap: - chalmap[chal].switch(addr) + try: pp = pinger.find(name) + except KeyError: raise T.TripeJobError('unknown-peer', name) + pp.sabotage() ###-------------------------------------------------------------------------- ### Start up. @@ -181,10 +897,14 @@ def setup(): """ Service setup. - Register the notification-watcher, and add the automatic active peers. + Register the notification watcher, rescan the peers, and add automatic + active peers. """ S.handler['NOTE'] = notify S.watch('+n') + + pinger.rescan(opts.startup) + if opts.startup: cdb = CDB.init(opts.cdb) try: @@ -194,10 +914,22 @@ def setup(): for name in M.split(autos)[0]: try: peer = Peer(name, cdb) - addpeer(peer, M.split(peer.get('peer'), quotep = True)[0]) + addpeer(peer, M.split(peer.get('peer'), quotep = True)[0], False) except T.TripeJobError, err: S.warn('connect', 'auto-add-failed', name, *err.args) +def init(): + """ + Initialization to be done before service startup. + """ + global errorwatch, childwatch, pinger + errorwatch = ErrorWatch() + childwatch = ChildWatch() + pinger = Pinger() + T.Coroutine(dbwatch, name = 'dbwatch').switch() + errorwatch.switch() + pinger.switch() + def parse_options(): """ Parse the command-line options. @@ -234,17 +966,22 @@ def parse_options(): return opts ## Service table, for running manually. -service_info = [('connect', VERSION, { +service_info = [('connect', T.VERSION, { + 'adopted': (0, 0, '', cmd_adopted), + 'kick': (1, 1, 'PEER', cmd_kick), 'passive': (1, None, '[OPTIONS] USER', cmd_passive), 'active': (1, 1, 'PEER', cmd_active), 'info': (1, 1, 'PEER', cmd_info), - 'list': (0, 0, '', cmd_list) + 'list-active': (0, 0, '', cmd_listactive), + 'userpeer': (1, 1, 'USER', cmd_userpeer), + 'sabotage': (1, 1, 'PEER', cmd_sabotage) })] if __name__ == '__main__': opts = parse_options() + OS.environ['TRIPESOCK'] = opts.tripesock T.runservices(opts.tripesock, service_info, - setup = setup, + init = init, setup = setup, daemon = opts.daemon) ###----- That's all, folks --------------------------------------------------