svc/connect.in: Stash the time in a variable rather than fetching inline.
[tripe] / svc / connect.in
CommitLineData
a62f8e8a
MW
1#! @PYTHON@
2### -*-python-*-
3###
d64ce4ae 4### Connect to remote peers, and keep track of them
a62f8e8a 5###
d64ce4ae 6### (c) 2007 Straylight/Edgeware
a62f8e8a
MW
7###
8
9###----- Licensing notice ---------------------------------------------------
10###
11### This file is part of Trivial IP Encryption (TrIPE).
12###
11ad66c2
MW
13### TrIPE is free software: you can redistribute it and/or modify it under
14### the terms of the GNU General Public License as published by the Free
15### Software Foundation; either version 3 of the License, or (at your
16### option) any later version.
a62f8e8a 17###
11ad66c2
MW
18### TrIPE is distributed in the hope that it will be useful, but WITHOUT
19### ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
20### FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
21### for more details.
a62f8e8a
MW
22###
23### You should have received a copy of the GNU General Public License
11ad66c2 24### along with TrIPE. If not, see <https://www.gnu.org/licenses/>.
a62f8e8a
MW
25
26VERSION = '@VERSION@'
27
28###--------------------------------------------------------------------------
29### External dependencies.
30
31from optparse import OptionParser
32import tripe as T
33import os as OS
d64ce4ae
MW
34import signal as SIG
35import errno as E
b9dedfa6 36from math import sqrt
a62f8e8a
MW
37import cdb as CDB
38import mLib as M
d64ce4ae 39import re as RX
ebe48771 40import sys as SYS
a62f8e8a 41from time import time
d64ce4ae 42import subprocess as PROC
a62f8e8a
MW
43
44S = T.svcmgr
45
46###--------------------------------------------------------------------------
d64ce4ae
MW
47### Running auxiliary commands.
48
49class SelLineQueue (M.SelLineBuffer):
50 """Glues the select-line-buffer into the coroutine queue system."""
51
52 def __new__(cls, file, queue, tag, kind):
53 """See __init__ for documentation."""
54 return M.SelLineBuffer.__new__(cls, file.fileno())
55
56 def __init__(me, file, queue, tag, kind):
57 """
58 Initialize a new line-reading adaptor.
59
60 The adaptor reads lines from FILE. Each line is inserted as a message of
61 the stated KIND, bearing the TAG, into the QUEUE. End-of-file is
62 represented as None.
63 """
64 me._q = queue
65 me._file = file
66 me._tag = tag
67 me._kind = kind
68 me.enable()
69
70 @T._callback
71 def line(me, line):
72 me._q.put((me._tag, me._kind, line))
73
74 @T._callback
75 def eof(me):
76 me.disable()
77 me._q.put((me._tag, me._kind, None))
78
79class ErrorWatch (T.Coroutine):
80 """
81 An object which watches stderr streams for errors and converts them into
82 warnings of the form
83
84 WARN connect INFO stderr LINE
85
86 The INFO is a list of tokens associated with the file when it was
87 registered.
88
89 Usually there is a single ErrorWatch object, called errorwatch.
90 """
91
92 def __init__(me):
93 """Initialization: there are no arguments."""
94 T.Coroutine.__init__(me)
95 me._q = T.Queue()
96 me._map = {}
97 me._seq = 1
98
99 def watch(me, file, info):
100 """
101 Adds FILE to the collection of files to watch.
102
103 INFO will be written in the warning messages from this FILE. Returns a
104 sequence number which can be used to unregister the file again.
105 """
106 seq = me._seq
107 me._seq += 1
108 me._map[seq] = info, SelLineQueue(file, me._q, seq, 'stderr')
109 return seq
110
111 def unwatch(me, seq):
112 """Stop watching the file with sequence number SEQ."""
113 del me._map[seq]
114 return me
115
116 def run(me):
117 """
118 Coroutine function: read items from the queue and report them.
119
120 Unregisters files automatically when they reach EOF.
121 """
122 while True:
123 seq, _, line = me._q.get()
124 if line is None:
125 me.unwatch(seq)
126 else:
127 S.warn(*['connect'] + me._map[seq][0] + ['stderr', line])
128
129def dbwatch():
130 """
14b77b60 131 Coroutine function: wake up every minute and notice changes to the
d64ce4ae
MW
132 database. When a change happens, tell the Pinger (q.v.) to rescan its
133 peers.
134 """
135 cr = T.Coroutine.getcurrent()
136 main = cr.parent
137 fw = M.FWatch(opts.cdb)
138 while True:
14b77b60 139 timer = M.SelTimer(time() + 60, lambda: cr.switch())
d64ce4ae
MW
140 main.switch()
141 if fw.update():
142 pinger.rescan(False)
143 S.notify('connect', 'peerdb-update')
144
145class ChildWatch (M.SelSignal):
146 """
147 An object which watches for specified processes exiting and reports
148 terminations by writing items of the form (TAG, 'exit', RESULT) to a queue.
149
150 There is usually only one ChildWatch object, called childwatch.
151 """
152
153 def __new__(cls):
154 """Initialize the child-watcher."""
155 return M.SelSignal.__new__(cls, SIG.SIGCHLD)
156
157 def __init__(me):
158 """Initialize the child-watcher."""
159 me._pid = {}
160 me.enable()
161
162 def watch(me, pid, queue, tag):
163 """
164 Register PID as a child to watch. If it exits, write (TAG, 'exit', CODE)
165 to the QUEUE, where CODE is one of
166
167 * None (successful termination)
168 * ['exit-nonzero', CODE] (CODE is a string!)
169 * ['exit-signal', 'S' + CODE] (CODE is the signal number as a string)
170 * ['exit-unknown', STATUS] (STATUS is the entire exit status, in hex)
171 """
172 me._pid[pid] = queue, tag
173 return me
174
175 def unwatch(me, pid):
176 """Unregister PID as a child to watch."""
177 del me._pid[pid]
178 return me
179
180 @T._callback
181 def signalled(me):
182 """
183 Called when child processes exit: collect exit statuses and report
184 failures.
185 """
186 while True:
187 try:
188 pid, status = OS.waitpid(-1, OS.WNOHANG)
189 except OSError, exc:
190 if exc.errno == E.ECHILD:
191 break
192 if pid == 0:
193 break
194 if pid not in me._pid:
195 continue
196 queue, tag = me._pid[pid]
197 if OS.WIFEXITED(status):
198 exit = OS.WEXITSTATUS(status)
199 if exit == 0:
200 code = None
201 else:
202 code = ['exit-nonzero', str(exit)]
203 elif OS.WIFSIGNALED(status):
204 code = ['exit-signal', 'S' + str(OS.WTERMSIG(status))]
205 else:
206 code = ['exit-unknown', hex(status)]
207 queue.put((tag, 'exit', code))
208
209class Command (object):
210 """
211 Represents a running command.
212
213 This class is the main interface to the machery provided by the ChildWatch
214 and ErrorWatch objects. See also potwatch.
215 """
216
217 def __init__(me, info, queue, tag, args, env):
218 """
219 Start a new child process.
220
221 The ARGS are a list of arguments to be given to the child process. The
222 ENV is either None or a dictionary of environment variable assignments to
223 override the extant environment. INFO is a list of tokens to be included
224 in warnings about the child's stderr output. If the child writes a line
225 to standard output, put (TAG, 'stdout', LINE) to the QUEUE. When the
226 child exits, write (TAG, 'exit', CODE) to the QUEUE.
227 """
228 me._info = info
229 me._q = queue
230 me._tag = tag
231 myenv = OS.environ.copy()
232 if env: myenv.update(env)
233 me._proc = PROC.Popen(args = args, env = myenv, bufsize = 1,
234 stdout = PROC.PIPE, stderr = PROC.PIPE)
235 me._lq = SelLineQueue(me._proc.stdout, queue, tag, 'stdout')
236 errorwatch.watch(me._proc.stderr, info)
237 childwatch.watch(me._proc.pid, queue, tag)
238
239 def __del__(me):
240 """
241 If I've been forgotten then stop watching for termination.
242 """
243 childwatch.unwatch(me._proc.pid)
244
245def potwatch(what, name, q):
246 """
247 Watch the queue Q for activity as reported by a Command object.
248
249 Information from the process's stdout is reported as
250
251 NOTE WHAT NAME stdout LINE
252
253 abnormal termination is reported as
254
255 WARN WHAT NAME CODE
256
257 where CODE is what the ChildWatch wrote.
258 """
259 eofp = deadp = False
260 while not deadp or not eofp:
261 _, kind, more = q.get()
262 if kind == 'stdout':
263 if more is None:
264 eofp = True
265 else:
266 S.notify('connect', what, name, 'stdout', more)
267 elif kind == 'exit':
268 if more: S.warn('connect', what, name, *more)
269 deadp = True
270
271###--------------------------------------------------------------------------
272### Peer database utilities.
a62f8e8a
MW
273
274_magic = ['_magic'] # An object distinct from all others
275
276class Peer (object):
277 """Representation of a peer in the database."""
278
279 def __init__(me, peer, cdb = None):
280 """
281 Create a new peer, named PEER.
282
283 Information about the peer is read from the database CDB, or the default
284 one given on the command-line.
285 """
286 me.name = peer
d64ce4ae 287 record = (cdb or CDB.init(opts.cdb))['P' + peer]
a62f8e8a
MW
288 me.__dict__.update(M.URLDecode(record, semip = True))
289
d64ce4ae 290 def get(me, key, default = _magic, filter = None):
a62f8e8a
MW
291 """
292 Get the information stashed under KEY from the peer's database record.
293
294 If DEFAULT is given, then use it if the database doesn't contain the
d64ce4ae
MW
295 necessary information. If no DEFAULT is given, then report an error. If
296 a FILTER function is given then apply it to the information from the
297 database before returning it.
a62f8e8a 298 """
c7737d90
MW
299 try:
300 attr = me.__dict__[key]
301 except KeyError:
302 if default is _magic:
303 raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key)
304 return default
305 else:
306 if filter is not None: attr = filter(attr)
307 return attr
a62f8e8a 308
d64ce4ae
MW
309 def has(me, key):
310 """
311 Return whether the peer's database record has the KEY.
312 """
313 return key in me.__dict__
314
a62f8e8a
MW
315 def list(me):
316 """
317 Iterate over the available keys in the peer's database record.
318 """
319 return me.__dict__.iterkeys()
320
d64ce4ae
MW
321def boolean(value):
322 """Parse VALUE as a boolean."""
323 return value in ['t', 'true', 'y', 'yes', 'on']
324
325###--------------------------------------------------------------------------
326### Waking up and watching peers.
327
328def run_connect(peer, cmd):
329 """
330 Start the job of connecting to the passive PEER.
331
332 The CMD string is a shell command which will connect to the peer (via some
333 back-channel, say ssh and userv), issue a command
334
335 SVCSUBMIT connect passive [OPTIONS] USER
336
337 and write the resulting challenge to standard error.
338 """
339 q = T.Queue()
340 cmd = Command(['connect', peer.name], q, 'connect',
341 ['/bin/sh', '-c', cmd], None)
342 _, kind, more = q.peek()
343 if kind == 'stdout':
344 if more is None:
345 S.warn('connect', 'connect', peer.name, 'unexpected-eof')
346 else:
347 chal = more
348 S.greet(peer.name, chal)
349 q.get()
350 potwatch('connect', peer.name, q)
351
352def run_disconnect(peer, cmd):
353 """
354 Start the job of disconnecting from a passive PEER.
355
356 The CMD string is a shell command which will disconnect from the peer.
357 """
358 q = T.Queue()
359 cmd = Command(['disconnect', peer.name], q, 'disconnect',
360 ['/bin/sh', '-c', cmd], None)
361 potwatch('disconnect', peer.name, q)
362
363_pingseq = 0
364class PingPeer (object):
365 """
366 Object representing a peer which we are pinging to ensure that it is still
367 present.
368
369 PingPeer objects are held by the Pinger (q.v.). The Pinger maintains an
370 event queue -- which saves us from having an enormous swarm of coroutines
371 -- but most of the actual work is done here.
372
373 In order to avoid confusion between different PingPeer instances for the
374 same actual peer, each PingPeer has a sequence number (its `seq'
375 attribute). Events for the PingPeer are identified by a (PEER, SEQ) pair.
376 (Using the PingPeer instance itself will prevent garbage collection of
377 otherwise defunct instances.)
378 """
379
380 def __init__(me, pinger, queue, peer, pingnow):
381 """
382 Create a new PingPeer.
383
384 The PINGER is the Pinger object we should send the results to. This is
385 used when we remove ourselves, if the peer has been explicitly removed.
386
387 The QUEUE is the event queue on which timer and ping-command events
388 should be written.
389
390 The PEER is a `Peer' object describing the peer.
391
392 If PINGNOW is true, then immediately start pinging the peer. Otherwise
393 wait until the usual retry interval.
394 """
395 global _pingseq
396 me._pinger = pinger
397 me._q = queue
398 me._peer = peer.name
399 me.update(peer)
400 me.seq = _pingseq
401 _pingseq += 1
402 me._failures = 0
965eb987 403 me._sabotage = False
b9dedfa6
MW
404 me._last = '-'
405 me._nping = 0
406 me._nlost = 0
407 me._sigma_t = 0
408 me._sigma_t2 = 0
409 me._min = me._max = '-'
e514a3e8 410 now = time()
d64ce4ae
MW
411 if pingnow:
412 me._timer = None
413 me._ping()
414 else:
e514a3e8 415 me._timer = M.SelTimer(now + me._every, me._time)
d64ce4ae
MW
416
417 def update(me, peer):
418 """
419 Refreshes the timer parameters for this peer. We don't, however,
420 immediately reschedule anything: that will happen next time anything
421 interesting happens.
422 """
423 if peer is None: peer = Peer(me._peer)
424 assert peer.name == me._peer
425 me._every = peer.get('every', filter = T.timespec, default = 120)
426 me._timeout = peer.get('timeout', filter = T.timespec, default = 10)
427 me._retries = peer.get('retries', filter = int, default = 5)
428 me._connectp = peer.has('connect')
8362ac1c 429 me._knockp = peer.has('knock')
d64ce4ae
MW
430 return me
431
432 def _ping(me):
433 """
434 Send a ping to the peer; the result is sent to the Pinger's event queue.
435 """
436 S.rawcommand(T.TripeAsynchronousCommand(
437 me._q, (me._peer, me.seq),
438 ['EPING',
439 '-background', S.bgtag(),
440 '-timeout', str(me._timeout),
441 '--',
442 me._peer]))
443
444 def _reconnect(me):
e514a3e8 445 now = time()
8ad21c29
MW
446 try:
447 peer = Peer(me._peer)
8362ac1c 448 if me._connectp or me._knockp:
8ad21c29 449 S.warn('connect', 'reconnecting', me._peer)
7f4482e9 450 S.forcekx(me._peer, quiet = not me._knockp)
8362ac1c 451 if me._connectp: T.spawn(run_connect, peer, peer.get('connect'))
e514a3e8 452 me._timer = M.SelTimer(now + me._every, me._time)
8ad21c29
MW
453 me._sabotage = False
454 else:
455 S.kill(me._peer)
11bb4c32 456 except T.TripeError, e:
8ad21c29 457 if e.args[0] == 'unknown-peer': me._pinger.kill(me._peer)
d64ce4ae
MW
458
459 def event(me, code, stuff):
460 """
461 Respond to an event which happened to this peer.
462
463 Timer events indicate that we should start a new ping. (The server has
464 its own timeout which detects lost packets.)
465
466 We trap unknown-peer responses and detach from the Pinger.
467
468 If the ping fails and we run out of retries, we attempt to restart the
469 connection.
470 """
e514a3e8 471 now = time()
d64ce4ae
MW
472 if code == 'TIMER':
473 me._failures = 0
474 me._ping()
475 elif code == 'FAIL':
476 S.notify('connect', 'ping-failed', me._peer, *stuff)
f688828b
MW
477 if not stuff: pass
478 elif stuff[0] == 'unknown-peer': me._pinger.kill(me._peer)
479 elif stuff[0] == 'ping-send-failed': me._reconnect()
d64ce4ae 480 elif code == 'INFO':
965eb987
MW
481 outcome = stuff[0]
482 if outcome == 'ping-ok' and me._sabotage:
483 outcome = 'ping-timeout'
484 if outcome == 'ping-ok':
f688828b 485 if me._failures > 0: S.warn('connect', 'ping-ok', me._peer)
b9dedfa6
MW
486 t = float(stuff[1])
487 me._last = '%.1fms' % t
488 me._sigma_t += t
489 me._sigma_t2 += t*t
490 me._nping += 1
491 if me._min == '-' or t < me._min: me._min = t
492 if me._max == '-' or t > me._max: me._max = t
e514a3e8 493 me._timer = M.SelTimer(now + me._every, me._time)
965eb987 494 elif outcome == 'ping-timeout':
d64ce4ae 495 me._failures += 1
b9dedfa6 496 me._nlost += 1
d64ce4ae
MW
497 S.warn('connect', 'ping-timeout', me._peer,
498 'attempt', str(me._failures), 'of', str(me._retries))
499 if me._failures < me._retries:
500 me._ping()
b9dedfa6 501 me._last = 'timeout'
d64ce4ae
MW
502 else:
503 me._reconnect()
965eb987
MW
504 me._last = 'reconnect'
505 elif outcome == 'ping-peer-died':
d64ce4ae
MW
506 me._pinger.kill(me._peer)
507
965eb987
MW
508 def sabotage(me):
509 """Sabotage the peer, for testing purposes."""
510 me._sabotage = True
511 if me._timer: me._timer.kill()
512 T.defer(me._time)
513
b9dedfa6
MW
514 def info(me):
515 if not me._nping:
8849990e 516 mean = sd = min = max = '-'
b9dedfa6 517 else:
8849990e
MW
518 meanval = me._sigma_t/me._nping
519 mean = '%.1fms' % meanval
520 sd = '%.1fms' % sqrt(me._sigma_t2/me._nping - meanval*meanval)
521 min = '%.1fms' % me._min
522 max = '%.1fms' % me._max
b9dedfa6
MW
523 n = me._nping + me._nlost
524 if not n: pclost = '-'
525 else: pclost = '%d' % ((100*me._nlost + n//2)//n)
526 return { 'last-ping': me._last,
8849990e
MW
527 'mean-ping': mean,
528 'sd-ping': sd,
b9dedfa6
MW
529 'n-ping': '%d' % me._nping,
530 'n-lost': '%d' % me._nlost,
531 'percent-lost': pclost,
8849990e
MW
532 'min-ping': min,
533 'max-ping': max,
b9dedfa6 534 'state': me._timer and 'idle' or 'check',
d3a17033 535 'failures': str(me._failures) }
b9dedfa6 536
d64ce4ae
MW
537 @T._callback
538 def _time(me):
539 """
540 Handle timer callbacks by posting a timeout event on the queue.
541 """
542 me._timer = None
543 me._q.put(((me._peer, me.seq), 'TIMER', None))
544
545 def __str__(me):
546 return 'PingPeer(%s, %d, f = %d)' % (me._peer, me.seq, me._failures)
547 def __repr__(me):
548 return str(me)
549
550class Pinger (T.Coroutine):
551 """
552 The Pinger keeps track of the peers which we expect to be connected and
553 takes action if they seem to stop responding.
554
555 There is usually only one Pinger, called pinger.
556
557 The Pinger maintains a collection of PingPeer objects, and an event queue.
558 The PingPeers direct the results of their pings, and timer events, to the
559 event queue. The Pinger's coroutine picks items off the queue and
560 dispatches them back to the PingPeers as appropriate.
561 """
562
563 def __init__(me):
564 """Initialize the Pinger."""
565 T.Coroutine.__init__(me)
566 me._peers = {}
567 me._q = T.Queue()
568
569 def run(me):
570 """
571 Coroutine function: reads the pinger queue and sends events to the
572 PingPeer objects they correspond to.
573 """
574 while True:
575 (peer, seq), code, stuff = me._q.get()
576 if peer in me._peers and seq == me._peers[peer].seq:
ebe48771
MW
577 try: me._peers[peer].event(code, stuff)
578 except Exception, e:
579 SYS.excepthook(*SYS.exc_info())
d64ce4ae
MW
580
581 def add(me, peer, pingnow):
582 """
583 Add PEER to the collection of peers under the Pinger's watchful eye.
584 The arguments are as for PingPeer: see above.
585 """
586 me._peers[peer.name] = PingPeer(me, me._q, peer, pingnow)
587 return me
588
589 def kill(me, peername):
590 """Remove PEER from the peers being watched by the Pinger."""
47912108
MW
591 try: del me._peers[peername]
592 except KeyError: pass
d64ce4ae
MW
593 return me
594
595 def rescan(me, startup):
596 """
597 General resynchronization method.
598
599 We scan the list of peers (with connect scripts) known at the server.
600 Any which are known to the Pinger but aren't known to the server are
601 removed from our list; newly arrived peers are added. (Note that a peer
602 can change state here either due to the server sneakily changing its list
603 without issuing notifications or, more likely, the database changing its
604 idea of whether a peer is interesting.) Finally, PingPeers which are
605 still present are prodded to update their timing parameters.
606
607 This method is called once at startup to pick up the peers already
608 installed, and again by the dbwatcher coroutine when it detects a change
609 to the database.
610 """
611 if T._debug: print '# rescan peers'
612 correct = {}
613 start = {}
614 for name in S.list():
615 try: peer = Peer(name)
616 except KeyError: continue
617 if peer.get('watch', filter = boolean, default = False):
618 if T._debug: print '# interesting peer %s' % peer
619 correct[peer.name] = start[peer.name] = peer
620 elif startup:
621 if T._debug: print '# peer %s ready for adoption' % peer
622 start[peer.name] = peer
623 for name, obj in me._peers.items():
624 try:
625 peer = correct[name]
626 except KeyError:
627 if T._debug: print '# peer %s vanished' % name
628 del me._peers[name]
629 else:
630 obj.update(peer)
631 for name, peer in start.iteritems():
632 if name in me._peers: continue
633 if startup:
634 if T._debug: print '# setting up peer %s' % name
635 ifname = S.ifname(name)
636 addr = S.addr(name)
637 T.defer(adoptpeer, peer, ifname, *addr)
638 else:
639 if T._debug: print '# adopting new peer %s' % name
640 me.add(peer, True)
641 return me
642
643 def adopted(me):
644 """
645 Returns the list of peers being watched by the Pinger.
646 """
647 return me._peers.keys()
648
fb52c291
MW
649 def find(me, name):
650 """Return the PingPeer with the given name."""
651 return me._peers[name]
652
d64ce4ae
MW
653###--------------------------------------------------------------------------
654### New connections.
655
656def encode_envvars(env, prefix, vars):
657 """
658 Encode the variables in VARS suitably for including in a program
659 environment. Lowercase letters in variable names are forced to uppercase;
660 runs of non-alphanumeric characters are replaced by single underscores; and
661 the PREFIX is prepended. The resulting variables are written to ENV.
662 """
663 for k, v in vars.iteritems():
664 env[prefix + r_bad.sub('_', k.upper())] = v
665
666r_bad = RX.compile(r'[\W_]+')
667def envvars(peer):
668 """
669 Translate the database information for a PEER into a dictionary of
670 environment variables with plausible upper-case names and a P_ prefix.
671 Also collect the crypto information into A_ variables.
672 """
673 env = {}
674 encode_envvars(env, 'P_', dict([(k, peer.get(k)) for k in peer.list()]))
675 encode_envvars(env, 'A_', S.algs(peer.name))
676 return env
677
678def run_ifupdown(what, peer, *args):
679 """
680 Run the interface up/down script for a peer.
681
682 WHAT is 'ifup' or 'ifdown'. PEER names the peer in question. ARGS is a
683 list of arguments to pass to the script, in addition to the peer name.
684
685 The command is run and watched in the background by potwatch.
686 """
687 q = T.Queue()
688 c = Command([what, peer.name], q, what,
689 M.split(peer.get(what), quotep = True)[0] +
690 [peer.name] + list(args),
691 envvars(peer))
692 potwatch(what, peer.name, q)
693
694def adoptpeer(peer, ifname, *addr):
695 """
696 Add a new peer to our collection.
697
698 PEER is the `Peer' object; IFNAME is the interface name for its tunnel; and
699 ADDR is the list of tokens representing its address.
700
701 We try to bring up the interface and provoke a connection to the peer if
702 it's passive.
703 """
704 if peer.has('ifup'):
705 T.Coroutine(run_ifupdown, name = 'ifup %s' % peer.name) \
706 .switch('ifup', peer, ifname, *addr)
707 cmd = peer.get('connect', default = None)
708 if cmd is not None:
709 T.Coroutine(run_connect, name = 'connect %s' % peer.name) \
710 .switch(peer, cmd)
711 if peer.get('watch', filter = boolean, default = False):
712 pinger.add(peer, False)
713
714def disownpeer(peer):
715 """Drop the PEER from the Pinger and put its interface to bed."""
716 try: pinger.kill(peer)
717 except KeyError: pass
718 cmd = peer.get('disconnect', default = None)
719 if cmd is not None:
720 T.Coroutine(run_disconnect, name = 'disconnect %s' % peer.name) \
721 .switch(peer, cmd)
722 if peer.has('ifdown'):
723 T.Coroutine(run_ifupdown, name = 'ifdown %s' % peer.name) \
724 .switch('ifdown', peer)
725
067aa5f0 726def addpeer(peer, addr, ephemp):
a62f8e8a
MW
727 """
728 Process a connect request from a new peer PEER on address ADDR.
729
067aa5f0
MW
730 Any existing peer with this name is disconnected from the server. EPHEMP
731 is the default ephemeral-ness state for the new peer.
a62f8e8a
MW
732 """
733 if peer.name in S.list():
734 S.kill(peer.name)
735 try:
736 S.add(peer.name,
a5dbeffa
MW
737 tunnel = peer.get('tunnel', default = None),
738 keepalive = peer.get('keepalive', default = None),
739 key = peer.get('key', default = None),
740 priv = peer.get('priv', default = None),
741 mobile = peer.get('mobile', filter = boolean, default = False),
8362ac1c 742 knock = peer.get('knock', default = None),
a5dbeffa 743 cork = peer.get('cork', filter = boolean, default = False),
067aa5f0
MW
744 ephemeral = peer.get('ephemeral', filter = boolean,
745 default = ephemp),
a62f8e8a
MW
746 *addr)
747 except T.TripeError, exc:
748 raise T.TripeJobError(*exc.args)
749
d64ce4ae
MW
750## Dictionary mapping challenges to waiting passive-connection coroutines.
751chalmap = {}
752
753def notify(_, code, *rest):
754 """
755 Watch for notifications.
756
757 We trap ADD and KILL notifications, and send them straight to adoptpeer and
758 disownpeer respectively; and dispatch GREET notifications to the
759 corresponding waiting coroutine.
760 """
761 if code == 'ADD':
762 try: p = Peer(rest[0])
e9ec9391
MW
763 except KeyError: pass
764 else: adoptpeer(p, *rest[1:])
d64ce4ae
MW
765 elif code == 'KILL':
766 try: p = Peer(rest[0])
e9ec9391
MW
767 except KeyError: pass
768 else: disownpeer(p, *rest[1:])
d64ce4ae
MW
769 elif code == 'GREET':
770 chal = rest[0]
771 try: cr = chalmap[chal]
772 except KeyError: pass
773 else: cr.switch(rest[1:])
8362ac1c
MW
774 elif code == 'KNOCK':
775 try: p = Peer(rest[0])
776 except KeyError:
777 S.warn(['connect', 'knock-unknown-peer', rest[0]])
778 return
779 if p.get('peer') != 'PASSIVE':
780 S.warn(['connect', 'knock-active-peer', p.name])
781 return
782 dot = p.name.find('.')
783 if dot >= 0: kname = p.name[dot + 1:]
784 else: kname = p.name
785 ktag = p.get('key', p.name)
786 if kname != ktag:
787 S.warn(['connect', 'knock-tag-mismatch',
788 'peer', pname, 'public-key-tag', ktag])
789 return
067aa5f0 790 T.spawn(addpeer, p, rest[1:], True)
d64ce4ae
MW
791
792###--------------------------------------------------------------------------
793### Command implementation.
794
795def cmd_kick(name):
796 """
797 kick NAME: Force a new connection attempt for the NAMEd peer.
798 """
fb52c291
MW
799 try: pp = pinger.find(name)
800 except KeyError: raise T.TripeJobError('peer-not-adopted', name)
d64ce4ae
MW
801 try: peer = Peer(name)
802 except KeyError: raise T.TripeJobError('unknown-peer', name)
8bd108e4
MW
803 conn = peer.get('connect', None)
804 if conn: T.spawn(run_connect, peer, peer.get('connect'))
805 else: T.spawn(lambda p: S.forcekx(p.name), peer)
d64ce4ae
MW
806
807def cmd_adopted():
808 """
809 adopted: Report a list of adopted peers.
810 """
811 for name in pinger.adopted():
812 T.svcinfo(name)
813
a62f8e8a
MW
814def cmd_active(name):
815 """
816 active NAME: Handle an active connection request for the peer called NAME.
817
818 The appropriate address is read from the database automatically.
819 """
d64ce4ae
MW
820 try: peer = Peer(name)
821 except KeyError: raise T.TripeJobError('unknown-peer', name)
a62f8e8a
MW
822 addr = peer.get('peer')
823 if addr == 'PASSIVE':
824 raise T.TripeJobError('passive-peer', name)
067aa5f0 825 addpeer(peer, M.split(addr, quotep = True)[0], True)
a62f8e8a 826
d64ce4ae 827def cmd_listactive():
a62f8e8a
MW
828 """
829 list: Report a list of the available active peers.
830 """
831 cdb = CDB.init(opts.cdb)
832 for key in cdb.keys():
833 if key.startswith('P') and Peer(key[1:]).get('peer', '') != 'PASSIVE':
834 T.svcinfo(key[1:])
835
836def cmd_info(name):
837 """
838 info NAME: Report the database entries for the named peer.
839 """
d64ce4ae
MW
840 try: peer = Peer(name)
841 except KeyError: raise T.TripeJobError('unknown-peer', name)
b9dedfa6
MW
842 d = {}
843 try: pp = pinger.find(name)
844 except KeyError: pass
845 else: d.update(pp.info())
846 items = list(peer.list()) + d.keys()
a62f8e8a
MW
847 items.sort()
848 for i in items:
b9dedfa6
MW
849 try: v = d[i]
850 except KeyError: v = peer.get(i)
ab6f1b0d 851 T.svcinfo('%s=%s' % (i, v.replace('\n', ' ')))
a62f8e8a 852
d3731285
MW
853def cmd_userpeer(user):
854 """
855 userpeer USER: Report the peer name for the named user.
856 """
d64ce4ae
MW
857 try: name = CDB.init(opts.cdb)['U' + user]
858 except KeyError: raise T.TripeJobError('unknown-user', user)
859 T.svcinfo(name)
a62f8e8a
MW
860
861def cmd_passive(*args):
862 """
863 passive [OPTIONS] USER: Await the arrival of the named USER.
864
865 Report a challenge; when (and if!) the server receives a greeting quoting
866 this challenge, add the corresponding peer to the server.
867 """
e514a3e8 868 now = time()
a62f8e8a
MW
869 timeout = 30
870 op = T.OptParse(args, ['-timeout'])
871 for opt in op:
872 if opt == '-timeout':
873 timeout = T.timespec(op.arg())
874 user, = op.rest(1, 1)
d64ce4ae
MW
875 try: name = CDB.init(opts.cdb)['U' + user]
876 except KeyError: raise T.TripeJobError('unknown-user', user)
877 try: peer = Peer(name)
878 except KeyError: raise T.TripeJobError('unknown-peer', name)
a62f8e8a
MW
879 chal = S.getchal()
880 cr = T.Coroutine.getcurrent()
e514a3e8 881 timer = M.SelTimer(now + timeout, lambda: cr.switch(None))
a62f8e8a
MW
882 try:
883 T.svcinfo(chal)
884 chalmap[chal] = cr
885 addr = cr.parent.switch()
886 if addr is None:
887 raise T.TripeJobError('connect-timeout')
067aa5f0 888 addpeer(peer, addr, True)
a62f8e8a
MW
889 finally:
890 del chalmap[chal]
891
965eb987
MW
892def cmd_sabotage(name):
893 """
894 sabotage NAME: Sabotage the NAMEd peer so that we think it can't be pinged.
895 """
896 try: pp = pinger.find(name)
897 except KeyError: raise T.TripeJobError('unknown-peer', name)
898 pp.sabotage()
899
a62f8e8a
MW
900###--------------------------------------------------------------------------
901### Start up.
902
903def setup():
904 """
905 Service setup.
906
d64ce4ae
MW
907 Register the notification watcher, rescan the peers, and add automatic
908 active peers.
a62f8e8a
MW
909 """
910 S.handler['NOTE'] = notify
911 S.watch('+n')
d64ce4ae
MW
912
913 pinger.rescan(opts.startup)
914
a62f8e8a
MW
915 if opts.startup:
916 cdb = CDB.init(opts.cdb)
917 try:
918 autos = cdb['%AUTO']
919 except KeyError:
920 autos = ''
921 for name in M.split(autos)[0]:
922 try:
923 peer = Peer(name, cdb)
067aa5f0 924 addpeer(peer, M.split(peer.get('peer'), quotep = True)[0], False)
a62f8e8a
MW
925 except T.TripeJobError, err:
926 S.warn('connect', 'auto-add-failed', name, *err.args)
927
d64ce4ae
MW
928def init():
929 """
930 Initialization to be done before service startup.
931 """
932 global errorwatch, childwatch, pinger
933 errorwatch = ErrorWatch()
934 childwatch = ChildWatch()
935 pinger = Pinger()
936 T.Coroutine(dbwatch, name = 'dbwatch').switch()
937 errorwatch.switch()
938 pinger.switch()
939
a62f8e8a
MW
940def parse_options():
941 """
942 Parse the command-line options.
943
944 Automatically changes directory to the requested configdir, and turns on
945 debugging. Returns the options object.
946 """
947 op = OptionParser(usage = '%prog [-a FILE] [-d DIR]',
948 version = '%%prog %s' % VERSION)
949
950 op.add_option('-a', '--admin-socket',
951 metavar = 'FILE', dest = 'tripesock', default = T.tripesock,
952 help = 'Select socket to connect to [default %default]')
953 op.add_option('-d', '--directory',
954 metavar = 'DIR', dest = 'dir', default = T.configdir,
955 help = 'Select current diretory [default %default]')
956 op.add_option('-p', '--peerdb',
957 metavar = 'FILE', dest = 'cdb', default = T.peerdb,
958 help = 'Select peers database [default %default]')
959 op.add_option('--daemon', dest = 'daemon',
960 default = False, action = 'store_true',
961 help = 'Become a daemon after successful initialization')
962 op.add_option('--debug', dest = 'debug',
963 default = False, action = 'store_true',
964 help = 'Emit debugging trace information')
965 op.add_option('--startup', dest = 'startup',
966 default = False, action = 'store_true',
967 help = 'Being called as part of the server startup')
968
969 opts, args = op.parse_args()
970 if args: op.error('no arguments permitted')
971 OS.chdir(opts.dir)
972 T._debug = opts.debug
973 return opts
974
975## Service table, for running manually.
65f2f474 976service_info = [('connect', VERSION, {
d64ce4ae
MW
977 'adopted': (0, 0, '', cmd_adopted),
978 'kick': (1, 1, 'PEER', cmd_kick),
a62f8e8a
MW
979 'passive': (1, None, '[OPTIONS] USER', cmd_passive),
980 'active': (1, 1, 'PEER', cmd_active),
981 'info': (1, 1, 'PEER', cmd_info),
d64ce4ae 982 'list-active': (0, 0, '', cmd_listactive),
965eb987
MW
983 'userpeer': (1, 1, 'USER', cmd_userpeer),
984 'sabotage': (1, 1, 'PEER', cmd_sabotage)
a62f8e8a
MW
985})]
986
987if __name__ == '__main__':
988 opts = parse_options()
04b3c57c 989 OS.environ['TRIPESOCK'] = opts.tripesock
a62f8e8a 990 T.runservices(opts.tripesock, service_info,
d64ce4ae 991 init = init, setup = setup,
a62f8e8a
MW
992 daemon = opts.daemon)
993
994###----- That's all, folks --------------------------------------------------