(Python): Use more modern `raise' syntax.
[tripe] / svc / connect.in
1 #! @PYTHON@
2 ### -*-python-*-
3 ###
4 ### Connect to remote peers, and keep track of them
5 ###
6 ### (c) 2007 Straylight/Edgeware
7 ###
8
9 ###----- Licensing notice ---------------------------------------------------
10 ###
11 ### This file is part of Trivial IP Encryption (TrIPE).
12 ###
13 ### TrIPE is free software: you can redistribute it and/or modify it under
14 ### the terms of the GNU General Public License as published by the Free
15 ### Software Foundation; either version 3 of the License, or (at your
16 ### option) any later version.
17 ###
18 ### TrIPE is distributed in the hope that it will be useful, but WITHOUT
19 ### ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
20 ### FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
21 ### for more details.
22 ###
23 ### You should have received a copy of the GNU General Public License
24 ### along with TrIPE. If not, see <https://www.gnu.org/licenses/>.
25
26 VERSION = '@VERSION@'
27
28 ###--------------------------------------------------------------------------
29 ### External dependencies.
30
31 from optparse import OptionParser
32 import tripe as T
33 import os as OS
34 import signal as SIG
35 import errno as E
36 from math import sqrt
37 import cdb as CDB
38 import mLib as M
39 import re as RX
40 import sys as SYS
41 from time import time
42 import subprocess as PROC
43
44 S = T.svcmgr
45
46 ###--------------------------------------------------------------------------
47 ### Running auxiliary commands.
48
49 class SelLineQueue (M.SelLineBuffer):
50 """Glues the select-line-buffer into the coroutine queue system."""
51
52 def __new__(cls, file, queue, tag, kind):
53 """See __init__ for documentation."""
54 return M.SelLineBuffer.__new__(cls, file.fileno())
55
56 def __init__(me, file, queue, tag, kind):
57 """
58 Initialize a new line-reading adaptor.
59
60 The adaptor reads lines from FILE. Each line is inserted as a message of
61 the stated KIND, bearing the TAG, into the QUEUE. End-of-file is
62 represented as None.
63 """
64 me._q = queue
65 me._file = file
66 me._tag = tag
67 me._kind = kind
68 me.enable()
69
70 @T._callback
71 def line(me, line):
72 me._q.put((me._tag, me._kind, line))
73
74 @T._callback
75 def eof(me):
76 me.disable()
77 me._q.put((me._tag, me._kind, None))
78
79 class ErrorWatch (T.Coroutine):
80 """
81 An object which watches stderr streams for errors and converts them into
82 warnings of the form
83
84 WARN connect INFO stderr LINE
85
86 The INFO is a list of tokens associated with the file when it was
87 registered.
88
89 Usually there is a single ErrorWatch object, called errorwatch.
90 """
91
92 def __init__(me):
93 """Initialization: there are no arguments."""
94 T.Coroutine.__init__(me)
95 me._q = T.Queue()
96 me._map = {}
97 me._seq = 1
98
99 def watch(me, file, info):
100 """
101 Adds FILE to the collection of files to watch.
102
103 INFO will be written in the warning messages from this FILE. Returns a
104 sequence number which can be used to unregister the file again.
105 """
106 seq = me._seq
107 me._seq += 1
108 me._map[seq] = info, SelLineQueue(file, me._q, seq, 'stderr')
109 return seq
110
111 def unwatch(me, seq):
112 """Stop watching the file with sequence number SEQ."""
113 del me._map[seq]
114 return me
115
116 def run(me):
117 """
118 Coroutine function: read items from the queue and report them.
119
120 Unregisters files automatically when they reach EOF.
121 """
122 while True:
123 seq, _, line = me._q.get()
124 if line is None:
125 me.unwatch(seq)
126 else:
127 S.warn(*['connect'] + me._map[seq][0] + ['stderr', line])
128
129 def dbwatch():
130 """
131 Coroutine function: wake up every minute and notice changes to the
132 database. When a change happens, tell the Pinger (q.v.) to rescan its
133 peers.
134 """
135 cr = T.Coroutine.getcurrent()
136 main = cr.parent
137 fw = M.FWatch(opts.cdb)
138 while True:
139 timer = M.SelTimer(time() + 60, lambda: cr.switch())
140 main.switch()
141 if fw.update():
142 pinger.rescan(False)
143 S.notify('connect', 'peerdb-update')
144
145 class ChildWatch (M.SelSignal):
146 """
147 An object which watches for specified processes exiting and reports
148 terminations by writing items of the form (TAG, 'exit', RESULT) to a queue.
149
150 There is usually only one ChildWatch object, called childwatch.
151 """
152
153 def __new__(cls):
154 """Initialize the child-watcher."""
155 return M.SelSignal.__new__(cls, SIG.SIGCHLD)
156
157 def __init__(me):
158 """Initialize the child-watcher."""
159 me._pid = {}
160 me.enable()
161
162 def watch(me, pid, queue, tag):
163 """
164 Register PID as a child to watch. If it exits, write (TAG, 'exit', CODE)
165 to the QUEUE, where CODE is one of
166
167 * None (successful termination)
168 * ['exit-nonzero', CODE] (CODE is a string!)
169 * ['exit-signal', 'S' + CODE] (CODE is the signal number as a string)
170 * ['exit-unknown', STATUS] (STATUS is the entire exit status, in hex)
171 """
172 me._pid[pid] = queue, tag
173 return me
174
175 def unwatch(me, pid):
176 """Unregister PID as a child to watch."""
177 del me._pid[pid]
178 return me
179
180 @T._callback
181 def signalled(me):
182 """
183 Called when child processes exit: collect exit statuses and report
184 failures.
185 """
186 while True:
187 try:
188 pid, status = OS.waitpid(-1, OS.WNOHANG)
189 except OSError, exc:
190 if exc.errno == E.ECHILD:
191 break
192 if pid == 0:
193 break
194 if pid not in me._pid:
195 continue
196 queue, tag = me._pid[pid]
197 if OS.WIFEXITED(status):
198 exit = OS.WEXITSTATUS(status)
199 if exit == 0:
200 code = None
201 else:
202 code = ['exit-nonzero', str(exit)]
203 elif OS.WIFSIGNALED(status):
204 code = ['exit-signal', 'S' + str(OS.WTERMSIG(status))]
205 else:
206 code = ['exit-unknown', hex(status)]
207 queue.put((tag, 'exit', code))
208
209 class Command (object):
210 """
211 Represents a running command.
212
213 This class is the main interface to the machery provided by the ChildWatch
214 and ErrorWatch objects. See also potwatch.
215 """
216
217 def __init__(me, info, queue, tag, args, env):
218 """
219 Start a new child process.
220
221 The ARGS are a list of arguments to be given to the child process. The
222 ENV is either None or a dictionary of environment variable assignments to
223 override the extant environment. INFO is a list of tokens to be included
224 in warnings about the child's stderr output. If the child writes a line
225 to standard output, put (TAG, 'stdout', LINE) to the QUEUE. When the
226 child exits, write (TAG, 'exit', CODE) to the QUEUE.
227 """
228 me._info = info
229 me._q = queue
230 me._tag = tag
231 myenv = OS.environ.copy()
232 if env: myenv.update(env)
233 me._proc = PROC.Popen(args = args, env = myenv, bufsize = 1,
234 stdout = PROC.PIPE, stderr = PROC.PIPE)
235 me._lq = SelLineQueue(me._proc.stdout, queue, tag, 'stdout')
236 errorwatch.watch(me._proc.stderr, info)
237 childwatch.watch(me._proc.pid, queue, tag)
238
239 def __del__(me):
240 """
241 If I've been forgotten then stop watching for termination.
242 """
243 childwatch.unwatch(me._proc.pid)
244
245 def potwatch(what, name, q):
246 """
247 Watch the queue Q for activity as reported by a Command object.
248
249 Information from the process's stdout is reported as
250
251 NOTE WHAT NAME stdout LINE
252
253 abnormal termination is reported as
254
255 WARN WHAT NAME CODE
256
257 where CODE is what the ChildWatch wrote.
258 """
259 eofp = deadp = False
260 while not deadp or not eofp:
261 _, kind, more = q.get()
262 if kind == 'stdout':
263 if more is None:
264 eofp = True
265 else:
266 S.notify('connect', what, name, 'stdout', more)
267 elif kind == 'exit':
268 if more: S.warn('connect', what, name, *more)
269 deadp = True
270
271 ###--------------------------------------------------------------------------
272 ### Peer database utilities.
273
274 _magic = ['_magic'] # An object distinct from all others
275
276 class Peer (object):
277 """Representation of a peer in the database."""
278
279 def __init__(me, peer, cdb = None):
280 """
281 Create a new peer, named PEER.
282
283 Information about the peer is read from the database CDB, or the default
284 one given on the command-line.
285 """
286 me.name = peer
287 record = (cdb or CDB.init(opts.cdb))['P' + peer]
288 me.__dict__.update(M.URLDecode(record, semip = True))
289
290 def get(me, key, default = _magic, filter = None):
291 """
292 Get the information stashed under KEY from the peer's database record.
293
294 If DEFAULT is given, then use it if the database doesn't contain the
295 necessary information. If no DEFAULT is given, then report an error. If
296 a FILTER function is given then apply it to the information from the
297 database before returning it.
298 """
299 attr = me.__dict__.get(key, default)
300 if attr is _magic:
301 raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key)
302 elif filter is not None:
303 attr = filter(attr)
304 return attr
305
306 def has(me, key):
307 """
308 Return whether the peer's database record has the KEY.
309 """
310 return key in me.__dict__
311
312 def list(me):
313 """
314 Iterate over the available keys in the peer's database record.
315 """
316 return me.__dict__.iterkeys()
317
318 def boolean(value):
319 """Parse VALUE as a boolean."""
320 return value in ['t', 'true', 'y', 'yes', 'on']
321
322 ###--------------------------------------------------------------------------
323 ### Waking up and watching peers.
324
325 def run_connect(peer, cmd):
326 """
327 Start the job of connecting to the passive PEER.
328
329 The CMD string is a shell command which will connect to the peer (via some
330 back-channel, say ssh and userv), issue a command
331
332 SVCSUBMIT connect passive [OPTIONS] USER
333
334 and write the resulting challenge to standard error.
335 """
336 q = T.Queue()
337 cmd = Command(['connect', peer.name], q, 'connect',
338 ['/bin/sh', '-c', cmd], None)
339 _, kind, more = q.peek()
340 if kind == 'stdout':
341 if more is None:
342 S.warn('connect', 'connect', peer.name, 'unexpected-eof')
343 else:
344 chal = more
345 S.greet(peer.name, chal)
346 q.get()
347 potwatch('connect', peer.name, q)
348
349 def run_disconnect(peer, cmd):
350 """
351 Start the job of disconnecting from a passive PEER.
352
353 The CMD string is a shell command which will disconnect from the peer.
354 """
355 q = T.Queue()
356 cmd = Command(['disconnect', peer.name], q, 'disconnect',
357 ['/bin/sh', '-c', cmd], None)
358 potwatch('disconnect', peer.name, q)
359
360 _pingseq = 0
361 class PingPeer (object):
362 """
363 Object representing a peer which we are pinging to ensure that it is still
364 present.
365
366 PingPeer objects are held by the Pinger (q.v.). The Pinger maintains an
367 event queue -- which saves us from having an enormous swarm of coroutines
368 -- but most of the actual work is done here.
369
370 In order to avoid confusion between different PingPeer instances for the
371 same actual peer, each PingPeer has a sequence number (its `seq'
372 attribute). Events for the PingPeer are identified by a (PEER, SEQ) pair.
373 (Using the PingPeer instance itself will prevent garbage collection of
374 otherwise defunct instances.)
375 """
376
377 def __init__(me, pinger, queue, peer, pingnow):
378 """
379 Create a new PingPeer.
380
381 The PINGER is the Pinger object we should send the results to. This is
382 used when we remove ourselves, if the peer has been explicitly removed.
383
384 The QUEUE is the event queue on which timer and ping-command events
385 should be written.
386
387 The PEER is a `Peer' object describing the peer.
388
389 If PINGNOW is true, then immediately start pinging the peer. Otherwise
390 wait until the usual retry interval.
391 """
392 global _pingseq
393 me._pinger = pinger
394 me._q = queue
395 me._peer = peer.name
396 me.update(peer)
397 me.seq = _pingseq
398 _pingseq += 1
399 me._failures = 0
400 me._sabotage = False
401 me._last = '-'
402 me._nping = 0
403 me._nlost = 0
404 me._sigma_t = 0
405 me._sigma_t2 = 0
406 me._min = me._max = '-'
407 if pingnow:
408 me._timer = None
409 me._ping()
410 else:
411 me._timer = M.SelTimer(time() + me._every, me._time)
412
413 def update(me, peer):
414 """
415 Refreshes the timer parameters for this peer. We don't, however,
416 immediately reschedule anything: that will happen next time anything
417 interesting happens.
418 """
419 if peer is None: peer = Peer(me._peer)
420 assert peer.name == me._peer
421 me._every = peer.get('every', filter = T.timespec, default = 120)
422 me._timeout = peer.get('timeout', filter = T.timespec, default = 10)
423 me._retries = peer.get('retries', filter = int, default = 5)
424 me._connectp = peer.has('connect')
425 return me
426
427 def _ping(me):
428 """
429 Send a ping to the peer; the result is sent to the Pinger's event queue.
430 """
431 S.rawcommand(T.TripeAsynchronousCommand(
432 me._q, (me._peer, me.seq),
433 ['EPING',
434 '-background', S.bgtag(),
435 '-timeout', str(me._timeout),
436 '--',
437 me._peer]))
438
439 def _reconnect(me):
440 try:
441 peer = Peer(me._peer)
442 if me._connectp:
443 S.warn('connect', 'reconnecting', me._peer)
444 S.forcekx(me._peer)
445 T.spawn(run_connect, peer, peer.get('connect'))
446 me._timer = M.SelTimer(time() + me._every, me._time)
447 me._sabotage = False
448 else:
449 S.kill(me._peer)
450 except TripeError, e:
451 if e.args[0] == 'unknown-peer': me._pinger.kill(me._peer)
452
453 def event(me, code, stuff):
454 """
455 Respond to an event which happened to this peer.
456
457 Timer events indicate that we should start a new ping. (The server has
458 its own timeout which detects lost packets.)
459
460 We trap unknown-peer responses and detach from the Pinger.
461
462 If the ping fails and we run out of retries, we attempt to restart the
463 connection.
464 """
465 if code == 'TIMER':
466 me._failures = 0
467 me._ping()
468 elif code == 'FAIL':
469 S.notify('connect', 'ping-failed', me._peer, *stuff)
470 if not stuff: pass
471 elif stuff[0] == 'unknown-peer': me._pinger.kill(me._peer)
472 elif stuff[0] == 'ping-send-failed': me._reconnect()
473 elif code == 'INFO':
474 outcome = stuff[0]
475 if outcome == 'ping-ok' and me._sabotage:
476 outcome = 'ping-timeout'
477 if outcome == 'ping-ok':
478 if me._failures > 0: S.warn('connect', 'ping-ok', me._peer)
479 t = float(stuff[1])
480 me._last = '%.1fms' % t
481 me._sigma_t += t
482 me._sigma_t2 += t*t
483 me._nping += 1
484 if me._min == '-' or t < me._min: me._min = t
485 if me._max == '-' or t > me._max: me._max = t
486 me._timer = M.SelTimer(time() + me._every, me._time)
487 elif outcome == 'ping-timeout':
488 me._failures += 1
489 me._nlost += 1
490 S.warn('connect', 'ping-timeout', me._peer,
491 'attempt', str(me._failures), 'of', str(me._retries))
492 if me._failures < me._retries:
493 me._ping()
494 me._last = 'timeout'
495 else:
496 me._reconnect()
497 me._last = 'reconnect'
498 elif outcome == 'ping-peer-died':
499 me._pinger.kill(me._peer)
500
501 def sabotage(me):
502 """Sabotage the peer, for testing purposes."""
503 me._sabotage = True
504 if me._timer: me._timer.kill()
505 T.defer(me._time)
506
507 def info(me):
508 if not me._nping:
509 mean = sd = '-'
510 else:
511 mean = me._sigma_t/me._nping
512 sd = sqrt(me._sigma_t2/me._nping - mean*mean)
513 n = me._nping + me._nlost
514 if not n: pclost = '-'
515 else: pclost = '%d' % ((100*me._nlost + n//2)//n)
516 return { 'last-ping': me._last,
517 'mean-ping': '%.1fms' % mean,
518 'sd-ping': '%.1fms' % sd,
519 'n-ping': '%d' % me._nping,
520 'n-lost': '%d' % me._nlost,
521 'percent-lost': pclost,
522 'min-ping': '%.1fms' % me._min,
523 'max-ping': '%.1fms' % me._max,
524 'state': me._timer and 'idle' or 'check',
525 'failures': me._failures }
526
527 @T._callback
528 def _time(me):
529 """
530 Handle timer callbacks by posting a timeout event on the queue.
531 """
532 me._timer = None
533 me._q.put(((me._peer, me.seq), 'TIMER', None))
534
535 def __str__(me):
536 return 'PingPeer(%s, %d, f = %d)' % (me._peer, me.seq, me._failures)
537 def __repr__(me):
538 return str(me)
539
540 class Pinger (T.Coroutine):
541 """
542 The Pinger keeps track of the peers which we expect to be connected and
543 takes action if they seem to stop responding.
544
545 There is usually only one Pinger, called pinger.
546
547 The Pinger maintains a collection of PingPeer objects, and an event queue.
548 The PingPeers direct the results of their pings, and timer events, to the
549 event queue. The Pinger's coroutine picks items off the queue and
550 dispatches them back to the PingPeers as appropriate.
551 """
552
553 def __init__(me):
554 """Initialize the Pinger."""
555 T.Coroutine.__init__(me)
556 me._peers = {}
557 me._q = T.Queue()
558
559 def run(me):
560 """
561 Coroutine function: reads the pinger queue and sends events to the
562 PingPeer objects they correspond to.
563 """
564 while True:
565 (peer, seq), code, stuff = me._q.get()
566 if peer in me._peers and seq == me._peers[peer].seq:
567 try: me._peers[peer].event(code, stuff)
568 except Exception, e:
569 SYS.excepthook(*SYS.exc_info())
570
571 def add(me, peer, pingnow):
572 """
573 Add PEER to the collection of peers under the Pinger's watchful eye.
574 The arguments are as for PingPeer: see above.
575 """
576 me._peers[peer.name] = PingPeer(me, me._q, peer, pingnow)
577 return me
578
579 def kill(me, peername):
580 """Remove PEER from the peers being watched by the Pinger."""
581 try: del me._peers[peername]
582 except KeyError: pass
583 return me
584
585 def rescan(me, startup):
586 """
587 General resynchronization method.
588
589 We scan the list of peers (with connect scripts) known at the server.
590 Any which are known to the Pinger but aren't known to the server are
591 removed from our list; newly arrived peers are added. (Note that a peer
592 can change state here either due to the server sneakily changing its list
593 without issuing notifications or, more likely, the database changing its
594 idea of whether a peer is interesting.) Finally, PingPeers which are
595 still present are prodded to update their timing parameters.
596
597 This method is called once at startup to pick up the peers already
598 installed, and again by the dbwatcher coroutine when it detects a change
599 to the database.
600 """
601 if T._debug: print '# rescan peers'
602 correct = {}
603 start = {}
604 for name in S.list():
605 try: peer = Peer(name)
606 except KeyError: continue
607 if peer.get('watch', filter = boolean, default = False):
608 if T._debug: print '# interesting peer %s' % peer
609 correct[peer.name] = start[peer.name] = peer
610 elif startup:
611 if T._debug: print '# peer %s ready for adoption' % peer
612 start[peer.name] = peer
613 for name, obj in me._peers.items():
614 try:
615 peer = correct[name]
616 except KeyError:
617 if T._debug: print '# peer %s vanished' % name
618 del me._peers[name]
619 else:
620 obj.update(peer)
621 for name, peer in start.iteritems():
622 if name in me._peers: continue
623 if startup:
624 if T._debug: print '# setting up peer %s' % name
625 ifname = S.ifname(name)
626 addr = S.addr(name)
627 T.defer(adoptpeer, peer, ifname, *addr)
628 else:
629 if T._debug: print '# adopting new peer %s' % name
630 me.add(peer, True)
631 return me
632
633 def adopted(me):
634 """
635 Returns the list of peers being watched by the Pinger.
636 """
637 return me._peers.keys()
638
639 def find(me, name):
640 """Return the PingPeer with the given name."""
641 return me._peers[name]
642
643 ###--------------------------------------------------------------------------
644 ### New connections.
645
646 def encode_envvars(env, prefix, vars):
647 """
648 Encode the variables in VARS suitably for including in a program
649 environment. Lowercase letters in variable names are forced to uppercase;
650 runs of non-alphanumeric characters are replaced by single underscores; and
651 the PREFIX is prepended. The resulting variables are written to ENV.
652 """
653 for k, v in vars.iteritems():
654 env[prefix + r_bad.sub('_', k.upper())] = v
655
656 r_bad = RX.compile(r'[\W_]+')
657 def envvars(peer):
658 """
659 Translate the database information for a PEER into a dictionary of
660 environment variables with plausible upper-case names and a P_ prefix.
661 Also collect the crypto information into A_ variables.
662 """
663 env = {}
664 encode_envvars(env, 'P_', dict([(k, peer.get(k)) for k in peer.list()]))
665 encode_envvars(env, 'A_', S.algs(peer.name))
666 return env
667
668 def run_ifupdown(what, peer, *args):
669 """
670 Run the interface up/down script for a peer.
671
672 WHAT is 'ifup' or 'ifdown'. PEER names the peer in question. ARGS is a
673 list of arguments to pass to the script, in addition to the peer name.
674
675 The command is run and watched in the background by potwatch.
676 """
677 q = T.Queue()
678 c = Command([what, peer.name], q, what,
679 M.split(peer.get(what), quotep = True)[0] +
680 [peer.name] + list(args),
681 envvars(peer))
682 potwatch(what, peer.name, q)
683
684 def adoptpeer(peer, ifname, *addr):
685 """
686 Add a new peer to our collection.
687
688 PEER is the `Peer' object; IFNAME is the interface name for its tunnel; and
689 ADDR is the list of tokens representing its address.
690
691 We try to bring up the interface and provoke a connection to the peer if
692 it's passive.
693 """
694 if peer.has('ifup'):
695 T.Coroutine(run_ifupdown, name = 'ifup %s' % peer.name) \
696 .switch('ifup', peer, ifname, *addr)
697 cmd = peer.get('connect', default = None)
698 if cmd is not None:
699 T.Coroutine(run_connect, name = 'connect %s' % peer.name) \
700 .switch(peer, cmd)
701 if peer.get('watch', filter = boolean, default = False):
702 pinger.add(peer, False)
703
704 def disownpeer(peer):
705 """Drop the PEER from the Pinger and put its interface to bed."""
706 try: pinger.kill(peer)
707 except KeyError: pass
708 cmd = peer.get('disconnect', default = None)
709 if cmd is not None:
710 T.Coroutine(run_disconnect, name = 'disconnect %s' % peer.name) \
711 .switch(peer, cmd)
712 if peer.has('ifdown'):
713 T.Coroutine(run_ifupdown, name = 'ifdown %s' % peer.name) \
714 .switch('ifdown', peer)
715
716 def addpeer(peer, addr):
717 """
718 Process a connect request from a new peer PEER on address ADDR.
719
720 Any existing peer with this name is disconnected from the server.
721 """
722 if peer.name in S.list():
723 S.kill(peer.name)
724 try:
725 booltrue = ['t', 'true', 'y', 'yes', 'on']
726 S.add(peer.name,
727 tunnel = peer.get('tunnel', None),
728 keepalive = peer.get('keepalive', None),
729 key = peer.get('key', None),
730 priv = peer.get('priv', None),
731 mobile = peer.get('mobile', 'nil') in booltrue,
732 cork = peer.get('cork', 'nil') in booltrue,
733 *addr)
734 except T.TripeError, exc:
735 raise T.TripeJobError(*exc.args)
736
737 ## Dictionary mapping challenges to waiting passive-connection coroutines.
738 chalmap = {}
739
740 def notify(_, code, *rest):
741 """
742 Watch for notifications.
743
744 We trap ADD and KILL notifications, and send them straight to adoptpeer and
745 disownpeer respectively; and dispatch GREET notifications to the
746 corresponding waiting coroutine.
747 """
748 if code == 'ADD':
749 try: p = Peer(rest[0])
750 except KeyError: return
751 adoptpeer(p, *rest[1:])
752 elif code == 'KILL':
753 try: p = Peer(rest[0])
754 except KeyError: return
755 disownpeer(p, *rest[1:])
756 elif code == 'GREET':
757 chal = rest[0]
758 try: cr = chalmap[chal]
759 except KeyError: pass
760 else: cr.switch(rest[1:])
761
762 ###--------------------------------------------------------------------------
763 ### Command implementation.
764
765 def cmd_kick(name):
766 """
767 kick NAME: Force a new connection attempt for the NAMEd peer.
768 """
769 try: pp = pinger.find(name)
770 except KeyError: raise T.TripeJobError('peer-not-adopted', name)
771 try: peer = Peer(name)
772 except KeyError: raise T.TripeJobError('unknown-peer', name)
773 conn = peer.get('connect', None)
774 if conn: T.spawn(run_connect, peer, peer.get('connect'))
775 else: T.spawn(lambda p: S.forcekx(p.name), peer)
776
777 def cmd_adopted():
778 """
779 adopted: Report a list of adopted peers.
780 """
781 for name in pinger.adopted():
782 T.svcinfo(name)
783
784 def cmd_active(name):
785 """
786 active NAME: Handle an active connection request for the peer called NAME.
787
788 The appropriate address is read from the database automatically.
789 """
790 try: peer = Peer(name)
791 except KeyError: raise T.TripeJobError('unknown-peer', name)
792 addr = peer.get('peer')
793 if addr == 'PASSIVE':
794 raise T.TripeJobError('passive-peer', name)
795 addpeer(peer, M.split(addr, quotep = True)[0])
796
797 def cmd_listactive():
798 """
799 list: Report a list of the available active peers.
800 """
801 cdb = CDB.init(opts.cdb)
802 for key in cdb.keys():
803 if key.startswith('P') and Peer(key[1:]).get('peer', '') != 'PASSIVE':
804 T.svcinfo(key[1:])
805
806 def cmd_info(name):
807 """
808 info NAME: Report the database entries for the named peer.
809 """
810 try: peer = Peer(name)
811 except KeyError: raise T.TripeJobError('unknown-peer', name)
812 d = {}
813 try: pp = pinger.find(name)
814 except KeyError: pass
815 else: d.update(pp.info())
816 items = list(peer.list()) + d.keys()
817 items.sort()
818 for i in items:
819 try: v = d[i]
820 except KeyError: v = peer.get(i)
821 T.svcinfo('%s=%s' % (i, v))
822
823 def cmd_userpeer(user):
824 """
825 userpeer USER: Report the peer name for the named user.
826 """
827 try: name = CDB.init(opts.cdb)['U' + user]
828 except KeyError: raise T.TripeJobError('unknown-user', user)
829 T.svcinfo(name)
830
831 def cmd_passive(*args):
832 """
833 passive [OPTIONS] USER: Await the arrival of the named USER.
834
835 Report a challenge; when (and if!) the server receives a greeting quoting
836 this challenge, add the corresponding peer to the server.
837 """
838 timeout = 30
839 op = T.OptParse(args, ['-timeout'])
840 for opt in op:
841 if opt == '-timeout':
842 timeout = T.timespec(op.arg())
843 user, = op.rest(1, 1)
844 try: name = CDB.init(opts.cdb)['U' + user]
845 except KeyError: raise T.TripeJobError('unknown-user', user)
846 try: peer = Peer(name)
847 except KeyError: raise T.TripeJobError('unknown-peer', name)
848 chal = S.getchal()
849 cr = T.Coroutine.getcurrent()
850 timer = M.SelTimer(time() + timeout, lambda: cr.switch(None))
851 try:
852 T.svcinfo(chal)
853 chalmap[chal] = cr
854 addr = cr.parent.switch()
855 if addr is None:
856 raise T.TripeJobError('connect-timeout')
857 addpeer(peer, addr)
858 finally:
859 del chalmap[chal]
860
861 def cmd_sabotage(name):
862 """
863 sabotage NAME: Sabotage the NAMEd peer so that we think it can't be pinged.
864 """
865 try: pp = pinger.find(name)
866 except KeyError: raise T.TripeJobError('unknown-peer', name)
867 pp.sabotage()
868
869 ###--------------------------------------------------------------------------
870 ### Start up.
871
872 def setup():
873 """
874 Service setup.
875
876 Register the notification watcher, rescan the peers, and add automatic
877 active peers.
878 """
879 S.handler['NOTE'] = notify
880 S.watch('+n')
881
882 pinger.rescan(opts.startup)
883
884 if opts.startup:
885 cdb = CDB.init(opts.cdb)
886 try:
887 autos = cdb['%AUTO']
888 except KeyError:
889 autos = ''
890 for name in M.split(autos)[0]:
891 try:
892 peer = Peer(name, cdb)
893 addpeer(peer, M.split(peer.get('peer'), quotep = True)[0])
894 except T.TripeJobError, err:
895 S.warn('connect', 'auto-add-failed', name, *err.args)
896
897 def init():
898 """
899 Initialization to be done before service startup.
900 """
901 global errorwatch, childwatch, pinger
902 errorwatch = ErrorWatch()
903 childwatch = ChildWatch()
904 pinger = Pinger()
905 T.Coroutine(dbwatch, name = 'dbwatch').switch()
906 errorwatch.switch()
907 pinger.switch()
908
909 def parse_options():
910 """
911 Parse the command-line options.
912
913 Automatically changes directory to the requested configdir, and turns on
914 debugging. Returns the options object.
915 """
916 op = OptionParser(usage = '%prog [-a FILE] [-d DIR]',
917 version = '%%prog %s' % VERSION)
918
919 op.add_option('-a', '--admin-socket',
920 metavar = 'FILE', dest = 'tripesock', default = T.tripesock,
921 help = 'Select socket to connect to [default %default]')
922 op.add_option('-d', '--directory',
923 metavar = 'DIR', dest = 'dir', default = T.configdir,
924 help = 'Select current diretory [default %default]')
925 op.add_option('-p', '--peerdb',
926 metavar = 'FILE', dest = 'cdb', default = T.peerdb,
927 help = 'Select peers database [default %default]')
928 op.add_option('--daemon', dest = 'daemon',
929 default = False, action = 'store_true',
930 help = 'Become a daemon after successful initialization')
931 op.add_option('--debug', dest = 'debug',
932 default = False, action = 'store_true',
933 help = 'Emit debugging trace information')
934 op.add_option('--startup', dest = 'startup',
935 default = False, action = 'store_true',
936 help = 'Being called as part of the server startup')
937
938 opts, args = op.parse_args()
939 if args: op.error('no arguments permitted')
940 OS.chdir(opts.dir)
941 T._debug = opts.debug
942 return opts
943
944 ## Service table, for running manually.
945 service_info = [('connect', T.VERSION, {
946 'adopted': (0, 0, '', cmd_adopted),
947 'kick': (1, 1, 'PEER', cmd_kick),
948 'passive': (1, None, '[OPTIONS] USER', cmd_passive),
949 'active': (1, 1, 'PEER', cmd_active),
950 'info': (1, 1, 'PEER', cmd_info),
951 'list-active': (0, 0, '', cmd_listactive),
952 'userpeer': (1, 1, 'USER', cmd_userpeer),
953 'sabotage': (1, 1, 'PEER', cmd_sabotage)
954 })]
955
956 if __name__ == '__main__':
957 opts = parse_options()
958 OS.environ['TRIPESOCK'] = opts.tripesock
959 T.runservices(opts.tripesock, service_info,
960 init = init, setup = setup,
961 daemon = opts.daemon)
962
963 ###----- That's all, folks --------------------------------------------------