3 ### Administration connection with tripe server
5 ### (c) 2006 Straylight/Edgeware
8 ###----- Licensing notice ---------------------------------------------------
10 ### This file is part of Trivial IP Encryption (TrIPE).
12 ### TrIPE is free software; you can redistribute it and/or modify
13 ### it under the terms of the GNU General Public License as published by
14 ### the Free Software Foundation; either version 2 of the License, or
15 ### (at your option) any later version.
17 ### TrIPE is distributed in the hope that it will be useful,
18 ### but WITHOUT ANY WARRANTY; without even the implied warranty of
19 ### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 ### GNU General Public License for more details.
22 ### You should have received a copy of the GNU General Public License
23 ### along with TrIPE; if not, write to the Free Software Foundation,
24 ### Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
27 This module provides classes and functions for connecting to a running tripe
28 server, sending it commands, receiving and processing replies, and
29 implementing services.
31 Rather than end up in lost in a storm of little event-driven classes, or a
32 morass of concurrent threads, the module uses coroutines to present a fairly
33 simple function call/return interface to potentially long-running commands
34 which must run without blocking the main process. It assumes a coroutine
35 module presenting a subset of the `greenlet' interface: if actual greenlets
36 are available, they are used; otherwise there's an implementation in terms of
37 threads (with lots of locking) which will do instead.
39 The simple rule governing the coroutines used here is this:
41 * The root coroutine never cares what values are passed to it when it
42 resumes: it just discards them.
44 * Other, non-root, coroutines are presumed to be waiting for some specific
47 Configuration variables:
55 Other useful variables:
80 TripeSynchronousCommand
81 TripeAsynchronousCommand
84 TripeCommandDispatcher
97 __pychecker__
= 'self=me no-constCond no-argsused'
101 ###--------------------------------------------------------------------------
102 ### External dependencies.
112 if OS
.getenv('TRIPE_FORCE_RMCR') is not None:
114 from py
.magic
import greenlet
as _Coroutine
116 from rmcr
import Coroutine
as _Coroutine
118 ###--------------------------------------------------------------------------
119 ### Coroutine hacking.
121 rootcr
= _Coroutine
.getcurrent()
123 class Coroutine (_Coroutine
):
125 A coroutine class which can only be invoked by the root coroutine.
127 The root, by construction, cannot be an instance of this class.
129 def switch(me
, *args
, **kw
):
130 assert _Coroutine
.getcurrent() is rootcr
131 if _debug
: print '* %s' % me
132 _Coroutine
.switch(me
, *args
, **kw
)
133 if _debug
: print '* %s' % rootcr
135 ###--------------------------------------------------------------------------
136 ### Default places for things.
138 configdir
= OS
.environ
.get('TRIPEDIR', "@configdir@")
139 socketdir
= "@socketdir@"
140 PACKAGE
= "@PACKAGE@"
141 VERSION
= "@VERSION@"
143 tripesock
= OS
.environ
.get('TRIPESOCK', OS
.path
.join(socketdir
, 'tripesock'))
144 peerdb
= OS
.environ
.get('TRIPEPEERDB', 'peers.cdb')
146 ###--------------------------------------------------------------------------
147 ### Connection to the server.
149 def readnonblockingly(sock
, len):
151 Nonblocking read from SOCK.
153 Try to return LEN bytes. If couldn't read anything, return `None'. EOF is
154 returned as an empty string.
158 return sock
.recv(len)
160 if exc
[0] == E
.EWOULDBLOCK
:
164 class TripeConnectionError (StandardError):
165 """Something happened to the connection with the server."""
167 class TripeInternalError (StandardError):
168 """This program is very confused."""
171 class TripeConnection (object):
173 A logical connection to the tripe administration socket.
175 There may or may not be a physical connection. (This is needed for the
176 monitor, for example.)
178 This class isn't very useful on its own, but it has useful subclasses. At
179 this level, the class is agnostic about I/O multiplexing schemes; that gets
183 def __init__(me
, socket
):
185 Make a connection to the named SOCKET.
187 No physical connection is made initially.
192 me
.iowatch
= SelIOWatcher(me
)
196 Ensure that there's a physical connection.
198 Do nothing if we're already connected. Invoke the `connected' method if
202 sock
= S
.socket(S
.AF_UNIX
, S
.SOCK_STREAM
)
203 sock
.connect(me
.socket
)
205 me
.lbuf
= M
.LineBuffer(me
.line
, me
._eof
)
210 def disconnect(me
, reason
):
212 Disconnect the physical connection.
214 Invoke the `disconnected' method, giving the provided REASON, which
215 should be either `None' or an exception.
217 if not me
.sock
: return
218 me
.disconnected(reason
)
227 Return true if there's a current, believed-good physical connection.
229 return me
.sock
is not None
231 __nonzero__
= connectedp
235 Send the LINE to the connection's socket.
237 All output is done through this method; it can be overridden to provide
238 proper nonblocking writing, though this seems generally unnecessary.
241 me
.sock
.setblocking(1)
242 me
.sock
.send(line
+ '\n')
243 except Exception, exc
:
250 Receive whatever's ready from the connection's socket.
252 Call `line' on each complete line, and `eof' if the connection closed.
253 Subclasses which attach this class to an I/O-event system should call
254 this method when the socket (the `sock' attribute) is ready for reading.
256 while me
.sock
is not None:
258 buf
= readnonblockingly(me
.sock
, 16384)
259 except Exception, exc
:
271 """Internal end-of-file handler."""
272 me
.disconnect(TripeConnectionError('connection lost'))
277 To be overridden by subclasses to react to a connection being
280 me
.iowatch
.connected(me
.sock
)
282 def disconnected(me
, reason
):
284 To be overridden by subclasses to react to a connection being severed.
286 me
.iowatch
.disconnected()
289 """To be overridden by subclasses to handle end-of-file."""
293 """To be overridden by subclasses to handle incoming lines."""
296 ###--------------------------------------------------------------------------
297 ### I/O loop integration.
299 class SelIOWatcher (object):
301 Integration with mLib's I/O event system.
303 You can replace this object with a different one for integration with,
304 e.g., glib's main loop, by setting `CONN.iowatcher' to a different object
305 while the CONN is disconnected.
308 def __init__(me
, conn
):
312 def connected(me
, sock
):
314 Called when a connection is made.
316 SOCK is the socket. The watcher must arrange to call `CONN.receive' when
319 me
._selfile
= M
.SelFile(sock
.fileno(), M
.SEL_READ
, me
._conn
.receive
)
322 def disconnected(me
):
324 Called when the connection is lost.
330 Wait for something interesting to happen, and issue events.
332 That is, basically, do one iteration of a main select loop, processing
333 all of the events, and then return. This is used in the method
334 `TripeCommandDispatcher.mainloop', but that's mostly for the benefit of
335 `runservices'; if your I/O watcher has a different main loop, you can
340 ###--------------------------------------------------------------------------
341 ### Inter-coroutine communication.
343 class Queue (object):
345 A queue of things arriving asynchronously.
347 This is a very simple single-reader multiple-writer queue. It's useful for
348 more complex coroutines which need to cope with a variety of possible
353 """Create a new empty queue."""
354 me
.contents
= M
.Array()
359 Internal: wait for an item to arrive in the queue.
361 Complain if someone is already waiting, because this is just a
365 raise ValueError('queue already being waited on')
367 me
.waiter
= Coroutine
.getcurrent()
368 while not me
.contents
:
369 me
.waiter
.parent
.switch()
375 Remove and return the item at the head of the queue.
377 If the queue is empty, wait until an item arrives.
380 return me
.contents
.shift()
384 Return the item at the head of the queue without removing it.
386 If the queue is empty, wait until an item arrives.
389 return me
.contents
[0]
393 Write THING to the queue.
395 If someone is waiting on the queue, wake him up immediately; otherwise
396 just leave the item there for later.
398 me
.contents
.push(thing
)
402 ###--------------------------------------------------------------------------
403 ### Dispatching coroutine.
405 ## Match a string if it can stand on its own as a bareword: i.e., it doesn't
406 ## contain backslashes, quotes or whitespace.
407 rx_ordinary
= RX
.compile(r
'^[^\\\'\"\s
]+$
')
409 ## Match characters which need to be escaped, even in quoted text.
410 rx_weird = RX.compile(r'([\\\'])')
413 """Quote S according to the tripe-admin(5) rules."""
414 m = rx_ordinary.match(s)
415 if m and m.end() == len(s):
418 return "'" + rx_weird.sub(r'\\\1', s) + "'"
422 Return a wrapper for FUNC which reports exceptions thrown by it.
424 Useful in the case of callbacks invoked by C functions which ignore
429 return func(*a, **kw)
431 SYS.excepthook(*SYS.exc_info())
435 class TripeCommand (object):
437 This abstract class represents a command in progress.
439 The `words' attribute contains the
list of tokens which make up the
442 Subclasses must implement a method to handle server responses
:
444 * response(CODE
, *ARGS
): CODE
is one of the strings `OK
', `INFO' or
445 `FAIL
'; ARGS are the remaining tokens from the server's response
.
448 def __init__(me, words):
449 """Make a new command consisting of the given
list of WORDS
."""
452 class TripeSynchronousCommand (TripeCommand):
454 A simple command
, processed apparently synchronously
.
456 Must be invoked
from a coroutine other than the
root (or whichever one
is
457 running the dispatcher
); in reality
, other coroutines carry on running
458 while we wait
for a response
from the server
.
460 Each server response causes the calling coroutine to be resumed with the
461 pair (CODE
, REST
) -- where CODE
is the server
's response code (`OK', `INFO
'
462 or `FAIL') and REST
is a
list of the server
's other response tokens. The
463 calling coroutine must continue switching back to the dispatcher until a
464 terminating response (`OK' or `FAIL
') is received or become very
467 Mostly it's better to use the `TripeCommandIterator
' to do this
471 def __init__(me, words):
472 """Initialize the command, specifying the WORDS to send to the server."""
473 TripeCommand.__init__(me, words)
474 me.owner = Coroutine.getcurrent()
476 def response(me, code, *rest):
477 """Handle a server response by forwarding it to the calling coroutine."""
478 me.owner.switch((code, rest))
480 class TripeError (StandardError):
482 A tripe command failed with an error (a `FAIL' code
). The args attribute
483 contains a
list of the server
's message tokens.
487 class TripeCommandIterator (object):
489 Iterator interface to a tripe command.
491 The values returned by the iterator are lists of tokens from the server's
492 `INFO
' lines, as processed by the given filter function, if any. The
493 iterator completes normally (by raising `StopIteration') if the server
494 reported `OK
', and raises an exception if the command failed for some reason.
496 A `TripeError' is raised
if the server issues a `FAIL
' code. If the
497 connection failed, some other exception is raised.
500 def __init__(me, dispatcher, words, bg = False, filter = None):
502 Create a new command iterator.
504 The command is submitted to the DISPATCHER; it consists of the given
505 WORDS. If BG is true, then an option is inserted to request that the
506 server run the command in the background. The FILTER is applied to the
507 token lists which the server responds, and the filter's output are the
508 items returned by the iterator
.
510 me.dcr = Coroutine.getcurrent().parent
512 raise ValueError, 'must invoke from coroutine'
513 me.filter = filter or (lambda x: x)
515 words = [words[0], '-background', dispatcher.bgtag()] + list(words[1:])
516 dispatcher.rawcommand(TripeSynchronousCommand(words))
519 """Iterator protocol
: I am my own iterator
."""
524 Iterator protocol
: return the next piece of information
from the server
.
526 `INFO
' responses are filtered and returned as the values of the
527 iteration. `FAIL' and `CONNERR
' responses are turned into exceptions and
528 raised. Finally, `OK' is turned into `
StopIteration', which should cause
529 a normal end to the iteration process.
531 thing = me.dcr.switch()
534 return me.filter(rest)
537 elif code == 'CONNERR
':
539 raise TripeConnectionError, 'connection terminated by user
'
543 raise TripeError(*rest)
545 raise TripeInternalError \
546 ('unexpected tripe response %r
' % ([code] + rest))
548 ### Simple utility functions for the TripeCommandIterator convenience
551 def _tokenjoin(words):
552 """Filter function: simply join the given tokens with spaces between."""
553 return ' '.join(words)
556 """Return a dictionary formed from the `KEY=VALUE' pairs returned by the
562 kv[w[:q]] = w[q + 1:]
566 """Raise an error
if ITER contains any item
."""
569 raise TripeInternalError('expected no response')
573 """If ITER contains a single item
, return it
; otherwise
raise an error
."""
576 raise TripeInternalError('expected only one line of response')
579 def _tracelike(iter):
580 """Handle a TRACE
-like command
. The result
is a
list of
tuples (CHAR
,
581 STATUS
, DESC
): CHAR
is a selector character
, STATUS
is the
status (empty
if
582 disabled
, `
+' if enabled, maybe something else later), and DESC is the
583 human-readable description."""
588 desc = ' '.join(ww[1:])
589 stuff.append((ch, st, desc))
592 def _kwopts(kw, allowed):
593 """Parse keyword arguments into options. ALLOWED is a list of allowable
594 keywords; raise errors if other keywords are present. `KEY = VALUE'
595 becomes an option pair `
-KEY VALUE
' if VALUE is a string, just the option
596 `-KEY' if VALUE
is a true non
-string
, or nothing
if VALUE
is false
. Insert
597 a `
--' at the end to stop the parser getting confused."""
600 for a in allowed: amap[a] = True
601 for k, v in kw.iteritems():
603 raise ValueError('option %s
not allowed here
' % k)
604 if isinstance(v, str):
613 def defer(func, *args, **kw):
614 """Call FUNC(*ARGS, **KW) later, in the root coroutine."""
615 _deferq.append((func, args, kw))
617 def funargstr(func, args, kw):
618 items = [repr(a) for a in args]
619 for k, v in kw.iteritems():
620 items.append('%s
= %r
' % (k, v))
621 return '%s
(%s
)' % (func.__name__, ', '.join(items))
623 def spawn(func, *args, **kw):
624 """Call FUNC, passing ARGS and KW, in a fresh coroutine."""
625 defer(lambda: (Coroutine(func, name = funargstr(func, args, kw))
626 .switch(*args, **kw)))
632 Read (FUNC, ARGS, KW) triples from queue and invoke FUNC(*ARGS, **KW).
635 func, args, kw = _asideq.get()
639 SYS.excepthook(*SYS.exc_info())
641 def aside(func, *args, **kw):
642 """Call FUNC(*ARGS, **KW) later, in a non-root coroutine."""
643 defer(_asideq.put, (func, args, kw))
645 class TripeCommandDispatcher (TripeConnection):
649 The command dispatcher is a connection which knows how to handle commands.
650 This is probably the most important class in this module to understand.
652 Lines from the server are parsed into tokens. The first token is a code
653 (`OK' or `NOTE
' or something) explaining what kind of line this is. The
654 `handler' attribute
is a dictionary mapping server line codes to handler
655 functions
, which are applied to the words of the line
as individual
656 arguments
. *Exception*: the content of `TRACE
' lines is not tokenized.
658 There are default handlers for server codes which respond to commands.
659 Commands arrive as `TripeCommand' instances through the `rawcommand
'
660 interface. The dispatcher keeps track of which command objects represent
661 which jobs, and sends responses on to the appropriate command objects by
662 invoking their `response' methods
. Command objects don
't see the `BG...'
663 codes
, because the dispatcher has already transformed them into regular
664 codes when it was looking up the job tag
.
666 The dispatcher also has a special response code of its own
: `CONNERR
'
667 indicates that the connection failed and the command has therefore been
668 lost. This is sent to all outstanding commands when a connection error is
669 encountered: rather than a token list, it is accompanied by an exception
670 object which is the cause of the disconnection, which may be `None' if the
671 disconnection
is expected (e
.g
., the direct result of a user request
).
674 ## --- Infrastructure ---
676 ## We will get confused if we pipeline commands. Send them one at a time.
677 ## Only send a command when the previous one detaches or completes.
679 ## The following attributes are interesting:
681 ## tagseq Sequence number for next background job (for bgtag)
683 ## queue Commands awaiting submission.
685 ## cmd Mapping from job tags to commands: cmd[None] is the
686 ## foreground command.
688 ## handler Mapping from server codes to handler functions.
690 def __init__(me, socket):
692 Initialize the dispatcher
.
694 The SOCKET
is the filename of the administration socket to connect to
,
695 for TripeConnection
.__init__
.
697 TripeConnection.__init__(me, socket)
700 me.handler['BGDETACH'] = me._detach
701 for i in 'BGOK', 'BGINFO', 'BGFAIL':
702 me.handler[i] = me._response
703 for i in 'OK', 'INFO', 'FAIL':
704 me.handler[i] = me._fgresponse
707 """Should we quit the main loop? Subclasses should override
."""
710 def mainloop(me, quitp = None):
712 Iterate the I
/O watcher until QUITP returns true
.
714 Arranges
for asides
and deferred calls to be made at the right times
.
718 assert _Coroutine.getcurrent() is rootcr
719 Coroutine(_runasides, name = '_runasides').switch()
726 for func, args, kw in q:
734 If a subclass overrides this method
, it must call us
; clears out the
735 command queue
and job
map.
739 TripeConnection.connected(me)
741 def disconnected(me, reason):
745 If a subclass hooks overrides this method
, it must call us
; sends a
746 special `CONNERR
' code to all incomplete commands.
748 TripeConnection.disconnected(me, reason)
749 for cmd in me.cmd.itervalues():
750 cmd.response('CONNERR
', reason)
752 cmd.response('CONNERR
', reason)
756 """Handle an incoming line, sending it to the right place."""
757 if _debug: print '<', line
758 code, rest = M.word(line, quotep = True)
759 func = me.handler.get(code)
764 func(code, *M.split(rest, quotep = True)[0])
769 Pull the oldest command off the queue and try to send it to the server.
771 if not me.queue or None in me.cmd: return
772 cmd = me.queue.shift()
773 if _debug: print '>', ' '.join([quotify(w) for w in cmd.words])
774 me.send(' '.join([quotify(w) for w in cmd.words]))
779 Return an unused job tag.
781 May be of use when composing commands by hand.
783 tag = 'J%05d
' % me.tagseq
787 ## --- Built-in handler functions for server responses ---
789 def _detach(me, _, tag):
791 Respond to a `BGDETACH' TAG message
.
793 Move the current foreground command to the background
.
795 assert tag not in me.cmd
796 me.cmd[tag] = me.cmd[None]
799 def _response(me, code, tag, *w):
801 Respond to an `OK
', `INFO' or `FAIL
' message.
803 If this is a message for a background job, find the tag; then dispatch
804 the result to the command object. This is also called by `_fgresponse'
805 (wth TAG
set to `
None') to handle responses for foreground commands, and
806 is therefore a useful method to extend or override in subclasses.
808 if code.startswith('BG
'):
813 cmd.response(code, *w)
815 def _fgresponse(me, code, *w):
816 """Process responses to the foreground command."""
817 me._response(code, None, *w)
819 ## --- Interface methods ---
821 def rawcommand(me, cmd):
823 Submit the `TripeCommand' CMD to the server
, and look after it until it
826 if not me.connectedp():
827 raise TripeConnectionError('connection closed')
831 def command(me, *cmd, **kw):
832 """Convenience wrapper
for creating a TripeCommandIterator
object."""
833 return TripeCommandIterator(me, cmd, **kw)
835 ## --- Convenience methods for server commands ---
837 def add(me, peer, *addr, **kw):
838 return _simple(me.command(bg = True,
840 _kwopts(kw, ['tunnel', 'keepalive',
841 'key', 'priv', 'cork',
846 return _oneline(me.command('ADDR', peer))
847 def algs(me, peer = None):
848 return _keyvals(me.command('ALGS',
849 *((peer is not None and [peer]) or [])))
850 def checkchal(me, chal):
851 return _simple(me.command('CHECKCHAL', chal))
853 return _simple(me.command('DAEMON'))
854 def eping(me, peer, **kw):
855 return _oneline(me.command(bg = True,
857 _kwopts(kw, ['timeout']) +
859 def forcekx(me, peer):
860 return _simple(me.command('FORCEKX', peer))
862 return _oneline(me.command('GETCHAL', filter = _tokenjoin))
863 def greet(me, peer, chal):
864 return _simple(me.command('GREET', peer, chal))
866 return list(me.command('HELP', filter = _tokenjoin))
867 def ifname(me, peer):
868 return _oneline(me.command('IFNAME', peer, filter = _tokenjoin))
870 return _simple(me.command('KILL', peer))
872 return list(me.command('LIST', filter = _tokenjoin))
873 def notify(me, *msg):
874 return _simple(me.command('NOTIFY', *msg))
875 def peerinfo(me, peer):
876 return _keyvals(me.command('PEERINFO', peer))
877 def ping(me, peer, **kw):
878 return _oneline(me.command(bg = True,
880 _kwopts(kw, ['timeout']) +
883 return _oneline(me.command('PORT', filter = _tokenjoin))
885 return _simple(me.command('QUIT'))
887 return _simple(me.command('RELOAD'))
889 return _keyvals(me.command('SERVINFO'))
890 def setifname(me, new):
891 return _simple(me.command('SETIFNAME', new))
892 def svcclaim(me, service, version):
893 return _simple(me.command('SVCCLAIM', service, version))
894 def svcensure(me, service, version = None):
895 return _simple(me.command('SVCENSURE', service,
896 *((version is not None and [version]) or [])))
897 def svcfail(me, job, *msg):
898 return _simple(me.command('SVCFAIL', job, *msg))
899 def svcinfo(me, job, *msg):
900 return _simple(me.command('SVCINFO', job, *msg))
902 return list(me.command('SVCLIST'))
904 return _simple(me.command('SVCOK', job))
905 def svcquery(me, service):
906 return _keyvals(me.command('SVCQUERY', service))
907 def svcrelease(me, service):
908 return _simple(me.command('SVCRELEASE', service))
909 def svcsubmit(me, service, *args, **kw):
910 return me.command(bg = True,
912 _kwopts(kw, ['version']) +
916 return _keyvals(me.command('STATS', peer))
917 def trace(me, *args):
918 return _tracelike(me.command('TRACE', *args))
920 return list(me.command('TUNNELS', filter = _tokenjoin))
922 return _oneline(me.command('VERSION', filter = _tokenjoin))
924 return _simple(me.command('WARN', *msg))
925 def watch(me, *args):
926 return _tracelike(me.command('WATCH', *args))
928 ###--------------------------------------------------------------------------
929 ### Asynchronous commands.
931 class TripeAsynchronousCommand (TripeCommand):
933 Asynchronous commands
.
935 This
is the complicated way of issuing commands
. You must
set up a queue
,
936 and associate the command with the queue
. Responses arriving
for the
937 command will be put on the queue
as an triple of the
form (TAG
, CODE
, REST
)
938 -- where TAG
is an
object of your choice
, not interpreted by this
class,
939 CODE
is the server
's response code (`OK', `INFO
', `FAIL', or `CONNERR
'),
940 and REST is the list of the rest of the server's tokens
.
942 Using this
, you can write coroutines which process many
commands (and
943 possibly other events
) simultaneously
.
946 def __init__(me, queue, tag, words):
947 """Make an asynchronous command consisting of the given WORDS
, which
948 sends responses to QUEUE
, labelled with TAG
."""
949 TripeCommand.__init__(me, words)
953 def response(me, code, *stuff):
954 """Handle a server response by writing it to the caller
's queue."""
955 me.queue.put((me.tag, code, list(stuff)))
957 ###--------------------------------------------------------------------------
960 class TripeJobCancelled (Exception):
962 Exception sent to job handler if the client kills the job.
964 Not propagated further.
968 class TripeJobError (Exception):
970 Exception to cause failure report for running job.
972 Sends an SVCFAIL code back.
976 class TripeSyntaxError (Exception):
978 Exception to report a syntax error for a job.
980 Sends an SVCFAIL bad-svc-syntax message back.
984 class TripeServiceManager (TripeCommandDispatcher):
986 A command dispatcher with added handling for incoming service requests.
988 There is usually only one instance of this class, called svcmgr. Some of
989 the support functions in this module assume that this is the case.
991 To use, run `mLib.select' in a loop until the quitp method returns true
;
992 then
, in a non
-root coroutine
, register your services by calling `add
', and
993 then call `running' when you
've finished setting up.
995 The instance handles server service messages `SVCJOB', `SVCCANCEL
' and
996 `SVCCLAIM'. It maintains a table of running services
. Incoming jobs cause
997 the service
's `job' method to be invoked
; `SVCCANCEL
' sends a
998 `TripeJobCancelled' exception to the handler coroutine
, and `SVCCLAIM
'
999 causes the relevant service to be deregistered.
1001 There is no base class for jobs, but a job must implement two methods:
1003 start() Begin processing; might be a no-op.
1005 cancel() Stop processing; the original client has killed the
1008 The life of a service manager is divided into two parts: setup and running;
1009 you tell the manager that you've finished setting up by calling the
1010 `running
' method. If, at any point after setup is finished, there are no
1011 remaining services or jobs, `quitp' will
return true
, ending the process
.
1014 ## --- Attributes ---
1016 ## svc Mapping name -> service object
1018 ## job Mapping jobid -> job handler coroutine
1020 ## runningp True when setup is finished
1022 ## _quitp True if explicit quit has been requested
1024 def __init__(me, socket):
1026 Initialize the service manager
.
1028 SOCKET
is the administration socket to connect to
.
1030 TripeCommandDispatcher.__init__(me, socket)
1034 me.handler['SVCCANCEL'] = me._cancel
1035 me.handler['SVCJOB'] = me._job
1036 me.handler['SVCCLAIM'] = me._claim
1039 def addsvc(me, svc):
1040 """Register a new service
; SVC
is a `TripeService
' instance."""
1041 assert svc.name not in me.svc
1042 me.svcclaim(svc.name, svc.version)
1043 me.svc[svc.name] = svc
1045 def _cancel(me, _, jid):
1047 Called when the server cancels a job; invokes the job's `cancel
' method.
1053 def _claim(me, _, svc, __):
1054 """Called when another program claims our service at a higher version."""
1057 def _job(me, _, jid, svc, cmd, *args):
1059 Called when the server sends us a job to do.
1061 Calls the service to collect a job, and begins processing it.
1063 assert jid not in me.job
1064 svc = me.svc[svc.lower()]
1065 job = svc.job(jid, cmd, args)
1070 """Answer true if setup is finished."""
1073 def jobdone(me, jid):
1074 """Informs the service manager that the job with id JID has finished."""
1082 Return true if no services or jobs are active (and, therefore, if this
1083 process can quit without anyone caring).
1085 return me._quitp or (me.runningp and ((not me.svc and not me.job) or
1089 """Forces the quit flag (returned by quitp) on."""
1092 class TripeService (object):
1096 The NAME and VERSION are passed on to the server. The CMDTAB is a
1097 dictionary mapping command names (in lowercase) to command objects.
1099 If the CMDTAB doesn't have entries
for commands `HELP
' and `QUIT' then
1100 defaults are provided
.
1102 TripeService itself
is mostly agnostic about the nature of command objects
,
1103 but the TripeServiceJob
class (below
) has some requirements
. The built
-in
1104 HELP command requires command objects to have `usage
' attributes.
1107 def __init__(me, name, version, cmdtab):
1109 Create and register a new service with the given NAME and VERSION.
1111 CMDTAB maps command names (in lower-case) to command objects.
1114 me.version = version
1117 me.cmd.setdefault('help',
1118 TripeServiceCommand('help', 0, 0, '', me._help))
1119 me.cmd.setdefault('quit
',
1120 TripeServiceCommand('quit
', 0, 0, '', me._quit))
1122 def job(me, jid, cmd, args):
1124 Called by the service manager: a job arrived with id JID.
1126 It asks for comamnd CMD with argument list ARGS. Creates a new job,
1127 passing it the information needed.
1129 return TripeServiceJob(jid, me, cmd, me.cmd.get(cmd.lower()), args)
1131 ## Simple default command handlers, complying with the spec in
1132 ## tripe-service(7).
1135 """Send a help summary to the user."""
1136 cmds = me.cmd.items()
1138 for name, cmd in cmds:
1139 svcinfo(name, *cmd.usage)
1142 """Terminate the service manager."""
1143 svcmgr.notify('svc
-quit
', me.name, 'admin
-request
')
1146 class TripeServiceCommand (object):
1147 """A simple service command."""
1149 def __init__(me, name, min, max, usage, func):
1151 Creates a new command.
1153 NAME is the command's
name (in lowercase
).
1155 MIN
and MAX are the minimum
and maximum number of allowed
arguments (used
1156 for checking
); either may be
None to indicate no minimum
or maximum
.
1158 USAGE
is a usage string
, used
for generating
help and error messages
.
1160 FUNC
is the function to invoke
.
1165 me.usage = usage.split()
1170 Called when the command
is invoked
.
1172 Does minimal checking of the arguments
and calls the supplied function
.
1174 if (me.min is not None and len(args) < me.min) or \
1175 (me.max is not None and len(args) > me.max):
1176 raise TripeSyntaxError
1179 class TripeServiceJob (Coroutine):
1181 Job handler coroutine
.
1183 A standard `TripeService
' invokes a `TripeServiceJob' for each incoming job
1184 request
, passing it the jobid
, command
and arguments
, and a command
object.
1185 The command
object needs the following attributes
.
1187 usage A usage
list (excluding the command name
) showing
1188 arguments
and options
.
1190 run(*ARGS
) Function to react to the command with ARGS split into
1191 separate arguments
. Invoked
in a coroutine
. The
1192 `svcinfo
function (not the `TripeCommandDispatcher
'
1193 method) may be used to send `INFO' lines
. The
1194 function may
raise `TripeJobError
' to send a `FAIL'
1195 response back
, or `TripeSyntaxError
' to send a
1196 generic usage error. `TripeJobCancelled' exceptions
1197 are trapped silently
. Other exceptions are
1198 translated into a generic internal
-error message
.
1200 This
class automatically takes care of sending some closing response to the
1201 job
, and for informing the service manager that the job
is completed
.
1203 The `jid
' attribute stores the job's
id.
1206 def __init__(me, jid, svc, cmd, command, args):
1210 The job
is created with
id JID
, for service SVC
, processing command name
1211 CMD (which the service resolved into the command
object COMMAND
, or
1212 `
None'), and with the arguments ARGS.
1214 Coroutine.__init__(me)
1218 me.command = command
1223 Main body of the coroutine.
1225 Does the tedious exception handling boilerplate and invokes the command's
1230 if me.command is None:
1231 svcmgr.svcfail(me.jid, 'unknown-svc-command', me.cmd)
1233 me.command.run(*me.args)
1234 svcmgr.svcok(me.jid)
1235 except TripeJobError, exc:
1236 svcmgr.svcfail(me.jid, *exc.args)
1237 except TripeSyntaxError:
1238 svcmgr.svcfail(me.jid, 'bad-svc-syntax',
1239 me.svc.name, me.command.name,
1241 except TripeJobCancelled:
1243 except Exception, exc:
1244 svcmgr.svcfail(me.jid, 'svc-internal-error',
1245 exc.__class__.__name__, str(exc))
1247 svcmgr.jobdone(me.jid)
1250 """Invoked by the service manager to start running the coroutine
."""
1254 """Invoked by the service manager to cancel the job
."""
1255 me.throw(TripeJobCancelled())
1259 If invoked
from a TripeServiceJob coroutine
, sends an `INFO
' line to the
1260 job's sender
, automatically using the correct job
id.
1262 svcmgr.svcinfo(Coroutine.getcurrent().jid, *args)
1264 def _setupsvc(tab, func):
1266 Setup coroutine
for setting up service programs
.
1268 Register the given services
.
1272 svcmgr.addsvc(service)
1278 svcmgr = TripeServiceManager(None)
1279 def runservices(socket, tab, init = None, setup = None, daemon = False):
1281 Function to start a service provider
.
1283 SOCKET
is the socket to connect to
, usually tripesock
.
1285 TAB
is a
list of entries
. An entry may be either a
tuple
1287 (NAME
, VERSION
, COMMANDS
)
1289 or a service
object (e
.g
., a `TripeService
' instance).
1291 COMMANDS is a dictionary mapping command names to tuples
1293 (MIN, MAX, USAGE, FUNC)
1295 of arguments for a `TripeServiceCommand' object.
1297 If DAEMON
is true
, then the process
is forked into the background before we
1298 start
. If INIT
is given
, it
is called
in the main coroutine
, immediately
1299 after forking
. If SETUP
is given
, it
is called
in a coroutine
, after
1300 calling INIT
and setting up the services but before marking the service
1303 It
is a really bad idea to do any initialization
, particularly setting up
1304 coroutines
, outside of the INIT
or SETUP functions
. In particular
, if
1305 we
're using rmcr for fake coroutines, the daemonizing fork will kill off
1306 the currently established coroutines in a most surprising way.
1308 The function runs a main select loop until the service manager decides to
1312 svcmgr.socket = socket
1316 if not isinstance(service, tuple):
1317 svcs.append(service)
1319 name, version, commands = service
1321 for cmd, stuff in commands.iteritems():
1322 cmdmap[cmd] = TripeServiceCommand(cmd, *stuff)
1323 svcs.append(TripeService(name, version, cmdmap))
1326 if init is not None:
1328 spawn(_setupsvc, svcs, setup)
1331 ###--------------------------------------------------------------------------
1332 ### Utilities for services.
1334 _timeunits = {'s
': 1, 'm
': 60, 'h
': 3600, 'd
': 86400}
1336 """Parse the timespec SPEC, returning a number of seconds."""
1338 if len(spec) > 1 and spec[-1] in _timeunits:
1339 mul = _timeunits[spec[-1]]
1344 raise TripeJobError('bad
-time
-spec
', spec)
1346 raise TripeJobError('bad
-time
-spec
', spec)
1347 return mul * int(spec)
1349 class OptParse (object):
1351 Parse options from a command list in the conventional fashion.
1353 ARGS is a list of arguments to a command. ALLOWED is a sequence of allowed
1354 options. The returned values are the option tags. During parsing, the
1355 `arg' method may be used to retrieve the argument
for the most recent
1356 option
. Afterwards
, `rest
' may be used to retrieve the remaining
1357 non-option arguments, and do a simple check on how many there are.
1359 The parser correctly handles `--' option terminators
.
1362 def __init__(me, args, allowed):
1364 Create a new option parser
.
1366 The parser will scan the ARGS
for options given
in the sequence ALLOWED
1367 (which are expected to include the `
-' prefix).
1371 me.allowed[a] = True
1372 me.args = list(args)
1375 """Iterator protocol: I am my own iterator."""
1380 Iterator protocol: return the next option.
1382 If we've run out
, raise `
StopIteration'.
1384 if len(me.args) == 0 or \
1385 len(me.args[0]) < 2 or \
1386 not me.args[0].startswith('-'):
1388 opt = me.args.pop(0)
1391 if opt not in me.allowed:
1392 raise TripeSyntaxError
1397 Return the argument for the most recent option.
1399 If none is available, raise `TripeSyntaxError'.
1401 if len(me.args) == 0:
1402 raise TripeSyntaxError
1403 return me.args.pop(0)
1405 def rest(me, min = None, max = None):
1407 After option parsing
is done
, return the remaining arguments
.
1409 Check that there are at least MIN
and at most MAX arguments remaining
--
1410 either may be
None to suppress the check
.
1412 if (min is not None and len(me.args) < min) or \
1413 (max is not None and len(me.args) > max):
1414 raise TripeSyntaxError
1417 ###----- That's all, folks --------------------------------------------------