ac616ae82acb748efba11046f3c0a82c55bbec17
[tripe] / py / tripe.py.in
1 ### -*-python-*-
2 ###
3 ### Administration connection with tripe server
4 ###
5 ### (c) 2006 Straylight/Edgeware
6 ###
7
8 ###----- Licensing notice ---------------------------------------------------
9 ###
10 ### This file is part of Trivial IP Encryption (TrIPE).
11 ###
12 ### TrIPE is free software; you can redistribute it and/or modify
13 ### it under the terms of the GNU General Public License as published by
14 ### the Free Software Foundation; either version 2 of the License, or
15 ### (at your option) any later version.
16 ###
17 ### TrIPE is distributed in the hope that it will be useful,
18 ### but WITHOUT ANY WARRANTY; without even the implied warranty of
19 ### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 ### GNU General Public License for more details.
21 ###
22 ### You should have received a copy of the GNU General Public License
23 ### along with TrIPE; if not, write to the Free Software Foundation,
24 ### Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
25
26 """
27 This module provides classes and functions for connecting to a running tripe
28 server, sending it commands, receiving and processing replies, and
29 implementing services.
30
31 Rather than end up in lost in a storm of little event-driven classes, or a
32 morass of concurrent threads, the module uses coroutines to present a fairly
33 simple function call/return interface to potentially long-running commands
34 which must run without blocking the main process. It assumes a coroutine
35 module presenting a subset of the `greenlet' interface: if actual greenlets
36 are available, they are used; otherwise there's an implementation in terms of
37 threads (with lots of locking) which will do instead.
38
39 The simple rule governing the coroutines used here is this:
40
41 * The root coroutine never cares what values are passed to it when it
42 resumes: it just discards them.
43
44 * Other, non-root, coroutines are presumed to be waiting for some specific
45 thing.
46
47 Configuration variables:
48 configdir
49 socketdir
50 PACKAGE
51 VERSION
52 tripesock
53 peerdb
54
55 Other useful variables:
56 rootcr
57 svcmgr
58
59 Other tweakables:
60 _debug
61
62 Exceptions:
63 Exception
64 StandardError
65 TripeConnectionError
66 TripeError
67 TripeInternalError
68 TripeJobCancelled
69 TripeJobError
70 TripeSyntaxError
71
72 Classes:
73 _Coroutine
74 Coroutine
75 TripeServiceJob
76 OptParse
77 Queue
78 SelIOWatcher
79 TripeCommand
80 TripeSynchronousCommand
81 TripeAsynchronousCommand
82 TripeCommandIterator
83 TripeConnection
84 TripeCommandDispatcher
85 TripeServiceManager
86 TripeService
87 TripeServiceCommand
88
89 Utility functions:
90 quotify
91 runservices
92 spawn
93 svcinfo
94 timespec
95 """
96
97 __pychecker__ = 'self=me no-constCond no-argsused'
98
99 _debug = False
100
101 ###--------------------------------------------------------------------------
102 ### External dependencies.
103
104 import socket as S
105 import errno as E
106 import mLib as M
107 import re as RX
108 import sys as SYS
109 import os as OS
110
111 try:
112 if OS.getenv('TRIPE_FORCE_RMCR') is not None:
113 raise ImportError
114 from py.magic import greenlet as _Coroutine
115 except ImportError:
116 from rmcr import Coroutine as _Coroutine
117
118 ###--------------------------------------------------------------------------
119 ### Coroutine hacking.
120
121 rootcr = _Coroutine.getcurrent()
122
123 class Coroutine (_Coroutine):
124 """
125 A coroutine class which can only be invoked by the root coroutine.
126
127 The root, by construction, cannot be an instance of this class.
128 """
129 def switch(me, *args, **kw):
130 assert _Coroutine.getcurrent() is rootcr
131 if _debug: print '* %s' % me
132 _Coroutine.switch(me, *args, **kw)
133 if _debug: print '* %s' % rootcr
134
135 ###--------------------------------------------------------------------------
136 ### Default places for things.
137
138 configdir = OS.environ.get('TRIPEDIR', "@configdir@")
139 socketdir = "@socketdir@"
140 PACKAGE = "@PACKAGE@"
141 VERSION = "@VERSION@"
142
143 tripesock = OS.environ.get('TRIPESOCK', OS.path.join(socketdir, 'tripesock'))
144 peerdb = OS.environ.get('TRIPEPEERDB', 'peers.cdb')
145
146 ###--------------------------------------------------------------------------
147 ### Connection to the server.
148
149 def readnonblockingly(sock, len):
150 """
151 Nonblocking read from SOCK.
152
153 Try to return LEN bytes. If couldn't read anything, return `None'. EOF is
154 returned as an empty string.
155 """
156 try:
157 sock.setblocking(0)
158 return sock.recv(len)
159 except S.error, exc:
160 if exc[0] == E.EWOULDBLOCK:
161 return None
162 raise
163
164 class TripeConnectionError (StandardError):
165 """Something happened to the connection with the server."""
166 pass
167 class TripeInternalError (StandardError):
168 """This program is very confused."""
169 pass
170
171 class TripeConnection (object):
172 """
173 A logical connection to the tripe administration socket.
174
175 There may or may not be a physical connection. (This is needed for the
176 monitor, for example.)
177
178 This class isn't very useful on its own, but it has useful subclasses. At
179 this level, the class is agnostic about I/O multiplexing schemes; that gets
180 added later.
181 """
182
183 def __init__(me, socket):
184 """
185 Make a connection to the named SOCKET.
186
187 No physical connection is made initially.
188 """
189 me.socket = socket
190 me.sock = None
191 me.lbuf = None
192 me.iowatch = SelIOWatcher(me)
193
194 def connect(me):
195 """
196 Ensure that there's a physical connection.
197
198 Do nothing if we're already connected. Invoke the `connected' method if
199 successful.
200 """
201 if me.sock: return
202 sock = S.socket(S.AF_UNIX, S.SOCK_STREAM)
203 sock.connect(me.socket)
204 me.sock = sock
205 me.lbuf = M.LineBuffer(me.line, me._eof)
206 me.lbuf.size = 1024
207 me.connected()
208 return me
209
210 def disconnect(me, reason):
211 """
212 Disconnect the physical connection.
213
214 Invoke the `disconnected' method, giving the provided REASON, which
215 should be either `None' or an exception.
216 """
217 if not me.sock: return
218 me.disconnected(reason)
219 me.sock.close()
220 me.sock = None
221 me.lbuf.disable()
222 me.lbuf = None
223 return me
224
225 def connectedp(me):
226 """
227 Return true if there's a current, believed-good physical connection.
228 """
229 return me.sock is not None
230
231 __nonzero__ = connectedp
232
233 def send(me, line):
234 """
235 Send the LINE to the connection's socket.
236
237 All output is done through this method; it can be overridden to provide
238 proper nonblocking writing, though this seems generally unnecessary.
239 """
240 try:
241 me.sock.setblocking(1)
242 me.sock.send(line + '\n')
243 except Exception, exc:
244 me.disconnect(exc)
245 raise
246 return me
247
248 def receive(me):
249 """
250 Receive whatever's ready from the connection's socket.
251
252 Call `line' on each complete line, and `eof' if the connection closed.
253 Subclasses which attach this class to an I/O-event system should call
254 this method when the socket (the `sock' attribute) is ready for reading.
255 """
256 while me.sock is not None:
257 try:
258 buf = readnonblockingly(me.sock, 16384)
259 except Exception, exc:
260 me.disconnect(exc)
261 raise
262 if buf is None:
263 return me
264 if buf == '':
265 me._eof()
266 return me
267 me.lbuf.flush(buf)
268 return me
269
270 def _eof(me):
271 """Internal end-of-file handler."""
272 me.disconnect(TripeConnectionError('connection lost'))
273 me.eof()
274
275 def connected(me):
276 """
277 To be overridden by subclasses to react to a connection being
278 established.
279 """
280 me.iowatch.connected(me.sock)
281
282 def disconnected(me, reason):
283 """
284 To be overridden by subclasses to react to a connection being severed.
285 """
286 me.iowatch.disconnected()
287
288 def eof(me):
289 """To be overridden by subclasses to handle end-of-file."""
290 pass
291
292 def line(me, line):
293 """To be overridden by subclasses to handle incoming lines."""
294 pass
295
296 ###--------------------------------------------------------------------------
297 ### I/O loop integration.
298
299 class SelIOWatcher (object):
300 """
301 Integration with mLib's I/O event system.
302
303 You can replace this object with a different one for integration with,
304 e.g., glib's main loop, by setting `CONN.iowatcher' to a different object
305 while the CONN is disconnected.
306 """
307
308 def __init__(me, conn):
309 me._conn = conn
310 me._selfile = None
311
312 def connected(me, sock):
313 """
314 Called when a connection is made.
315
316 SOCK is the socket. The watcher must arrange to call `CONN.receive' when
317 data is available.
318 """
319 me._selfile = M.SelFile(sock.fileno(), M.SEL_READ, me._conn.receive)
320 me._selfile.enable()
321
322 def disconnected(me):
323 """
324 Called when the connection is lost.
325 """
326 me._selfile = None
327
328 def iterate(me):
329 """
330 Wait for something interesting to happen, and issue events.
331
332 That is, basically, do one iteration of a main select loop, processing
333 all of the events, and then return. This isn't needed for
334 `TripeCommandDispatcher', but `runservices' wants it.
335 """
336 M.select()
337
338 ###--------------------------------------------------------------------------
339 ### Inter-coroutine communication.
340
341 class Queue (object):
342 """
343 A queue of things arriving asynchronously.
344
345 This is a very simple single-reader multiple-writer queue. It's useful for
346 more complex coroutines which need to cope with a variety of possible
347 incoming events.
348 """
349
350 def __init__(me):
351 """Create a new empty queue."""
352 me.contents = M.Array()
353 me.waiter = None
354
355 def _wait(me):
356 """
357 Internal: wait for an item to arrive in the queue.
358
359 Complain if someone is already waiting, because this is just a
360 single-reader queue.
361 """
362 if me.waiter:
363 raise ValueError('queue already being waited on')
364 try:
365 me.waiter = Coroutine.getcurrent()
366 while not me.contents:
367 me.waiter.parent.switch()
368 finally:
369 me.waiter = None
370
371 def get(me):
372 """
373 Remove and return the item at the head of the queue.
374
375 If the queue is empty, wait until an item arrives.
376 """
377 me._wait()
378 return me.contents.shift()
379
380 def peek(me):
381 """
382 Return the item at the head of the queue without removing it.
383
384 If the queue is empty, wait until an item arrives.
385 """
386 me._wait()
387 return me.contents[0]
388
389 def put(me, thing):
390 """
391 Write THING to the queue.
392
393 If someone is waiting on the queue, wake him up immediately; otherwise
394 just leave the item there for later.
395 """
396 me.contents.push(thing)
397 if me.waiter:
398 me.waiter.switch()
399
400 ###--------------------------------------------------------------------------
401 ### Dispatching coroutine.
402
403 ## Match a string if it can stand on its own as a bareword: i.e., it doesn't
404 ## contain backslashes, quotes or whitespace.
405 rx_ordinary = RX.compile(r'^[^\\\'\"\s]+$')
406
407 ## Match characters which need to be escaped, even in quoted text.
408 rx_weird = RX.compile(r'([\\\'])')
409
410 def quotify(s):
411 """Quote S according to the tripe-admin(5) rules."""
412 m = rx_ordinary.match(s)
413 if m and m.end() == len(s):
414 return s
415 else:
416 return "'" + rx_weird.sub(r'\\\1', s) + "'"
417
418 def _callback(func):
419 """
420 Return a wrapper for FUNC which reports exceptions thrown by it.
421
422 Useful in the case of callbacks invoked by C functions which ignore
423 exceptions.
424 """
425 def _(*a, **kw):
426 try:
427 return func(*a, **kw)
428 except:
429 SYS.excepthook(*SYS.exc_info())
430 raise
431 return _
432
433 class TripeCommand (object):
434 """
435 This abstract class represents a command in progress.
436
437 The `words' attribute contains the list of tokens which make up the
438 command.
439
440 Subclasses must implement a method to handle server responses:
441
442 * response(CODE, *ARGS): CODE is one of the strings `OK', `INFO' or
443 `FAIL'; ARGS are the remaining tokens from the server's response.
444 """
445
446 def __init__(me, words):
447 """Make a new command consisting of the given list of WORDS."""
448 me.words = words
449
450 class TripeSynchronousCommand (TripeCommand):
451 """
452 A simple command, processed apparently synchronously.
453
454 Must be invoked from a coroutine other than the root (or whichever one is
455 running the dispatcher); in reality, other coroutines carry on running
456 while we wait for a response from the server.
457
458 Each server response causes the calling coroutine to be resumed with the
459 pair (CODE, REST) -- where CODE is the server's response code (`OK', `INFO'
460 or `FAIL') and REST is a list of the server's other response tokens. The
461 calling coroutine must continue switching back to the dispatcher until a
462 terminating response (`OK' or `FAIL') is received or become very
463 confused.
464
465 Mostly it's better to use the `TripeCommandIterator' to do this
466 automatically.
467 """
468
469 def __init__(me, words):
470 """Initialize the command, specifying the WORDS to send to the server."""
471 TripeCommand.__init__(me, words)
472 me.owner = Coroutine.getcurrent()
473
474 def response(me, code, *rest):
475 """Handle a server response by forwarding it to the calling coroutine."""
476 me.owner.switch((code, rest))
477
478 class TripeError (StandardError):
479 """
480 A tripe command failed with an error (a `FAIL' code). The args attribute
481 contains a list of the server's message tokens.
482 """
483 pass
484
485 class TripeCommandIterator (object):
486 """
487 Iterator interface to a tripe command.
488
489 The values returned by the iterator are lists of tokens from the server's
490 `INFO' lines, as processed by the given filter function, if any. The
491 iterator completes normally (by raising `StopIteration') if the server
492 reported `OK', and raises an exception if the command failed for some reason.
493
494 A `TripeError' is raised if the server issues a `FAIL' code. If the
495 connection failed, some other exception is raised.
496 """
497
498 def __init__(me, dispatcher, words, bg = False, filter = None):
499 """
500 Create a new command iterator.
501
502 The command is submitted to the DISPATCHER; it consists of the given
503 WORDS. If BG is true, then an option is inserted to request that the
504 server run the command in the background. The FILTER is applied to the
505 token lists which the server responds, and the filter's output are the
506 items returned by the iterator.
507 """
508 me.dcr = Coroutine.getcurrent().parent
509 if me.dcr is None:
510 raise ValueError, 'must invoke from coroutine'
511 me.filter = filter or (lambda x: x)
512 if bg:
513 words = [words[0], '-background', dispatcher.bgtag()] + list(words[1:])
514 dispatcher.rawcommand(TripeSynchronousCommand(words))
515
516 def __iter__(me):
517 """Iterator protocol: I am my own iterator."""
518 return me
519
520 def next(me):
521 """
522 Iterator protocol: return the next piece of information from the server.
523
524 `INFO' responses are filtered and returned as the values of the
525 iteration. `FAIL' and `CONNERR' responses are turned into exceptions and
526 raised. Finally, `OK' is turned into `StopIteration', which should cause
527 a normal end to the iteration process.
528 """
529 thing = me.dcr.switch()
530 code, rest = thing
531 if code == 'INFO':
532 return me.filter(rest)
533 elif code == 'OK':
534 raise StopIteration
535 elif code == 'CONNERR':
536 if rest is None:
537 raise TripeConnectionError, 'connection terminated by user'
538 else:
539 raise rest
540 elif code == 'FAIL':
541 raise TripeError(*rest)
542 else:
543 raise TripeInternalError \
544 ('unexpected tripe response %r' % ([code] + rest))
545
546 ### Simple utility functions for the TripeCommandIterator convenience
547 ### methods.
548
549 def _tokenjoin(words):
550 """Filter function: simply join the given tokens with spaces between."""
551 return ' '.join(words)
552
553 def _keyvals(iter):
554 """Return a dictionary formed from the `KEY=VALUE' pairs returned by the
555 iterator ITER."""
556 kv = {}
557 for ww in iter:
558 for w in ww:
559 q = w.index('=')
560 kv[w[:q]] = w[q + 1:]
561 return kv
562
563 def _simple(iter):
564 """Raise an error if ITER contains any item."""
565 stuff = list(iter)
566 if len(stuff) != 0:
567 raise TripeInternalError('expected no response')
568 return None
569
570 def _oneline(iter):
571 """If ITER contains a single item, return it; otherwise raise an error."""
572 stuff = list(iter)
573 if len(stuff) != 1:
574 raise TripeInternalError('expected only one line of response')
575 return stuff[0]
576
577 def _tracelike(iter):
578 """Handle a TRACE-like command. The result is a list of tuples (CHAR,
579 STATUS, DESC): CHAR is a selector character, STATUS is the status (empty if
580 disabled, `+' if enabled, maybe something else later), and DESC is the
581 human-readable description."""
582 stuff = []
583 for ww in iter:
584 ch = ww[0][0]
585 st = ww[0][1:]
586 desc = ' '.join(ww[1:])
587 stuff.append((ch, st, desc))
588 return stuff
589
590 def _kwopts(kw, allowed):
591 """Parse keyword arguments into options. ALLOWED is a list of allowable
592 keywords; raise errors if other keywords are present. `KEY = VALUE'
593 becomes an option pair `-KEY VALUE' if VALUE is a string, just the option
594 `-KEY' if VALUE is a true non-string, or nothing if VALUE is false. Insert
595 a `--' at the end to stop the parser getting confused."""
596 opts = []
597 amap = {}
598 for a in allowed: amap[a] = True
599 for k, v in kw.iteritems():
600 if k not in amap:
601 raise ValueError('option %s not allowed here' % k)
602 if isinstance(v, str):
603 opts += ['-' + k, v]
604 elif v:
605 opts += ['-' + k]
606 opts.append('--')
607 return opts
608
609 ## Deferral.
610 _deferq = []
611 def defer(func, *args, **kw):
612 """Call FUNC(*ARGS, **KW) later, in the root coroutine."""
613 _deferq.append((func, args, kw))
614
615 def funargstr(func, args, kw):
616 items = [repr(a) for a in args]
617 for k, v in kw.iteritems():
618 items.append('%s = %r' % (k, v))
619 return '%s(%s)' % (func.__name__, ', '.join(items))
620
621 def spawn(func, *args, **kw):
622 """Call FUNC, passing ARGS and KW, in a fresh coroutine."""
623 defer(lambda: (Coroutine(func, name = funargstr(func, args, kw))
624 .switch(*args, **kw)))
625
626 ## Asides.
627 _asideq = Queue()
628 def _runasides():
629 """
630 Read (FUNC, ARGS, KW) triples from queue and invoke FUNC(*ARGS, **KW).
631 """
632 while True:
633 func, args, kw = _asideq.get()
634 try:
635 func(*args, **kw)
636 except:
637 SYS.excepthook(*SYS.exc_info())
638
639 def aside(func, *args, **kw):
640 """Call FUNC(*ARGS, **KW) later, in a non-root coroutine."""
641 defer(_asideq.put, (func, args, kw))
642
643 class TripeCommandDispatcher (TripeConnection):
644 """
645 Command dispatcher.
646
647 The command dispatcher is a connection which knows how to handle commands.
648 This is probably the most important class in this module to understand.
649
650 Lines from the server are parsed into tokens. The first token is a code
651 (`OK' or `NOTE' or something) explaining what kind of line this is. The
652 `handler' attribute is a dictionary mapping server line codes to handler
653 functions, which are applied to the words of the line as individual
654 arguments. *Exception*: the content of `TRACE' lines is not tokenized.
655
656 There are default handlers for server codes which respond to commands.
657 Commands arrive as `TripeCommand' instances through the `rawcommand'
658 interface. The dispatcher keeps track of which command objects represent
659 which jobs, and sends responses on to the appropriate command objects by
660 invoking their `response' methods. Command objects don't see the `BG...'
661 codes, because the dispatcher has already transformed them into regular
662 codes when it was looking up the job tag.
663
664 The dispatcher also has a special response code of its own: `CONNERR'
665 indicates that the connection failed and the command has therefore been
666 lost. This is sent to all outstanding commands when a connection error is
667 encountered: rather than a token list, it is accompanied by an exception
668 object which is the cause of the disconnection, which may be `None' if the
669 disconnection is expected (e.g., the direct result of a user request).
670 """
671
672 ## --- Infrastructure ---
673 ##
674 ## We will get confused if we pipeline commands. Send them one at a time.
675 ## Only send a command when the previous one detaches or completes.
676 ##
677 ## The following attributes are interesting:
678 ##
679 ## tagseq Sequence number for next background job (for bgtag)
680 ##
681 ## queue Commands awaiting submission.
682 ##
683 ## cmd Mapping from job tags to commands: cmd[None] is the
684 ## foreground command.
685 ##
686 ## handler Mapping from server codes to handler functions.
687
688 def __init__(me, socket):
689 """
690 Initialize the dispatcher.
691
692 The SOCKET is the filename of the administration socket to connect to,
693 for TripeConnection.__init__.
694 """
695 TripeConnection.__init__(me, socket)
696 me.tagseq = 0
697 me.handler = {}
698 me.handler['BGDETACH'] = me._detach
699 for i in 'BGOK', 'BGINFO', 'BGFAIL':
700 me.handler[i] = me._response
701 for i in 'OK', 'INFO', 'FAIL':
702 me.handler[i] = me._fgresponse
703
704 def quitp(me):
705 """Should we quit the main loop? Subclasses should override."""
706 return False
707
708 def mainloop(me, quitp = None):
709 """
710 Iterate the I/O watcher until QUITP returns true.
711
712 Arranges for asides and deferred calls to be made at the right times.
713 """
714
715 global _deferq
716 assert _Coroutine.getcurrent() is rootcr
717 Coroutine(_runasides, name = '_runasides').switch()
718 if quitp is None:
719 quitp = me.quitp
720 while not quitp():
721 while _deferq:
722 q = _deferq
723 _deferq = []
724 for func, args, kw in q:
725 func(*args, **kw)
726 me.iowatch.iterate()
727
728 def connected(me):
729 """
730 Connection hook.
731
732 If a subclass overrides this method, it must call us; clears out the
733 command queue and job map.
734 """
735 me.queue = M.Array()
736 me.cmd = {}
737 TripeConnection.connected(me)
738
739 def disconnected(me, reason):
740 """
741 Disconnection hook.
742
743 If a subclass hooks overrides this method, it must call us; sends a
744 special `CONNERR' code to all incomplete commands.
745 """
746 TripeConnection.disconnected(me, reason)
747 for cmd in me.cmd.itervalues():
748 cmd.response('CONNERR', reason)
749 for cmd in me.queue:
750 cmd.response('CONNERR', reason)
751
752 @_callback
753 def line(me, line):
754 """Handle an incoming line, sending it to the right place."""
755 if _debug: print '<', line
756 code, rest = M.word(line, quotep = True)
757 func = me.handler.get(code)
758 if func is not None:
759 if code == 'TRACE':
760 func(code, rest)
761 else:
762 func(code, *M.split(rest, quotep = True)[0])
763 me.dequeue()
764
765 def dequeue(me):
766 """
767 Pull the oldest command off the queue and try to send it to the server.
768 """
769 if not me.queue or None in me.cmd: return
770 cmd = me.queue.shift()
771 if _debug: print '>', ' '.join([quotify(w) for w in cmd.words])
772 me.send(' '.join([quotify(w) for w in cmd.words]))
773 me.cmd[None] = cmd
774
775 def bgtag(me):
776 """
777 Return an unused job tag.
778
779 May be of use when composing commands by hand.
780 """
781 tag = 'J%05d' % me.tagseq
782 me.tagseq += 1
783 return tag
784
785 ## --- Built-in handler functions for server responses ---
786
787 def _detach(me, _, tag):
788 """
789 Respond to a `BGDETACH' TAG message.
790
791 Move the current foreground command to the background.
792 """
793 assert tag not in me.cmd
794 me.cmd[tag] = me.cmd[None]
795 del me.cmd[None]
796
797 def _response(me, code, tag, *w):
798 """
799 Respond to an `OK', `INFO' or `FAIL' message.
800
801 If this is a message for a background job, find the tag; then dispatch
802 the result to the command object. This is also called by `_fgresponse'
803 (wth TAG set to `None') to handle responses for foreground commands, and
804 is therefore a useful method to extend or override in subclasses.
805 """
806 if code.startswith('BG'):
807 code = code[2:]
808 cmd = me.cmd[tag]
809 if code != 'INFO':
810 del me.cmd[tag]
811 cmd.response(code, *w)
812
813 def _fgresponse(me, code, *w):
814 """Process responses to the foreground command."""
815 me._response(code, None, *w)
816
817 ## --- Interface methods ---
818
819 def rawcommand(me, cmd):
820 """
821 Submit the `TripeCommand' CMD to the server, and look after it until it
822 completes.
823 """
824 if not me.connectedp():
825 raise TripeConnectionError('connection closed')
826 me.queue.push(cmd)
827 me.dequeue()
828
829 def command(me, *cmd, **kw):
830 """Convenience wrapper for creating a TripeCommandIterator object."""
831 return TripeCommandIterator(me, cmd, **kw)
832
833 ## --- Convenience methods for server commands ---
834
835 def add(me, peer, *addr, **kw):
836 return _simple(me.command(bg = True,
837 *['ADD'] +
838 _kwopts(kw, ['tunnel', 'keepalive',
839 'key', 'priv', 'cork',
840 'mobile']) +
841 [peer] +
842 list(addr)))
843 def addr(me, peer):
844 return _oneline(me.command('ADDR', peer))
845 def algs(me, peer = None):
846 return _keyvals(me.command('ALGS',
847 *((peer is not None and [peer]) or [])))
848 def checkchal(me, chal):
849 return _simple(me.command('CHECKCHAL', chal))
850 def daemon(me):
851 return _simple(me.command('DAEMON'))
852 def eping(me, peer, **kw):
853 return _oneline(me.command(bg = True,
854 *['PING'] +
855 _kwopts(kw, ['timeout']) +
856 [peer]))
857 def forcekx(me, peer):
858 return _simple(me.command('FORCEKX', peer))
859 def getchal(me):
860 return _oneline(me.command('GETCHAL', filter = _tokenjoin))
861 def greet(me, peer, chal):
862 return _simple(me.command('GREET', peer, chal))
863 def help(me):
864 return list(me.command('HELP', filter = _tokenjoin))
865 def ifname(me, peer):
866 return _oneline(me.command('IFNAME', peer, filter = _tokenjoin))
867 def kill(me, peer):
868 return _simple(me.command('KILL', peer))
869 def list(me):
870 return list(me.command('LIST', filter = _tokenjoin))
871 def notify(me, *msg):
872 return _simple(me.command('NOTIFY', *msg))
873 def peerinfo(me, peer):
874 return _keyvals(me.command('PEERINFO', peer))
875 def ping(me, peer, **kw):
876 return _oneline(me.command(bg = True,
877 *['PING'] +
878 _kwopts(kw, ['timeout']) +
879 [peer]))
880 def port(me):
881 return _oneline(me.command('PORT', filter = _tokenjoin))
882 def quit(me):
883 return _simple(me.command('QUIT'))
884 def reload(me):
885 return _simple(me.command('RELOAD'))
886 def servinfo(me):
887 return _keyvals(me.command('SERVINFO'))
888 def setifname(me, new):
889 return _simple(me.command('SETIFNAME', new))
890 def svcclaim(me, service, version):
891 return _simple(me.command('SVCCLAIM', service, version))
892 def svcensure(me, service, version = None):
893 return _simple(me.command('SVCENSURE', service,
894 *((version is not None and [version]) or [])))
895 def svcfail(me, job, *msg):
896 return _simple(me.command('SVCFAIL', job, *msg))
897 def svcinfo(me, job, *msg):
898 return _simple(me.command('SVCINFO', job, *msg))
899 def svclist(me):
900 return list(me.command('SVCLIST'))
901 def svcok(me, job):
902 return _simple(me.command('SVCOK', job))
903 def svcquery(me, service):
904 return _keyvals(me.command('SVCQUERY', service))
905 def svcrelease(me, service):
906 return _simple(me.command('SVCRELEASE', service))
907 def svcsubmit(me, service, *args, **kw):
908 return me.command(bg = True,
909 *['SVCSUBMIT'] +
910 _kwopts(kw, ['version']) +
911 [service] +
912 list(args))
913 def stats(me, peer):
914 return _keyvals(me.command('STATS', peer))
915 def trace(me, *args):
916 return _tracelike(me.command('TRACE', *args))
917 def tunnels(me):
918 return list(me.command('TUNNELS', filter = _tokenjoin))
919 def version(me):
920 return _oneline(me.command('VERSION', filter = _tokenjoin))
921 def warn(me, *msg):
922 return _simple(me.command('WARN', *msg))
923 def watch(me, *args):
924 return _tracelike(me.command('WATCH', *args))
925
926 ###--------------------------------------------------------------------------
927 ### Asynchronous commands.
928
929 class TripeAsynchronousCommand (TripeCommand):
930 """
931 Asynchronous commands.
932
933 This is the complicated way of issuing commands. You must set up a queue,
934 and associate the command with the queue. Responses arriving for the
935 command will be put on the queue as an triple of the form (TAG, CODE, REST)
936 -- where TAG is an object of your choice, not interpreted by this class,
937 CODE is the server's response code (`OK', `INFO', `FAIL', or `CONNERR'),
938 and REST is the list of the rest of the server's tokens.
939
940 Using this, you can write coroutines which process many commands (and
941 possibly other events) simultaneously.
942 """
943
944 def __init__(me, queue, tag, words):
945 """Make an asynchronous command consisting of the given WORDS, which
946 sends responses to QUEUE, labelled with TAG."""
947 TripeCommand.__init__(me, words)
948 me.queue = queue
949 me.tag = tag
950
951 def response(me, code, *stuff):
952 """Handle a server response by writing it to the caller's queue."""
953 me.queue.put((me.tag, code, list(stuff)))
954
955 ###--------------------------------------------------------------------------
956 ### Services.
957
958 class TripeJobCancelled (Exception):
959 """
960 Exception sent to job handler if the client kills the job.
961
962 Not propagated further.
963 """
964 pass
965
966 class TripeJobError (Exception):
967 """
968 Exception to cause failure report for running job.
969
970 Sends an SVCFAIL code back.
971 """
972 pass
973
974 class TripeSyntaxError (Exception):
975 """
976 Exception to report a syntax error for a job.
977
978 Sends an SVCFAIL bad-svc-syntax message back.
979 """
980 pass
981
982 class TripeServiceManager (TripeCommandDispatcher):
983 """
984 A command dispatcher with added handling for incoming service requests.
985
986 There is usually only one instance of this class, called svcmgr. Some of
987 the support functions in this module assume that this is the case.
988
989 To use, run `mLib.select' in a loop until the quitp method returns true;
990 then, in a non-root coroutine, register your services by calling `add', and
991 then call `running' when you've finished setting up.
992
993 The instance handles server service messages `SVCJOB', `SVCCANCEL' and
994 `SVCCLAIM'. It maintains a table of running services. Incoming jobs cause
995 the service's `job' method to be invoked; `SVCCANCEL' sends a
996 `TripeJobCancelled' exception to the handler coroutine, and `SVCCLAIM'
997 causes the relevant service to be deregistered.
998
999 There is no base class for jobs, but a job must implement two methods:
1000
1001 start() Begin processing; might be a no-op.
1002
1003 cancel() Stop processing; the original client has killed the
1004 job.
1005
1006 The life of a service manager is divided into two parts: setup and running;
1007 you tell the manager that you've finished setting up by calling the
1008 `running' method. If, at any point after setup is finished, there are no
1009 remaining services or jobs, `quitp' will return true, ending the process.
1010 """
1011
1012 ## --- Attributes ---
1013 ##
1014 ## svc Mapping name -> service object
1015 ##
1016 ## job Mapping jobid -> job handler coroutine
1017 ##
1018 ## runningp True when setup is finished
1019 ##
1020 ## _quitp True if explicit quit has been requested
1021
1022 def __init__(me, socket):
1023 """
1024 Initialize the service manager.
1025
1026 SOCKET is the administration socket to connect to.
1027 """
1028 TripeCommandDispatcher.__init__(me, socket)
1029 me.svc = {}
1030 me.job = {}
1031 me.runningp = False
1032 me.handler['SVCCANCEL'] = me._cancel
1033 me.handler['SVCJOB'] = me._job
1034 me.handler['SVCCLAIM'] = me._claim
1035 me._quitp = 0
1036
1037 def addsvc(me, svc):
1038 """Register a new service; SVC is a `TripeService' instance."""
1039 assert svc.name not in me.svc
1040 me.svcclaim(svc.name, svc.version)
1041 me.svc[svc.name] = svc
1042
1043 def _cancel(me, _, jid):
1044 """
1045 Called when the server cancels a job; invokes the job's `cancel' method.
1046 """
1047 job = me.job[jid]
1048 del me.job[jid]
1049 job.cancel()
1050
1051 def _claim(me, _, svc, __):
1052 """Called when another program claims our service at a higher version."""
1053 del me.svc[svc]
1054
1055 def _job(me, _, jid, svc, cmd, *args):
1056 """
1057 Called when the server sends us a job to do.
1058
1059 Calls the service to collect a job, and begins processing it.
1060 """
1061 assert jid not in me.job
1062 svc = me.svc[svc.lower()]
1063 job = svc.job(jid, cmd, args)
1064 me.job[jid] = job
1065 job.start()
1066
1067 def running(me):
1068 """Answer true if setup is finished."""
1069 me.runningp = True
1070
1071 def jobdone(me, jid):
1072 """Informs the service manager that the job with id JID has finished."""
1073 try:
1074 del me.job[jid]
1075 except KeyError:
1076 pass
1077
1078 def quitp(me):
1079 """
1080 Return true if no services or jobs are active (and, therefore, if this
1081 process can quit without anyone caring).
1082 """
1083 return me._quitp or (me.runningp and ((not me.svc and not me.job) or
1084 not me.sock))
1085
1086 def quit(me):
1087 """Forces the quit flag (returned by quitp) on."""
1088 me._quitp = True
1089
1090 class TripeService (object):
1091 """
1092 A standard service.
1093
1094 The NAME and VERSION are passed on to the server. The CMDTAB is a
1095 dictionary mapping command names (in lowercase) to command objects.
1096
1097 If the CMDTAB doesn't have entries for commands `HELP' and `QUIT' then
1098 defaults are provided.
1099
1100 TripeService itself is mostly agnostic about the nature of command objects,
1101 but the TripeServiceJob class (below) has some requirements. The built-in
1102 HELP command requires command objects to have `usage' attributes.
1103 """
1104
1105 def __init__(me, name, version, cmdtab):
1106 """
1107 Create and register a new service with the given NAME and VERSION.
1108
1109 CMDTAB maps command names (in lower-case) to command objects.
1110 """
1111 me.name = name
1112 me.version = version
1113 me.cmd = cmdtab
1114 me.activep = True
1115 me.cmd.setdefault('help',
1116 TripeServiceCommand('help', 0, 0, '', me._help))
1117 me.cmd.setdefault('quit',
1118 TripeServiceCommand('quit', 0, 0, '', me._quit))
1119
1120 def job(me, jid, cmd, args):
1121 """
1122 Called by the service manager: a job arrived with id JID.
1123
1124 It asks for comamnd CMD with argument list ARGS. Creates a new job,
1125 passing it the information needed.
1126 """
1127 return TripeServiceJob(jid, me, cmd, me.cmd.get(cmd.lower()), args)
1128
1129 ## Simple default command handlers, complying with the spec in
1130 ## tripe-service(7).
1131
1132 def _help(me):
1133 """Send a help summary to the user."""
1134 cmds = me.cmd.items()
1135 cmds.sort()
1136 for name, cmd in cmds:
1137 svcinfo(name, *cmd.usage)
1138
1139 def _quit(me):
1140 """Terminate the service manager."""
1141 svcmgr.notify('svc-quit', me.name, 'admin-request')
1142 svcmgr.quit()
1143
1144 class TripeServiceCommand (object):
1145 """A simple service command."""
1146
1147 def __init__(me, name, min, max, usage, func):
1148 """
1149 Creates a new command.
1150
1151 NAME is the command's name (in lowercase).
1152
1153 MIN and MAX are the minimum and maximum number of allowed arguments (used
1154 for checking); either may be None to indicate no minimum or maximum.
1155
1156 USAGE is a usage string, used for generating help and error messages.
1157
1158 FUNC is the function to invoke.
1159 """
1160 me.name = name
1161 me.min = min
1162 me.max = max
1163 me.usage = usage.split()
1164 me.func = func
1165
1166 def run(me, *args):
1167 """
1168 Called when the command is invoked.
1169
1170 Does minimal checking of the arguments and calls the supplied function.
1171 """
1172 if (me.min is not None and len(args) < me.min) or \
1173 (me.max is not None and len(args) > me.max):
1174 raise TripeSyntaxError
1175 me.func(*args)
1176
1177 class TripeServiceJob (Coroutine):
1178 """
1179 Job handler coroutine.
1180
1181 A standard `TripeService' invokes a `TripeServiceJob' for each incoming job
1182 request, passing it the jobid, command and arguments, and a command object.
1183 The command object needs the following attributes.
1184
1185 usage A usage list (excluding the command name) showing
1186 arguments and options.
1187
1188 run(*ARGS) Function to react to the command with ARGS split into
1189 separate arguments. Invoked in a coroutine. The
1190 `svcinfo function (not the `TripeCommandDispatcher'
1191 method) may be used to send `INFO' lines. The
1192 function may raise `TripeJobError' to send a `FAIL'
1193 response back, or `TripeSyntaxError' to send a
1194 generic usage error. `TripeJobCancelled' exceptions
1195 are trapped silently. Other exceptions are
1196 translated into a generic internal-error message.
1197
1198 This class automatically takes care of sending some closing response to the
1199 job, and for informing the service manager that the job is completed.
1200
1201 The `jid' attribute stores the job's id.
1202 """
1203
1204 def __init__(me, jid, svc, cmd, command, args):
1205 """
1206 Start a new job.
1207
1208 The job is created with id JID, for service SVC, processing command name
1209 CMD (which the service resolved into the command object COMMAND, or
1210 `None'), and with the arguments ARGS.
1211 """
1212 Coroutine.__init__(me)
1213 me.jid = jid
1214 me.svc = svc
1215 me.cmd = cmd
1216 me.command = command
1217 me.args = args
1218
1219 def run(me):
1220 """
1221 Main body of the coroutine.
1222
1223 Does the tedious exception handling boilerplate and invokes the command's
1224 run method.
1225 """
1226 try:
1227 try:
1228 if me.command is None:
1229 svcmgr.svcfail(me.jid, 'unknown-svc-command', me.cmd)
1230 else:
1231 me.command.run(*me.args)
1232 svcmgr.svcok(me.jid)
1233 except TripeJobError, exc:
1234 svcmgr.svcfail(me.jid, *exc.args)
1235 except TripeSyntaxError:
1236 svcmgr.svcfail(me.jid, 'bad-svc-syntax',
1237 me.svc.name, me.command.name,
1238 *me.command.usage)
1239 except TripeJobCancelled:
1240 pass
1241 except Exception, exc:
1242 svcmgr.svcfail(me.jid, 'svc-internal-error',
1243 exc.__class__.__name__, str(exc))
1244 finally:
1245 svcmgr.jobdone(me.jid)
1246
1247 def start(me):
1248 """Invoked by the service manager to start running the coroutine."""
1249 me.switch()
1250
1251 def cancel(me):
1252 """Invoked by the service manager to cancel the job."""
1253 me.throw(TripeJobCancelled())
1254
1255 def svcinfo(*args):
1256 """
1257 If invoked from a TripeServiceJob coroutine, sends an `INFO' line to the
1258 job's sender, automatically using the correct job id.
1259 """
1260 svcmgr.svcinfo(Coroutine.getcurrent().jid, *args)
1261
1262 def _setupsvc(tab, func):
1263 """
1264 Setup coroutine for setting up service programs.
1265
1266 Register the given services.
1267 """
1268 try:
1269 for service in tab:
1270 svcmgr.addsvc(service)
1271 if func:
1272 func()
1273 finally:
1274 svcmgr.running()
1275
1276 svcmgr = TripeServiceManager(None)
1277 def runservices(socket, tab, init = None, setup = None, daemon = False):
1278 """
1279 Function to start a service provider.
1280
1281 SOCKET is the socket to connect to, usually tripesock.
1282
1283 TAB is a list of entries. An entry may be either a tuple
1284
1285 (NAME, VERSION, COMMANDS)
1286
1287 or a service object (e.g., a `TripeService' instance).
1288
1289 COMMANDS is a dictionary mapping command names to tuples
1290
1291 (MIN, MAX, USAGE, FUNC)
1292
1293 of arguments for a `TripeServiceCommand' object.
1294
1295 If DAEMON is true, then the process is forked into the background before we
1296 start. If INIT is given, it is called in the main coroutine, immediately
1297 after forking. If SETUP is given, it is called in a coroutine, after
1298 calling INIT and setting up the services but before marking the service
1299 manager as running.
1300
1301 It is a really bad idea to do any initialization, particularly setting up
1302 coroutines, outside of the INIT or SETUP functions. In particular, if
1303 we're using rmcr for fake coroutines, the daemonizing fork will kill off
1304 the currently established coroutines in a most surprising way.
1305
1306 The function runs a main select loop until the service manager decides to
1307 quit.
1308 """
1309
1310 svcmgr.socket = socket
1311 svcmgr.connect()
1312 svcs = []
1313 for service in tab:
1314 if not isinstance(service, tuple):
1315 svcs.append(service)
1316 else:
1317 name, version, commands = service
1318 cmdmap = {}
1319 for cmd, stuff in commands.iteritems():
1320 cmdmap[cmd] = TripeServiceCommand(cmd, *stuff)
1321 svcs.append(TripeService(name, version, cmdmap))
1322 if daemon:
1323 M.daemonize()
1324 if init is not None:
1325 init()
1326 spawn(_setupsvc, svcs, setup)
1327 svcmgr.mainloop()
1328
1329 ###--------------------------------------------------------------------------
1330 ### Utilities for services.
1331
1332 _timeunits = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}
1333 def timespec(spec):
1334 """Parse the timespec SPEC, returning a number of seconds."""
1335 mul = 1
1336 if len(spec) > 1 and spec[-1] in _timeunits:
1337 mul = _timeunits[spec[-1]]
1338 spec = spec[:-1]
1339 try:
1340 t = int(spec)
1341 except:
1342 raise TripeJobError('bad-time-spec', spec)
1343 if t < 0:
1344 raise TripeJobError('bad-time-spec', spec)
1345 return mul * int(spec)
1346
1347 class OptParse (object):
1348 """
1349 Parse options from a command list in the conventional fashion.
1350
1351 ARGS is a list of arguments to a command. ALLOWED is a sequence of allowed
1352 options. The returned values are the option tags. During parsing, the
1353 `arg' method may be used to retrieve the argument for the most recent
1354 option. Afterwards, `rest' may be used to retrieve the remaining
1355 non-option arguments, and do a simple check on how many there are.
1356
1357 The parser correctly handles `--' option terminators.
1358 """
1359
1360 def __init__(me, args, allowed):
1361 """
1362 Create a new option parser.
1363
1364 The parser will scan the ARGS for options given in the sequence ALLOWED
1365 (which are expected to include the `-' prefix).
1366 """
1367 me.allowed = {}
1368 for a in allowed:
1369 me.allowed[a] = True
1370 me.args = list(args)
1371
1372 def __iter__(me):
1373 """Iterator protocol: I am my own iterator."""
1374 return me
1375
1376 def next(me):
1377 """
1378 Iterator protocol: return the next option.
1379
1380 If we've run out, raise `StopIteration'.
1381 """
1382 if len(me.args) == 0 or \
1383 len(me.args[0]) < 2 or \
1384 not me.args[0].startswith('-'):
1385 raise StopIteration
1386 opt = me.args.pop(0)
1387 if opt == '--':
1388 raise StopIteration
1389 if opt not in me.allowed:
1390 raise TripeSyntaxError
1391 return opt
1392
1393 def arg(me):
1394 """
1395 Return the argument for the most recent option.
1396
1397 If none is available, raise `TripeSyntaxError'.
1398 """
1399 if len(me.args) == 0:
1400 raise TripeSyntaxError
1401 return me.args.pop(0)
1402
1403 def rest(me, min = None, max = None):
1404 """
1405 After option parsing is done, return the remaining arguments.
1406
1407 Check that there are at least MIN and at most MAX arguments remaining --
1408 either may be None to suppress the check.
1409 """
1410 if (min is not None and len(me.args) < min) or \
1411 (max is not None and len(me.args) > max):
1412 raise TripeSyntaxError
1413 return me.args
1414
1415 ###----- That's all, folks --------------------------------------------------