svc/connect.in: Maintain time of last reconnect attempt and add rate limit.
[tripe] / svc / connect.in
1 #! @PYTHON@
2 ### -*-python-*-
3 ###
4 ### Connect to remote peers, and keep track of them
5 ###
6 ### (c) 2007 Straylight/Edgeware
7 ###
8
9 ###----- Licensing notice ---------------------------------------------------
10 ###
11 ### This file is part of Trivial IP Encryption (TrIPE).
12 ###
13 ### TrIPE is free software: you can redistribute it and/or modify it under
14 ### the terms of the GNU General Public License as published by the Free
15 ### Software Foundation; either version 3 of the License, or (at your
16 ### option) any later version.
17 ###
18 ### TrIPE is distributed in the hope that it will be useful, but WITHOUT
19 ### ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
20 ### FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
21 ### for more details.
22 ###
23 ### You should have received a copy of the GNU General Public License
24 ### along with TrIPE. If not, see <https://www.gnu.org/licenses/>.
25
26 VERSION = '@VERSION@'
27
28 ###--------------------------------------------------------------------------
29 ### External dependencies.
30
31 from optparse import OptionParser
32 import tripe as T
33 import os as OS
34 import signal as SIG
35 import errno as E
36 from math import sqrt
37 import cdb as CDB
38 import mLib as M
39 import re as RX
40 import sys as SYS
41 from time import time
42 import subprocess as PROC
43
44 S = T.svcmgr
45
46 ###--------------------------------------------------------------------------
47 ### Running auxiliary commands.
48
49 class SelLineQueue (M.SelLineBuffer):
50 """Glues the select-line-buffer into the coroutine queue system."""
51
52 def __new__(cls, file, queue, tag, kind):
53 """See __init__ for documentation."""
54 return M.SelLineBuffer.__new__(cls, file.fileno())
55
56 def __init__(me, file, queue, tag, kind):
57 """
58 Initialize a new line-reading adaptor.
59
60 The adaptor reads lines from FILE. Each line is inserted as a message of
61 the stated KIND, bearing the TAG, into the QUEUE. End-of-file is
62 represented as None.
63 """
64 me._q = queue
65 me._file = file
66 me._tag = tag
67 me._kind = kind
68 me.enable()
69
70 @T._callback
71 def line(me, line):
72 me._q.put((me._tag, me._kind, line))
73
74 @T._callback
75 def eof(me):
76 me.disable()
77 me._q.put((me._tag, me._kind, None))
78
79 class ErrorWatch (T.Coroutine):
80 """
81 An object which watches stderr streams for errors and converts them into
82 warnings of the form
83
84 WARN connect INFO stderr LINE
85
86 The INFO is a list of tokens associated with the file when it was
87 registered.
88
89 Usually there is a single ErrorWatch object, called errorwatch.
90 """
91
92 def __init__(me):
93 """Initialization: there are no arguments."""
94 T.Coroutine.__init__(me)
95 me._q = T.Queue()
96 me._map = {}
97 me._seq = 1
98
99 def watch(me, file, info):
100 """
101 Adds FILE to the collection of files to watch.
102
103 INFO will be written in the warning messages from this FILE. Returns a
104 sequence number which can be used to unregister the file again.
105 """
106 seq = me._seq
107 me._seq += 1
108 me._map[seq] = info, SelLineQueue(file, me._q, seq, 'stderr')
109 return seq
110
111 def unwatch(me, seq):
112 """Stop watching the file with sequence number SEQ."""
113 del me._map[seq]
114 return me
115
116 def run(me):
117 """
118 Coroutine function: read items from the queue and report them.
119
120 Unregisters files automatically when they reach EOF.
121 """
122 while True:
123 seq, _, line = me._q.get()
124 if line is None:
125 me.unwatch(seq)
126 else:
127 S.warn(*['connect'] + me._map[seq][0] + ['stderr', line])
128
129 def dbwatch():
130 """
131 Coroutine function: wake up every minute and notice changes to the
132 database. When a change happens, tell the Pinger (q.v.) to rescan its
133 peers.
134 """
135 cr = T.Coroutine.getcurrent()
136 main = cr.parent
137 fw = M.FWatch(opts.cdb)
138 while True:
139 timer = M.SelTimer(time() + 60, lambda: cr.switch())
140 main.switch()
141 if fw.update():
142 pinger.rescan(False)
143 S.notify('connect', 'peerdb-update')
144
145 class ChildWatch (M.SelSignal):
146 """
147 An object which watches for specified processes exiting and reports
148 terminations by writing items of the form (TAG, 'exit', RESULT) to a queue.
149
150 There is usually only one ChildWatch object, called childwatch.
151 """
152
153 def __new__(cls):
154 """Initialize the child-watcher."""
155 return M.SelSignal.__new__(cls, SIG.SIGCHLD)
156
157 def __init__(me):
158 """Initialize the child-watcher."""
159 me._pid = {}
160 me.enable()
161
162 def watch(me, pid, queue, tag):
163 """
164 Register PID as a child to watch. If it exits, write (TAG, 'exit', CODE)
165 to the QUEUE, where CODE is one of
166
167 * None (successful termination)
168 * ['exit-nonzero', CODE] (CODE is a string!)
169 * ['exit-signal', 'S' + CODE] (CODE is the signal number as a string)
170 * ['exit-unknown', STATUS] (STATUS is the entire exit status, in hex)
171 """
172 me._pid[pid] = queue, tag
173 return me
174
175 def unwatch(me, pid):
176 """Unregister PID as a child to watch."""
177 del me._pid[pid]
178 return me
179
180 @T._callback
181 def signalled(me):
182 """
183 Called when child processes exit: collect exit statuses and report
184 failures.
185 """
186 while True:
187 try:
188 pid, status = OS.waitpid(-1, OS.WNOHANG)
189 except OSError, exc:
190 if exc.errno == E.ECHILD:
191 break
192 if pid == 0:
193 break
194 if pid not in me._pid:
195 continue
196 queue, tag = me._pid[pid]
197 if OS.WIFEXITED(status):
198 exit = OS.WEXITSTATUS(status)
199 if exit == 0:
200 code = None
201 else:
202 code = ['exit-nonzero', str(exit)]
203 elif OS.WIFSIGNALED(status):
204 code = ['exit-signal', 'S' + str(OS.WTERMSIG(status))]
205 else:
206 code = ['exit-unknown', hex(status)]
207 queue.put((tag, 'exit', code))
208
209 class Command (object):
210 """
211 Represents a running command.
212
213 This class is the main interface to the machery provided by the ChildWatch
214 and ErrorWatch objects. See also potwatch.
215 """
216
217 def __init__(me, info, queue, tag, args, env):
218 """
219 Start a new child process.
220
221 The ARGS are a list of arguments to be given to the child process. The
222 ENV is either None or a dictionary of environment variable assignments to
223 override the extant environment. INFO is a list of tokens to be included
224 in warnings about the child's stderr output. If the child writes a line
225 to standard output, put (TAG, 'stdout', LINE) to the QUEUE. When the
226 child exits, write (TAG, 'exit', CODE) to the QUEUE.
227 """
228 me._info = info
229 me._q = queue
230 me._tag = tag
231 myenv = OS.environ.copy()
232 if env: myenv.update(env)
233 me._proc = PROC.Popen(args = args, env = myenv, bufsize = 1,
234 stdout = PROC.PIPE, stderr = PROC.PIPE)
235 me._lq = SelLineQueue(me._proc.stdout, queue, tag, 'stdout')
236 errorwatch.watch(me._proc.stderr, info)
237 childwatch.watch(me._proc.pid, queue, tag)
238
239 def __del__(me):
240 """
241 If I've been forgotten then stop watching for termination.
242 """
243 childwatch.unwatch(me._proc.pid)
244
245 def potwatch(what, name, q):
246 """
247 Watch the queue Q for activity as reported by a Command object.
248
249 Information from the process's stdout is reported as
250
251 NOTE WHAT NAME stdout LINE
252
253 abnormal termination is reported as
254
255 WARN WHAT NAME CODE
256
257 where CODE is what the ChildWatch wrote.
258 """
259 eofp = deadp = False
260 while not deadp or not eofp:
261 _, kind, more = q.get()
262 if kind == 'stdout':
263 if more is None:
264 eofp = True
265 else:
266 S.notify('connect', what, name, 'stdout', more)
267 elif kind == 'exit':
268 if more: S.warn('connect', what, name, *more)
269 deadp = True
270
271 ###--------------------------------------------------------------------------
272 ### Peer database utilities.
273
274 _magic = ['_magic'] # An object distinct from all others
275
276 class Peer (object):
277 """Representation of a peer in the database."""
278
279 def __init__(me, peer, cdb = None):
280 """
281 Create a new peer, named PEER.
282
283 Information about the peer is read from the database CDB, or the default
284 one given on the command-line.
285 """
286 me.name = peer
287 record = (cdb or CDB.init(opts.cdb))['P' + peer]
288 me.__dict__.update(M.URLDecode(record, semip = True))
289
290 def get(me, key, default = _magic, filter = None):
291 """
292 Get the information stashed under KEY from the peer's database record.
293
294 If DEFAULT is given, then use it if the database doesn't contain the
295 necessary information. If no DEFAULT is given, then report an error. If
296 a FILTER function is given then apply it to the information from the
297 database before returning it.
298 """
299 try:
300 attr = me.__dict__[key]
301 except KeyError:
302 if default is _magic:
303 raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key)
304 return default
305 else:
306 if filter is not None: attr = filter(attr)
307 return attr
308
309 def has(me, key):
310 """
311 Return whether the peer's database record has the KEY.
312 """
313 return key in me.__dict__
314
315 def list(me):
316 """
317 Iterate over the available keys in the peer's database record.
318 """
319 return me.__dict__.iterkeys()
320
321 def boolean(value):
322 """Parse VALUE as a boolean."""
323 return value in ['t', 'true', 'y', 'yes', 'on']
324
325 ###--------------------------------------------------------------------------
326 ### Waking up and watching peers.
327
328 def run_connect(peer, cmd):
329 """
330 Start the job of connecting to the passive PEER.
331
332 The CMD string is a shell command which will connect to the peer (via some
333 back-channel, say ssh and userv), issue a command
334
335 SVCSUBMIT connect passive [OPTIONS] USER
336
337 and write the resulting challenge to standard error.
338 """
339 q = T.Queue()
340 cmd = Command(['connect', peer.name], q, 'connect',
341 ['/bin/sh', '-c', cmd], None)
342 _, kind, more = q.peek()
343 if kind == 'stdout':
344 if more is None:
345 S.warn('connect', 'connect', peer.name, 'unexpected-eof')
346 else:
347 chal = more
348 S.greet(peer.name, chal)
349 q.get()
350 potwatch('connect', peer.name, q)
351
352 def run_disconnect(peer, cmd):
353 """
354 Start the job of disconnecting from a passive PEER.
355
356 The CMD string is a shell command which will disconnect from the peer.
357 """
358 q = T.Queue()
359 cmd = Command(['disconnect', peer.name], q, 'disconnect',
360 ['/bin/sh', '-c', cmd], None)
361 potwatch('disconnect', peer.name, q)
362
363 _pingseq = 0
364 class PingPeer (object):
365 """
366 Object representing a peer which we are pinging to ensure that it is still
367 present.
368
369 PingPeer objects are held by the Pinger (q.v.). The Pinger maintains an
370 event queue -- which saves us from having an enormous swarm of coroutines
371 -- but most of the actual work is done here.
372
373 In order to avoid confusion between different PingPeer instances for the
374 same actual peer, each PingPeer has a sequence number (its `seq'
375 attribute). Events for the PingPeer are identified by a (PEER, SEQ) pair.
376 (Using the PingPeer instance itself will prevent garbage collection of
377 otherwise defunct instances.)
378 """
379
380 def __init__(me, pinger, queue, peer, pingnow):
381 """
382 Create a new PingPeer.
383
384 The PINGER is the Pinger object we should send the results to. This is
385 used when we remove ourselves, if the peer has been explicitly removed.
386
387 The QUEUE is the event queue on which timer and ping-command events
388 should be written.
389
390 The PEER is a `Peer' object describing the peer.
391
392 If PINGNOW is true, then immediately start pinging the peer. Otherwise
393 wait until the usual retry interval.
394 """
395 global _pingseq
396 me._pinger = pinger
397 me._q = queue
398 me._peer = peer.name
399 me.update(peer)
400 me.seq = _pingseq
401 _pingseq += 1
402 me._failures = 0
403 me._sabotage = False
404 me._last = '-'
405 me._nping = 0
406 me._nlost = 0
407 me._sigma_t = 0
408 me._sigma_t2 = 0
409 me._min = me._max = '-'
410 now = time()
411 if pingnow:
412 me._timer = None
413 me._ping()
414 else:
415 me._timer = M.SelTimer(now + me._every, me._time)
416 me._last_reconn = now
417
418 def update(me, peer):
419 """
420 Refreshes the timer parameters for this peer. We don't, however,
421 immediately reschedule anything: that will happen next time anything
422 interesting happens.
423 """
424 if peer is None: peer = Peer(me._peer)
425 assert peer.name == me._peer
426 me._every = peer.get('every', filter = T.timespec, default = 120)
427 me._timeout = peer.get('timeout', filter = T.timespec, default = 10)
428 me._retries = peer.get('retries', filter = int, default = 5)
429 me._connectp = peer.has('connect')
430 me._knockp = peer.has('knock')
431 return me
432
433 def _ping(me):
434 """
435 Send a ping to the peer; the result is sent to the Pinger's event queue.
436 """
437 S.rawcommand(T.TripeAsynchronousCommand(
438 me._q, (me._peer, me.seq),
439 ['EPING',
440 '-background', S.bgtag(),
441 '-timeout', str(me._timeout),
442 '--',
443 me._peer]))
444
445 def _reconnect(me, now):
446 now = time()
447 try:
448 peer = Peer(me._peer)
449 if me._connectp or me._knockp:
450 S.warn('connect', 'reconnecting', me._peer)
451 S.forcekx(me._peer, quiet = not me._knockp)
452 if me._connectp: T.spawn(run_connect, peer, peer.get('connect'))
453 me._timer = M.SelTimer(now + me._every, me._time)
454 me._sabotage = False
455 me._last_reconn = now
456 else:
457 S.kill(me._peer)
458 except T.TripeError, e:
459 if e.args[0] == 'unknown-peer': me._pinger.kill(me._peer)
460
461 def reconnect(me):
462 """
463 Attempt reconnection to the peer.
464
465 Applies rate-limiting so that we don't hammer a remote peer just because
466 we notice several problems in a short time interval.
467 """
468 now = time()
469 if now >= me._last_reconn + 5: me._reconnect(now)
470
471 def event(me, code, stuff):
472 """
473 Respond to an event which happened to this peer.
474
475 Timer events indicate that we should start a new ping. (The server has
476 its own timeout which detects lost packets.)
477
478 We trap unknown-peer responses and detach from the Pinger.
479
480 If the ping fails and we run out of retries, we attempt to restart the
481 connection.
482 """
483 now = time()
484 if code == 'TIMER':
485 me._failures = 0
486 me._ping()
487 elif code == 'FAIL':
488 S.notify('connect', 'ping-failed', me._peer, *stuff)
489 if not stuff: pass
490 elif stuff[0] == 'unknown-peer': me._pinger.kill(me._peer)
491 elif stuff[0] == 'ping-send-failed': me._reconnect(now)
492 elif code == 'INFO':
493 outcome = stuff[0]
494 if outcome == 'ping-ok' and me._sabotage:
495 outcome = 'ping-timeout'
496 if outcome == 'ping-ok':
497 if me._failures > 0: S.warn('connect', 'ping-ok', me._peer)
498 t = float(stuff[1])
499 me._last = '%.1fms' % t
500 me._sigma_t += t
501 me._sigma_t2 += t*t
502 me._nping += 1
503 if me._min == '-' or t < me._min: me._min = t
504 if me._max == '-' or t > me._max: me._max = t
505 me._timer = M.SelTimer(now + me._every, me._time)
506 elif outcome == 'ping-timeout':
507 me._failures += 1
508 me._nlost += 1
509 S.warn('connect', 'ping-timeout', me._peer,
510 'attempt', str(me._failures), 'of', str(me._retries))
511 if me._failures < me._retries:
512 me._ping()
513 me._last = 'timeout'
514 else:
515 me._reconnect(now)
516 me._last = 'reconnect'
517 elif outcome == 'ping-peer-died':
518 me._pinger.kill(me._peer)
519
520 def sabotage(me):
521 """Sabotage the peer, for testing purposes."""
522 me._sabotage = True
523 if me._timer: me._timer.kill()
524 T.defer(me._time)
525
526 def info(me):
527 if not me._nping:
528 mean = sd = min = max = '-'
529 else:
530 meanval = me._sigma_t/me._nping
531 mean = '%.1fms' % meanval
532 sd = '%.1fms' % sqrt(me._sigma_t2/me._nping - meanval*meanval)
533 min = '%.1fms' % me._min
534 max = '%.1fms' % me._max
535 n = me._nping + me._nlost
536 if not n: pclost = '-'
537 else: pclost = '%d' % ((100*me._nlost + n//2)//n)
538 return { 'last-ping': me._last,
539 'mean-ping': mean,
540 'sd-ping': sd,
541 'n-ping': '%d' % me._nping,
542 'n-lost': '%d' % me._nlost,
543 'percent-lost': pclost,
544 'min-ping': min,
545 'max-ping': max,
546 'state': me._timer and 'idle' or 'check',
547 'failures': str(me._failures) }
548
549 @T._callback
550 def _time(me):
551 """
552 Handle timer callbacks by posting a timeout event on the queue.
553 """
554 me._timer = None
555 me._q.put(((me._peer, me.seq), 'TIMER', None))
556
557 def __str__(me):
558 return 'PingPeer(%s, %d, f = %d)' % (me._peer, me.seq, me._failures)
559 def __repr__(me):
560 return str(me)
561
562 class Pinger (T.Coroutine):
563 """
564 The Pinger keeps track of the peers which we expect to be connected and
565 takes action if they seem to stop responding.
566
567 There is usually only one Pinger, called pinger.
568
569 The Pinger maintains a collection of PingPeer objects, and an event queue.
570 The PingPeers direct the results of their pings, and timer events, to the
571 event queue. The Pinger's coroutine picks items off the queue and
572 dispatches them back to the PingPeers as appropriate.
573 """
574
575 def __init__(me):
576 """Initialize the Pinger."""
577 T.Coroutine.__init__(me)
578 me._peers = {}
579 me._q = T.Queue()
580
581 def run(me):
582 """
583 Coroutine function: reads the pinger queue and sends events to the
584 PingPeer objects they correspond to.
585 """
586 while True:
587 (peer, seq), code, stuff = me._q.get()
588 if peer in me._peers and seq == me._peers[peer].seq:
589 try: me._peers[peer].event(code, stuff)
590 except Exception, e:
591 SYS.excepthook(*SYS.exc_info())
592
593 def add(me, peer, pingnow):
594 """
595 Add PEER to the collection of peers under the Pinger's watchful eye.
596 The arguments are as for PingPeer: see above.
597 """
598 me._peers[peer.name] = PingPeer(me, me._q, peer, pingnow)
599 return me
600
601 def kill(me, peername):
602 """Remove PEER from the peers being watched by the Pinger."""
603 try: del me._peers[peername]
604 except KeyError: pass
605 return me
606
607 def rescan(me, startup):
608 """
609 General resynchronization method.
610
611 We scan the list of peers (with connect scripts) known at the server.
612 Any which are known to the Pinger but aren't known to the server are
613 removed from our list; newly arrived peers are added. (Note that a peer
614 can change state here either due to the server sneakily changing its list
615 without issuing notifications or, more likely, the database changing its
616 idea of whether a peer is interesting.) Finally, PingPeers which are
617 still present are prodded to update their timing parameters.
618
619 This method is called once at startup to pick up the peers already
620 installed, and again by the dbwatcher coroutine when it detects a change
621 to the database.
622 """
623 if T._debug: print '# rescan peers'
624 correct = {}
625 start = {}
626 for name in S.list():
627 try: peer = Peer(name)
628 except KeyError: continue
629 if peer.get('watch', filter = boolean, default = False):
630 if T._debug: print '# interesting peer %s' % peer
631 correct[peer.name] = start[peer.name] = peer
632 elif startup:
633 if T._debug: print '# peer %s ready for adoption' % peer
634 start[peer.name] = peer
635 for name, obj in me._peers.items():
636 try:
637 peer = correct[name]
638 except KeyError:
639 if T._debug: print '# peer %s vanished' % name
640 del me._peers[name]
641 else:
642 obj.update(peer)
643 for name, peer in start.iteritems():
644 if name in me._peers: continue
645 if startup:
646 if T._debug: print '# setting up peer %s' % name
647 ifname = S.ifname(name)
648 addr = S.addr(name)
649 T.defer(adoptpeer, peer, ifname, *addr)
650 else:
651 if T._debug: print '# adopting new peer %s' % name
652 me.add(peer, True)
653 return me
654
655 def adopted(me):
656 """
657 Returns the list of peers being watched by the Pinger.
658 """
659 return me._peers.keys()
660
661 def find(me, name):
662 """Return the PingPeer with the given name."""
663 return me._peers[name]
664
665 ###--------------------------------------------------------------------------
666 ### New connections.
667
668 def encode_envvars(env, prefix, vars):
669 """
670 Encode the variables in VARS suitably for including in a program
671 environment. Lowercase letters in variable names are forced to uppercase;
672 runs of non-alphanumeric characters are replaced by single underscores; and
673 the PREFIX is prepended. The resulting variables are written to ENV.
674 """
675 for k, v in vars.iteritems():
676 env[prefix + r_bad.sub('_', k.upper())] = v
677
678 r_bad = RX.compile(r'[\W_]+')
679 def envvars(peer):
680 """
681 Translate the database information for a PEER into a dictionary of
682 environment variables with plausible upper-case names and a P_ prefix.
683 Also collect the crypto information into A_ variables.
684 """
685 env = {}
686 encode_envvars(env, 'P_', dict([(k, peer.get(k)) for k in peer.list()]))
687 encode_envvars(env, 'A_', S.algs(peer.name))
688 return env
689
690 def run_ifupdown(what, peer, *args):
691 """
692 Run the interface up/down script for a peer.
693
694 WHAT is 'ifup' or 'ifdown'. PEER names the peer in question. ARGS is a
695 list of arguments to pass to the script, in addition to the peer name.
696
697 The command is run and watched in the background by potwatch.
698 """
699 q = T.Queue()
700 c = Command([what, peer.name], q, what,
701 M.split(peer.get(what), quotep = True)[0] +
702 [peer.name] + list(args),
703 envvars(peer))
704 potwatch(what, peer.name, q)
705
706 def adoptpeer(peer, ifname, *addr):
707 """
708 Add a new peer to our collection.
709
710 PEER is the `Peer' object; IFNAME is the interface name for its tunnel; and
711 ADDR is the list of tokens representing its address.
712
713 We try to bring up the interface and provoke a connection to the peer if
714 it's passive.
715 """
716 if peer.has('ifup'):
717 T.Coroutine(run_ifupdown, name = 'ifup %s' % peer.name) \
718 .switch('ifup', peer, ifname, *addr)
719 cmd = peer.get('connect', default = None)
720 if cmd is not None:
721 T.Coroutine(run_connect, name = 'connect %s' % peer.name) \
722 .switch(peer, cmd)
723 if peer.get('watch', filter = boolean, default = False):
724 pinger.add(peer, False)
725
726 def disownpeer(peer):
727 """Drop the PEER from the Pinger and put its interface to bed."""
728 try: pinger.kill(peer)
729 except KeyError: pass
730 cmd = peer.get('disconnect', default = None)
731 if cmd is not None:
732 T.Coroutine(run_disconnect, name = 'disconnect %s' % peer.name) \
733 .switch(peer, cmd)
734 if peer.has('ifdown'):
735 T.Coroutine(run_ifupdown, name = 'ifdown %s' % peer.name) \
736 .switch('ifdown', peer)
737
738 def addpeer(peer, addr, ephemp):
739 """
740 Process a connect request from a new peer PEER on address ADDR.
741
742 Any existing peer with this name is disconnected from the server. EPHEMP
743 is the default ephemeral-ness state for the new peer.
744 """
745 if peer.name in S.list():
746 S.kill(peer.name)
747 try:
748 S.add(peer.name,
749 tunnel = peer.get('tunnel', default = None),
750 keepalive = peer.get('keepalive', default = None),
751 key = peer.get('key', default = None),
752 priv = peer.get('priv', default = None),
753 mobile = peer.get('mobile', filter = boolean, default = False),
754 knock = peer.get('knock', default = None),
755 cork = peer.get('cork', filter = boolean, default = False),
756 ephemeral = peer.get('ephemeral', filter = boolean,
757 default = ephemp),
758 *addr)
759 except T.TripeError, exc:
760 raise T.TripeJobError(*exc.args)
761
762 ## Dictionary mapping challenges to waiting passive-connection coroutines.
763 chalmap = {}
764
765 def notify(_, code, *rest):
766 """
767 Watch for notifications.
768
769 We trap ADD and KILL notifications, and send them straight to adoptpeer and
770 disownpeer respectively; and dispatch GREET notifications to the
771 corresponding waiting coroutine.
772 """
773 if code == 'ADD':
774 try: p = Peer(rest[0])
775 except KeyError: pass
776 else: adoptpeer(p, *rest[1:])
777 elif code == 'KILL':
778 try: p = Peer(rest[0])
779 except KeyError: pass
780 else: disownpeer(p, *rest[1:])
781 elif code == 'GREET':
782 chal = rest[0]
783 try: cr = chalmap[chal]
784 except KeyError: pass
785 else: cr.switch(rest[1:])
786 elif code == 'KNOCK':
787 try: p = Peer(rest[0])
788 except KeyError:
789 S.warn(['connect', 'knock-unknown-peer', rest[0]])
790 return
791 if p.get('peer') != 'PASSIVE':
792 S.warn(['connect', 'knock-active-peer', p.name])
793 return
794 dot = p.name.find('.')
795 if dot >= 0: kname = p.name[dot + 1:]
796 else: kname = p.name
797 ktag = p.get('key', p.name)
798 if kname != ktag:
799 S.warn(['connect', 'knock-tag-mismatch',
800 'peer', pname, 'public-key-tag', ktag])
801 return
802 T.spawn(addpeer, p, rest[1:], True)
803
804 ###--------------------------------------------------------------------------
805 ### Command implementation.
806
807 def cmd_kick(name):
808 """
809 kick NAME: Force a new connection attempt for the NAMEd peer.
810 """
811 try: pp = pinger.find(name)
812 except KeyError: raise T.TripeJobError('peer-not-adopted', name)
813 try: peer = Peer(name)
814 except KeyError: raise T.TripeJobError('unknown-peer', name)
815 conn = peer.get('connect', None)
816 if conn: T.spawn(run_connect, peer, peer.get('connect'))
817 else: T.spawn(lambda p: S.forcekx(p.name), peer)
818
819 def cmd_adopted():
820 """
821 adopted: Report a list of adopted peers.
822 """
823 for name in pinger.adopted():
824 T.svcinfo(name)
825
826 def cmd_active(name):
827 """
828 active NAME: Handle an active connection request for the peer called NAME.
829
830 The appropriate address is read from the database automatically.
831 """
832 try: peer = Peer(name)
833 except KeyError: raise T.TripeJobError('unknown-peer', name)
834 addr = peer.get('peer')
835 if addr == 'PASSIVE':
836 raise T.TripeJobError('passive-peer', name)
837 addpeer(peer, M.split(addr, quotep = True)[0], True)
838
839 def cmd_listactive():
840 """
841 list: Report a list of the available active peers.
842 """
843 cdb = CDB.init(opts.cdb)
844 for key in cdb.keys():
845 if key.startswith('P') and Peer(key[1:]).get('peer', '') != 'PASSIVE':
846 T.svcinfo(key[1:])
847
848 def cmd_info(name):
849 """
850 info NAME: Report the database entries for the named peer.
851 """
852 try: peer = Peer(name)
853 except KeyError: raise T.TripeJobError('unknown-peer', name)
854 d = {}
855 try: pp = pinger.find(name)
856 except KeyError: pass
857 else: d.update(pp.info())
858 items = list(peer.list()) + d.keys()
859 items.sort()
860 for i in items:
861 try: v = d[i]
862 except KeyError: v = peer.get(i)
863 T.svcinfo('%s=%s' % (i, v.replace('\n', ' ')))
864
865 def cmd_userpeer(user):
866 """
867 userpeer USER: Report the peer name for the named user.
868 """
869 try: name = CDB.init(opts.cdb)['U' + user]
870 except KeyError: raise T.TripeJobError('unknown-user', user)
871 T.svcinfo(name)
872
873 def cmd_passive(*args):
874 """
875 passive [OPTIONS] USER: Await the arrival of the named USER.
876
877 Report a challenge; when (and if!) the server receives a greeting quoting
878 this challenge, add the corresponding peer to the server.
879 """
880 now = time()
881 timeout = 30
882 op = T.OptParse(args, ['-timeout'])
883 for opt in op:
884 if opt == '-timeout':
885 timeout = T.timespec(op.arg())
886 user, = op.rest(1, 1)
887 try: name = CDB.init(opts.cdb)['U' + user]
888 except KeyError: raise T.TripeJobError('unknown-user', user)
889 try: peer = Peer(name)
890 except KeyError: raise T.TripeJobError('unknown-peer', name)
891 chal = S.getchal()
892 cr = T.Coroutine.getcurrent()
893 timer = M.SelTimer(now + timeout, lambda: cr.switch(None))
894 try:
895 T.svcinfo(chal)
896 chalmap[chal] = cr
897 addr = cr.parent.switch()
898 if addr is None:
899 raise T.TripeJobError('connect-timeout')
900 addpeer(peer, addr, True)
901 finally:
902 del chalmap[chal]
903
904 def cmd_sabotage(name):
905 """
906 sabotage NAME: Sabotage the NAMEd peer so that we think it can't be pinged.
907 """
908 try: pp = pinger.find(name)
909 except KeyError: raise T.TripeJobError('unknown-peer', name)
910 pp.sabotage()
911
912 ###--------------------------------------------------------------------------
913 ### Start up.
914
915 def setup():
916 """
917 Service setup.
918
919 Register the notification watcher, rescan the peers, and add automatic
920 active peers.
921 """
922 S.handler['NOTE'] = notify
923 S.watch('+n')
924
925 pinger.rescan(opts.startup)
926
927 if opts.startup:
928 cdb = CDB.init(opts.cdb)
929 try:
930 autos = cdb['%AUTO']
931 except KeyError:
932 autos = ''
933 for name in M.split(autos)[0]:
934 try:
935 peer = Peer(name, cdb)
936 addpeer(peer, M.split(peer.get('peer'), quotep = True)[0], False)
937 except T.TripeJobError, err:
938 S.warn('connect', 'auto-add-failed', name, *err.args)
939
940 def init():
941 """
942 Initialization to be done before service startup.
943 """
944 global errorwatch, childwatch, pinger
945 errorwatch = ErrorWatch()
946 childwatch = ChildWatch()
947 pinger = Pinger()
948 T.Coroutine(dbwatch, name = 'dbwatch').switch()
949 errorwatch.switch()
950 pinger.switch()
951
952 def parse_options():
953 """
954 Parse the command-line options.
955
956 Automatically changes directory to the requested configdir, and turns on
957 debugging. Returns the options object.
958 """
959 op = OptionParser(usage = '%prog [-a FILE] [-d DIR]',
960 version = '%%prog %s' % VERSION)
961
962 op.add_option('-a', '--admin-socket',
963 metavar = 'FILE', dest = 'tripesock', default = T.tripesock,
964 help = 'Select socket to connect to [default %default]')
965 op.add_option('-d', '--directory',
966 metavar = 'DIR', dest = 'dir', default = T.configdir,
967 help = 'Select current diretory [default %default]')
968 op.add_option('-p', '--peerdb',
969 metavar = 'FILE', dest = 'cdb', default = T.peerdb,
970 help = 'Select peers database [default %default]')
971 op.add_option('--daemon', dest = 'daemon',
972 default = False, action = 'store_true',
973 help = 'Become a daemon after successful initialization')
974 op.add_option('--debug', dest = 'debug',
975 default = False, action = 'store_true',
976 help = 'Emit debugging trace information')
977 op.add_option('--startup', dest = 'startup',
978 default = False, action = 'store_true',
979 help = 'Being called as part of the server startup')
980
981 opts, args = op.parse_args()
982 if args: op.error('no arguments permitted')
983 OS.chdir(opts.dir)
984 T._debug = opts.debug
985 return opts
986
987 ## Service table, for running manually.
988 service_info = [('connect', VERSION, {
989 'adopted': (0, 0, '', cmd_adopted),
990 'kick': (1, 1, 'PEER', cmd_kick),
991 'passive': (1, None, '[OPTIONS] USER', cmd_passive),
992 'active': (1, 1, 'PEER', cmd_active),
993 'info': (1, 1, 'PEER', cmd_info),
994 'list-active': (0, 0, '', cmd_listactive),
995 'userpeer': (1, 1, 'USER', cmd_userpeer),
996 'sabotage': (1, 1, 'PEER', cmd_sabotage)
997 })]
998
999 if __name__ == '__main__':
1000 opts = parse_options()
1001 OS.environ['TRIPESOCK'] = opts.tripesock
1002 T.runservices(opts.tripesock, service_info,
1003 init = init, setup = setup,
1004 daemon = opts.daemon)
1005
1006 ###----- That's all, folks --------------------------------------------------