contrib/README: Add missing descriptions of things added over the years.
[tripe] / svc / connect.in
1 #! @PYTHON@
2 ### -*-python-*-
3 ###
4 ### Connect to remote peers, and keep track of them
5 ###
6 ### (c) 2007 Straylight/Edgeware
7 ###
8
9 ###----- Licensing notice ---------------------------------------------------
10 ###
11 ### This file is part of Trivial IP Encryption (TrIPE).
12 ###
13 ### TrIPE is free software: you can redistribute it and/or modify it under
14 ### the terms of the GNU General Public License as published by the Free
15 ### Software Foundation; either version 3 of the License, or (at your
16 ### option) any later version.
17 ###
18 ### TrIPE is distributed in the hope that it will be useful, but WITHOUT
19 ### ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
20 ### FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
21 ### for more details.
22 ###
23 ### You should have received a copy of the GNU General Public License
24 ### along with TrIPE. If not, see <https://www.gnu.org/licenses/>.
25
26 VERSION = '@VERSION@'
27
28 ###--------------------------------------------------------------------------
29 ### External dependencies.
30
31 from optparse import OptionParser
32 import tripe as T
33 import os as OS
34 import signal as SIG
35 import errno as E
36 import cdb as CDB
37 import mLib as M
38 import re as RX
39 from time import time
40 import subprocess as PROC
41
42 S = T.svcmgr
43
44 ###--------------------------------------------------------------------------
45 ### Running auxiliary commands.
46
47 class SelLineQueue (M.SelLineBuffer):
48 """Glues the select-line-buffer into the coroutine queue system."""
49
50 def __new__(cls, file, queue, tag, kind):
51 """See __init__ for documentation."""
52 return M.SelLineBuffer.__new__(cls, file.fileno())
53
54 def __init__(me, file, queue, tag, kind):
55 """
56 Initialize a new line-reading adaptor.
57
58 The adaptor reads lines from FILE. Each line is inserted as a message of
59 the stated KIND, bearing the TAG, into the QUEUE. End-of-file is
60 represented as None.
61 """
62 me._q = queue
63 me._file = file
64 me._tag = tag
65 me._kind = kind
66 me.enable()
67
68 @T._callback
69 def line(me, line):
70 me._q.put((me._tag, me._kind, line))
71
72 @T._callback
73 def eof(me):
74 me.disable()
75 me._q.put((me._tag, me._kind, None))
76
77 class ErrorWatch (T.Coroutine):
78 """
79 An object which watches stderr streams for errors and converts them into
80 warnings of the form
81
82 WARN connect INFO stderr LINE
83
84 The INFO is a list of tokens associated with the file when it was
85 registered.
86
87 Usually there is a single ErrorWatch object, called errorwatch.
88 """
89
90 def __init__(me):
91 """Initialization: there are no arguments."""
92 T.Coroutine.__init__(me)
93 me._q = T.Queue()
94 me._map = {}
95 me._seq = 1
96
97 def watch(me, file, info):
98 """
99 Adds FILE to the collection of files to watch.
100
101 INFO will be written in the warning messages from this FILE. Returns a
102 sequence number which can be used to unregister the file again.
103 """
104 seq = me._seq
105 me._seq += 1
106 me._map[seq] = info, SelLineQueue(file, me._q, seq, 'stderr')
107 return seq
108
109 def unwatch(me, seq):
110 """Stop watching the file with sequence number SEQ."""
111 del me._map[seq]
112 return me
113
114 def run(me):
115 """
116 Coroutine function: read items from the queue and report them.
117
118 Unregisters files automatically when they reach EOF.
119 """
120 while True:
121 seq, _, line = me._q.get()
122 if line is None:
123 me.unwatch(seq)
124 else:
125 S.warn(*['connect'] + me._map[seq][0] + ['stderr', line])
126
127 def dbwatch():
128 """
129 Coroutine function: wake up every minute and notice changes to the
130 database. When a change happens, tell the Pinger (q.v.) to rescan its
131 peers.
132 """
133 cr = T.Coroutine.getcurrent()
134 main = cr.parent
135 fw = M.FWatch(opts.cdb)
136 while True:
137 timer = M.SelTimer(time() + 60, lambda: cr.switch())
138 main.switch()
139 if fw.update():
140 pinger.rescan(False)
141 S.notify('connect', 'peerdb-update')
142
143 class ChildWatch (M.SelSignal):
144 """
145 An object which watches for specified processes exiting and reports
146 terminations by writing items of the form (TAG, 'exit', RESULT) to a queue.
147
148 There is usually only one ChildWatch object, called childwatch.
149 """
150
151 def __new__(cls):
152 """Initialize the child-watcher."""
153 return M.SelSignal.__new__(cls, SIG.SIGCHLD)
154
155 def __init__(me):
156 """Initialize the child-watcher."""
157 me._pid = {}
158 me.enable()
159
160 def watch(me, pid, queue, tag):
161 """
162 Register PID as a child to watch. If it exits, write (TAG, 'exit', CODE)
163 to the QUEUE, where CODE is one of
164
165 * None (successful termination)
166 * ['exit-nonzero', CODE] (CODE is a string!)
167 * ['exit-signal', 'S' + CODE] (CODE is the signal number as a string)
168 * ['exit-unknown', STATUS] (STATUS is the entire exit status, in hex)
169 """
170 me._pid[pid] = queue, tag
171 return me
172
173 def unwatch(me, pid):
174 """Unregister PID as a child to watch."""
175 del me._pid[pid]
176 return me
177
178 @T._callback
179 def signalled(me):
180 """
181 Called when child processes exit: collect exit statuses and report
182 failures.
183 """
184 while True:
185 try:
186 pid, status = OS.waitpid(-1, OS.WNOHANG)
187 except OSError, exc:
188 if exc.errno == E.ECHILD:
189 break
190 if pid == 0:
191 break
192 if pid not in me._pid:
193 continue
194 queue, tag = me._pid[pid]
195 if OS.WIFEXITED(status):
196 exit = OS.WEXITSTATUS(status)
197 if exit == 0:
198 code = None
199 else:
200 code = ['exit-nonzero', str(exit)]
201 elif OS.WIFSIGNALED(status):
202 code = ['exit-signal', 'S' + str(OS.WTERMSIG(status))]
203 else:
204 code = ['exit-unknown', hex(status)]
205 queue.put((tag, 'exit', code))
206
207 class Command (object):
208 """
209 Represents a running command.
210
211 This class is the main interface to the machery provided by the ChildWatch
212 and ErrorWatch objects. See also potwatch.
213 """
214
215 def __init__(me, info, queue, tag, args, env):
216 """
217 Start a new child process.
218
219 The ARGS are a list of arguments to be given to the child process. The
220 ENV is either None or a dictionary of environment variable assignments to
221 override the extant environment. INFO is a list of tokens to be included
222 in warnings about the child's stderr output. If the child writes a line
223 to standard output, put (TAG, 'stdout', LINE) to the QUEUE. When the
224 child exits, write (TAG, 'exit', CODE) to the QUEUE.
225 """
226 me._info = info
227 me._q = queue
228 me._tag = tag
229 myenv = OS.environ.copy()
230 if env: myenv.update(env)
231 me._proc = PROC.Popen(args = args, env = myenv, bufsize = 1,
232 stdout = PROC.PIPE, stderr = PROC.PIPE)
233 me._lq = SelLineQueue(me._proc.stdout, queue, tag, 'stdout')
234 errorwatch.watch(me._proc.stderr, info)
235 childwatch.watch(me._proc.pid, queue, tag)
236
237 def __del__(me):
238 """
239 If I've been forgotten then stop watching for termination.
240 """
241 childwatch.unwatch(me._proc.pid)
242
243 def potwatch(what, name, q):
244 """
245 Watch the queue Q for activity as reported by a Command object.
246
247 Information from the process's stdout is reported as
248
249 NOTE WHAT NAME stdout LINE
250
251 abnormal termination is reported as
252
253 WARN WHAT NAME CODE
254
255 where CODE is what the ChildWatch wrote.
256 """
257 eofp = deadp = False
258 while not deadp or not eofp:
259 _, kind, more = q.get()
260 if kind == 'stdout':
261 if more is None:
262 eofp = True
263 else:
264 S.notify('connect', what, name, 'stdout', more)
265 elif kind == 'exit':
266 if more: S.warn('connect', what, name, *more)
267 deadp = True
268
269 ###--------------------------------------------------------------------------
270 ### Peer database utilities.
271
272 _magic = ['_magic'] # An object distinct from all others
273
274 class Peer (object):
275 """Representation of a peer in the database."""
276
277 def __init__(me, peer, cdb = None):
278 """
279 Create a new peer, named PEER.
280
281 Information about the peer is read from the database CDB, or the default
282 one given on the command-line.
283 """
284 me.name = peer
285 record = (cdb or CDB.init(opts.cdb))['P' + peer]
286 me.__dict__.update(M.URLDecode(record, semip = True))
287
288 def get(me, key, default = _magic, filter = None):
289 """
290 Get the information stashed under KEY from the peer's database record.
291
292 If DEFAULT is given, then use it if the database doesn't contain the
293 necessary information. If no DEFAULT is given, then report an error. If
294 a FILTER function is given then apply it to the information from the
295 database before returning it.
296 """
297 attr = me.__dict__.get(key, default)
298 if attr is _magic:
299 raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key)
300 elif filter is not None:
301 attr = filter(attr)
302 return attr
303
304 def has(me, key):
305 """
306 Return whether the peer's database record has the KEY.
307 """
308 return key in me.__dict__
309
310 def list(me):
311 """
312 Iterate over the available keys in the peer's database record.
313 """
314 return me.__dict__.iterkeys()
315
316 def boolean(value):
317 """Parse VALUE as a boolean."""
318 return value in ['t', 'true', 'y', 'yes', 'on']
319
320 ###--------------------------------------------------------------------------
321 ### Waking up and watching peers.
322
323 def run_connect(peer, cmd):
324 """
325 Start the job of connecting to the passive PEER.
326
327 The CMD string is a shell command which will connect to the peer (via some
328 back-channel, say ssh and userv), issue a command
329
330 SVCSUBMIT connect passive [OPTIONS] USER
331
332 and write the resulting challenge to standard error.
333 """
334 q = T.Queue()
335 cmd = Command(['connect', peer.name], q, 'connect',
336 ['/bin/sh', '-c', cmd], None)
337 _, kind, more = q.peek()
338 if kind == 'stdout':
339 if more is None:
340 S.warn('connect', 'connect', peer.name, 'unexpected-eof')
341 else:
342 chal = more
343 S.greet(peer.name, chal)
344 q.get()
345 potwatch('connect', peer.name, q)
346
347 def run_disconnect(peer, cmd):
348 """
349 Start the job of disconnecting from a passive PEER.
350
351 The CMD string is a shell command which will disconnect from the peer.
352 """
353 q = T.Queue()
354 cmd = Command(['disconnect', peer.name], q, 'disconnect',
355 ['/bin/sh', '-c', cmd], None)
356 potwatch('disconnect', peer.name, q)
357
358 _pingseq = 0
359 class PingPeer (object):
360 """
361 Object representing a peer which we are pinging to ensure that it is still
362 present.
363
364 PingPeer objects are held by the Pinger (q.v.). The Pinger maintains an
365 event queue -- which saves us from having an enormous swarm of coroutines
366 -- but most of the actual work is done here.
367
368 In order to avoid confusion between different PingPeer instances for the
369 same actual peer, each PingPeer has a sequence number (its `seq'
370 attribute). Events for the PingPeer are identified by a (PEER, SEQ) pair.
371 (Using the PingPeer instance itself will prevent garbage collection of
372 otherwise defunct instances.)
373 """
374
375 def __init__(me, pinger, queue, peer, pingnow):
376 """
377 Create a new PingPeer.
378
379 The PINGER is the Pinger object we should send the results to. This is
380 used when we remove ourselves, if the peer has been explicitly removed.
381
382 The QUEUE is the event queue on which timer and ping-command events
383 should be written.
384
385 The PEER is a `Peer' object describing the peer.
386
387 If PINGNOW is true, then immediately start pinging the peer. Otherwise
388 wait until the usual retry interval.
389 """
390 global _pingseq
391 me._pinger = pinger
392 me._q = queue
393 me._peer = peer.name
394 me.update(peer)
395 me.seq = _pingseq
396 _pingseq += 1
397 me._failures = 0
398 if pingnow:
399 me._timer = None
400 me._ping()
401 else:
402 me._timer = M.SelTimer(time() + me._every, me._time)
403
404 def update(me, peer):
405 """
406 Refreshes the timer parameters for this peer. We don't, however,
407 immediately reschedule anything: that will happen next time anything
408 interesting happens.
409 """
410 if peer is None: peer = Peer(me._peer)
411 assert peer.name == me._peer
412 me._every = peer.get('every', filter = T.timespec, default = 120)
413 me._timeout = peer.get('timeout', filter = T.timespec, default = 10)
414 me._retries = peer.get('retries', filter = int, default = 5)
415 me._connectp = peer.has('connect')
416 return me
417
418 def _ping(me):
419 """
420 Send a ping to the peer; the result is sent to the Pinger's event queue.
421 """
422 S.rawcommand(T.TripeAsynchronousCommand(
423 me._q, (me._peer, me.seq),
424 ['EPING',
425 '-background', S.bgtag(),
426 '-timeout', str(me._timeout),
427 '--',
428 me._peer]))
429
430 def _reconnect(me):
431 peer = Peer(me._peer)
432 if me._connectp:
433 S.warn('connect', 'reconnecting', me._peer)
434 S.forcekx(me._peer)
435 T.spawn(run_connect, peer, peer.get('connect'))
436 me._timer = M.SelTimer(time() + me._every, me._time)
437 else:
438 S.kill(me._peer)
439
440 def event(me, code, stuff):
441 """
442 Respond to an event which happened to this peer.
443
444 Timer events indicate that we should start a new ping. (The server has
445 its own timeout which detects lost packets.)
446
447 We trap unknown-peer responses and detach from the Pinger.
448
449 If the ping fails and we run out of retries, we attempt to restart the
450 connection.
451 """
452 if code == 'TIMER':
453 me._failures = 0
454 me._ping()
455 elif code == 'FAIL':
456 S.notify('connect', 'ping-failed', me._peer, *stuff)
457 if not stuff:
458 pass
459 elif stuff[0] == 'unknown-peer':
460 me._pinger.kill(me._peer)
461 elif stuff[0] == 'ping-send-failed':
462 me._reconnect()
463 elif code == 'INFO':
464 if stuff[0] == 'ping-ok':
465 if me._failures > 0:
466 S.warn('connect', 'ping-ok', me._peer)
467 me._timer = M.SelTimer(time() + me._every, me._time)
468 elif stuff[0] == 'ping-timeout':
469 me._failures += 1
470 S.warn('connect', 'ping-timeout', me._peer,
471 'attempt', str(me._failures), 'of', str(me._retries))
472 if me._failures < me._retries:
473 me._ping()
474 else:
475 me._reconnect()
476 elif stuff[0] == 'ping-peer-died':
477 me._pinger.kill(me._peer)
478
479 @T._callback
480 def _time(me):
481 """
482 Handle timer callbacks by posting a timeout event on the queue.
483 """
484 me._timer = None
485 me._q.put(((me._peer, me.seq), 'TIMER', None))
486
487 def __str__(me):
488 return 'PingPeer(%s, %d, f = %d)' % (me._peer, me.seq, me._failures)
489 def __repr__(me):
490 return str(me)
491
492 class Pinger (T.Coroutine):
493 """
494 The Pinger keeps track of the peers which we expect to be connected and
495 takes action if they seem to stop responding.
496
497 There is usually only one Pinger, called pinger.
498
499 The Pinger maintains a collection of PingPeer objects, and an event queue.
500 The PingPeers direct the results of their pings, and timer events, to the
501 event queue. The Pinger's coroutine picks items off the queue and
502 dispatches them back to the PingPeers as appropriate.
503 """
504
505 def __init__(me):
506 """Initialize the Pinger."""
507 T.Coroutine.__init__(me)
508 me._peers = {}
509 me._q = T.Queue()
510
511 def run(me):
512 """
513 Coroutine function: reads the pinger queue and sends events to the
514 PingPeer objects they correspond to.
515 """
516 while True:
517 (peer, seq), code, stuff = me._q.get()
518 if peer in me._peers and seq == me._peers[peer].seq:
519 me._peers[peer].event(code, stuff)
520
521 def add(me, peer, pingnow):
522 """
523 Add PEER to the collection of peers under the Pinger's watchful eye.
524 The arguments are as for PingPeer: see above.
525 """
526 me._peers[peer.name] = PingPeer(me, me._q, peer, pingnow)
527 return me
528
529 def kill(me, peername):
530 """Remove PEER from the peers being watched by the Pinger."""
531 del me._peers[peername]
532 return me
533
534 def rescan(me, startup):
535 """
536 General resynchronization method.
537
538 We scan the list of peers (with connect scripts) known at the server.
539 Any which are known to the Pinger but aren't known to the server are
540 removed from our list; newly arrived peers are added. (Note that a peer
541 can change state here either due to the server sneakily changing its list
542 without issuing notifications or, more likely, the database changing its
543 idea of whether a peer is interesting.) Finally, PingPeers which are
544 still present are prodded to update their timing parameters.
545
546 This method is called once at startup to pick up the peers already
547 installed, and again by the dbwatcher coroutine when it detects a change
548 to the database.
549 """
550 if T._debug: print '# rescan peers'
551 correct = {}
552 start = {}
553 for name in S.list():
554 try: peer = Peer(name)
555 except KeyError: continue
556 if peer.get('watch', filter = boolean, default = False):
557 if T._debug: print '# interesting peer %s' % peer
558 correct[peer.name] = start[peer.name] = peer
559 elif startup:
560 if T._debug: print '# peer %s ready for adoption' % peer
561 start[peer.name] = peer
562 for name, obj in me._peers.items():
563 try:
564 peer = correct[name]
565 except KeyError:
566 if T._debug: print '# peer %s vanished' % name
567 del me._peers[name]
568 else:
569 obj.update(peer)
570 for name, peer in start.iteritems():
571 if name in me._peers: continue
572 if startup:
573 if T._debug: print '# setting up peer %s' % name
574 ifname = S.ifname(name)
575 addr = S.addr(name)
576 T.defer(adoptpeer, peer, ifname, *addr)
577 else:
578 if T._debug: print '# adopting new peer %s' % name
579 me.add(peer, True)
580 return me
581
582 def adopted(me):
583 """
584 Returns the list of peers being watched by the Pinger.
585 """
586 return me._peers.keys()
587
588 ###--------------------------------------------------------------------------
589 ### New connections.
590
591 def encode_envvars(env, prefix, vars):
592 """
593 Encode the variables in VARS suitably for including in a program
594 environment. Lowercase letters in variable names are forced to uppercase;
595 runs of non-alphanumeric characters are replaced by single underscores; and
596 the PREFIX is prepended. The resulting variables are written to ENV.
597 """
598 for k, v in vars.iteritems():
599 env[prefix + r_bad.sub('_', k.upper())] = v
600
601 r_bad = RX.compile(r'[\W_]+')
602 def envvars(peer):
603 """
604 Translate the database information for a PEER into a dictionary of
605 environment variables with plausible upper-case names and a P_ prefix.
606 Also collect the crypto information into A_ variables.
607 """
608 env = {}
609 encode_envvars(env, 'P_', dict([(k, peer.get(k)) for k in peer.list()]))
610 encode_envvars(env, 'A_', S.algs(peer.name))
611 return env
612
613 def run_ifupdown(what, peer, *args):
614 """
615 Run the interface up/down script for a peer.
616
617 WHAT is 'ifup' or 'ifdown'. PEER names the peer in question. ARGS is a
618 list of arguments to pass to the script, in addition to the peer name.
619
620 The command is run and watched in the background by potwatch.
621 """
622 q = T.Queue()
623 c = Command([what, peer.name], q, what,
624 M.split(peer.get(what), quotep = True)[0] +
625 [peer.name] + list(args),
626 envvars(peer))
627 potwatch(what, peer.name, q)
628
629 def adoptpeer(peer, ifname, *addr):
630 """
631 Add a new peer to our collection.
632
633 PEER is the `Peer' object; IFNAME is the interface name for its tunnel; and
634 ADDR is the list of tokens representing its address.
635
636 We try to bring up the interface and provoke a connection to the peer if
637 it's passive.
638 """
639 if peer.has('ifup'):
640 T.Coroutine(run_ifupdown, name = 'ifup %s' % peer.name) \
641 .switch('ifup', peer, ifname, *addr)
642 cmd = peer.get('connect', default = None)
643 if cmd is not None:
644 T.Coroutine(run_connect, name = 'connect %s' % peer.name) \
645 .switch(peer, cmd)
646 if peer.get('watch', filter = boolean, default = False):
647 pinger.add(peer, False)
648
649 def disownpeer(peer):
650 """Drop the PEER from the Pinger and put its interface to bed."""
651 try: pinger.kill(peer)
652 except KeyError: pass
653 cmd = peer.get('disconnect', default = None)
654 if cmd is not None:
655 T.Coroutine(run_disconnect, name = 'disconnect %s' % peer.name) \
656 .switch(peer, cmd)
657 if peer.has('ifdown'):
658 T.Coroutine(run_ifupdown, name = 'ifdown %s' % peer.name) \
659 .switch('ifdown', peer)
660
661 def addpeer(peer, addr):
662 """
663 Process a connect request from a new peer PEER on address ADDR.
664
665 Any existing peer with this name is disconnected from the server.
666 """
667 if peer.name in S.list():
668 S.kill(peer.name)
669 try:
670 booltrue = ['t', 'true', 'y', 'yes', 'on']
671 S.add(peer.name,
672 tunnel = peer.get('tunnel', None),
673 keepalive = peer.get('keepalive', None),
674 key = peer.get('key', None),
675 priv = peer.get('priv', None),
676 mobile = peer.get('mobile', 'nil') in booltrue,
677 cork = peer.get('cork', 'nil') in booltrue,
678 *addr)
679 except T.TripeError, exc:
680 raise T.TripeJobError(*exc.args)
681
682 ## Dictionary mapping challenges to waiting passive-connection coroutines.
683 chalmap = {}
684
685 def notify(_, code, *rest):
686 """
687 Watch for notifications.
688
689 We trap ADD and KILL notifications, and send them straight to adoptpeer and
690 disownpeer respectively; and dispatch GREET notifications to the
691 corresponding waiting coroutine.
692 """
693 if code == 'ADD':
694 try: p = Peer(rest[0])
695 except KeyError: return
696 adoptpeer(p, *rest[1:])
697 elif code == 'KILL':
698 try: p = Peer(rest[0])
699 except KeyError: return
700 disownpeer(p, *rest[1:])
701 elif code == 'GREET':
702 chal = rest[0]
703 try: cr = chalmap[chal]
704 except KeyError: pass
705 else: cr.switch(rest[1:])
706
707 ###--------------------------------------------------------------------------
708 ### Command implementation.
709
710 def cmd_kick(name):
711 """
712 kick NAME: Force a new connection attempt for the NAMEd peer.
713 """
714 if name not in pinger.adopted():
715 raise T.TripeJobError('peer-not-adopted', name)
716 try: peer = Peer(name)
717 except KeyError: raise T.TripeJobError('unknown-peer', name)
718 T.spawn(run_connect, peer, peer.get('connect'))
719
720 def cmd_adopted():
721 """
722 adopted: Report a list of adopted peers.
723 """
724 for name in pinger.adopted():
725 T.svcinfo(name)
726
727 def cmd_active(name):
728 """
729 active NAME: Handle an active connection request for the peer called NAME.
730
731 The appropriate address is read from the database automatically.
732 """
733 try: peer = Peer(name)
734 except KeyError: raise T.TripeJobError('unknown-peer', name)
735 addr = peer.get('peer')
736 if addr == 'PASSIVE':
737 raise T.TripeJobError('passive-peer', name)
738 addpeer(peer, M.split(addr, quotep = True)[0])
739
740 def cmd_listactive():
741 """
742 list: Report a list of the available active peers.
743 """
744 cdb = CDB.init(opts.cdb)
745 for key in cdb.keys():
746 if key.startswith('P') and Peer(key[1:]).get('peer', '') != 'PASSIVE':
747 T.svcinfo(key[1:])
748
749 def cmd_info(name):
750 """
751 info NAME: Report the database entries for the named peer.
752 """
753 try: peer = Peer(name)
754 except KeyError: raise T.TripeJobError('unknown-peer', name)
755 items = list(peer.list())
756 items.sort()
757 for i in items:
758 T.svcinfo('%s=%s' % (i, peer.get(i)))
759
760 def cmd_userpeer(user):
761 """
762 userpeer USER: Report the peer name for the named user.
763 """
764 try: name = CDB.init(opts.cdb)['U' + user]
765 except KeyError: raise T.TripeJobError('unknown-user', user)
766 T.svcinfo(name)
767
768 def cmd_passive(*args):
769 """
770 passive [OPTIONS] USER: Await the arrival of the named USER.
771
772 Report a challenge; when (and if!) the server receives a greeting quoting
773 this challenge, add the corresponding peer to the server.
774 """
775 timeout = 30
776 op = T.OptParse(args, ['-timeout'])
777 for opt in op:
778 if opt == '-timeout':
779 timeout = T.timespec(op.arg())
780 user, = op.rest(1, 1)
781 try: name = CDB.init(opts.cdb)['U' + user]
782 except KeyError: raise T.TripeJobError('unknown-user', user)
783 try: peer = Peer(name)
784 except KeyError: raise T.TripeJobError('unknown-peer', name)
785 chal = S.getchal()
786 cr = T.Coroutine.getcurrent()
787 timer = M.SelTimer(time() + timeout, lambda: cr.switch(None))
788 try:
789 T.svcinfo(chal)
790 chalmap[chal] = cr
791 addr = cr.parent.switch()
792 if addr is None:
793 raise T.TripeJobError('connect-timeout')
794 addpeer(peer, addr)
795 finally:
796 del chalmap[chal]
797
798 ###--------------------------------------------------------------------------
799 ### Start up.
800
801 def setup():
802 """
803 Service setup.
804
805 Register the notification watcher, rescan the peers, and add automatic
806 active peers.
807 """
808 S.handler['NOTE'] = notify
809 S.watch('+n')
810
811 pinger.rescan(opts.startup)
812
813 if opts.startup:
814 cdb = CDB.init(opts.cdb)
815 try:
816 autos = cdb['%AUTO']
817 except KeyError:
818 autos = ''
819 for name in M.split(autos)[0]:
820 try:
821 peer = Peer(name, cdb)
822 addpeer(peer, M.split(peer.get('peer'), quotep = True)[0])
823 except T.TripeJobError, err:
824 S.warn('connect', 'auto-add-failed', name, *err.args)
825
826 def init():
827 """
828 Initialization to be done before service startup.
829 """
830 global errorwatch, childwatch, pinger
831 errorwatch = ErrorWatch()
832 childwatch = ChildWatch()
833 pinger = Pinger()
834 T.Coroutine(dbwatch, name = 'dbwatch').switch()
835 errorwatch.switch()
836 pinger.switch()
837
838 def parse_options():
839 """
840 Parse the command-line options.
841
842 Automatically changes directory to the requested configdir, and turns on
843 debugging. Returns the options object.
844 """
845 op = OptionParser(usage = '%prog [-a FILE] [-d DIR]',
846 version = '%%prog %s' % VERSION)
847
848 op.add_option('-a', '--admin-socket',
849 metavar = 'FILE', dest = 'tripesock', default = T.tripesock,
850 help = 'Select socket to connect to [default %default]')
851 op.add_option('-d', '--directory',
852 metavar = 'DIR', dest = 'dir', default = T.configdir,
853 help = 'Select current diretory [default %default]')
854 op.add_option('-p', '--peerdb',
855 metavar = 'FILE', dest = 'cdb', default = T.peerdb,
856 help = 'Select peers database [default %default]')
857 op.add_option('--daemon', dest = 'daemon',
858 default = False, action = 'store_true',
859 help = 'Become a daemon after successful initialization')
860 op.add_option('--debug', dest = 'debug',
861 default = False, action = 'store_true',
862 help = 'Emit debugging trace information')
863 op.add_option('--startup', dest = 'startup',
864 default = False, action = 'store_true',
865 help = 'Being called as part of the server startup')
866
867 opts, args = op.parse_args()
868 if args: op.error('no arguments permitted')
869 OS.chdir(opts.dir)
870 T._debug = opts.debug
871 return opts
872
873 ## Service table, for running manually.
874 service_info = [('connect', T.VERSION, {
875 'adopted': (0, 0, '', cmd_adopted),
876 'kick': (1, 1, 'PEER', cmd_kick),
877 'passive': (1, None, '[OPTIONS] USER', cmd_passive),
878 'active': (1, 1, 'PEER', cmd_active),
879 'info': (1, 1, 'PEER', cmd_info),
880 'list-active': (0, 0, '', cmd_listactive),
881 'userpeer': (1, 1, 'USER', cmd_userpeer)
882 })]
883
884 if __name__ == '__main__':
885 opts = parse_options()
886 T.runservices(opts.tripesock, service_info,
887 init = init, setup = setup,
888 daemon = opts.daemon)
889
890 ###----- That's all, folks --------------------------------------------------