--- /dev/null
+### -*-python-*-
+###
+### Administration connection with tripe server
+###
+### (c) 2006 Straylight/Edgeware
+###
+
+###----- Licensing notice ---------------------------------------------------
+###
+### This file is part of Trivial IP Encryption (TrIPE).
+###
+### TrIPE is free software; you can redistribute it and/or modify
+### it under the terms of the GNU General Public License as published by
+### the Free Software Foundation; either version 2 of the License, or
+### (at your option) any later version.
+###
+### TrIPE is distributed in the hope that it will be useful,
+### but WITHOUT ANY WARRANTY; without even the implied warranty of
+### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+### GNU General Public License for more details.
+###
+### You should have received a copy of the GNU General Public License
+### along with TrIPE; if not, write to the Free Software Foundation,
+### Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+"""
+This module provides classes and functions for connecting to a running tripe
+server, sending it commands, receiving and processing replies, and
+implementing services.
+
+Rather than end up in lost in a storm of little event-driven classes, or a
+morass of concurrent threads, the module uses coroutines to present a fairly
+simple function call/return interface to potentially long-running commands
+which must run without blocking the main process. It sassumes a coroutine
+module presenting a subset of the `greenlet' interface: if actual greenlets
+are available, they are used; otherwise there's an implementation in terms of
+threads (with lots of locking) which will do instead.
+
+The simple rule governing the coroutines used here is this:
+
+ * The root coroutine never cares what values are passed to it when it
+ resumes: it just discards them.
+
+ * Other, non-root, coroutines are presumed to be waiting for some specific
+ thing.
+
+Configuration variables:
+ configdir
+ socketdir
+ PACKAGE
+ VERSION
+ tripesock
+ peerdb
+
+Other useful variables:
+ rootcr
+ svcmgr
+
+Other tweakables:
+ _debug
+
+Exceptions:
+ Exception
+ StandardError
+ TripeConnectionError
+ TripeError
+ TripeInternalError
+ TripeJobCancelled
+ TripeJobError
+ TripeSyntaxError
+
+Classes:
+ _Coroutine
+ Coroutine
+ TripeServiceJob
+ OptParse
+ Queue
+ TripeCommand
+ TripeSynchronousCommand
+ TripeAsynchronousCommand
+ TripeCommandIterator
+ TripeConnection
+ TripeCommandDispatcher
+ SelCommandDispatcher
+ TripeServiceManager
+ TripeService
+ TripeServiceCommand
+
+Utility functions:
+ quotify
+ runservices
+ spawn
+ svcinfo
+ timespec
+"""
+
+__pychecker__ = 'self=me no-constCond no-argsused'
+
+_debug = False
+
+###--------------------------------------------------------------------------
+### External dependencies.
+
+import socket as S
+import errno as E
+import mLib as M
+import re as RX
+import sys as SYS
+import os as OS
+
+try:
+ if OS.getenv('TRIPE_FORCE_RMCR') is not None:
+ raise ImportError
+ from py.magic import greenlet as _Coroutine
+except ImportError:
+ from rmcr import Coroutine as _Coroutine
+
+###--------------------------------------------------------------------------
+### Coroutine hacking.
+
+rootcr = _Coroutine.getcurrent()
+
+class Coroutine (_Coroutine):
+ """
+ A coroutine class which can only be invoked by the root coroutine.
+
+ The root, by construction, cannot be an instance of this class.
+ """
+ def switch(me, *args, **kw):
+ assert _Coroutine.getcurrent() is rootcr
+ _Coroutine.switch(me, *args, **kw)
+
+###--------------------------------------------------------------------------
+### Default places for things.
+
+configdir = OS.environ.get('TRIPEDIR', "@configdir@")
+socketdir = "@socketdir@"
+PACKAGE = "@PACKAGE@"
+VERSION = "@VERSION@"
+
+tripesock = OS.environ.get('TRIPESOCK', OS.path.join(socketdir, 'tripesock'))
+
+###--------------------------------------------------------------------------
+### Connection to the server.
+
+def readnonblockingly(sock, len):
+ """
+ Nonblocking read from SOCK.
+
+ Try to return LEN bytes. If couldn't read anything, return None. EOF is
+ returned as an empty string.
+ """
+ try:
+ sock.setblocking(0)
+ return sock.recv(len)
+ except S.error, exc:
+ if exc[0] == E.EWOULDBLOCK:
+ return None
+ raise
+
+class TripeConnectionError (StandardError):
+ """Something happened to the connection with the server."""
+ pass
+class TripeInternalError (StandardError):
+ """This program is very confused."""
+ pass
+
+class TripeConnection (object):
+ """
+ A logical connection to the tripe administration socket.
+
+ There may or may not be a physical connection. (This is needed for the
+ monitor, for example.)
+
+ This class isn't very useful on its own, but it has useful subclasses. At
+ this level, the class is agnostic about I/O multiplexing schemes; that gets
+ added later.
+ """
+
+ def __init__(me, socket):
+ """
+ Make a connection to the named SOCKET.
+
+ No physical connection is made initially.
+ """
+ me.socket = socket
+ me.sock = None
+ me.lbuf = None
+
+ def connect(me):
+ """
+ Ensure that there's a physical connection.
+
+ Do nothing if we're already connected. Invoke the `connected' method if
+ successful.
+ """
+ if me.sock: return
+ sock = S.socket(S.AF_UNIX, S.SOCK_STREAM)
+ sock.connect(me.socket)
+ me.sock = sock
+ me.lbuf = M.LineBuffer(me.line, me._eof)
+ me.lbuf.size = 1024
+ me.connected()
+ return me
+
+ def disconnect(me, reason):
+ """
+ Disconnect the physical connection.
+
+ Invoke the `disconnected' method, giving the provided REASON, which
+ should be either None or an exception.
+ """
+ if not me.sock: return
+ me.disconnected(reason)
+ me.sock.close()
+ me.sock = None
+ me.lbuf.disable()
+ me.lbuf = None
+ return me
+
+ def connectedp(me):
+ """
+ Return true if there's a current, believed-good physical connection.
+ """
+ return me.sock is not None
+
+ __nonzero__ = connectedp
+
+ def send(me, line):
+ """
+ Send the LINE to the connection's socket.
+
+ All output is done through this method; it can be overridden to provide
+ proper nonblocking writing, though this seems generally unnecessary.
+ """
+ try:
+ me.sock.setblocking(1)
+ me.sock.send(line + '\n')
+ except Exception, exc:
+ me.disconnect(exc)
+ raise
+ return me
+
+ def receive(me):
+ """
+ Receive whatever's ready from the connection's socket.
+
+ Call `line' on each complete line, and `eof' if the connection closed.
+ Subclasses which attach this class to an I/O-event system should call
+ this method when the socket (CONN.sock) is ready for reading.
+ """
+ while me.sock is not None:
+ try:
+ buf = readnonblockingly(me.sock, 16384)
+ except Exception, exc:
+ me.disconnect(exc)
+ raise
+ if buf is None:
+ return me
+ if buf == '':
+ me._eof()
+ return me
+ me.lbuf.flush(buf)
+ return me
+
+ def _eof(me):
+ """Internal end-of-file handler."""
+ me.disconnect(TripeConnectionError('connection lost'))
+ me.eof()
+
+ def connected(me):
+ """
+ To be overridden by subclasses to react to a connection being
+ established.
+ """
+ pass
+
+ def disconnected(me, reason):
+ """
+ To be overridden by subclasses to react to a connection being severed.
+ """
+ pass
+
+ def eof(me):
+ """To be overridden by subclasses to handle end-of-file."""
+ pass
+
+ def line(me, line):
+ """To be overridden by subclasses to handle incoming lines."""
+ pass
+
+###--------------------------------------------------------------------------
+### Dispatching coroutine.
+
+## Match a string if it can stand on its own as a bareword: i.e., it doesn't
+## contain backslashes, quotes or whitespace.
+rx_ordinary = RX.compile(r'^[^\\\'\"\s]+$')
+
+## Match characters which need to be escaped, even in quoted text.
+rx_weird = RX.compile(r'([\\\'])')
+
+def quotify(s):
+ """Quote S according to the tripe-admin(5) rules."""
+ m = rx_ordinary.match(s)
+ if m and m.end() == len(s):
+ return s
+ else:
+ return "'" + rx_weird.sub(r'\\\1', s) + "'"
+
+def _callback(func):
+ """
+ Return a wrapper for FUNC which reports exceptions thrown by it.
+
+ Useful in the case of callbacks invoked by C functions which ignore
+ exceptions.
+ """
+ def _(*a, **kw):
+ try:
+ return func(*a, **kw)
+ except:
+ SYS.excepthook(*SYS.exc_info())
+ raise
+ return _
+
+class TripeCommand (object):
+ """
+ This abstract class represents a command in progress.
+
+ The `words' attribute contains the list of tokens which make up the
+ command.
+
+ Subclasses must implement a method to handle server responses:
+
+ * response(CODE, *ARGS): CODE is one of the strings 'OK', 'INFO' or
+ 'FAIL'; ARGS are the remaining tokens from the server's response.
+ """
+
+ def __init__(me, words):
+ """Make a new command consisting of the given list of WORDS."""
+ me.words = words
+
+class TripeSynchronousCommand (TripeCommand):
+ """
+ A simple command, processed apparently synchronously.
+
+ Must be invoked from a coroutine other than the root (or whichever one is
+ running the dispatcher); in reality, other coroutines carry on running
+ while we wait for a response from the server.
+
+ Each server response causes the calling coroutine to be resumed with the
+ pair (CODE, REST) -- where CODE is the server's response code (`OK', `INFO'
+ or `FAIL') and REST is a list of the server's other response tokens. The
+ calling coroutine must continue switching back to the dispatcher until a
+ terminating response (`OK' or `FAIL') is received or become very
+ confused.
+
+ Mostly it's better to use the TripeCommandIterator to do this
+ automatically.
+ """
+
+ def __init__(me, words):
+ """Initialize the command, specifying the WORDS to send to the server."""
+ TripeCommand.__init__(me, words)
+ me.owner = Coroutine.getcurrent()
+
+ def response(me, code, *rest):
+ """Handle a server response by forwarding it to the calling coroutine."""
+ me.owner.switch((code, rest))
+
+class TripeError (StandardError):
+ """
+ A tripe command failed with an error (a FAIL code). The args attribute
+ contains a list of the server's message tokens.
+ """
+ pass
+
+class TripeCommandIterator (object):
+ """
+ Iterator interface to a tripe command.
+
+ The values returned by the iterator are lists of tokens from the server's
+ INFO lines, as processed by the given filter function, if any. The
+ iterator completes normally (by raising StopIteration) if the server
+ reported OK, and raises an exception if the command failed for some reason.
+
+ A TripeError is raised if the server issues a FAIL code. If the connection
+ failed, some other exception is raised.
+ """
+
+ def __init__(me, dispatcher, words, bg = False, filter = None):
+ """
+ Create a new command iterator.
+
+ The command is submitted to the DISPATCHER; it consists of the given
+ WORDS. If BG is true, then an option is inserted to request that the
+ server run the command in the background. The FILTER is applied to the
+ token lists which the server responds, and the filter's output are the
+ items returned by the iterator.
+ """
+ me.dcr = Coroutine.getcurrent().parent
+ if me.dcr is None:
+ raise ValueError, 'must invoke from coroutine'
+ me.filter = filter or (lambda x: x)
+ if bg:
+ words = [words[0], '-background', dispatcher.bgtag()] + list(words[1:])
+ dispatcher.rawcommand(TripeSynchronousCommand(words))
+
+ def __iter__(me):
+ """Iterator protocol: I am my own iterator."""
+ return me
+
+ def next(me):
+ """
+ Iterator protocol: return the next piece of information from the server.
+
+ INFO responses are filtered and returned as the values of the iteration.
+ FAIL and CONNERR responses are turned into exceptions and raised.
+ Finally, OK is turned into StopIteration, which should cause a normal end
+ to the iteration process.
+ """
+ thing = me.dcr.switch()
+ code, rest = thing
+ if code == 'INFO':
+ return me.filter(rest)
+ elif code == 'OK':
+ raise StopIteration
+ elif code == 'CONNERR':
+ if rest is None:
+ raise TripeConnectionError, 'connection terminated by user'
+ else:
+ raise rest
+ elif code == 'FAIL':
+ raise TripeError(*rest)
+ else:
+ raise TripeInternalError \
+ ('unexpected tripe response %r' % ([code] + rest))
+
+### Simple utility functions for the TripeCommandIterator convenience
+### methods.
+
+def _tokenjoin(words):
+ """Filter function: simply join the given tokens with spaces between."""
+ return ' '.join(words)
+
+def _keyvals(iter):
+ """Return a dictionary formed from the KEY=VALUE pairs returned by the
+ iterator ITER."""
+ kv = {}
+ for ww in iter:
+ for w in ww:
+ q = w.index('=')
+ kv[w[:q]] = w[q + 1:]
+ return kv
+
+def _simple(iter):
+ """Raise an error if ITER contains any item."""
+ stuff = list(iter)
+ if len(stuff) != 0:
+ raise TripeInternalError('expected no response')
+ return None
+
+def _oneline(iter):
+ """If ITER contains a single item, return it; otherwise raise an error."""
+ stuff = list(iter)
+ if len(stuff) != 1:
+ raise TripeInternalError('expected only one line of response')
+ return stuff[0]
+
+def _tracelike(iter):
+ """Handle a TRACE-like command. The result is a list of tuples (CHAR,
+ STATUS, DESC): CHAR is a selector character, STATUS is the status (empty if
+ disabled, `+' if enabled, maybe something else later), and DESC is the
+ human-readable description."""
+ stuff = []
+ for ww in iter:
+ ch = ww[0][0]
+ st = ww[0][1:]
+ desc = ' '.join(ww[1:])
+ stuff.append((ch, st, desc))
+ return stuff
+
+def _kwopts(kw, allowed):
+ """Parse keyword arguments into options. ALLOWED is a list of allowable
+ keywords; raise errors if other keywords are present. KEY = VALUE becomes
+ an option pair -KEY VALUE if VALUE is a string, just the option -KEY if
+ VALUE is a true non-string, or nothing if VALUE is false.. Insert a `--'
+ at the end to stop the parser getting confused."""
+ opts = []
+ amap = {}
+ for a in allowed: amap[a] = True
+ for k, v in kw.iteritems():
+ if k not in amap:
+ raise ValueError('option %s not allowed here' % k)
+ if isinstance(v, str):
+ opts += ['-' + k, v]
+ elif v:
+ opts += ['-' + k]
+ opts.append('--')
+ return opts
+
+class TripeCommandDispatcher (TripeConnection):
+ """
+ Command dispatcher.
+
+ The command dispatcher is a connection which knows how to handle commands.
+ This is probably the most important class in this module to understand.
+
+ Lines from the server are parsed into tokens. The first token is a code
+ (OK or NOTE or something) explaining what kind of line this is. The
+ `handler' attribute is a dictionary mapping server line codes to handler
+ functions, which are applied to the words of the line as individual
+ arguments. *Exception*: the content of TRACE lines is not tokenized.
+
+ There are default handlers for server codes which respond to commands.
+ Commands arrive as TripeCommand instances through the `rawcommand'
+ interface. The dispatcher keeps track of which command objects represent
+ which jobs, and sends responses on to the appropriate command objects by
+ invoking their `response' methods. Command objects don't see the
+ BG... codes, because the dispatcher has already transformed them into
+ regular codes when it was looking up job code.
+
+ The dispatcher also has a special response code of its own: CONNERR
+ indicates that the connection failed and the command has therefore been
+ lost; the
+ """
+
+ ## --- Infrastructure ---
+ ##
+ ## We will get confused if we pipeline commands. Send them one at a time.
+ ## Only send a command when the previous one detaches or completes.
+ ##
+ ## The following attributes are interesting:
+ ##
+ ## tagseq Sequence number for next background job (for bgtag)
+ ##
+ ## queue Commands awaiting submission.
+ ##
+ ## cmd Mapping from job tags to commands: cmd[None] is the
+ ## foreground command.
+ ##
+ ## handler Mapping from server codes to handler functions.
+
+ def __init__(me, socket):
+ """
+ Initialize the dispatcher.
+
+ The SOCKET is the filename of the administration socket to connect to,
+ for TripeConnection.__init__.
+ """
+ TripeConnection.__init__(me, socket)
+ me.tagseq = 0
+ me.handler = {}
+ me.handler['BGDETACH'] = me._detach
+ for i in 'BGOK', 'BGINFO', 'BGFAIL':
+ me.handler[i] = me._response
+ for i in 'OK', 'INFO', 'FAIL':
+ me.handler[i] = me._fgresponse
+
+ def connected(me):
+ """
+ Connection hook.
+
+ If a subclass overrides this method, it must call us; clears out the
+ command queue and job map.
+ """
+ me.queue = M.Array()
+ me.cmd = {}
+
+ def disconnected(me, reason):
+ """
+ Disconnection hook.
+
+ If a subclass hooks overrides this method, it must call us; sends a
+ special CONNERR code to all incomplete commands.
+ """
+ for cmd in me.cmd.itervalues():
+ cmd.response('CONNERR', reason)
+ for cmd in me.queue:
+ cmd.response('CONNERR', reason)
+
+ @_callback
+ def line(me, line):
+ """Handle an incoming line, sending it to the right place."""
+ if _debug: print '<', line
+ code, rest = M.word(line, quotep = True)
+ func = me.handler.get(code)
+ if func is not None:
+ if code == 'TRACE':
+ func(code, rest)
+ else:
+ func(code, *M.split(rest, quotep = True)[0])
+ me.dequeue()
+
+ def dequeue(me):
+ """
+ Pull the oldest command off the queue and try to send it to the server.
+ """
+ if not me.queue or None in me.cmd: return
+ cmd = me.queue.shift()
+ if _debug: print '>', ' '.join([quotify(w) for w in cmd.words])
+ me.send(' '.join([quotify(w) for w in cmd.words]))
+ me.cmd[None] = cmd
+
+ def bgtag(me):
+ """
+ Return an unused job tag.
+
+ May be of use when composing commands by hand.
+ """
+ tag = 'J%05d' % me.tagseq
+ me.tagseq += 1
+ return tag
+
+ ## --- Built-in handler functions for server responses ---
+
+ def _detach(me, _, tag):
+ """
+ Respond to a BGDETACH TAG message.
+
+ Move the current foreground command to the background.
+ """
+ assert tag not in me.cmd
+ me.cmd[tag] = me.cmd[None]
+ del me.cmd[None]
+
+ def _response(me, code, tag, *w):
+ """
+ Respond to an OK, INFO or FAIL message.
+
+ If this is a message for a background job, find the tag; then dispatch
+ the result to the command object.
+ """
+ if code.startswith('BG'):
+ code = code[2:]
+ cmd = me.cmd[tag]
+ if code != 'INFO':
+ del me.cmd[tag]
+ cmd.response(code, *w)
+
+ def _fgresponse(me, code, *w):
+ """Process responses to the foreground command."""
+ me._response(code, None, *w)
+
+ ## --- Interface methods ---
+
+ def rawcommand(me, cmd):
+ """
+ Submit the TripeCommand CMD to the server, and look after it until it
+ completes.
+ """
+ if not me.connectedp():
+ raise TripeConnectionError('connection closed')
+ me.queue.push(cmd)
+ me.dequeue()
+
+ def command(me, *cmd, **kw):
+ """Convenience wrapper for creating a TripeCommandIterator object."""
+ return TripeCommandIterator(me, cmd, **kw)
+
+ ## --- Convenience methods for server commands ---
+
+ def add(me, peer, *addr, **kw):
+ return _simple(me.command(bg = True,
+ *['ADD'] +
+ _kwopts(kw, ['tunnel', 'keepalive', 'cork']) +
+ [peer] +
+ list(addr)))
+ def addr(me, peer):
+ return _oneline(me.command('ADDR', peer))
+ def algs(me):
+ return _keyvals(me.command('ALGS'))
+ def checkchal(me, chal):
+ return _simple(me.command('CHECKCHAL', chal))
+ def daemon(me):
+ return _simple(me.command('DAEMON'))
+ def eping(me, peer, **kw):
+ return _oneline(me.command(bg = True,
+ *['PING'] +
+ _kwopts(kw, ['timeout']) +
+ [peer]))
+ def forcekx(me, peer):
+ return _simple(me.command('FORCEKX', peer))
+ def getchal(me):
+ return _oneline(me.command('GETCHAL', filter = _tokenjoin))
+ def greet(me, peer, chal):
+ return _simple(me.command('GREET', peer, chal))
+ def help(me):
+ return list(me.command('HELP', filter = _tokenjoin))
+ def ifname(me, peer):
+ return _oneline(me.command('IFNAME', peer, filter = _tokenjoin))
+ def kill(me, peer):
+ return _simple(me.command('KILL', peer))
+ def list(me):
+ return list(me.command('LIST', filter = _tokenjoin))
+ def notify(me, *msg):
+ return _simple(me.command('NOTIFY', *msg))
+ def peerinfo(me, peer):
+ return _keyvals(me.command('PEERINFO', peer))
+ def ping(me, peer, **kw):
+ return _oneline(me.command(bg = True,
+ *['PING'] +
+ _kwopts(kw, ['timeout']) +
+ [peer]))
+ def port(me):
+ return _oneline(me.command('PORT', filter = _tokenjoin))
+ def quit(me):
+ return _simple(me.command('QUIT'))
+ def reload(me):
+ return _simple(me.command('RELOAD'))
+ def servinfo(me):
+ return _keyvals(me.command('SERVINFO'))
+ def setifname(me, new):
+ return _simple(me.command('SETIFNAME', new))
+ def svcclaim(me, service, version):
+ return _simple(me.command('SVCCLAIM', service, version))
+ def svcensure(me, service, version = None):
+ return _simple(me.command('SVCENSURE', service,
+ *((version is not None and [version]) or [])))
+ def svcfail(me, job, *msg):
+ return _simple(me.command('SVCFAIL', job, *msg))
+ def svcinfo(me, job, *msg):
+ return _simple(me.command('SVCINFO', job, *msg))
+ def svclist(me):
+ return list(me.command('SVCLIST'))
+ def svcok(me, job):
+ return _simple(me.command('SVCOK', job))
+ def svcquery(me, service):
+ return _keyvals(me.command('SVCQUERY', service))
+ def svcrelease(me, service):
+ return _simple(me.command('SVCRELEASE', service))
+ def svcsubmit(me, service, *args, **kw):
+ return me.command(bg = True,
+ *['SVCSUBMIT'] +
+ _kwopts(kw, ['version']) +
+ [service] +
+ list(args))
+ def stats(me, peer):
+ return _keyvals(me.command('STATS', peer))
+ def trace(me, *args):
+ return _tracelike(me.command('TRACE', *args))
+ def tunnels(me):
+ return list(me.command('TUNNELS', filter = _tokenjoin))
+ def version(me):
+ return _oneline(me.command('VERSION', filter = _tokenjoin))
+ def warn(me, *msg):
+ return _simple(me.command('WARN', *msg))
+ def watch(me, *args):
+ return _tracelike(me.command('WATCH', *args))
+
+###--------------------------------------------------------------------------
+### Asynchronous commands.
+
+class Queue (object):
+ """
+ A queue of things arriving asynchronously.
+
+ This is a very simple single-reader multiple-writer queue. It's useful for
+ more complex coroutines which need to cope with a variety of possible
+ incoming events.
+ """
+
+ def __init__(me):
+ """Create a new empty queue."""
+ me.contents = M.Array()
+ me.waiter = None
+
+ def _wait(me):
+ """
+ Internal: wait for an item to arrive in the queue.
+
+ Complain if someone is already waiting, because this is just a
+ single-reader queue.
+ """
+ if me.waiter:
+ raise ValueError('queue already being waited on')
+ try:
+ me.waiter = Coroutine.getcurrent()
+ while not me.contents:
+ me.waiter.parent.switch()
+ finally:
+ me.waiter = None
+
+ def get(me):
+ """
+ Remove and return the item at the head of the queue.
+
+ If the queue is empty, wait until an item arrives.
+ """
+ me._wait()
+ return me.contents.shift()
+
+ def peek(me):
+ """
+ Return the item at the head of the queue without removing it.
+
+ If the queue is empty, wait until an item arrives.
+ """
+ me._wait()
+ return me.contents[0]
+
+ def put(me, thing):
+ """
+ Write THING to the queue.
+
+ If someone is waiting on the queue, wake him up immediately; otherwise
+ just leave the item there for later.
+ """
+ me.contents.push(thing)
+ if me.waiter:
+ me.waiter.switch()
+
+class TripeAsynchronousCommand (TripeCommand):
+ """
+ Asynchronous commands.
+
+ This is the complicated way of issuing commands. You must set up a queue,
+ and associate the command with the queue. Responses arriving for the
+ command will be put on the queue as an triple of the form (TAG, CODE, REST)
+ -- where TAG is an object of your choice, not interpreted by this class,
+ CODE is the server's response code (OK, INFO, FAIL), and REST is the list
+ of the rest of the server's tokens.
+
+ Using this, you can write coroutines which process many commands (and
+ possibly other events) simultaneously.
+ """
+
+ def __init__(me, queue, tag, words):
+ """Make an asynchronous command consisting of the given WORDS, which
+ sends responses to QUEUE, labelled with TAG."""
+ TripeCommand.__init__(me, words)
+ me.queue = queue
+ me.tag = tag
+
+ def response(me, code, *stuff):
+ """Handle a server response by writing it to the caller's queue."""
+ me.queue.put((me.tag, code, list(stuff)))
+
+###--------------------------------------------------------------------------
+### Selecting command dispatcher.
+
+class SelCommandDispatcher (TripeCommandDispatcher):
+ """
+ A command dispatcher which integrates with mLib's I/O-event system.
+
+ To use, simply create an instance and run mLib.select in a loop in your
+ main coroutine.
+ """
+
+ def __init__(me, socket):
+ """
+ Create an instance; SOCKET is the admin socket to connect to.
+
+ Note that no connection is made initially.
+ """
+ TripeCommandDispatcher.__init__(me, socket)
+ me.selfile = None
+
+ def connected(me):
+ """Connection hook: wires itself into the mLib select machinery."""
+ TripeCommandDispatcher.connected(me)
+ me.selfile = M.SelFile(me.sock.fileno(), M.SEL_READ, me.receive)
+ me.selfile.enable()
+
+ def disconnected(me, reason):
+ """Disconnection hook: removes itself from the mLib select machinery."""
+ TripeCommandDispatcher.disconnected(me, reason)
+ me.selfile = None
+
+###--------------------------------------------------------------------------
+### Services.
+
+class TripeJobCancelled (Exception):
+ """
+ Exception sent to job handler if the client kills the job.
+
+ Not propagated further.
+ """
+ pass
+
+class TripeJobError (Exception):
+ """
+ Exception to cause failure report for running job.
+
+ Sends an SVCFAIL code back.
+ """
+ pass
+
+class TripeSyntaxError (Exception):
+ """
+ Exception to report a syntax error for a job.
+
+ Sends an SVCFAIL bad-svc-syntax message back.
+ """
+ pass
+
+class TripeServiceManager (SelCommandDispatcher):
+ """
+ A command dispatcher with added handling for incoming service requests.
+
+ There is usually only one instance of this class, called svcmgr. Some of
+ the support functions in this module assume that this is the case.
+
+ To use, run mLib.select in a loop until the quitp method returns true;
+ then, in a non-root coroutine, register your services by calling `add', and
+ then call `running' when you've finished setting up.
+
+ The instance handles server service messages SVCJOB, SVCCANCEL and
+ SVCCLAIM. It maintains a table of running services. Incoming jobs cause
+ the service's `job' method to be invoked; SVCCANCEL sends a
+ TripeJobCancelled exception to the handler coroutine, and SVCCLAIM causes
+ the relevant service to be deregistered.
+
+ There is no base class for jobs, but a job must implement two methods:
+
+ start() Begin processing; might be a no-op.
+
+ cancel() Stop processing; the original client has killed the
+ job.
+
+ The life of a service manager is divided into two parts: setup and running;
+ you tell the manager that you've finished setting up by calling the
+ `running' method. If, at any point after setup is finished, there are no
+ remaining services or jobs, `quitp' will return true, ending the process.
+ """
+
+ ## --- Attributes ---
+ ##
+ ## svc Mapping name -> service object
+ ##
+ ## job Mapping jobid -> job handler coroutine
+ ##
+ ## runningp True when setup is finished
+ ##
+ ## _quitp True if explicit quit has been requested
+
+ def __init__(me, socket):
+ """
+ Initialize the service manager.
+
+ SOCKET is the administration socket to connect to.
+ """
+ SelCommandDispatcher.__init__(me, socket)
+ me.svc = {}
+ me.job = {}
+ me.runningp = False
+ me.handler['SVCCANCEL'] = me._cancel
+ me.handler['SVCJOB'] = me._job
+ me.handler['SVCCLAIM'] = me._claim
+ me._quitp = 0
+
+ def addsvc(me, svc):
+ """Register a new service; SVC is a TripeService instance."""
+ assert svc.name not in me.svc
+ me.svcclaim(svc.name, svc.version)
+ me.svc[svc.name] = svc
+
+ def _cancel(me, _, jid):
+ """
+ Called when the server cancels a job; invokes the job's `cancel' method.
+ """
+ job = me.job[jid]
+ del me.job[jid]
+ job.cancel()
+
+ def _claim(me, _, svc, __):
+ """Called when another program claims our service at a higher version."""
+ del me.svc[svc]
+
+ def _job(me, _, jid, svc, cmd, *args):
+ """
+ Called when the server sends us a job to do.
+
+ Calls the service to collect a job, and begins processing it.
+ """
+ assert jid not in me.job
+ svc = me.svc[svc.lower()]
+ job = svc.job(jid, cmd, args)
+ me.job[jid] = job
+ job.start()
+
+ def running(me):
+ """Answer true if setup is finished."""
+ me.runningp = True
+
+ def jobdone(me, jid):
+ """Informs the service manager that the job with id JID has finished."""
+ try:
+ del me.job[jid]
+ except KeyError:
+ pass
+
+ def quitp(me):
+ """
+ Return true if no services or jobs are active (and, therefore, if this
+ process can quit without anyone caring).
+ """
+ return me._quitp or (me.runningp and ((not me.svc and not me.job) or
+ not me.selfile))
+
+ def quit(me):
+ """Forces the quit flag (returned by quitp) on."""
+ me._quitp = True
+
+class TripeService (object):
+ """
+ A standard service.
+
+ The NAME and VERSION are passed on to the server. The CMDTAB is a
+ dictionary mapping command names (in lowercase) to command objects.
+
+ If the CMDTAB doesn't have entries for commands HELP and QUIT then defaults
+ are provided.
+
+ TripeService itself is mostly agnostic about the nature of command objects,
+ but the TripeServiceJob class (below) has some requirements. The built-in
+ HELP command requires command objects to have `usage' attributes.
+ """
+
+ def __init__(me, name, version, cmdtab):
+ """
+ Create and register a new service with the given NAME and VERSION.
+
+ CMDTAB maps command names (in lower-case) to command objects.
+ """
+ me.name = name
+ me.version = version
+ me.cmd = cmdtab
+ me.activep = True
+ me.cmd.setdefault('help',
+ TripeServiceCommand('help', 0, 0, '', me._help))
+ me.cmd.setdefault('quit',
+ TripeServiceCommand('quit', 0, 0, '', me._quit))
+
+ def job(me, jid, cmd, args):
+ """
+ Called by the service manager: a job arrived with id JID.
+
+ It asks for comamnd CMD with argument list ARGS. Creates a new job,
+ passing it the information needed.
+ """
+ return TripeServiceJob(jid, me, cmd, me.cmd.get(cmd.lower()), args)
+
+ ## Simple default command handlers, complying with the spec in
+ ## tripe-service(7).
+
+ def _help(me):
+ """Send a help summary to the user."""
+ cmds = me.cmd.items()
+ cmds.sort()
+ for name, cmd in cmds:
+ svcinfo(name, *cmd.usage)
+
+ def _quit(me):
+ """Terminate the service manager."""
+ svcmgr.notify('svc-quit', me.name, 'admin-request')
+ svcmgr.quit()
+
+class TripeServiceCommand (object):
+ """A simple service command."""
+
+ def __init__(me, name, min, max, usage, func):
+ """
+ Creates a new command.
+
+ NAME is the command's name (in lowercase).
+
+ MIN and MAX are the minimum and maximum number of allowed arguments (used
+ for checking); either may be None to indicate no minimum or maximum.
+
+ USAGE is a usage string, used for generating help and error messages.
+
+ FUNC is the function to invoke.
+ """
+ me.name = name
+ me.min = min
+ me.max = max
+ me.usage = usage.split()
+ me.func = func
+
+ def run(me, *args):
+ """
+ Called when the command is invoked.
+
+ Does minimal checking of the arguments and calls the supplied function.
+ """
+ if (me.min is not None and len(args) < me.min) or \
+ (me.max is not None and len(args) > me.max):
+ raise TripeSyntaxError
+ me.func(*args)
+
+class TripeServiceJob (Coroutine):
+ """
+ Job handler coroutine.
+
+ A standard TripeService invokes a TripeServiceJob for each incoming job
+ request, passing it the jobid, command and arguments, and a command
+ object. The command object needs the following attributes.
+
+ usage A usage list (excluding the command name) showing
+ arguments and options.
+
+ run(*ARGS) Function to react to the command with ARGS split into
+ separate arguments. Invoked in a coroutine. The
+ svcinfo function (not the TripeCommandDispatcher
+ method) may be used to send INFO lines. The function
+ may raise TripeJobError to send a FAIL response back,
+ or TripeSyntaxError to send a generic usage error.
+ TripeJobCancelled exceptions are trapped silently.
+ Other exceptions are translated into a generic
+ internal-error message.
+
+ This class automatically takes care of sending some closing response to the
+ job, and for informing the service manager that the job is completed.
+
+ The `jid' attribute stores the job's id.
+ """
+
+ def __init__(me, jid, svc, cmd, command, args):
+ """
+ Start a new job.
+
+ The job is created with id JID, for service SVC, processing command name
+ CMD (which the service resolved into the command object COMMAND, or
+ None), and with the arguments ARGS.
+ """
+ Coroutine.__init__(me)
+ me.jid = jid
+ me.svc = svc
+ me.cmd = cmd
+ me.command = command
+ me.args = args
+
+ def run(me):
+ """
+ Main body of the coroutine.
+
+ Does the tedious exception handling boilerplate and invokes the command's
+ run method.
+ """
+ try:
+ try:
+ if me.command is None:
+ svcmgr.svcfail(me.jid, 'unknown-svc-command', me.cmd)
+ else:
+ me.command.run(*me.args)
+ svcmgr.svcok(me.jid)
+ except TripeJobError, exc:
+ svcmgr.svcfail(me.jid, *exc.args)
+ except TripeSyntaxError:
+ svcmgr.svcfail(me.jid, 'bad-svc-syntax',
+ me.svc.name, me.command.name,
+ *me.command.usage)
+ except TripeJobCancelled:
+ pass
+ except Exception, exc:
+ svcmgr.svcfail(me.jid, 'svc-internal-error',
+ exc.__class__.__name__, str(exc))
+ finally:
+ svcmgr.jobdone(me.jid)
+
+ def start(me):
+ """Invoked by the service manager to start running the coroutine."""
+ me.switch()
+
+ def cancel(me):
+ """Invoked by the service manager to cancel the job."""
+ me.throw(TripeJobCancelled())
+
+def svcinfo(*args):
+ """
+ If invoked from a TripeServiceJob coroutine, sends an INFO line to the
+ job's sender, automatically using the correct job id.
+ """
+ svcmgr.svcinfo(Coroutine.getcurrent().jid, *args)
+
+def _setupsvc(tab, func):
+ """
+ Setup coroutine for setting up service programs.
+
+ Register the given services.
+ """
+ try:
+ for service in tab:
+ svcmgr.addsvc(service)
+ if func:
+ func()
+ finally:
+ svcmgr.running()
+
+svcmgr = TripeServiceManager(None)
+_spawnq = []
+def runservices(socket, tab, init = None, setup = None, daemon = False):
+ """
+ Function to start a service provider.
+
+ SOCKET is the socket to connect to, usually tripesock.
+
+ TAB is a list of entries. An entry may be either a tuple
+
+ (NAME, VERSION, COMMANDS)
+
+ or a service object (e.g., a TripeService instance).
+
+ COMMANDS is a dictionary mapping command names to tuples
+
+ (MIN, MAX, USAGE, FUNC)
+
+ of arguments for a TripeServiceCommand object.
+
+ If DAEMON is true, then the process is forked into the background before we
+ start. If INIT is given, it is called in the main coroutine, immediately
+ after forking. If SETUP is given, it is called in a coroutine, after
+ calling INIT and setting up the services but before marking the service
+ manager as running.
+
+ It is a really bad idea to do any initialization, particularly setting up
+ coroutines, outside of the INIT or SETUP functions. In particular, if
+ we're using rmcr for fake coroutines, the daemonizing fork will kill off
+ the currently established coroutines in a most surprising way.
+
+ The function runs a main select loop until the service manager decides to
+ quit.
+ """
+
+ global _spawnq
+ svcmgr.socket = socket
+ svcmgr.connect()
+ svcs = []
+ for service in tab:
+ if not isinstance(service, tuple):
+ svcs.append(service)
+ else:
+ name, version, commands = service
+ cmdmap = {}
+ for cmd, stuff in commands.iteritems():
+ cmdmap[cmd] = TripeServiceCommand(cmd, *stuff)
+ svcs.append(TripeService(name, version, cmdmap))
+ if daemon:
+ M.daemonize()
+ if init is not None:
+ init()
+ Coroutine(_setupsvc).switch(svcs, setup)
+ while not svcmgr.quitp():
+ for cr, args, kw in _spawnq:
+ cr.switch(*args, **kw)
+ _spawnq = []
+ M.select()
+
+def spawn(cr, *args, **kw):
+ """
+ Utility for spawning coroutines.
+
+ The coroutine CR is made to be a direct child of the root coroutine, and
+ invoked by it with the given arguments.
+ """
+ cr.parent = rootcr
+ _spawnq.append((cr, args, kw))
+
+###--------------------------------------------------------------------------
+### Utilities for services.
+
+_timeunits = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}
+def timespec(spec):
+ """Parse the timespec SPEC, returning a number of seconds."""
+ mul = 1
+ if len(spec) > 1 and spec[-1] in _timeunits:
+ mul = _timeunits[spec[-1]]
+ spec = spec[:-1]
+ try:
+ t = int(spec)
+ except:
+ raise TripeJobError('bad-time-spec', spec)
+ if t < 0:
+ raise TripeJobError('bad-time-spec', spec)
+ return mul * int(spec)
+
+class OptParse (object):
+ """
+ Parse options from a command list in the conventional fashion.
+
+ ARGS is a list of arguments to a command. ALLOWED is a sequence of allowed
+ options. The returned values are the option tags. During parsing, the
+ `arg' method may be used to retrieve the argument for the most recent
+ option. Afterwards, `rest' may be used to retrieve the remaining
+ non-option arguments, and do a simple check on how many there are.
+
+ The parser correctly handles `--' option terminators.
+ """
+
+ def __init__(me, args, allowed):
+ """
+ Create a new option parser.
+
+ The parser will scan the ARGS for options given in the sequence ALLOWED
+ (which are expected to include the `-' prefix).
+ """
+ me.allowed = {}
+ for a in allowed:
+ me.allowed[a] = True
+ me.args = list(args)
+
+ def __iter__(me):
+ """Iterator protocol: I am my own iterator."""
+ return me
+
+ def next(me):
+ """
+ Iterator protocol: return the next option.
+
+ If we've run out, raise StopIteration.
+ """
+ if len(me.args) == 0 or \
+ len(me.args[0]) < 2 or \
+ not me.args[0].startswith('-'):
+ raise StopIteration
+ opt = me.args.pop(0)
+ if opt == '--':
+ raise StopIteration
+ if opt not in me.allowed:
+ raise TripeSyntaxError
+ return opt
+
+ def arg(me):
+ """
+ Return the argument for the most recent option.
+
+ If none is available, raise TripeSyntaxError.
+ """
+ if len(me.args) == 0:
+ raise TripeSyntaxError
+ return me.args.pop(0)
+
+ def rest(me, min = None, max = None):
+ """
+ After option parsing is done, return the remaining arguments.
+
+ Check that there are at least MIN and at most MAX arguments remaining --
+ either may be None to suppress the check.
+ """
+ if (min is not None and len(me.args) < min) or \
+ (max is not None and len(me.args) > max):
+ raise TripeSyntaxError
+ return me.args
+
+###----- That's all, folks --------------------------------------------------