c68e520d03e6f1d65363ad26a915731e920b26c5
[tripe] / svc / connect.in
1 #! @PYTHON@
2 ### -*-python-*-
3 ###
4 ### Connect to remote peers, and keep track of them
5 ###
6 ### (c) 2007 Straylight/Edgeware
7 ###
8
9 ###----- Licensing notice ---------------------------------------------------
10 ###
11 ### This file is part of Trivial IP Encryption (TrIPE).
12 ###
13 ### TrIPE is free software: you can redistribute it and/or modify it under
14 ### the terms of the GNU General Public License as published by the Free
15 ### Software Foundation; either version 3 of the License, or (at your
16 ### option) any later version.
17 ###
18 ### TrIPE is distributed in the hope that it will be useful, but WITHOUT
19 ### ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
20 ### FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
21 ### for more details.
22 ###
23 ### You should have received a copy of the GNU General Public License
24 ### along with TrIPE. If not, see <https://www.gnu.org/licenses/>.
25
26 VERSION = '@VERSION@'
27
28 ###--------------------------------------------------------------------------
29 ### External dependencies.
30
31 from optparse import OptionParser
32 import tripe as T
33 import os as OS
34 import signal as SIG
35 import errno as E
36 from math import sqrt
37 import cdb as CDB
38 import mLib as M
39 import re as RX
40 from time import time
41 import subprocess as PROC
42
43 S = T.svcmgr
44
45 ###--------------------------------------------------------------------------
46 ### Running auxiliary commands.
47
48 class SelLineQueue (M.SelLineBuffer):
49 """Glues the select-line-buffer into the coroutine queue system."""
50
51 def __new__(cls, file, queue, tag, kind):
52 """See __init__ for documentation."""
53 return M.SelLineBuffer.__new__(cls, file.fileno())
54
55 def __init__(me, file, queue, tag, kind):
56 """
57 Initialize a new line-reading adaptor.
58
59 The adaptor reads lines from FILE. Each line is inserted as a message of
60 the stated KIND, bearing the TAG, into the QUEUE. End-of-file is
61 represented as None.
62 """
63 me._q = queue
64 me._file = file
65 me._tag = tag
66 me._kind = kind
67 me.enable()
68
69 @T._callback
70 def line(me, line):
71 me._q.put((me._tag, me._kind, line))
72
73 @T._callback
74 def eof(me):
75 me.disable()
76 me._q.put((me._tag, me._kind, None))
77
78 class ErrorWatch (T.Coroutine):
79 """
80 An object which watches stderr streams for errors and converts them into
81 warnings of the form
82
83 WARN connect INFO stderr LINE
84
85 The INFO is a list of tokens associated with the file when it was
86 registered.
87
88 Usually there is a single ErrorWatch object, called errorwatch.
89 """
90
91 def __init__(me):
92 """Initialization: there are no arguments."""
93 T.Coroutine.__init__(me)
94 me._q = T.Queue()
95 me._map = {}
96 me._seq = 1
97
98 def watch(me, file, info):
99 """
100 Adds FILE to the collection of files to watch.
101
102 INFO will be written in the warning messages from this FILE. Returns a
103 sequence number which can be used to unregister the file again.
104 """
105 seq = me._seq
106 me._seq += 1
107 me._map[seq] = info, SelLineQueue(file, me._q, seq, 'stderr')
108 return seq
109
110 def unwatch(me, seq):
111 """Stop watching the file with sequence number SEQ."""
112 del me._map[seq]
113 return me
114
115 def run(me):
116 """
117 Coroutine function: read items from the queue and report them.
118
119 Unregisters files automatically when they reach EOF.
120 """
121 while True:
122 seq, _, line = me._q.get()
123 if line is None:
124 me.unwatch(seq)
125 else:
126 S.warn(*['connect'] + me._map[seq][0] + ['stderr', line])
127
128 def dbwatch():
129 """
130 Coroutine function: wake up every minute and notice changes to the
131 database. When a change happens, tell the Pinger (q.v.) to rescan its
132 peers.
133 """
134 cr = T.Coroutine.getcurrent()
135 main = cr.parent
136 fw = M.FWatch(opts.cdb)
137 while True:
138 timer = M.SelTimer(time() + 60, lambda: cr.switch())
139 main.switch()
140 if fw.update():
141 pinger.rescan(False)
142 S.notify('connect', 'peerdb-update')
143
144 class ChildWatch (M.SelSignal):
145 """
146 An object which watches for specified processes exiting and reports
147 terminations by writing items of the form (TAG, 'exit', RESULT) to a queue.
148
149 There is usually only one ChildWatch object, called childwatch.
150 """
151
152 def __new__(cls):
153 """Initialize the child-watcher."""
154 return M.SelSignal.__new__(cls, SIG.SIGCHLD)
155
156 def __init__(me):
157 """Initialize the child-watcher."""
158 me._pid = {}
159 me.enable()
160
161 def watch(me, pid, queue, tag):
162 """
163 Register PID as a child to watch. If it exits, write (TAG, 'exit', CODE)
164 to the QUEUE, where CODE is one of
165
166 * None (successful termination)
167 * ['exit-nonzero', CODE] (CODE is a string!)
168 * ['exit-signal', 'S' + CODE] (CODE is the signal number as a string)
169 * ['exit-unknown', STATUS] (STATUS is the entire exit status, in hex)
170 """
171 me._pid[pid] = queue, tag
172 return me
173
174 def unwatch(me, pid):
175 """Unregister PID as a child to watch."""
176 del me._pid[pid]
177 return me
178
179 @T._callback
180 def signalled(me):
181 """
182 Called when child processes exit: collect exit statuses and report
183 failures.
184 """
185 while True:
186 try:
187 pid, status = OS.waitpid(-1, OS.WNOHANG)
188 except OSError, exc:
189 if exc.errno == E.ECHILD:
190 break
191 if pid == 0:
192 break
193 if pid not in me._pid:
194 continue
195 queue, tag = me._pid[pid]
196 if OS.WIFEXITED(status):
197 exit = OS.WEXITSTATUS(status)
198 if exit == 0:
199 code = None
200 else:
201 code = ['exit-nonzero', str(exit)]
202 elif OS.WIFSIGNALED(status):
203 code = ['exit-signal', 'S' + str(OS.WTERMSIG(status))]
204 else:
205 code = ['exit-unknown', hex(status)]
206 queue.put((tag, 'exit', code))
207
208 class Command (object):
209 """
210 Represents a running command.
211
212 This class is the main interface to the machery provided by the ChildWatch
213 and ErrorWatch objects. See also potwatch.
214 """
215
216 def __init__(me, info, queue, tag, args, env):
217 """
218 Start a new child process.
219
220 The ARGS are a list of arguments to be given to the child process. The
221 ENV is either None or a dictionary of environment variable assignments to
222 override the extant environment. INFO is a list of tokens to be included
223 in warnings about the child's stderr output. If the child writes a line
224 to standard output, put (TAG, 'stdout', LINE) to the QUEUE. When the
225 child exits, write (TAG, 'exit', CODE) to the QUEUE.
226 """
227 me._info = info
228 me._q = queue
229 me._tag = tag
230 myenv = OS.environ.copy()
231 if env: myenv.update(env)
232 me._proc = PROC.Popen(args = args, env = myenv, bufsize = 1,
233 stdout = PROC.PIPE, stderr = PROC.PIPE)
234 me._lq = SelLineQueue(me._proc.stdout, queue, tag, 'stdout')
235 errorwatch.watch(me._proc.stderr, info)
236 childwatch.watch(me._proc.pid, queue, tag)
237
238 def __del__(me):
239 """
240 If I've been forgotten then stop watching for termination.
241 """
242 childwatch.unwatch(me._proc.pid)
243
244 def potwatch(what, name, q):
245 """
246 Watch the queue Q for activity as reported by a Command object.
247
248 Information from the process's stdout is reported as
249
250 NOTE WHAT NAME stdout LINE
251
252 abnormal termination is reported as
253
254 WARN WHAT NAME CODE
255
256 where CODE is what the ChildWatch wrote.
257 """
258 eofp = deadp = False
259 while not deadp or not eofp:
260 _, kind, more = q.get()
261 if kind == 'stdout':
262 if more is None:
263 eofp = True
264 else:
265 S.notify('connect', what, name, 'stdout', more)
266 elif kind == 'exit':
267 if more: S.warn('connect', what, name, *more)
268 deadp = True
269
270 ###--------------------------------------------------------------------------
271 ### Peer database utilities.
272
273 _magic = ['_magic'] # An object distinct from all others
274
275 class Peer (object):
276 """Representation of a peer in the database."""
277
278 def __init__(me, peer, cdb = None):
279 """
280 Create a new peer, named PEER.
281
282 Information about the peer is read from the database CDB, or the default
283 one given on the command-line.
284 """
285 me.name = peer
286 record = (cdb or CDB.init(opts.cdb))['P' + peer]
287 me.__dict__.update(M.URLDecode(record, semip = True))
288
289 def get(me, key, default = _magic, filter = None):
290 """
291 Get the information stashed under KEY from the peer's database record.
292
293 If DEFAULT is given, then use it if the database doesn't contain the
294 necessary information. If no DEFAULT is given, then report an error. If
295 a FILTER function is given then apply it to the information from the
296 database before returning it.
297 """
298 attr = me.__dict__.get(key, default)
299 if attr is _magic:
300 raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key)
301 elif filter is not None:
302 attr = filter(attr)
303 return attr
304
305 def has(me, key):
306 """
307 Return whether the peer's database record has the KEY.
308 """
309 return key in me.__dict__
310
311 def list(me):
312 """
313 Iterate over the available keys in the peer's database record.
314 """
315 return me.__dict__.iterkeys()
316
317 def boolean(value):
318 """Parse VALUE as a boolean."""
319 return value in ['t', 'true', 'y', 'yes', 'on']
320
321 ###--------------------------------------------------------------------------
322 ### Waking up and watching peers.
323
324 def run_connect(peer, cmd):
325 """
326 Start the job of connecting to the passive PEER.
327
328 The CMD string is a shell command which will connect to the peer (via some
329 back-channel, say ssh and userv), issue a command
330
331 SVCSUBMIT connect passive [OPTIONS] USER
332
333 and write the resulting challenge to standard error.
334 """
335 q = T.Queue()
336 cmd = Command(['connect', peer.name], q, 'connect',
337 ['/bin/sh', '-c', cmd], None)
338 _, kind, more = q.peek()
339 if kind == 'stdout':
340 if more is None:
341 S.warn('connect', 'connect', peer.name, 'unexpected-eof')
342 else:
343 chal = more
344 S.greet(peer.name, chal)
345 q.get()
346 potwatch('connect', peer.name, q)
347
348 def run_disconnect(peer, cmd):
349 """
350 Start the job of disconnecting from a passive PEER.
351
352 The CMD string is a shell command which will disconnect from the peer.
353 """
354 q = T.Queue()
355 cmd = Command(['disconnect', peer.name], q, 'disconnect',
356 ['/bin/sh', '-c', cmd], None)
357 potwatch('disconnect', peer.name, q)
358
359 _pingseq = 0
360 class PingPeer (object):
361 """
362 Object representing a peer which we are pinging to ensure that it is still
363 present.
364
365 PingPeer objects are held by the Pinger (q.v.). The Pinger maintains an
366 event queue -- which saves us from having an enormous swarm of coroutines
367 -- but most of the actual work is done here.
368
369 In order to avoid confusion between different PingPeer instances for the
370 same actual peer, each PingPeer has a sequence number (its `seq'
371 attribute). Events for the PingPeer are identified by a (PEER, SEQ) pair.
372 (Using the PingPeer instance itself will prevent garbage collection of
373 otherwise defunct instances.)
374 """
375
376 def __init__(me, pinger, queue, peer, pingnow):
377 """
378 Create a new PingPeer.
379
380 The PINGER is the Pinger object we should send the results to. This is
381 used when we remove ourselves, if the peer has been explicitly removed.
382
383 The QUEUE is the event queue on which timer and ping-command events
384 should be written.
385
386 The PEER is a `Peer' object describing the peer.
387
388 If PINGNOW is true, then immediately start pinging the peer. Otherwise
389 wait until the usual retry interval.
390 """
391 global _pingseq
392 me._pinger = pinger
393 me._q = queue
394 me._peer = peer.name
395 me.update(peer)
396 me.seq = _pingseq
397 _pingseq += 1
398 me._failures = 0
399 me._sabotage = False
400 me._last = '-'
401 me._nping = 0
402 me._nlost = 0
403 me._sigma_t = 0
404 me._sigma_t2 = 0
405 me._min = me._max = '-'
406 if pingnow:
407 me._timer = None
408 me._ping()
409 else:
410 me._timer = M.SelTimer(time() + me._every, me._time)
411
412 def update(me, peer):
413 """
414 Refreshes the timer parameters for this peer. We don't, however,
415 immediately reschedule anything: that will happen next time anything
416 interesting happens.
417 """
418 if peer is None: peer = Peer(me._peer)
419 assert peer.name == me._peer
420 me._every = peer.get('every', filter = T.timespec, default = 120)
421 me._timeout = peer.get('timeout', filter = T.timespec, default = 10)
422 me._retries = peer.get('retries', filter = int, default = 5)
423 me._connectp = peer.has('connect')
424 return me
425
426 def _ping(me):
427 """
428 Send a ping to the peer; the result is sent to the Pinger's event queue.
429 """
430 S.rawcommand(T.TripeAsynchronousCommand(
431 me._q, (me._peer, me.seq),
432 ['EPING',
433 '-background', S.bgtag(),
434 '-timeout', str(me._timeout),
435 '--',
436 me._peer]))
437
438 def _reconnect(me):
439 try:
440 peer = Peer(me._peer)
441 if me._connectp:
442 S.warn('connect', 'reconnecting', me._peer)
443 S.forcekx(me._peer)
444 T.spawn(run_connect, peer, peer.get('connect'))
445 me._timer = M.SelTimer(time() + me._every, me._time)
446 me._sabotage = False
447 else:
448 S.kill(me._peer)
449 except TripeError, e:
450 if e.args[0] == 'unknown-peer': me._pinger.kill(me._peer)
451
452 def event(me, code, stuff):
453 """
454 Respond to an event which happened to this peer.
455
456 Timer events indicate that we should start a new ping. (The server has
457 its own timeout which detects lost packets.)
458
459 We trap unknown-peer responses and detach from the Pinger.
460
461 If the ping fails and we run out of retries, we attempt to restart the
462 connection.
463 """
464 if code == 'TIMER':
465 me._failures = 0
466 me._ping()
467 elif code == 'FAIL':
468 S.notify('connect', 'ping-failed', me._peer, *stuff)
469 if not stuff:
470 pass
471 elif stuff[0] == 'unknown-peer':
472 me._pinger.kill(me._peer)
473 elif stuff[0] == 'ping-send-failed':
474 me._reconnect()
475 elif code == 'INFO':
476 outcome = stuff[0]
477 if outcome == 'ping-ok' and me._sabotage:
478 outcome = 'ping-timeout'
479 if outcome == 'ping-ok':
480 if me._failures > 0:
481 S.warn('connect', 'ping-ok', me._peer)
482 t = float(stuff[1])
483 me._last = '%.1fms' % t
484 me._sigma_t += t
485 me._sigma_t2 += t*t
486 me._nping += 1
487 if me._min == '-' or t < me._min: me._min = t
488 if me._max == '-' or t > me._max: me._max = t
489 me._timer = M.SelTimer(time() + me._every, me._time)
490 elif outcome == 'ping-timeout':
491 me._failures += 1
492 me._nlost += 1
493 S.warn('connect', 'ping-timeout', me._peer,
494 'attempt', str(me._failures), 'of', str(me._retries))
495 if me._failures < me._retries:
496 me._ping()
497 me._last = 'timeout'
498 else:
499 me._reconnect()
500 me._last = 'reconnect'
501 elif outcome == 'ping-peer-died':
502 me._pinger.kill(me._peer)
503
504 def sabotage(me):
505 """Sabotage the peer, for testing purposes."""
506 me._sabotage = True
507 if me._timer: me._timer.kill()
508 T.defer(me._time)
509
510 def info(me):
511 if not me._nping:
512 mean = sd = '-'
513 else:
514 mean = me._sigma_t/me._nping
515 sd = sqrt(me._sigma_t2/me._nping - mean*mean)
516 n = me._nping + me._nlost
517 if not n: pclost = '-'
518 else: pclost = '%d' % ((100*me._nlost + n//2)//n)
519 return { 'last-ping': me._last,
520 'mean-ping': '%.1fms' % mean,
521 'sd-ping': '%.1fms' % sd,
522 'n-ping': '%d' % me._nping,
523 'n-lost': '%d' % me._nlost,
524 'percent-lost': pclost,
525 'min-ping': '%.1fms' % me._min,
526 'max-ping': '%.1fms' % me._max,
527 'state': me._timer and 'idle' or 'check',
528 'failures': me._failures }
529
530 @T._callback
531 def _time(me):
532 """
533 Handle timer callbacks by posting a timeout event on the queue.
534 """
535 me._timer = None
536 me._q.put(((me._peer, me.seq), 'TIMER', None))
537
538 def __str__(me):
539 return 'PingPeer(%s, %d, f = %d)' % (me._peer, me.seq, me._failures)
540 def __repr__(me):
541 return str(me)
542
543 class Pinger (T.Coroutine):
544 """
545 The Pinger keeps track of the peers which we expect to be connected and
546 takes action if they seem to stop responding.
547
548 There is usually only one Pinger, called pinger.
549
550 The Pinger maintains a collection of PingPeer objects, and an event queue.
551 The PingPeers direct the results of their pings, and timer events, to the
552 event queue. The Pinger's coroutine picks items off the queue and
553 dispatches them back to the PingPeers as appropriate.
554 """
555
556 def __init__(me):
557 """Initialize the Pinger."""
558 T.Coroutine.__init__(me)
559 me._peers = {}
560 me._q = T.Queue()
561
562 def run(me):
563 """
564 Coroutine function: reads the pinger queue and sends events to the
565 PingPeer objects they correspond to.
566 """
567 while True:
568 (peer, seq), code, stuff = me._q.get()
569 if peer in me._peers and seq == me._peers[peer].seq:
570 me._peers[peer].event(code, stuff)
571
572 def add(me, peer, pingnow):
573 """
574 Add PEER to the collection of peers under the Pinger's watchful eye.
575 The arguments are as for PingPeer: see above.
576 """
577 me._peers[peer.name] = PingPeer(me, me._q, peer, pingnow)
578 return me
579
580 def kill(me, peername):
581 """Remove PEER from the peers being watched by the Pinger."""
582 try: del me._peers[peername]
583 except KeyError: pass
584 return me
585
586 def rescan(me, startup):
587 """
588 General resynchronization method.
589
590 We scan the list of peers (with connect scripts) known at the server.
591 Any which are known to the Pinger but aren't known to the server are
592 removed from our list; newly arrived peers are added. (Note that a peer
593 can change state here either due to the server sneakily changing its list
594 without issuing notifications or, more likely, the database changing its
595 idea of whether a peer is interesting.) Finally, PingPeers which are
596 still present are prodded to update their timing parameters.
597
598 This method is called once at startup to pick up the peers already
599 installed, and again by the dbwatcher coroutine when it detects a change
600 to the database.
601 """
602 if T._debug: print '# rescan peers'
603 correct = {}
604 start = {}
605 for name in S.list():
606 try: peer = Peer(name)
607 except KeyError: continue
608 if peer.get('watch', filter = boolean, default = False):
609 if T._debug: print '# interesting peer %s' % peer
610 correct[peer.name] = start[peer.name] = peer
611 elif startup:
612 if T._debug: print '# peer %s ready for adoption' % peer
613 start[peer.name] = peer
614 for name, obj in me._peers.items():
615 try:
616 peer = correct[name]
617 except KeyError:
618 if T._debug: print '# peer %s vanished' % name
619 del me._peers[name]
620 else:
621 obj.update(peer)
622 for name, peer in start.iteritems():
623 if name in me._peers: continue
624 if startup:
625 if T._debug: print '# setting up peer %s' % name
626 ifname = S.ifname(name)
627 addr = S.addr(name)
628 T.defer(adoptpeer, peer, ifname, *addr)
629 else:
630 if T._debug: print '# adopting new peer %s' % name
631 me.add(peer, True)
632 return me
633
634 def adopted(me):
635 """
636 Returns the list of peers being watched by the Pinger.
637 """
638 return me._peers.keys()
639
640 def find(me, name):
641 """Return the PingPeer with the given name."""
642 return me._peers[name]
643
644 ###--------------------------------------------------------------------------
645 ### New connections.
646
647 def encode_envvars(env, prefix, vars):
648 """
649 Encode the variables in VARS suitably for including in a program
650 environment. Lowercase letters in variable names are forced to uppercase;
651 runs of non-alphanumeric characters are replaced by single underscores; and
652 the PREFIX is prepended. The resulting variables are written to ENV.
653 """
654 for k, v in vars.iteritems():
655 env[prefix + r_bad.sub('_', k.upper())] = v
656
657 r_bad = RX.compile(r'[\W_]+')
658 def envvars(peer):
659 """
660 Translate the database information for a PEER into a dictionary of
661 environment variables with plausible upper-case names and a P_ prefix.
662 Also collect the crypto information into A_ variables.
663 """
664 env = {}
665 encode_envvars(env, 'P_', dict([(k, peer.get(k)) for k in peer.list()]))
666 encode_envvars(env, 'A_', S.algs(peer.name))
667 return env
668
669 def run_ifupdown(what, peer, *args):
670 """
671 Run the interface up/down script for a peer.
672
673 WHAT is 'ifup' or 'ifdown'. PEER names the peer in question. ARGS is a
674 list of arguments to pass to the script, in addition to the peer name.
675
676 The command is run and watched in the background by potwatch.
677 """
678 q = T.Queue()
679 c = Command([what, peer.name], q, what,
680 M.split(peer.get(what), quotep = True)[0] +
681 [peer.name] + list(args),
682 envvars(peer))
683 potwatch(what, peer.name, q)
684
685 def adoptpeer(peer, ifname, *addr):
686 """
687 Add a new peer to our collection.
688
689 PEER is the `Peer' object; IFNAME is the interface name for its tunnel; and
690 ADDR is the list of tokens representing its address.
691
692 We try to bring up the interface and provoke a connection to the peer if
693 it's passive.
694 """
695 if peer.has('ifup'):
696 T.Coroutine(run_ifupdown, name = 'ifup %s' % peer.name) \
697 .switch('ifup', peer, ifname, *addr)
698 cmd = peer.get('connect', default = None)
699 if cmd is not None:
700 T.Coroutine(run_connect, name = 'connect %s' % peer.name) \
701 .switch(peer, cmd)
702 if peer.get('watch', filter = boolean, default = False):
703 pinger.add(peer, False)
704
705 def disownpeer(peer):
706 """Drop the PEER from the Pinger and put its interface to bed."""
707 try: pinger.kill(peer)
708 except KeyError: pass
709 cmd = peer.get('disconnect', default = None)
710 if cmd is not None:
711 T.Coroutine(run_disconnect, name = 'disconnect %s' % peer.name) \
712 .switch(peer, cmd)
713 if peer.has('ifdown'):
714 T.Coroutine(run_ifupdown, name = 'ifdown %s' % peer.name) \
715 .switch('ifdown', peer)
716
717 def addpeer(peer, addr):
718 """
719 Process a connect request from a new peer PEER on address ADDR.
720
721 Any existing peer with this name is disconnected from the server.
722 """
723 if peer.name in S.list():
724 S.kill(peer.name)
725 try:
726 booltrue = ['t', 'true', 'y', 'yes', 'on']
727 S.add(peer.name,
728 tunnel = peer.get('tunnel', None),
729 keepalive = peer.get('keepalive', None),
730 key = peer.get('key', None),
731 priv = peer.get('priv', None),
732 mobile = peer.get('mobile', 'nil') in booltrue,
733 cork = peer.get('cork', 'nil') in booltrue,
734 *addr)
735 except T.TripeError, exc:
736 raise T.TripeJobError(*exc.args)
737
738 ## Dictionary mapping challenges to waiting passive-connection coroutines.
739 chalmap = {}
740
741 def notify(_, code, *rest):
742 """
743 Watch for notifications.
744
745 We trap ADD and KILL notifications, and send them straight to adoptpeer and
746 disownpeer respectively; and dispatch GREET notifications to the
747 corresponding waiting coroutine.
748 """
749 if code == 'ADD':
750 try: p = Peer(rest[0])
751 except KeyError: return
752 adoptpeer(p, *rest[1:])
753 elif code == 'KILL':
754 try: p = Peer(rest[0])
755 except KeyError: return
756 disownpeer(p, *rest[1:])
757 elif code == 'GREET':
758 chal = rest[0]
759 try: cr = chalmap[chal]
760 except KeyError: pass
761 else: cr.switch(rest[1:])
762
763 ###--------------------------------------------------------------------------
764 ### Command implementation.
765
766 def cmd_kick(name):
767 """
768 kick NAME: Force a new connection attempt for the NAMEd peer.
769 """
770 try: pp = pinger.find(name)
771 except KeyError: raise T.TripeJobError('peer-not-adopted', name)
772 try: peer = Peer(name)
773 except KeyError: raise T.TripeJobError('unknown-peer', name)
774 conn = peer.get('connect', None)
775 if conn: T.spawn(run_connect, peer, peer.get('connect'))
776 else: T.spawn(lambda p: S.forcekx(p.name), peer)
777
778 def cmd_adopted():
779 """
780 adopted: Report a list of adopted peers.
781 """
782 for name in pinger.adopted():
783 T.svcinfo(name)
784
785 def cmd_active(name):
786 """
787 active NAME: Handle an active connection request for the peer called NAME.
788
789 The appropriate address is read from the database automatically.
790 """
791 try: peer = Peer(name)
792 except KeyError: raise T.TripeJobError('unknown-peer', name)
793 addr = peer.get('peer')
794 if addr == 'PASSIVE':
795 raise T.TripeJobError('passive-peer', name)
796 addpeer(peer, M.split(addr, quotep = True)[0])
797
798 def cmd_listactive():
799 """
800 list: Report a list of the available active peers.
801 """
802 cdb = CDB.init(opts.cdb)
803 for key in cdb.keys():
804 if key.startswith('P') and Peer(key[1:]).get('peer', '') != 'PASSIVE':
805 T.svcinfo(key[1:])
806
807 def cmd_info(name):
808 """
809 info NAME: Report the database entries for the named peer.
810 """
811 try: peer = Peer(name)
812 except KeyError: raise T.TripeJobError('unknown-peer', name)
813 d = {}
814 try: pp = pinger.find(name)
815 except KeyError: pass
816 else: d.update(pp.info())
817 items = list(peer.list()) + d.keys()
818 items.sort()
819 for i in items:
820 try: v = d[i]
821 except KeyError: v = peer.get(i)
822 T.svcinfo('%s=%s' % (i, v))
823
824 def cmd_userpeer(user):
825 """
826 userpeer USER: Report the peer name for the named user.
827 """
828 try: name = CDB.init(opts.cdb)['U' + user]
829 except KeyError: raise T.TripeJobError('unknown-user', user)
830 T.svcinfo(name)
831
832 def cmd_passive(*args):
833 """
834 passive [OPTIONS] USER: Await the arrival of the named USER.
835
836 Report a challenge; when (and if!) the server receives a greeting quoting
837 this challenge, add the corresponding peer to the server.
838 """
839 timeout = 30
840 op = T.OptParse(args, ['-timeout'])
841 for opt in op:
842 if opt == '-timeout':
843 timeout = T.timespec(op.arg())
844 user, = op.rest(1, 1)
845 try: name = CDB.init(opts.cdb)['U' + user]
846 except KeyError: raise T.TripeJobError('unknown-user', user)
847 try: peer = Peer(name)
848 except KeyError: raise T.TripeJobError('unknown-peer', name)
849 chal = S.getchal()
850 cr = T.Coroutine.getcurrent()
851 timer = M.SelTimer(time() + timeout, lambda: cr.switch(None))
852 try:
853 T.svcinfo(chal)
854 chalmap[chal] = cr
855 addr = cr.parent.switch()
856 if addr is None:
857 raise T.TripeJobError('connect-timeout')
858 addpeer(peer, addr)
859 finally:
860 del chalmap[chal]
861
862 def cmd_sabotage(name):
863 """
864 sabotage NAME: Sabotage the NAMEd peer so that we think it can't be pinged.
865 """
866 try: pp = pinger.find(name)
867 except KeyError: raise T.TripeJobError('unknown-peer', name)
868 pp.sabotage()
869
870 ###--------------------------------------------------------------------------
871 ### Start up.
872
873 def setup():
874 """
875 Service setup.
876
877 Register the notification watcher, rescan the peers, and add automatic
878 active peers.
879 """
880 S.handler['NOTE'] = notify
881 S.watch('+n')
882
883 pinger.rescan(opts.startup)
884
885 if opts.startup:
886 cdb = CDB.init(opts.cdb)
887 try:
888 autos = cdb['%AUTO']
889 except KeyError:
890 autos = ''
891 for name in M.split(autos)[0]:
892 try:
893 peer = Peer(name, cdb)
894 addpeer(peer, M.split(peer.get('peer'), quotep = True)[0])
895 except T.TripeJobError, err:
896 S.warn('connect', 'auto-add-failed', name, *err.args)
897
898 def init():
899 """
900 Initialization to be done before service startup.
901 """
902 global errorwatch, childwatch, pinger
903 errorwatch = ErrorWatch()
904 childwatch = ChildWatch()
905 pinger = Pinger()
906 T.Coroutine(dbwatch, name = 'dbwatch').switch()
907 errorwatch.switch()
908 pinger.switch()
909
910 def parse_options():
911 """
912 Parse the command-line options.
913
914 Automatically changes directory to the requested configdir, and turns on
915 debugging. Returns the options object.
916 """
917 op = OptionParser(usage = '%prog [-a FILE] [-d DIR]',
918 version = '%%prog %s' % VERSION)
919
920 op.add_option('-a', '--admin-socket',
921 metavar = 'FILE', dest = 'tripesock', default = T.tripesock,
922 help = 'Select socket to connect to [default %default]')
923 op.add_option('-d', '--directory',
924 metavar = 'DIR', dest = 'dir', default = T.configdir,
925 help = 'Select current diretory [default %default]')
926 op.add_option('-p', '--peerdb',
927 metavar = 'FILE', dest = 'cdb', default = T.peerdb,
928 help = 'Select peers database [default %default]')
929 op.add_option('--daemon', dest = 'daemon',
930 default = False, action = 'store_true',
931 help = 'Become a daemon after successful initialization')
932 op.add_option('--debug', dest = 'debug',
933 default = False, action = 'store_true',
934 help = 'Emit debugging trace information')
935 op.add_option('--startup', dest = 'startup',
936 default = False, action = 'store_true',
937 help = 'Being called as part of the server startup')
938
939 opts, args = op.parse_args()
940 if args: op.error('no arguments permitted')
941 OS.chdir(opts.dir)
942 T._debug = opts.debug
943 return opts
944
945 ## Service table, for running manually.
946 service_info = [('connect', T.VERSION, {
947 'adopted': (0, 0, '', cmd_adopted),
948 'kick': (1, 1, 'PEER', cmd_kick),
949 'passive': (1, None, '[OPTIONS] USER', cmd_passive),
950 'active': (1, 1, 'PEER', cmd_active),
951 'info': (1, 1, 'PEER', cmd_info),
952 'list-active': (0, 0, '', cmd_listactive),
953 'userpeer': (1, 1, 'USER', cmd_userpeer),
954 'sabotage': (1, 1, 'PEER', cmd_sabotage)
955 })]
956
957 if __name__ == '__main__':
958 opts = parse_options()
959 OS.environ['TRIPESOCK'] = opts.tripesock
960 T.runservices(opts.tripesock, service_info,
961 init = init, setup = setup,
962 daemon = opts.daemon)
963
964 ###----- That's all, folks --------------------------------------------------