py/tripe.py.in: Raise an error if a command token contains a newline.
[tripe] / py / tripe.py.in
CommitLineData
2fa80010
MW
1### -*-python-*-
2###
3### Administration connection with tripe server
4###
5### (c) 2006 Straylight/Edgeware
6###
7
8###----- Licensing notice ---------------------------------------------------
9###
10### This file is part of Trivial IP Encryption (TrIPE).
11###
11ad66c2
MW
12### TrIPE is free software: you can redistribute it and/or modify it under
13### the terms of the GNU General Public License as published by the Free
14### Software Foundation; either version 3 of the License, or (at your
15### option) any later version.
2fa80010 16###
11ad66c2
MW
17### TrIPE is distributed in the hope that it will be useful, but WITHOUT
18### ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
19### FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
20### for more details.
2fa80010
MW
21###
22### You should have received a copy of the GNU General Public License
11ad66c2 23### along with TrIPE. If not, see <https://www.gnu.org/licenses/>.
2fa80010
MW
24
25"""
26This module provides classes and functions for connecting to a running tripe
27server, sending it commands, receiving and processing replies, and
28implementing services.
29
30Rather than end up in lost in a storm of little event-driven classes, or a
31morass of concurrent threads, the module uses coroutines to present a fairly
32simple function call/return interface to potentially long-running commands
a62f8e8a 33which must run without blocking the main process. It assumes a coroutine
2fa80010
MW
34module presenting a subset of the `greenlet' interface: if actual greenlets
35are available, they are used; otherwise there's an implementation in terms of
36threads (with lots of locking) which will do instead.
37
38The simple rule governing the coroutines used here is this:
39
40 * The root coroutine never cares what values are passed to it when it
41 resumes: it just discards them.
42
43 * Other, non-root, coroutines are presumed to be waiting for some specific
44 thing.
45
46Configuration variables:
47 configdir
48 socketdir
49 PACKAGE
50 VERSION
51 tripesock
52 peerdb
53
54Other useful variables:
55 rootcr
56 svcmgr
57
58Other tweakables:
59 _debug
60
61Exceptions:
62 Exception
63 StandardError
64 TripeConnectionError
65 TripeError
66 TripeInternalError
67 TripeJobCancelled
68 TripeJobError
69 TripeSyntaxError
70
71Classes:
72 _Coroutine
73 Coroutine
74 TripeServiceJob
75 OptParse
76 Queue
690a6ec1 77 SelIOWatcher
2fa80010
MW
78 TripeCommand
79 TripeSynchronousCommand
80 TripeAsynchronousCommand
81 TripeCommandIterator
82 TripeConnection
83 TripeCommandDispatcher
690a6ec1 84 TripeServiceManager
2fa80010
MW
85 TripeService
86 TripeServiceCommand
87
88Utility functions:
89 quotify
90 runservices
91 spawn
92 svcinfo
93 timespec
94"""
95
96__pychecker__ = 'self=me no-constCond no-argsused'
97
98_debug = False
99
100###--------------------------------------------------------------------------
101### External dependencies.
102
103import socket as S
104import errno as E
105import mLib as M
106import re as RX
107import sys as SYS
108import os as OS
109
110try:
111 if OS.getenv('TRIPE_FORCE_RMCR') is not None:
171206b5 112 raise ImportError()
2fa80010
MW
113 from py.magic import greenlet as _Coroutine
114except ImportError:
115 from rmcr import Coroutine as _Coroutine
116
117###--------------------------------------------------------------------------
118### Coroutine hacking.
119
120rootcr = _Coroutine.getcurrent()
121
122class Coroutine (_Coroutine):
123 """
124 A coroutine class which can only be invoked by the root coroutine.
125
126 The root, by construction, cannot be an instance of this class.
127 """
128 def switch(me, *args, **kw):
129 assert _Coroutine.getcurrent() is rootcr
22b47552 130 if _debug: print '* %s' % me
2fa80010 131 _Coroutine.switch(me, *args, **kw)
22b47552 132 if _debug: print '* %s' % rootcr
2fa80010
MW
133
134###--------------------------------------------------------------------------
135### Default places for things.
136
137configdir = OS.environ.get('TRIPEDIR', "@configdir@")
138socketdir = "@socketdir@"
139PACKAGE = "@PACKAGE@"
140VERSION = "@VERSION@"
141
142tripesock = OS.environ.get('TRIPESOCK', OS.path.join(socketdir, 'tripesock'))
6005ef9b 143peerdb = OS.environ.get('TRIPEPEERDB', 'peers.cdb')
2fa80010
MW
144
145###--------------------------------------------------------------------------
146### Connection to the server.
147
148def readnonblockingly(sock, len):
149 """
150 Nonblocking read from SOCK.
151
53e9ae9e 152 Try to return LEN bytes. If couldn't read anything, return `None'. EOF is
2fa80010
MW
153 returned as an empty string.
154 """
155 try:
156 sock.setblocking(0)
157 return sock.recv(len)
158 except S.error, exc:
159 if exc[0] == E.EWOULDBLOCK:
160 return None
161 raise
162
163class TripeConnectionError (StandardError):
164 """Something happened to the connection with the server."""
165 pass
166class TripeInternalError (StandardError):
167 """This program is very confused."""
168 pass
169
170class TripeConnection (object):
171 """
172 A logical connection to the tripe administration socket.
173
174 There may or may not be a physical connection. (This is needed for the
175 monitor, for example.)
176
177 This class isn't very useful on its own, but it has useful subclasses. At
178 this level, the class is agnostic about I/O multiplexing schemes; that gets
179 added later.
180 """
181
182 def __init__(me, socket):
183 """
184 Make a connection to the named SOCKET.
185
186 No physical connection is made initially.
187 """
188 me.socket = socket
189 me.sock = None
190 me.lbuf = None
690a6ec1 191 me.iowatch = SelIOWatcher(me)
2fa80010
MW
192
193 def connect(me):
194 """
195 Ensure that there's a physical connection.
196
197 Do nothing if we're already connected. Invoke the `connected' method if
198 successful.
199 """
200 if me.sock: return
201 sock = S.socket(S.AF_UNIX, S.SOCK_STREAM)
202 sock.connect(me.socket)
203 me.sock = sock
204 me.lbuf = M.LineBuffer(me.line, me._eof)
205 me.lbuf.size = 1024
206 me.connected()
207 return me
208
209 def disconnect(me, reason):
210 """
211 Disconnect the physical connection.
212
213 Invoke the `disconnected' method, giving the provided REASON, which
53e9ae9e 214 should be either `None' or an exception.
2fa80010
MW
215 """
216 if not me.sock: return
217 me.disconnected(reason)
218 me.sock.close()
219 me.sock = None
220 me.lbuf.disable()
221 me.lbuf = None
222 return me
223
224 def connectedp(me):
225 """
226 Return true if there's a current, believed-good physical connection.
227 """
228 return me.sock is not None
229
230 __nonzero__ = connectedp
231
232 def send(me, line):
233 """
234 Send the LINE to the connection's socket.
235
236 All output is done through this method; it can be overridden to provide
237 proper nonblocking writing, though this seems generally unnecessary.
238 """
239 try:
240 me.sock.setblocking(1)
241 me.sock.send(line + '\n')
242 except Exception, exc:
243 me.disconnect(exc)
244 raise
245 return me
246
247 def receive(me):
248 """
249 Receive whatever's ready from the connection's socket.
250
251 Call `line' on each complete line, and `eof' if the connection closed.
252 Subclasses which attach this class to an I/O-event system should call
53e9ae9e 253 this method when the socket (the `sock' attribute) is ready for reading.
2fa80010
MW
254 """
255 while me.sock is not None:
256 try:
257 buf = readnonblockingly(me.sock, 16384)
258 except Exception, exc:
259 me.disconnect(exc)
260 raise
261 if buf is None:
262 return me
263 if buf == '':
264 me._eof()
265 return me
266 me.lbuf.flush(buf)
267 return me
268
269 def _eof(me):
270 """Internal end-of-file handler."""
271 me.disconnect(TripeConnectionError('connection lost'))
272 me.eof()
273
274 def connected(me):
275 """
276 To be overridden by subclasses to react to a connection being
277 established.
278 """
690a6ec1 279 me.iowatch.connected(me.sock)
2fa80010
MW
280
281 def disconnected(me, reason):
282 """
283 To be overridden by subclasses to react to a connection being severed.
284 """
690a6ec1 285 me.iowatch.disconnected()
2fa80010
MW
286
287 def eof(me):
288 """To be overridden by subclasses to handle end-of-file."""
289 pass
290
291 def line(me, line):
292 """To be overridden by subclasses to handle incoming lines."""
293 pass
294
295###--------------------------------------------------------------------------
690a6ec1
MW
296### I/O loop integration.
297
298class SelIOWatcher (object):
299 """
300 Integration with mLib's I/O event system.
301
302 You can replace this object with a different one for integration with,
53e9ae9e 303 e.g., glib's main loop, by setting `CONN.iowatcher' to a different object
690a6ec1
MW
304 while the CONN is disconnected.
305 """
306
307 def __init__(me, conn):
308 me._conn = conn
309 me._selfile = None
310
311 def connected(me, sock):
312 """
313 Called when a connection is made.
314
53e9ae9e 315 SOCK is the socket. The watcher must arrange to call `CONN.receive' when
690a6ec1
MW
316 data is available.
317 """
318 me._selfile = M.SelFile(sock.fileno(), M.SEL_READ, me._conn.receive)
319 me._selfile.enable()
320
321 def disconnected(me):
322 """
323 Called when the connection is lost.
324 """
325 me._selfile = None
326
327 def iterate(me):
328 """
329 Wait for something interesting to happen, and issue events.
330
331 That is, basically, do one iteration of a main select loop, processing
d8fedf21
MW
332 all of the events, and then return. This is used in the method
333 `TripeCommandDispatcher.mainloop', but that's mostly for the benefit of
334 `runservices'; if your I/O watcher has a different main loop, you can
335 drive it yourself.
690a6ec1
MW
336 """
337 M.select()
338
339###--------------------------------------------------------------------------
340### Inter-coroutine communication.
341
342class Queue (object):
343 """
344 A queue of things arriving asynchronously.
345
346 This is a very simple single-reader multiple-writer queue. It's useful for
347 more complex coroutines which need to cope with a variety of possible
348 incoming events.
349 """
350
351 def __init__(me):
352 """Create a new empty queue."""
353 me.contents = M.Array()
354 me.waiter = None
355
356 def _wait(me):
357 """
358 Internal: wait for an item to arrive in the queue.
359
360 Complain if someone is already waiting, because this is just a
361 single-reader queue.
362 """
363 if me.waiter:
364 raise ValueError('queue already being waited on')
365 try:
366 me.waiter = Coroutine.getcurrent()
367 while not me.contents:
368 me.waiter.parent.switch()
369 finally:
370 me.waiter = None
371
372 def get(me):
373 """
374 Remove and return the item at the head of the queue.
375
376 If the queue is empty, wait until an item arrives.
377 """
378 me._wait()
379 return me.contents.shift()
380
381 def peek(me):
382 """
383 Return the item at the head of the queue without removing it.
384
385 If the queue is empty, wait until an item arrives.
386 """
387 me._wait()
388 return me.contents[0]
389
390 def put(me, thing):
391 """
392 Write THING to the queue.
393
394 If someone is waiting on the queue, wake him up immediately; otherwise
395 just leave the item there for later.
396 """
397 me.contents.push(thing)
398 if me.waiter:
399 me.waiter.switch()
400
401###--------------------------------------------------------------------------
2fa80010
MW
402### Dispatching coroutine.
403
404## Match a string if it can stand on its own as a bareword: i.e., it doesn't
405## contain backslashes, quotes or whitespace.
406rx_ordinary = RX.compile(r'^[^\\\'\"\s]+$')
407
408## Match characters which need to be escaped, even in quoted text.
409rx_weird = RX.compile(r'([\\\'])')
410
411def quotify(s):
412 """Quote S according to the tripe-admin(5) rules."""
413 m = rx_ordinary.match(s)
414 if m and m.end() == len(s):
415 return s
416 else:
417 return "'" + rx_weird.sub(r'\\\1', s) + "'"
418
419def _callback(func):
420 """
421 Return a wrapper for FUNC which reports exceptions thrown by it.
422
423 Useful in the case of callbacks invoked by C functions which ignore
424 exceptions.
425 """
426 def _(*a, **kw):
427 try:
428 return func(*a, **kw)
429 except:
430 SYS.excepthook(*SYS.exc_info())
431 raise
432 return _
433
434class TripeCommand (object):
435 """
436 This abstract class represents a command in progress.
437
438 The `words' attribute contains the list of tokens which make up the
439 command.
440
441 Subclasses must implement a method to handle server responses:
442
53e9ae9e
MW
443 * response(CODE, *ARGS): CODE is one of the strings `OK', `INFO' or
444 `FAIL'; ARGS are the remaining tokens from the server's response.
2fa80010
MW
445 """
446
447 def __init__(me, words):
448 """Make a new command consisting of the given list of WORDS."""
5b97ed7d
MW
449 for word in words:
450 if '\n' in word:
451 raise TripeInternalError("command word contains newline")
2fa80010
MW
452 me.words = words
453
454class TripeSynchronousCommand (TripeCommand):
455 """
456 A simple command, processed apparently synchronously.
457
458 Must be invoked from a coroutine other than the root (or whichever one is
459 running the dispatcher); in reality, other coroutines carry on running
460 while we wait for a response from the server.
461
462 Each server response causes the calling coroutine to be resumed with the
463 pair (CODE, REST) -- where CODE is the server's response code (`OK', `INFO'
464 or `FAIL') and REST is a list of the server's other response tokens. The
465 calling coroutine must continue switching back to the dispatcher until a
466 terminating response (`OK' or `FAIL') is received or become very
467 confused.
468
53e9ae9e 469 Mostly it's better to use the `TripeCommandIterator' to do this
2fa80010
MW
470 automatically.
471 """
472
473 def __init__(me, words):
474 """Initialize the command, specifying the WORDS to send to the server."""
475 TripeCommand.__init__(me, words)
476 me.owner = Coroutine.getcurrent()
477
478 def response(me, code, *rest):
479 """Handle a server response by forwarding it to the calling coroutine."""
480 me.owner.switch((code, rest))
481
482class TripeError (StandardError):
483 """
53e9ae9e 484 A tripe command failed with an error (a `FAIL' code). The args attribute
2fa80010
MW
485 contains a list of the server's message tokens.
486 """
487 pass
488
489class TripeCommandIterator (object):
490 """
491 Iterator interface to a tripe command.
492
493 The values returned by the iterator are lists of tokens from the server's
53e9ae9e
MW
494 `INFO' lines, as processed by the given filter function, if any. The
495 iterator completes normally (by raising `StopIteration') if the server
496 reported `OK', and raises an exception if the command failed for some reason.
2fa80010 497
53e9ae9e
MW
498 A `TripeError' is raised if the server issues a `FAIL' code. If the
499 connection failed, some other exception is raised.
2fa80010
MW
500 """
501
502 def __init__(me, dispatcher, words, bg = False, filter = None):
503 """
504 Create a new command iterator.
505
506 The command is submitted to the DISPATCHER; it consists of the given
507 WORDS. If BG is true, then an option is inserted to request that the
508 server run the command in the background. The FILTER is applied to the
509 token lists which the server responds, and the filter's output are the
510 items returned by the iterator.
511 """
512 me.dcr = Coroutine.getcurrent().parent
513 if me.dcr is None:
171206b5 514 raise ValueError('must invoke from coroutine')
2fa80010
MW
515 me.filter = filter or (lambda x: x)
516 if bg:
517 words = [words[0], '-background', dispatcher.bgtag()] + list(words[1:])
518 dispatcher.rawcommand(TripeSynchronousCommand(words))
519
520 def __iter__(me):
521 """Iterator protocol: I am my own iterator."""
522 return me
523
524 def next(me):
525 """
526 Iterator protocol: return the next piece of information from the server.
527
53e9ae9e
MW
528 `INFO' responses are filtered and returned as the values of the
529 iteration. `FAIL' and `CONNERR' responses are turned into exceptions and
530 raised. Finally, `OK' is turned into `StopIteration', which should cause
531 a normal end to the iteration process.
2fa80010
MW
532 """
533 thing = me.dcr.switch()
534 code, rest = thing
535 if code == 'INFO':
536 return me.filter(rest)
537 elif code == 'OK':
171206b5 538 raise StopIteration()
2fa80010
MW
539 elif code == 'CONNERR':
540 if rest is None:
171206b5 541 raise TripeConnectionError('connection terminated by user')
2fa80010
MW
542 else:
543 raise rest
544 elif code == 'FAIL':
545 raise TripeError(*rest)
546 else:
171206b5
MW
547 raise TripeInternalError('unexpected tripe response %r' %
548 ([code] + rest))
2fa80010
MW
549
550### Simple utility functions for the TripeCommandIterator convenience
551### methods.
552
553def _tokenjoin(words):
554 """Filter function: simply join the given tokens with spaces between."""
555 return ' '.join(words)
556
557def _keyvals(iter):
53e9ae9e 558 """Return a dictionary formed from the `KEY=VALUE' pairs returned by the
2fa80010
MW
559 iterator ITER."""
560 kv = {}
561 for ww in iter:
562 for w in ww:
563 q = w.index('=')
564 kv[w[:q]] = w[q + 1:]
565 return kv
566
567def _simple(iter):
568 """Raise an error if ITER contains any item."""
569 stuff = list(iter)
570 if len(stuff) != 0:
571 raise TripeInternalError('expected no response')
572 return None
573
574def _oneline(iter):
575 """If ITER contains a single item, return it; otherwise raise an error."""
576 stuff = list(iter)
577 if len(stuff) != 1:
578 raise TripeInternalError('expected only one line of response')
579 return stuff[0]
580
581def _tracelike(iter):
582 """Handle a TRACE-like command. The result is a list of tuples (CHAR,
583 STATUS, DESC): CHAR is a selector character, STATUS is the status (empty if
584 disabled, `+' if enabled, maybe something else later), and DESC is the
585 human-readable description."""
586 stuff = []
587 for ww in iter:
588 ch = ww[0][0]
589 st = ww[0][1:]
590 desc = ' '.join(ww[1:])
591 stuff.append((ch, st, desc))
592 return stuff
593
594def _kwopts(kw, allowed):
595 """Parse keyword arguments into options. ALLOWED is a list of allowable
53e9ae9e
MW
596 keywords; raise errors if other keywords are present. `KEY = VALUE'
597 becomes an option pair `-KEY VALUE' if VALUE is a string, just the option
598 `-KEY' if VALUE is a true non-string, or nothing if VALUE is false. Insert
599 a `--' at the end to stop the parser getting confused."""
2fa80010
MW
600 opts = []
601 amap = {}
602 for a in allowed: amap[a] = True
603 for k, v in kw.iteritems():
604 if k not in amap:
605 raise ValueError('option %s not allowed here' % k)
606 if isinstance(v, str):
607 opts += ['-' + k, v]
608 elif v:
609 opts += ['-' + k]
610 opts.append('--')
611 return opts
612
690a6ec1
MW
613## Deferral.
614_deferq = []
615def defer(func, *args, **kw):
616 """Call FUNC(*ARGS, **KW) later, in the root coroutine."""
617 _deferq.append((func, args, kw))
618
22b47552
MW
619def funargstr(func, args, kw):
620 items = [repr(a) for a in args]
621 for k, v in kw.iteritems():
622 items.append('%s = %r' % (k, v))
623 return '%s(%s)' % (func.__name__, ', '.join(items))
624
690a6ec1
MW
625def spawn(func, *args, **kw):
626 """Call FUNC, passing ARGS and KW, in a fresh coroutine."""
22b47552
MW
627 defer(lambda: (Coroutine(func, name = funargstr(func, args, kw))
628 .switch(*args, **kw)))
690a6ec1
MW
629
630## Asides.
631_asideq = Queue()
632def _runasides():
633 """
634 Read (FUNC, ARGS, KW) triples from queue and invoke FUNC(*ARGS, **KW).
635 """
636 while True:
637 func, args, kw = _asideq.get()
638 try:
639 func(*args, **kw)
640 except:
641 SYS.excepthook(*SYS.exc_info())
642
643def aside(func, *args, **kw):
644 """Call FUNC(*ARGS, **KW) later, in a non-root coroutine."""
645 defer(_asideq.put, (func, args, kw))
646
2fa80010
MW
647class TripeCommandDispatcher (TripeConnection):
648 """
649 Command dispatcher.
650
651 The command dispatcher is a connection which knows how to handle commands.
652 This is probably the most important class in this module to understand.
653
654 Lines from the server are parsed into tokens. The first token is a code
53e9ae9e 655 (`OK' or `NOTE' or something) explaining what kind of line this is. The
2fa80010
MW
656 `handler' attribute is a dictionary mapping server line codes to handler
657 functions, which are applied to the words of the line as individual
53e9ae9e 658 arguments. *Exception*: the content of `TRACE' lines is not tokenized.
2fa80010
MW
659
660 There are default handlers for server codes which respond to commands.
53e9ae9e 661 Commands arrive as `TripeCommand' instances through the `rawcommand'
2fa80010
MW
662 interface. The dispatcher keeps track of which command objects represent
663 which jobs, and sends responses on to the appropriate command objects by
53e9ae9e
MW
664 invoking their `response' methods. Command objects don't see the `BG...'
665 codes, because the dispatcher has already transformed them into regular
666 codes when it was looking up the job tag.
2fa80010 667
53e9ae9e 668 The dispatcher also has a special response code of its own: `CONNERR'
2fa80010 669 indicates that the connection failed and the command has therefore been
c1d15fda
MW
670 lost. This is sent to all outstanding commands when a connection error is
671 encountered: rather than a token list, it is accompanied by an exception
672 object which is the cause of the disconnection, which may be `None' if the
673 disconnection is expected (e.g., the direct result of a user request).
2fa80010
MW
674 """
675
676 ## --- Infrastructure ---
677 ##
678 ## We will get confused if we pipeline commands. Send them one at a time.
679 ## Only send a command when the previous one detaches or completes.
680 ##
681 ## The following attributes are interesting:
682 ##
683 ## tagseq Sequence number for next background job (for bgtag)
684 ##
685 ## queue Commands awaiting submission.
686 ##
687 ## cmd Mapping from job tags to commands: cmd[None] is the
688 ## foreground command.
689 ##
690 ## handler Mapping from server codes to handler functions.
691
692 def __init__(me, socket):
693 """
694 Initialize the dispatcher.
695
696 The SOCKET is the filename of the administration socket to connect to,
697 for TripeConnection.__init__.
698 """
699 TripeConnection.__init__(me, socket)
700 me.tagseq = 0
701 me.handler = {}
702 me.handler['BGDETACH'] = me._detach
703 for i in 'BGOK', 'BGINFO', 'BGFAIL':
704 me.handler[i] = me._response
705 for i in 'OK', 'INFO', 'FAIL':
706 me.handler[i] = me._fgresponse
707
690a6ec1
MW
708 def quitp(me):
709 """Should we quit the main loop? Subclasses should override."""
710 return False
711
712 def mainloop(me, quitp = None):
713 """
714 Iterate the I/O watcher until QUITP returns true.
715
716 Arranges for asides and deferred calls to be made at the right times.
717 """
718
719 global _deferq
720 assert _Coroutine.getcurrent() is rootcr
22b47552
MW
721 Coroutine(_runasides, name = '_runasides').switch()
722 if quitp is None:
723 quitp = me.quitp
690a6ec1
MW
724 while not quitp():
725 while _deferq:
726 q = _deferq
727 _deferq = []
728 for func, args, kw in q:
729 func(*args, **kw)
730 me.iowatch.iterate()
731
2fa80010
MW
732 def connected(me):
733 """
734 Connection hook.
735
736 If a subclass overrides this method, it must call us; clears out the
737 command queue and job map.
738 """
739 me.queue = M.Array()
740 me.cmd = {}
690a6ec1 741 TripeConnection.connected(me)
2fa80010
MW
742
743 def disconnected(me, reason):
744 """
745 Disconnection hook.
746
747 If a subclass hooks overrides this method, it must call us; sends a
53e9ae9e 748 special `CONNERR' code to all incomplete commands.
2fa80010 749 """
690a6ec1 750 TripeConnection.disconnected(me, reason)
2fa80010
MW
751 for cmd in me.cmd.itervalues():
752 cmd.response('CONNERR', reason)
753 for cmd in me.queue:
754 cmd.response('CONNERR', reason)
755
756 @_callback
757 def line(me, line):
758 """Handle an incoming line, sending it to the right place."""
759 if _debug: print '<', line
760 code, rest = M.word(line, quotep = True)
761 func = me.handler.get(code)
762 if func is not None:
763 if code == 'TRACE':
764 func(code, rest)
765 else:
766 func(code, *M.split(rest, quotep = True)[0])
767 me.dequeue()
768
769 def dequeue(me):
770 """
771 Pull the oldest command off the queue and try to send it to the server.
772 """
773 if not me.queue or None in me.cmd: return
774 cmd = me.queue.shift()
775 if _debug: print '>', ' '.join([quotify(w) for w in cmd.words])
776 me.send(' '.join([quotify(w) for w in cmd.words]))
777 me.cmd[None] = cmd
778
779 def bgtag(me):
780 """
781 Return an unused job tag.
782
783 May be of use when composing commands by hand.
784 """
785 tag = 'J%05d' % me.tagseq
786 me.tagseq += 1
787 return tag
788
789 ## --- Built-in handler functions for server responses ---
790
791 def _detach(me, _, tag):
792 """
53e9ae9e 793 Respond to a `BGDETACH' TAG message.
2fa80010
MW
794
795 Move the current foreground command to the background.
796 """
797 assert tag not in me.cmd
798 me.cmd[tag] = me.cmd[None]
799 del me.cmd[None]
800
801 def _response(me, code, tag, *w):
802 """
53e9ae9e 803 Respond to an `OK', `INFO' or `FAIL' message.
2fa80010
MW
804
805 If this is a message for a background job, find the tag; then dispatch
53e9ae9e
MW
806 the result to the command object. This is also called by `_fgresponse'
807 (wth TAG set to `None') to handle responses for foreground commands, and
808 is therefore a useful method to extend or override in subclasses.
2fa80010
MW
809 """
810 if code.startswith('BG'):
811 code = code[2:]
812 cmd = me.cmd[tag]
813 if code != 'INFO':
814 del me.cmd[tag]
815 cmd.response(code, *w)
816
817 def _fgresponse(me, code, *w):
818 """Process responses to the foreground command."""
819 me._response(code, None, *w)
820
821 ## --- Interface methods ---
822
823 def rawcommand(me, cmd):
824 """
53e9ae9e 825 Submit the `TripeCommand' CMD to the server, and look after it until it
2fa80010
MW
826 completes.
827 """
828 if not me.connectedp():
829 raise TripeConnectionError('connection closed')
830 me.queue.push(cmd)
831 me.dequeue()
832
833 def command(me, *cmd, **kw):
834 """Convenience wrapper for creating a TripeCommandIterator object."""
835 return TripeCommandIterator(me, cmd, **kw)
836
837 ## --- Convenience methods for server commands ---
838
839 def add(me, peer, *addr, **kw):
840 return _simple(me.command(bg = True,
841 *['ADD'] +
48b84569 842 _kwopts(kw, ['tunnel', 'keepalive',
fe2a5dcf 843 'key', 'priv', 'cork',
067aa5f0
MW
844 'mobile', 'knock',
845 'ephemeral']) +
2fa80010
MW
846 [peer] +
847 list(addr)))
848 def addr(me, peer):
849 return _oneline(me.command('ADDR', peer))
35c8b547
MW
850 def algs(me, peer = None):
851 return _keyvals(me.command('ALGS',
852 *((peer is not None and [peer]) or [])))
2fa80010
MW
853 def checkchal(me, chal):
854 return _simple(me.command('CHECKCHAL', chal))
855 def daemon(me):
856 return _simple(me.command('DAEMON'))
857 def eping(me, peer, **kw):
858 return _oneline(me.command(bg = True,
db11df93 859 *['EPING'] +
2fa80010
MW
860 _kwopts(kw, ['timeout']) +
861 [peer]))
862 def forcekx(me, peer):
863 return _simple(me.command('FORCEKX', peer))
864 def getchal(me):
865 return _oneline(me.command('GETCHAL', filter = _tokenjoin))
866 def greet(me, peer, chal):
867 return _simple(me.command('GREET', peer, chal))
868 def help(me):
869 return list(me.command('HELP', filter = _tokenjoin))
870 def ifname(me, peer):
871 return _oneline(me.command('IFNAME', peer, filter = _tokenjoin))
872 def kill(me, peer):
873 return _simple(me.command('KILL', peer))
874 def list(me):
875 return list(me.command('LIST', filter = _tokenjoin))
876 def notify(me, *msg):
877 return _simple(me.command('NOTIFY', *msg))
878 def peerinfo(me, peer):
879 return _keyvals(me.command('PEERINFO', peer))
880 def ping(me, peer, **kw):
881 return _oneline(me.command(bg = True,
882 *['PING'] +
883 _kwopts(kw, ['timeout']) +
884 [peer]))
5d06f63e
MW
885 def port(me, af = None):
886 return _oneline(me.command('PORT',
887 *((af is not None) and [af] or []),
888 filter = _tokenjoin))
2fa80010
MW
889 def quit(me):
890 return _simple(me.command('QUIT'))
891 def reload(me):
892 return _simple(me.command('RELOAD'))
893 def servinfo(me):
894 return _keyvals(me.command('SERVINFO'))
895 def setifname(me, new):
896 return _simple(me.command('SETIFNAME', new))
897 def svcclaim(me, service, version):
898 return _simple(me.command('SVCCLAIM', service, version))
899 def svcensure(me, service, version = None):
900 return _simple(me.command('SVCENSURE', service,
901 *((version is not None and [version]) or [])))
902 def svcfail(me, job, *msg):
903 return _simple(me.command('SVCFAIL', job, *msg))
904 def svcinfo(me, job, *msg):
905 return _simple(me.command('SVCINFO', job, *msg))
906 def svclist(me):
907 return list(me.command('SVCLIST'))
908 def svcok(me, job):
909 return _simple(me.command('SVCOK', job))
910 def svcquery(me, service):
911 return _keyvals(me.command('SVCQUERY', service))
912 def svcrelease(me, service):
913 return _simple(me.command('SVCRELEASE', service))
914 def svcsubmit(me, service, *args, **kw):
915 return me.command(bg = True,
916 *['SVCSUBMIT'] +
917 _kwopts(kw, ['version']) +
918 [service] +
919 list(args))
920 def stats(me, peer):
921 return _keyvals(me.command('STATS', peer))
922 def trace(me, *args):
923 return _tracelike(me.command('TRACE', *args))
924 def tunnels(me):
925 return list(me.command('TUNNELS', filter = _tokenjoin))
926 def version(me):
927 return _oneline(me.command('VERSION', filter = _tokenjoin))
928 def warn(me, *msg):
929 return _simple(me.command('WARN', *msg))
930 def watch(me, *args):
931 return _tracelike(me.command('WATCH', *args))
932
933###--------------------------------------------------------------------------
934### Asynchronous commands.
935
2fa80010
MW
936class TripeAsynchronousCommand (TripeCommand):
937 """
938 Asynchronous commands.
939
940 This is the complicated way of issuing commands. You must set up a queue,
941 and associate the command with the queue. Responses arriving for the
942 command will be put on the queue as an triple of the form (TAG, CODE, REST)
943 -- where TAG is an object of your choice, not interpreted by this class,
53e9ae9e
MW
944 CODE is the server's response code (`OK', `INFO', `FAIL', or `CONNERR'),
945 and REST is the list of the rest of the server's tokens.
2fa80010
MW
946
947 Using this, you can write coroutines which process many commands (and
948 possibly other events) simultaneously.
949 """
950
951 def __init__(me, queue, tag, words):
952 """Make an asynchronous command consisting of the given WORDS, which
953 sends responses to QUEUE, labelled with TAG."""
954 TripeCommand.__init__(me, words)
955 me.queue = queue
956 me.tag = tag
957
958 def response(me, code, *stuff):
959 """Handle a server response by writing it to the caller's queue."""
960 me.queue.put((me.tag, code, list(stuff)))
961
962###--------------------------------------------------------------------------
2fa80010
MW
963### Services.
964
965class TripeJobCancelled (Exception):
966 """
967 Exception sent to job handler if the client kills the job.
968
969 Not propagated further.
970 """
971 pass
972
973class TripeJobError (Exception):
974 """
975 Exception to cause failure report for running job.
976
977 Sends an SVCFAIL code back.
978 """
979 pass
980
981class TripeSyntaxError (Exception):
982 """
983 Exception to report a syntax error for a job.
984
985 Sends an SVCFAIL bad-svc-syntax message back.
986 """
987 pass
988
690a6ec1 989class TripeServiceManager (TripeCommandDispatcher):
2fa80010
MW
990 """
991 A command dispatcher with added handling for incoming service requests.
992
993 There is usually only one instance of this class, called svcmgr. Some of
994 the support functions in this module assume that this is the case.
995
53e9ae9e 996 To use, run `mLib.select' in a loop until the quitp method returns true;
2fa80010
MW
997 then, in a non-root coroutine, register your services by calling `add', and
998 then call `running' when you've finished setting up.
999
53e9ae9e
MW
1000 The instance handles server service messages `SVCJOB', `SVCCANCEL' and
1001 `SVCCLAIM'. It maintains a table of running services. Incoming jobs cause
1002 the service's `job' method to be invoked; `SVCCANCEL' sends a
1003 `TripeJobCancelled' exception to the handler coroutine, and `SVCCLAIM'
1004 causes the relevant service to be deregistered.
2fa80010
MW
1005
1006 There is no base class for jobs, but a job must implement two methods:
1007
1008 start() Begin processing; might be a no-op.
1009
1010 cancel() Stop processing; the original client has killed the
1011 job.
1012
1013 The life of a service manager is divided into two parts: setup and running;
1014 you tell the manager that you've finished setting up by calling the
1015 `running' method. If, at any point after setup is finished, there are no
1016 remaining services or jobs, `quitp' will return true, ending the process.
1017 """
1018
1019 ## --- Attributes ---
1020 ##
1021 ## svc Mapping name -> service object
1022 ##
1023 ## job Mapping jobid -> job handler coroutine
1024 ##
1025 ## runningp True when setup is finished
1026 ##
1027 ## _quitp True if explicit quit has been requested
1028
1029 def __init__(me, socket):
1030 """
1031 Initialize the service manager.
1032
1033 SOCKET is the administration socket to connect to.
1034 """
690a6ec1 1035 TripeCommandDispatcher.__init__(me, socket)
2fa80010
MW
1036 me.svc = {}
1037 me.job = {}
1038 me.runningp = False
1039 me.handler['SVCCANCEL'] = me._cancel
1040 me.handler['SVCJOB'] = me._job
1041 me.handler['SVCCLAIM'] = me._claim
1042 me._quitp = 0
1043
1044 def addsvc(me, svc):
53e9ae9e 1045 """Register a new service; SVC is a `TripeService' instance."""
2fa80010
MW
1046 assert svc.name not in me.svc
1047 me.svcclaim(svc.name, svc.version)
1048 me.svc[svc.name] = svc
1049
1050 def _cancel(me, _, jid):
1051 """
1052 Called when the server cancels a job; invokes the job's `cancel' method.
1053 """
1054 job = me.job[jid]
1055 del me.job[jid]
1056 job.cancel()
1057
1058 def _claim(me, _, svc, __):
1059 """Called when another program claims our service at a higher version."""
1060 del me.svc[svc]
1061
1062 def _job(me, _, jid, svc, cmd, *args):
1063 """
1064 Called when the server sends us a job to do.
1065
1066 Calls the service to collect a job, and begins processing it.
1067 """
1068 assert jid not in me.job
1069 svc = me.svc[svc.lower()]
1070 job = svc.job(jid, cmd, args)
1071 me.job[jid] = job
1072 job.start()
1073
1074 def running(me):
1075 """Answer true if setup is finished."""
1076 me.runningp = True
1077
1078 def jobdone(me, jid):
1079 """Informs the service manager that the job with id JID has finished."""
1080 try:
1081 del me.job[jid]
1082 except KeyError:
1083 pass
1084
1085 def quitp(me):
1086 """
1087 Return true if no services or jobs are active (and, therefore, if this
1088 process can quit without anyone caring).
1089 """
1090 return me._quitp or (me.runningp and ((not me.svc and not me.job) or
690a6ec1 1091 not me.sock))
2fa80010
MW
1092
1093 def quit(me):
1094 """Forces the quit flag (returned by quitp) on."""
1095 me._quitp = True
1096
1097class TripeService (object):
1098 """
1099 A standard service.
1100
1101 The NAME and VERSION are passed on to the server. The CMDTAB is a
1102 dictionary mapping command names (in lowercase) to command objects.
1103
53e9ae9e
MW
1104 If the CMDTAB doesn't have entries for commands `HELP' and `QUIT' then
1105 defaults are provided.
2fa80010
MW
1106
1107 TripeService itself is mostly agnostic about the nature of command objects,
1108 but the TripeServiceJob class (below) has some requirements. The built-in
1109 HELP command requires command objects to have `usage' attributes.
1110 """
1111
1112 def __init__(me, name, version, cmdtab):
1113 """
1114 Create and register a new service with the given NAME and VERSION.
1115
1116 CMDTAB maps command names (in lower-case) to command objects.
1117 """
1118 me.name = name
1119 me.version = version
1120 me.cmd = cmdtab
1121 me.activep = True
1122 me.cmd.setdefault('help',
1123 TripeServiceCommand('help', 0, 0, '', me._help))
1124 me.cmd.setdefault('quit',
1125 TripeServiceCommand('quit', 0, 0, '', me._quit))
1126
1127 def job(me, jid, cmd, args):
1128 """
1129 Called by the service manager: a job arrived with id JID.
1130
1131 It asks for comamnd CMD with argument list ARGS. Creates a new job,
1132 passing it the information needed.
1133 """
1134 return TripeServiceJob(jid, me, cmd, me.cmd.get(cmd.lower()), args)
1135
1136 ## Simple default command handlers, complying with the spec in
1137 ## tripe-service(7).
1138
1139 def _help(me):
1140 """Send a help summary to the user."""
1141 cmds = me.cmd.items()
1142 cmds.sort()
1143 for name, cmd in cmds:
1144 svcinfo(name, *cmd.usage)
1145
1146 def _quit(me):
1147 """Terminate the service manager."""
1148 svcmgr.notify('svc-quit', me.name, 'admin-request')
1149 svcmgr.quit()
1150
1151class TripeServiceCommand (object):
1152 """A simple service command."""
1153
1154 def __init__(me, name, min, max, usage, func):
1155 """
1156 Creates a new command.
1157
1158 NAME is the command's name (in lowercase).
1159
1160 MIN and MAX are the minimum and maximum number of allowed arguments (used
1161 for checking); either may be None to indicate no minimum or maximum.
1162
1163 USAGE is a usage string, used for generating help and error messages.
1164
1165 FUNC is the function to invoke.
1166 """
1167 me.name = name
1168 me.min = min
1169 me.max = max
1170 me.usage = usage.split()
1171 me.func = func
1172
1173 def run(me, *args):
1174 """
1175 Called when the command is invoked.
1176
1177 Does minimal checking of the arguments and calls the supplied function.
1178 """
1179 if (me.min is not None and len(args) < me.min) or \
1180 (me.max is not None and len(args) > me.max):
171206b5 1181 raise TripeSyntaxError()
2fa80010
MW
1182 me.func(*args)
1183
1184class TripeServiceJob (Coroutine):
1185 """
1186 Job handler coroutine.
1187
53e9ae9e
MW
1188 A standard `TripeService' invokes a `TripeServiceJob' for each incoming job
1189 request, passing it the jobid, command and arguments, and a command object.
1190 The command object needs the following attributes.
2fa80010
MW
1191
1192 usage A usage list (excluding the command name) showing
1193 arguments and options.
1194
1195 run(*ARGS) Function to react to the command with ARGS split into
1196 separate arguments. Invoked in a coroutine. The
53e9ae9e
MW
1197 `svcinfo function (not the `TripeCommandDispatcher'
1198 method) may be used to send `INFO' lines. The
1199 function may raise `TripeJobError' to send a `FAIL'
1200 response back, or `TripeSyntaxError' to send a
1201 generic usage error. `TripeJobCancelled' exceptions
1202 are trapped silently. Other exceptions are
1203 translated into a generic internal-error message.
2fa80010
MW
1204
1205 This class automatically takes care of sending some closing response to the
1206 job, and for informing the service manager that the job is completed.
1207
1208 The `jid' attribute stores the job's id.
1209 """
1210
1211 def __init__(me, jid, svc, cmd, command, args):
1212 """
1213 Start a new job.
1214
1215 The job is created with id JID, for service SVC, processing command name
1216 CMD (which the service resolved into the command object COMMAND, or
53e9ae9e 1217 `None'), and with the arguments ARGS.
2fa80010
MW
1218 """
1219 Coroutine.__init__(me)
1220 me.jid = jid
1221 me.svc = svc
1222 me.cmd = cmd
1223 me.command = command
1224 me.args = args
1225
1226 def run(me):
1227 """
1228 Main body of the coroutine.
1229
1230 Does the tedious exception handling boilerplate and invokes the command's
1231 run method.
1232 """
1233 try:
1234 try:
1235 if me.command is None:
1236 svcmgr.svcfail(me.jid, 'unknown-svc-command', me.cmd)
1237 else:
1238 me.command.run(*me.args)
1239 svcmgr.svcok(me.jid)
1240 except TripeJobError, exc:
1241 svcmgr.svcfail(me.jid, *exc.args)
1242 except TripeSyntaxError:
1243 svcmgr.svcfail(me.jid, 'bad-svc-syntax',
1244 me.svc.name, me.command.name,
1245 *me.command.usage)
1246 except TripeJobCancelled:
1247 pass
1248 except Exception, exc:
1249 svcmgr.svcfail(me.jid, 'svc-internal-error',
1250 exc.__class__.__name__, str(exc))
1251 finally:
1252 svcmgr.jobdone(me.jid)
1253
1254 def start(me):
1255 """Invoked by the service manager to start running the coroutine."""
1256 me.switch()
1257
1258 def cancel(me):
1259 """Invoked by the service manager to cancel the job."""
1260 me.throw(TripeJobCancelled())
1261
1262def svcinfo(*args):
1263 """
53e9ae9e 1264 If invoked from a TripeServiceJob coroutine, sends an `INFO' line to the
2fa80010
MW
1265 job's sender, automatically using the correct job id.
1266 """
1267 svcmgr.svcinfo(Coroutine.getcurrent().jid, *args)
1268
1269def _setupsvc(tab, func):
1270 """
1271 Setup coroutine for setting up service programs.
1272
1273 Register the given services.
1274 """
1275 try:
1276 for service in tab:
1277 svcmgr.addsvc(service)
1278 if func:
1279 func()
1280 finally:
1281 svcmgr.running()
1282
1283svcmgr = TripeServiceManager(None)
2fa80010
MW
1284def runservices(socket, tab, init = None, setup = None, daemon = False):
1285 """
1286 Function to start a service provider.
1287
1288 SOCKET is the socket to connect to, usually tripesock.
1289
1290 TAB is a list of entries. An entry may be either a tuple
1291
1292 (NAME, VERSION, COMMANDS)
1293
53e9ae9e 1294 or a service object (e.g., a `TripeService' instance).
2fa80010
MW
1295
1296 COMMANDS is a dictionary mapping command names to tuples
1297
1298 (MIN, MAX, USAGE, FUNC)
1299
53e9ae9e 1300 of arguments for a `TripeServiceCommand' object.
2fa80010
MW
1301
1302 If DAEMON is true, then the process is forked into the background before we
1303 start. If INIT is given, it is called in the main coroutine, immediately
1304 after forking. If SETUP is given, it is called in a coroutine, after
1305 calling INIT and setting up the services but before marking the service
1306 manager as running.
1307
1308 It is a really bad idea to do any initialization, particularly setting up
1309 coroutines, outside of the INIT or SETUP functions. In particular, if
1310 we're using rmcr for fake coroutines, the daemonizing fork will kill off
1311 the currently established coroutines in a most surprising way.
1312
1313 The function runs a main select loop until the service manager decides to
1314 quit.
1315 """
1316
2fa80010
MW
1317 svcmgr.socket = socket
1318 svcmgr.connect()
1319 svcs = []
1320 for service in tab:
1321 if not isinstance(service, tuple):
1322 svcs.append(service)
1323 else:
1324 name, version, commands = service
1325 cmdmap = {}
1326 for cmd, stuff in commands.iteritems():
1327 cmdmap[cmd] = TripeServiceCommand(cmd, *stuff)
1328 svcs.append(TripeService(name, version, cmdmap))
1329 if daemon:
1330 M.daemonize()
1331 if init is not None:
1332 init()
690a6ec1
MW
1333 spawn(_setupsvc, svcs, setup)
1334 svcmgr.mainloop()
2fa80010
MW
1335
1336###--------------------------------------------------------------------------
1337### Utilities for services.
1338
1339_timeunits = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}
1340def timespec(spec):
1341 """Parse the timespec SPEC, returning a number of seconds."""
1342 mul = 1
1343 if len(spec) > 1 and spec[-1] in _timeunits:
1344 mul = _timeunits[spec[-1]]
1345 spec = spec[:-1]
1346 try:
1347 t = int(spec)
1348 except:
1349 raise TripeJobError('bad-time-spec', spec)
1350 if t < 0:
1351 raise TripeJobError('bad-time-spec', spec)
1352 return mul * int(spec)
1353
1354class OptParse (object):
1355 """
1356 Parse options from a command list in the conventional fashion.
1357
1358 ARGS is a list of arguments to a command. ALLOWED is a sequence of allowed
1359 options. The returned values are the option tags. During parsing, the
1360 `arg' method may be used to retrieve the argument for the most recent
1361 option. Afterwards, `rest' may be used to retrieve the remaining
1362 non-option arguments, and do a simple check on how many there are.
1363
1364 The parser correctly handles `--' option terminators.
1365 """
1366
1367 def __init__(me, args, allowed):
1368 """
1369 Create a new option parser.
1370
1371 The parser will scan the ARGS for options given in the sequence ALLOWED
1372 (which are expected to include the `-' prefix).
1373 """
1374 me.allowed = {}
1375 for a in allowed:
1376 me.allowed[a] = True
1377 me.args = list(args)
1378
1379 def __iter__(me):
1380 """Iterator protocol: I am my own iterator."""
1381 return me
1382
1383 def next(me):
1384 """
1385 Iterator protocol: return the next option.
1386
53e9ae9e 1387 If we've run out, raise `StopIteration'.
2fa80010
MW
1388 """
1389 if len(me.args) == 0 or \
1390 len(me.args[0]) < 2 or \
1391 not me.args[0].startswith('-'):
171206b5 1392 raise StopIteration()
2fa80010
MW
1393 opt = me.args.pop(0)
1394 if opt == '--':
171206b5 1395 raise StopIteration()
2fa80010 1396 if opt not in me.allowed:
171206b5 1397 raise TripeSyntaxError()
2fa80010
MW
1398 return opt
1399
1400 def arg(me):
1401 """
1402 Return the argument for the most recent option.
1403
53e9ae9e 1404 If none is available, raise `TripeSyntaxError'.
2fa80010
MW
1405 """
1406 if len(me.args) == 0:
171206b5 1407 raise TripeSyntaxError()
2fa80010
MW
1408 return me.args.pop(0)
1409
1410 def rest(me, min = None, max = None):
1411 """
1412 After option parsing is done, return the remaining arguments.
1413
1414 Check that there are at least MIN and at most MAX arguments remaining --
1415 either may be None to suppress the check.
1416 """
1417 if (min is not None and len(me.args) < min) or \
1418 (max is not None and len(me.args) > max):
171206b5 1419 raise TripeSyntaxError()
2fa80010
MW
1420 return me.args
1421
1422###----- That's all, folks --------------------------------------------------