svc/connect.in: Report statistics about adopted peers.
[tripe] / svc / connect.in
CommitLineData
a62f8e8a
MW
1#! @PYTHON@
2### -*-python-*-
3###
d64ce4ae 4### Connect to remote peers, and keep track of them
a62f8e8a 5###
d64ce4ae 6### (c) 2007 Straylight/Edgeware
a62f8e8a
MW
7###
8
9###----- Licensing notice ---------------------------------------------------
10###
11### This file is part of Trivial IP Encryption (TrIPE).
12###
11ad66c2
MW
13### TrIPE is free software: you can redistribute it and/or modify it under
14### the terms of the GNU General Public License as published by the Free
15### Software Foundation; either version 3 of the License, or (at your
16### option) any later version.
a62f8e8a 17###
11ad66c2
MW
18### TrIPE is distributed in the hope that it will be useful, but WITHOUT
19### ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
20### FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
21### for more details.
a62f8e8a
MW
22###
23### You should have received a copy of the GNU General Public License
11ad66c2 24### along with TrIPE. If not, see <https://www.gnu.org/licenses/>.
a62f8e8a
MW
25
26VERSION = '@VERSION@'
27
28###--------------------------------------------------------------------------
29### External dependencies.
30
31from optparse import OptionParser
32import tripe as T
33import os as OS
d64ce4ae
MW
34import signal as SIG
35import errno as E
b9dedfa6 36from math import sqrt
a62f8e8a
MW
37import cdb as CDB
38import mLib as M
d64ce4ae 39import re as RX
a62f8e8a 40from time import time
d64ce4ae 41import subprocess as PROC
a62f8e8a
MW
42
43S = T.svcmgr
44
45###--------------------------------------------------------------------------
d64ce4ae
MW
46### Running auxiliary commands.
47
48class SelLineQueue (M.SelLineBuffer):
49 """Glues the select-line-buffer into the coroutine queue system."""
50
51 def __new__(cls, file, queue, tag, kind):
52 """See __init__ for documentation."""
53 return M.SelLineBuffer.__new__(cls, file.fileno())
54
55 def __init__(me, file, queue, tag, kind):
56 """
57 Initialize a new line-reading adaptor.
58
59 The adaptor reads lines from FILE. Each line is inserted as a message of
60 the stated KIND, bearing the TAG, into the QUEUE. End-of-file is
61 represented as None.
62 """
63 me._q = queue
64 me._file = file
65 me._tag = tag
66 me._kind = kind
67 me.enable()
68
69 @T._callback
70 def line(me, line):
71 me._q.put((me._tag, me._kind, line))
72
73 @T._callback
74 def eof(me):
75 me.disable()
76 me._q.put((me._tag, me._kind, None))
77
78class ErrorWatch (T.Coroutine):
79 """
80 An object which watches stderr streams for errors and converts them into
81 warnings of the form
82
83 WARN connect INFO stderr LINE
84
85 The INFO is a list of tokens associated with the file when it was
86 registered.
87
88 Usually there is a single ErrorWatch object, called errorwatch.
89 """
90
91 def __init__(me):
92 """Initialization: there are no arguments."""
93 T.Coroutine.__init__(me)
94 me._q = T.Queue()
95 me._map = {}
96 me._seq = 1
97
98 def watch(me, file, info):
99 """
100 Adds FILE to the collection of files to watch.
101
102 INFO will be written in the warning messages from this FILE. Returns a
103 sequence number which can be used to unregister the file again.
104 """
105 seq = me._seq
106 me._seq += 1
107 me._map[seq] = info, SelLineQueue(file, me._q, seq, 'stderr')
108 return seq
109
110 def unwatch(me, seq):
111 """Stop watching the file with sequence number SEQ."""
112 del me._map[seq]
113 return me
114
115 def run(me):
116 """
117 Coroutine function: read items from the queue and report them.
118
119 Unregisters files automatically when they reach EOF.
120 """
121 while True:
122 seq, _, line = me._q.get()
123 if line is None:
124 me.unwatch(seq)
125 else:
126 S.warn(*['connect'] + me._map[seq][0] + ['stderr', line])
127
128def dbwatch():
129 """
14b77b60 130 Coroutine function: wake up every minute and notice changes to the
d64ce4ae
MW
131 database. When a change happens, tell the Pinger (q.v.) to rescan its
132 peers.
133 """
134 cr = T.Coroutine.getcurrent()
135 main = cr.parent
136 fw = M.FWatch(opts.cdb)
137 while True:
14b77b60 138 timer = M.SelTimer(time() + 60, lambda: cr.switch())
d64ce4ae
MW
139 main.switch()
140 if fw.update():
141 pinger.rescan(False)
142 S.notify('connect', 'peerdb-update')
143
144class ChildWatch (M.SelSignal):
145 """
146 An object which watches for specified processes exiting and reports
147 terminations by writing items of the form (TAG, 'exit', RESULT) to a queue.
148
149 There is usually only one ChildWatch object, called childwatch.
150 """
151
152 def __new__(cls):
153 """Initialize the child-watcher."""
154 return M.SelSignal.__new__(cls, SIG.SIGCHLD)
155
156 def __init__(me):
157 """Initialize the child-watcher."""
158 me._pid = {}
159 me.enable()
160
161 def watch(me, pid, queue, tag):
162 """
163 Register PID as a child to watch. If it exits, write (TAG, 'exit', CODE)
164 to the QUEUE, where CODE is one of
165
166 * None (successful termination)
167 * ['exit-nonzero', CODE] (CODE is a string!)
168 * ['exit-signal', 'S' + CODE] (CODE is the signal number as a string)
169 * ['exit-unknown', STATUS] (STATUS is the entire exit status, in hex)
170 """
171 me._pid[pid] = queue, tag
172 return me
173
174 def unwatch(me, pid):
175 """Unregister PID as a child to watch."""
176 del me._pid[pid]
177 return me
178
179 @T._callback
180 def signalled(me):
181 """
182 Called when child processes exit: collect exit statuses and report
183 failures.
184 """
185 while True:
186 try:
187 pid, status = OS.waitpid(-1, OS.WNOHANG)
188 except OSError, exc:
189 if exc.errno == E.ECHILD:
190 break
191 if pid == 0:
192 break
193 if pid not in me._pid:
194 continue
195 queue, tag = me._pid[pid]
196 if OS.WIFEXITED(status):
197 exit = OS.WEXITSTATUS(status)
198 if exit == 0:
199 code = None
200 else:
201 code = ['exit-nonzero', str(exit)]
202 elif OS.WIFSIGNALED(status):
203 code = ['exit-signal', 'S' + str(OS.WTERMSIG(status))]
204 else:
205 code = ['exit-unknown', hex(status)]
206 queue.put((tag, 'exit', code))
207
208class Command (object):
209 """
210 Represents a running command.
211
212 This class is the main interface to the machery provided by the ChildWatch
213 and ErrorWatch objects. See also potwatch.
214 """
215
216 def __init__(me, info, queue, tag, args, env):
217 """
218 Start a new child process.
219
220 The ARGS are a list of arguments to be given to the child process. The
221 ENV is either None or a dictionary of environment variable assignments to
222 override the extant environment. INFO is a list of tokens to be included
223 in warnings about the child's stderr output. If the child writes a line
224 to standard output, put (TAG, 'stdout', LINE) to the QUEUE. When the
225 child exits, write (TAG, 'exit', CODE) to the QUEUE.
226 """
227 me._info = info
228 me._q = queue
229 me._tag = tag
230 myenv = OS.environ.copy()
231 if env: myenv.update(env)
232 me._proc = PROC.Popen(args = args, env = myenv, bufsize = 1,
233 stdout = PROC.PIPE, stderr = PROC.PIPE)
234 me._lq = SelLineQueue(me._proc.stdout, queue, tag, 'stdout')
235 errorwatch.watch(me._proc.stderr, info)
236 childwatch.watch(me._proc.pid, queue, tag)
237
238 def __del__(me):
239 """
240 If I've been forgotten then stop watching for termination.
241 """
242 childwatch.unwatch(me._proc.pid)
243
244def potwatch(what, name, q):
245 """
246 Watch the queue Q for activity as reported by a Command object.
247
248 Information from the process's stdout is reported as
249
250 NOTE WHAT NAME stdout LINE
251
252 abnormal termination is reported as
253
254 WARN WHAT NAME CODE
255
256 where CODE is what the ChildWatch wrote.
257 """
258 eofp = deadp = False
259 while not deadp or not eofp:
260 _, kind, more = q.get()
261 if kind == 'stdout':
262 if more is None:
263 eofp = True
264 else:
265 S.notify('connect', what, name, 'stdout', more)
266 elif kind == 'exit':
267 if more: S.warn('connect', what, name, *more)
268 deadp = True
269
270###--------------------------------------------------------------------------
271### Peer database utilities.
a62f8e8a
MW
272
273_magic = ['_magic'] # An object distinct from all others
274
275class Peer (object):
276 """Representation of a peer in the database."""
277
278 def __init__(me, peer, cdb = None):
279 """
280 Create a new peer, named PEER.
281
282 Information about the peer is read from the database CDB, or the default
283 one given on the command-line.
284 """
285 me.name = peer
d64ce4ae 286 record = (cdb or CDB.init(opts.cdb))['P' + peer]
a62f8e8a
MW
287 me.__dict__.update(M.URLDecode(record, semip = True))
288
d64ce4ae 289 def get(me, key, default = _magic, filter = None):
a62f8e8a
MW
290 """
291 Get the information stashed under KEY from the peer's database record.
292
293 If DEFAULT is given, then use it if the database doesn't contain the
d64ce4ae
MW
294 necessary information. If no DEFAULT is given, then report an error. If
295 a FILTER function is given then apply it to the information from the
296 database before returning it.
a62f8e8a
MW
297 """
298 attr = me.__dict__.get(key, default)
299 if attr is _magic:
300 raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key)
d64ce4ae
MW
301 elif filter is not None:
302 attr = filter(attr)
a62f8e8a
MW
303 return attr
304
d64ce4ae
MW
305 def has(me, key):
306 """
307 Return whether the peer's database record has the KEY.
308 """
309 return key in me.__dict__
310
a62f8e8a
MW
311 def list(me):
312 """
313 Iterate over the available keys in the peer's database record.
314 """
315 return me.__dict__.iterkeys()
316
d64ce4ae
MW
317def boolean(value):
318 """Parse VALUE as a boolean."""
319 return value in ['t', 'true', 'y', 'yes', 'on']
320
321###--------------------------------------------------------------------------
322### Waking up and watching peers.
323
324def run_connect(peer, cmd):
325 """
326 Start the job of connecting to the passive PEER.
327
328 The CMD string is a shell command which will connect to the peer (via some
329 back-channel, say ssh and userv), issue a command
330
331 SVCSUBMIT connect passive [OPTIONS] USER
332
333 and write the resulting challenge to standard error.
334 """
335 q = T.Queue()
336 cmd = Command(['connect', peer.name], q, 'connect',
337 ['/bin/sh', '-c', cmd], None)
338 _, kind, more = q.peek()
339 if kind == 'stdout':
340 if more is None:
341 S.warn('connect', 'connect', peer.name, 'unexpected-eof')
342 else:
343 chal = more
344 S.greet(peer.name, chal)
345 q.get()
346 potwatch('connect', peer.name, q)
347
348def run_disconnect(peer, cmd):
349 """
350 Start the job of disconnecting from a passive PEER.
351
352 The CMD string is a shell command which will disconnect from the peer.
353 """
354 q = T.Queue()
355 cmd = Command(['disconnect', peer.name], q, 'disconnect',
356 ['/bin/sh', '-c', cmd], None)
357 potwatch('disconnect', peer.name, q)
358
359_pingseq = 0
360class PingPeer (object):
361 """
362 Object representing a peer which we are pinging to ensure that it is still
363 present.
364
365 PingPeer objects are held by the Pinger (q.v.). The Pinger maintains an
366 event queue -- which saves us from having an enormous swarm of coroutines
367 -- but most of the actual work is done here.
368
369 In order to avoid confusion between different PingPeer instances for the
370 same actual peer, each PingPeer has a sequence number (its `seq'
371 attribute). Events for the PingPeer are identified by a (PEER, SEQ) pair.
372 (Using the PingPeer instance itself will prevent garbage collection of
373 otherwise defunct instances.)
374 """
375
376 def __init__(me, pinger, queue, peer, pingnow):
377 """
378 Create a new PingPeer.
379
380 The PINGER is the Pinger object we should send the results to. This is
381 used when we remove ourselves, if the peer has been explicitly removed.
382
383 The QUEUE is the event queue on which timer and ping-command events
384 should be written.
385
386 The PEER is a `Peer' object describing the peer.
387
388 If PINGNOW is true, then immediately start pinging the peer. Otherwise
389 wait until the usual retry interval.
390 """
391 global _pingseq
392 me._pinger = pinger
393 me._q = queue
394 me._peer = peer.name
395 me.update(peer)
396 me.seq = _pingseq
397 _pingseq += 1
398 me._failures = 0
b9dedfa6
MW
399 me._last = '-'
400 me._nping = 0
401 me._nlost = 0
402 me._sigma_t = 0
403 me._sigma_t2 = 0
404 me._min = me._max = '-'
d64ce4ae
MW
405 if pingnow:
406 me._timer = None
407 me._ping()
408 else:
409 me._timer = M.SelTimer(time() + me._every, me._time)
410
411 def update(me, peer):
412 """
413 Refreshes the timer parameters for this peer. We don't, however,
414 immediately reschedule anything: that will happen next time anything
415 interesting happens.
416 """
417 if peer is None: peer = Peer(me._peer)
418 assert peer.name == me._peer
419 me._every = peer.get('every', filter = T.timespec, default = 120)
420 me._timeout = peer.get('timeout', filter = T.timespec, default = 10)
421 me._retries = peer.get('retries', filter = int, default = 5)
422 me._connectp = peer.has('connect')
423 return me
424
425 def _ping(me):
426 """
427 Send a ping to the peer; the result is sent to the Pinger's event queue.
428 """
429 S.rawcommand(T.TripeAsynchronousCommand(
430 me._q, (me._peer, me.seq),
431 ['EPING',
432 '-background', S.bgtag(),
433 '-timeout', str(me._timeout),
434 '--',
435 me._peer]))
436
437 def _reconnect(me):
438 peer = Peer(me._peer)
439 if me._connectp:
440 S.warn('connect', 'reconnecting', me._peer)
441 S.forcekx(me._peer)
442 T.spawn(run_connect, peer, peer.get('connect'))
443 me._timer = M.SelTimer(time() + me._every, me._time)
444 else:
445 S.kill(me._peer)
446
447 def event(me, code, stuff):
448 """
449 Respond to an event which happened to this peer.
450
451 Timer events indicate that we should start a new ping. (The server has
452 its own timeout which detects lost packets.)
453
454 We trap unknown-peer responses and detach from the Pinger.
455
456 If the ping fails and we run out of retries, we attempt to restart the
457 connection.
458 """
459 if code == 'TIMER':
460 me._failures = 0
461 me._ping()
462 elif code == 'FAIL':
463 S.notify('connect', 'ping-failed', me._peer, *stuff)
464 if not stuff:
465 pass
466 elif stuff[0] == 'unknown-peer':
467 me._pinger.kill(me._peer)
468 elif stuff[0] == 'ping-send-failed':
469 me._reconnect()
470 elif code == 'INFO':
471 if stuff[0] == 'ping-ok':
472 if me._failures > 0:
473 S.warn('connect', 'ping-ok', me._peer)
b9dedfa6
MW
474 t = float(stuff[1])
475 me._last = '%.1fms' % t
476 me._sigma_t += t
477 me._sigma_t2 += t*t
478 me._nping += 1
479 if me._min == '-' or t < me._min: me._min = t
480 if me._max == '-' or t > me._max: me._max = t
d64ce4ae
MW
481 me._timer = M.SelTimer(time() + me._every, me._time)
482 elif stuff[0] == 'ping-timeout':
483 me._failures += 1
b9dedfa6 484 me._nlost += 1
d64ce4ae
MW
485 S.warn('connect', 'ping-timeout', me._peer,
486 'attempt', str(me._failures), 'of', str(me._retries))
487 if me._failures < me._retries:
488 me._ping()
b9dedfa6 489 me._last = 'timeout'
d64ce4ae
MW
490 else:
491 me._reconnect()
492 elif stuff[0] == 'ping-peer-died':
493 me._pinger.kill(me._peer)
494
b9dedfa6
MW
495 def info(me):
496 if not me._nping:
497 mean = sd = '-'
498 else:
499 mean = me._sigma_t/me._nping
500 sd = sqrt(me._sigma_t2/me._nping - mean*mean)
501 n = me._nping + me._nlost
502 if not n: pclost = '-'
503 else: pclost = '%d' % ((100*me._nlost + n//2)//n)
504 return { 'last-ping': me._last,
505 'mean-ping': '%.1fms' % mean,
506 'sd-ping': '%.1fms' % sd,
507 'n-ping': '%d' % me._nping,
508 'n-lost': '%d' % me._nlost,
509 'percent-lost': pclost,
510 'min-ping': '%.1fms' % me._min,
511 'max-ping': '%.1fms' % me._max,
512 'state': me._timer and 'idle' or 'check',
513 'failures': me._failures }
514
d64ce4ae
MW
515 @T._callback
516 def _time(me):
517 """
518 Handle timer callbacks by posting a timeout event on the queue.
519 """
520 me._timer = None
521 me._q.put(((me._peer, me.seq), 'TIMER', None))
522
523 def __str__(me):
524 return 'PingPeer(%s, %d, f = %d)' % (me._peer, me.seq, me._failures)
525 def __repr__(me):
526 return str(me)
527
528class Pinger (T.Coroutine):
529 """
530 The Pinger keeps track of the peers which we expect to be connected and
531 takes action if they seem to stop responding.
532
533 There is usually only one Pinger, called pinger.
534
535 The Pinger maintains a collection of PingPeer objects, and an event queue.
536 The PingPeers direct the results of their pings, and timer events, to the
537 event queue. The Pinger's coroutine picks items off the queue and
538 dispatches them back to the PingPeers as appropriate.
539 """
540
541 def __init__(me):
542 """Initialize the Pinger."""
543 T.Coroutine.__init__(me)
544 me._peers = {}
545 me._q = T.Queue()
546
547 def run(me):
548 """
549 Coroutine function: reads the pinger queue and sends events to the
550 PingPeer objects they correspond to.
551 """
552 while True:
553 (peer, seq), code, stuff = me._q.get()
554 if peer in me._peers and seq == me._peers[peer].seq:
555 me._peers[peer].event(code, stuff)
556
557 def add(me, peer, pingnow):
558 """
559 Add PEER to the collection of peers under the Pinger's watchful eye.
560 The arguments are as for PingPeer: see above.
561 """
562 me._peers[peer.name] = PingPeer(me, me._q, peer, pingnow)
563 return me
564
565 def kill(me, peername):
566 """Remove PEER from the peers being watched by the Pinger."""
47912108
MW
567 try: del me._peers[peername]
568 except KeyError: pass
d64ce4ae
MW
569 return me
570
571 def rescan(me, startup):
572 """
573 General resynchronization method.
574
575 We scan the list of peers (with connect scripts) known at the server.
576 Any which are known to the Pinger but aren't known to the server are
577 removed from our list; newly arrived peers are added. (Note that a peer
578 can change state here either due to the server sneakily changing its list
579 without issuing notifications or, more likely, the database changing its
580 idea of whether a peer is interesting.) Finally, PingPeers which are
581 still present are prodded to update their timing parameters.
582
583 This method is called once at startup to pick up the peers already
584 installed, and again by the dbwatcher coroutine when it detects a change
585 to the database.
586 """
587 if T._debug: print '# rescan peers'
588 correct = {}
589 start = {}
590 for name in S.list():
591 try: peer = Peer(name)
592 except KeyError: continue
593 if peer.get('watch', filter = boolean, default = False):
594 if T._debug: print '# interesting peer %s' % peer
595 correct[peer.name] = start[peer.name] = peer
596 elif startup:
597 if T._debug: print '# peer %s ready for adoption' % peer
598 start[peer.name] = peer
599 for name, obj in me._peers.items():
600 try:
601 peer = correct[name]
602 except KeyError:
603 if T._debug: print '# peer %s vanished' % name
604 del me._peers[name]
605 else:
606 obj.update(peer)
607 for name, peer in start.iteritems():
608 if name in me._peers: continue
609 if startup:
610 if T._debug: print '# setting up peer %s' % name
611 ifname = S.ifname(name)
612 addr = S.addr(name)
613 T.defer(adoptpeer, peer, ifname, *addr)
614 else:
615 if T._debug: print '# adopting new peer %s' % name
616 me.add(peer, True)
617 return me
618
619 def adopted(me):
620 """
621 Returns the list of peers being watched by the Pinger.
622 """
623 return me._peers.keys()
624
fb52c291
MW
625 def find(me, name):
626 """Return the PingPeer with the given name."""
627 return me._peers[name]
628
d64ce4ae
MW
629###--------------------------------------------------------------------------
630### New connections.
631
632def encode_envvars(env, prefix, vars):
633 """
634 Encode the variables in VARS suitably for including in a program
635 environment. Lowercase letters in variable names are forced to uppercase;
636 runs of non-alphanumeric characters are replaced by single underscores; and
637 the PREFIX is prepended. The resulting variables are written to ENV.
638 """
639 for k, v in vars.iteritems():
640 env[prefix + r_bad.sub('_', k.upper())] = v
641
642r_bad = RX.compile(r'[\W_]+')
643def envvars(peer):
644 """
645 Translate the database information for a PEER into a dictionary of
646 environment variables with plausible upper-case names and a P_ prefix.
647 Also collect the crypto information into A_ variables.
648 """
649 env = {}
650 encode_envvars(env, 'P_', dict([(k, peer.get(k)) for k in peer.list()]))
651 encode_envvars(env, 'A_', S.algs(peer.name))
652 return env
653
654def run_ifupdown(what, peer, *args):
655 """
656 Run the interface up/down script for a peer.
657
658 WHAT is 'ifup' or 'ifdown'. PEER names the peer in question. ARGS is a
659 list of arguments to pass to the script, in addition to the peer name.
660
661 The command is run and watched in the background by potwatch.
662 """
663 q = T.Queue()
664 c = Command([what, peer.name], q, what,
665 M.split(peer.get(what), quotep = True)[0] +
666 [peer.name] + list(args),
667 envvars(peer))
668 potwatch(what, peer.name, q)
669
670def adoptpeer(peer, ifname, *addr):
671 """
672 Add a new peer to our collection.
673
674 PEER is the `Peer' object; IFNAME is the interface name for its tunnel; and
675 ADDR is the list of tokens representing its address.
676
677 We try to bring up the interface and provoke a connection to the peer if
678 it's passive.
679 """
680 if peer.has('ifup'):
681 T.Coroutine(run_ifupdown, name = 'ifup %s' % peer.name) \
682 .switch('ifup', peer, ifname, *addr)
683 cmd = peer.get('connect', default = None)
684 if cmd is not None:
685 T.Coroutine(run_connect, name = 'connect %s' % peer.name) \
686 .switch(peer, cmd)
687 if peer.get('watch', filter = boolean, default = False):
688 pinger.add(peer, False)
689
690def disownpeer(peer):
691 """Drop the PEER from the Pinger and put its interface to bed."""
692 try: pinger.kill(peer)
693 except KeyError: pass
694 cmd = peer.get('disconnect', default = None)
695 if cmd is not None:
696 T.Coroutine(run_disconnect, name = 'disconnect %s' % peer.name) \
697 .switch(peer, cmd)
698 if peer.has('ifdown'):
699 T.Coroutine(run_ifupdown, name = 'ifdown %s' % peer.name) \
700 .switch('ifdown', peer)
701
a62f8e8a
MW
702def addpeer(peer, addr):
703 """
704 Process a connect request from a new peer PEER on address ADDR.
705
706 Any existing peer with this name is disconnected from the server.
707 """
708 if peer.name in S.list():
709 S.kill(peer.name)
710 try:
6411163d 711 booltrue = ['t', 'true', 'y', 'yes', 'on']
a62f8e8a
MW
712 S.add(peer.name,
713 tunnel = peer.get('tunnel', None),
714 keepalive = peer.get('keepalive', None),
48b84569 715 key = peer.get('key', None),
fe2a5dcf 716 priv = peer.get('priv', None),
6411163d
MW
717 mobile = peer.get('mobile', 'nil') in booltrue,
718 cork = peer.get('cork', 'nil') in booltrue,
a62f8e8a
MW
719 *addr)
720 except T.TripeError, exc:
721 raise T.TripeJobError(*exc.args)
722
d64ce4ae
MW
723## Dictionary mapping challenges to waiting passive-connection coroutines.
724chalmap = {}
725
726def notify(_, code, *rest):
727 """
728 Watch for notifications.
729
730 We trap ADD and KILL notifications, and send them straight to adoptpeer and
731 disownpeer respectively; and dispatch GREET notifications to the
732 corresponding waiting coroutine.
733 """
734 if code == 'ADD':
735 try: p = Peer(rest[0])
736 except KeyError: return
737 adoptpeer(p, *rest[1:])
738 elif code == 'KILL':
739 try: p = Peer(rest[0])
740 except KeyError: return
741 disownpeer(p, *rest[1:])
742 elif code == 'GREET':
743 chal = rest[0]
744 try: cr = chalmap[chal]
745 except KeyError: pass
746 else: cr.switch(rest[1:])
747
748###--------------------------------------------------------------------------
749### Command implementation.
750
751def cmd_kick(name):
752 """
753 kick NAME: Force a new connection attempt for the NAMEd peer.
754 """
fb52c291
MW
755 try: pp = pinger.find(name)
756 except KeyError: raise T.TripeJobError('peer-not-adopted', name)
d64ce4ae
MW
757 try: peer = Peer(name)
758 except KeyError: raise T.TripeJobError('unknown-peer', name)
8bd108e4
MW
759 conn = peer.get('connect', None)
760 if conn: T.spawn(run_connect, peer, peer.get('connect'))
761 else: T.spawn(lambda p: S.forcekx(p.name), peer)
d64ce4ae
MW
762
763def cmd_adopted():
764 """
765 adopted: Report a list of adopted peers.
766 """
767 for name in pinger.adopted():
768 T.svcinfo(name)
769
a62f8e8a
MW
770def cmd_active(name):
771 """
772 active NAME: Handle an active connection request for the peer called NAME.
773
774 The appropriate address is read from the database automatically.
775 """
d64ce4ae
MW
776 try: peer = Peer(name)
777 except KeyError: raise T.TripeJobError('unknown-peer', name)
a62f8e8a
MW
778 addr = peer.get('peer')
779 if addr == 'PASSIVE':
780 raise T.TripeJobError('passive-peer', name)
781 addpeer(peer, M.split(addr, quotep = True)[0])
782
d64ce4ae 783def cmd_listactive():
a62f8e8a
MW
784 """
785 list: Report a list of the available active peers.
786 """
787 cdb = CDB.init(opts.cdb)
788 for key in cdb.keys():
789 if key.startswith('P') and Peer(key[1:]).get('peer', '') != 'PASSIVE':
790 T.svcinfo(key[1:])
791
792def cmd_info(name):
793 """
794 info NAME: Report the database entries for the named peer.
795 """
d64ce4ae
MW
796 try: peer = Peer(name)
797 except KeyError: raise T.TripeJobError('unknown-peer', name)
b9dedfa6
MW
798 d = {}
799 try: pp = pinger.find(name)
800 except KeyError: pass
801 else: d.update(pp.info())
802 items = list(peer.list()) + d.keys()
a62f8e8a
MW
803 items.sort()
804 for i in items:
b9dedfa6
MW
805 try: v = d[i]
806 except KeyError: v = peer.get(i)
807 T.svcinfo('%s=%s' % (i, v))
a62f8e8a 808
d3731285
MW
809def cmd_userpeer(user):
810 """
811 userpeer USER: Report the peer name for the named user.
812 """
d64ce4ae
MW
813 try: name = CDB.init(opts.cdb)['U' + user]
814 except KeyError: raise T.TripeJobError('unknown-user', user)
815 T.svcinfo(name)
a62f8e8a
MW
816
817def cmd_passive(*args):
818 """
819 passive [OPTIONS] USER: Await the arrival of the named USER.
820
821 Report a challenge; when (and if!) the server receives a greeting quoting
822 this challenge, add the corresponding peer to the server.
823 """
824 timeout = 30
825 op = T.OptParse(args, ['-timeout'])
826 for opt in op:
827 if opt == '-timeout':
828 timeout = T.timespec(op.arg())
829 user, = op.rest(1, 1)
d64ce4ae
MW
830 try: name = CDB.init(opts.cdb)['U' + user]
831 except KeyError: raise T.TripeJobError('unknown-user', user)
832 try: peer = Peer(name)
833 except KeyError: raise T.TripeJobError('unknown-peer', name)
a62f8e8a
MW
834 chal = S.getchal()
835 cr = T.Coroutine.getcurrent()
836 timer = M.SelTimer(time() + timeout, lambda: cr.switch(None))
837 try:
838 T.svcinfo(chal)
839 chalmap[chal] = cr
840 addr = cr.parent.switch()
841 if addr is None:
842 raise T.TripeJobError('connect-timeout')
d64ce4ae 843 addpeer(peer, addr)
a62f8e8a
MW
844 finally:
845 del chalmap[chal]
846
a62f8e8a
MW
847###--------------------------------------------------------------------------
848### Start up.
849
850def setup():
851 """
852 Service setup.
853
d64ce4ae
MW
854 Register the notification watcher, rescan the peers, and add automatic
855 active peers.
a62f8e8a
MW
856 """
857 S.handler['NOTE'] = notify
858 S.watch('+n')
d64ce4ae
MW
859
860 pinger.rescan(opts.startup)
861
a62f8e8a
MW
862 if opts.startup:
863 cdb = CDB.init(opts.cdb)
864 try:
865 autos = cdb['%AUTO']
866 except KeyError:
867 autos = ''
868 for name in M.split(autos)[0]:
869 try:
870 peer = Peer(name, cdb)
871 addpeer(peer, M.split(peer.get('peer'), quotep = True)[0])
872 except T.TripeJobError, err:
873 S.warn('connect', 'auto-add-failed', name, *err.args)
874
d64ce4ae
MW
875def init():
876 """
877 Initialization to be done before service startup.
878 """
879 global errorwatch, childwatch, pinger
880 errorwatch = ErrorWatch()
881 childwatch = ChildWatch()
882 pinger = Pinger()
883 T.Coroutine(dbwatch, name = 'dbwatch').switch()
884 errorwatch.switch()
885 pinger.switch()
886
a62f8e8a
MW
887def parse_options():
888 """
889 Parse the command-line options.
890
891 Automatically changes directory to the requested configdir, and turns on
892 debugging. Returns the options object.
893 """
894 op = OptionParser(usage = '%prog [-a FILE] [-d DIR]',
895 version = '%%prog %s' % VERSION)
896
897 op.add_option('-a', '--admin-socket',
898 metavar = 'FILE', dest = 'tripesock', default = T.tripesock,
899 help = 'Select socket to connect to [default %default]')
900 op.add_option('-d', '--directory',
901 metavar = 'DIR', dest = 'dir', default = T.configdir,
902 help = 'Select current diretory [default %default]')
903 op.add_option('-p', '--peerdb',
904 metavar = 'FILE', dest = 'cdb', default = T.peerdb,
905 help = 'Select peers database [default %default]')
906 op.add_option('--daemon', dest = 'daemon',
907 default = False, action = 'store_true',
908 help = 'Become a daemon after successful initialization')
909 op.add_option('--debug', dest = 'debug',
910 default = False, action = 'store_true',
911 help = 'Emit debugging trace information')
912 op.add_option('--startup', dest = 'startup',
913 default = False, action = 'store_true',
914 help = 'Being called as part of the server startup')
915
916 opts, args = op.parse_args()
917 if args: op.error('no arguments permitted')
918 OS.chdir(opts.dir)
919 T._debug = opts.debug
920 return opts
921
922## Service table, for running manually.
d64ce4ae
MW
923service_info = [('connect', T.VERSION, {
924 'adopted': (0, 0, '', cmd_adopted),
925 'kick': (1, 1, 'PEER', cmd_kick),
a62f8e8a
MW
926 'passive': (1, None, '[OPTIONS] USER', cmd_passive),
927 'active': (1, 1, 'PEER', cmd_active),
928 'info': (1, 1, 'PEER', cmd_info),
d64ce4ae 929 'list-active': (0, 0, '', cmd_listactive),
d3731285 930 'userpeer': (1, 1, 'USER', cmd_userpeer)
a62f8e8a
MW
931})]
932
933if __name__ == '__main__':
934 opts = parse_options()
935 T.runservices(opts.tripesock, service_info,
d64ce4ae 936 init = init, setup = setup,
a62f8e8a
MW
937 daemon = opts.daemon)
938
939###----- That's all, folks --------------------------------------------------