| 1 | ### -*-python-*- |
| 2 | ### |
| 3 | ### Administration connection with tripe server |
| 4 | ### |
| 5 | ### (c) 2006 Straylight/Edgeware |
| 6 | ### |
| 7 | |
| 8 | ###----- Licensing notice --------------------------------------------------- |
| 9 | ### |
| 10 | ### This file is part of Trivial IP Encryption (TrIPE). |
| 11 | ### |
| 12 | ### TrIPE is free software: you can redistribute it and/or modify it under |
| 13 | ### the terms of the GNU General Public License as published by the Free |
| 14 | ### Software Foundation; either version 3 of the License, or (at your |
| 15 | ### option) any later version. |
| 16 | ### |
| 17 | ### TrIPE is distributed in the hope that it will be useful, but WITHOUT |
| 18 | ### ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
| 19 | ### FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
| 20 | ### for more details. |
| 21 | ### |
| 22 | ### You should have received a copy of the GNU General Public License |
| 23 | ### along with TrIPE. If not, see <https://www.gnu.org/licenses/>. |
| 24 | |
| 25 | """ |
| 26 | This module provides classes and functions for connecting to a running tripe |
| 27 | server, sending it commands, receiving and processing replies, and |
| 28 | implementing services. |
| 29 | |
| 30 | Rather than end up in lost in a storm of little event-driven classes, or a |
| 31 | morass of concurrent threads, the module uses coroutines to present a fairly |
| 32 | simple function call/return interface to potentially long-running commands |
| 33 | which must run without blocking the main process. It assumes a coroutine |
| 34 | module presenting a subset of the `greenlet' interface: if actual greenlets |
| 35 | are available, they are used; otherwise there's an implementation in terms of |
| 36 | threads (with lots of locking) which will do instead. |
| 37 | |
| 38 | The simple rule governing the coroutines used here is this: |
| 39 | |
| 40 | * The root coroutine never cares what values are passed to it when it |
| 41 | resumes: it just discards them. |
| 42 | |
| 43 | * Other, non-root, coroutines are presumed to be waiting for some specific |
| 44 | thing. |
| 45 | |
| 46 | Configuration variables: |
| 47 | configdir |
| 48 | socketdir |
| 49 | PACKAGE |
| 50 | VERSION |
| 51 | tripesock |
| 52 | peerdb |
| 53 | |
| 54 | Other useful variables: |
| 55 | rootcr |
| 56 | svcmgr |
| 57 | |
| 58 | Other tweakables: |
| 59 | _debug |
| 60 | |
| 61 | Exceptions: |
| 62 | Exception |
| 63 | StandardError |
| 64 | TripeConnectionError |
| 65 | TripeError |
| 66 | TripeInternalError |
| 67 | TripeJobCancelled |
| 68 | TripeJobError |
| 69 | TripeSyntaxError |
| 70 | |
| 71 | Classes: |
| 72 | _Coroutine |
| 73 | Coroutine |
| 74 | TripeServiceJob |
| 75 | OptParse |
| 76 | Queue |
| 77 | SelIOWatcher |
| 78 | TripeCommand |
| 79 | TripeSynchronousCommand |
| 80 | TripeAsynchronousCommand |
| 81 | TripeCommandIterator |
| 82 | TripeConnection |
| 83 | TripeCommandDispatcher |
| 84 | TripeServiceManager |
| 85 | TripeService |
| 86 | TripeServiceCommand |
| 87 | |
| 88 | Utility functions: |
| 89 | quotify |
| 90 | runservices |
| 91 | spawn |
| 92 | svcinfo |
| 93 | timespec |
| 94 | """ |
| 95 | |
| 96 | __pychecker__ = 'self=me no-constCond no-argsused' |
| 97 | |
| 98 | _debug = False |
| 99 | |
| 100 | ###-------------------------------------------------------------------------- |
| 101 | ### External dependencies. |
| 102 | |
| 103 | import socket as S |
| 104 | import errno as E |
| 105 | import mLib as M |
| 106 | import re as RX |
| 107 | import sys as SYS |
| 108 | import os as OS |
| 109 | |
| 110 | try: |
| 111 | if OS.getenv('TRIPE_FORCE_RMCR') is not None: |
| 112 | raise ImportError() |
| 113 | from py.magic import greenlet as _Coroutine |
| 114 | except ImportError: |
| 115 | from rmcr import Coroutine as _Coroutine |
| 116 | |
| 117 | ###-------------------------------------------------------------------------- |
| 118 | ### Coroutine hacking. |
| 119 | |
| 120 | rootcr = _Coroutine.getcurrent() |
| 121 | |
| 122 | class Coroutine (_Coroutine): |
| 123 | """ |
| 124 | A coroutine class which can only be invoked by the root coroutine. |
| 125 | |
| 126 | The root, by construction, cannot be an instance of this class. |
| 127 | """ |
| 128 | def switch(me, *args, **kw): |
| 129 | assert _Coroutine.getcurrent() is rootcr |
| 130 | if _debug: print '* %s' % me |
| 131 | _Coroutine.switch(me, *args, **kw) |
| 132 | if _debug: print '* %s' % rootcr |
| 133 | |
| 134 | ###-------------------------------------------------------------------------- |
| 135 | ### Default places for things. |
| 136 | |
| 137 | configdir = OS.environ.get('TRIPEDIR', "@configdir@") |
| 138 | socketdir = "@socketdir@" |
| 139 | PACKAGE = "@PACKAGE@" |
| 140 | VERSION = "@VERSION@" |
| 141 | |
| 142 | tripesock = OS.environ.get('TRIPESOCK', OS.path.join(socketdir, 'tripesock')) |
| 143 | peerdb = OS.environ.get('TRIPEPEERDB', 'peers.cdb') |
| 144 | |
| 145 | ###-------------------------------------------------------------------------- |
| 146 | ### Connection to the server. |
| 147 | |
| 148 | def readnonblockingly(sock, len): |
| 149 | """ |
| 150 | Nonblocking read from SOCK. |
| 151 | |
| 152 | Try to return LEN bytes. If couldn't read anything, return `None'. EOF is |
| 153 | returned as an empty string. |
| 154 | """ |
| 155 | try: |
| 156 | sock.setblocking(0) |
| 157 | return sock.recv(len) |
| 158 | except S.error, exc: |
| 159 | if exc[0] == E.EWOULDBLOCK: |
| 160 | return None |
| 161 | raise |
| 162 | |
| 163 | class TripeConnectionError (StandardError): |
| 164 | """Something happened to the connection with the server.""" |
| 165 | pass |
| 166 | class TripeInternalError (StandardError): |
| 167 | """This program is very confused.""" |
| 168 | pass |
| 169 | |
| 170 | class TripeConnection (object): |
| 171 | """ |
| 172 | A logical connection to the tripe administration socket. |
| 173 | |
| 174 | There may or may not be a physical connection. (This is needed for the |
| 175 | monitor, for example.) |
| 176 | |
| 177 | This class isn't very useful on its own, but it has useful subclasses. At |
| 178 | this level, the class is agnostic about I/O multiplexing schemes; that gets |
| 179 | added later. |
| 180 | """ |
| 181 | |
| 182 | def __init__(me, socket): |
| 183 | """ |
| 184 | Make a connection to the named SOCKET. |
| 185 | |
| 186 | No physical connection is made initially. |
| 187 | """ |
| 188 | me.socket = socket |
| 189 | me.sock = None |
| 190 | me.lbuf = None |
| 191 | me.iowatch = SelIOWatcher(me) |
| 192 | |
| 193 | def connect(me): |
| 194 | """ |
| 195 | Ensure that there's a physical connection. |
| 196 | |
| 197 | Do nothing if we're already connected. Invoke the `connected' method if |
| 198 | successful. |
| 199 | """ |
| 200 | if me.sock: return |
| 201 | sock = S.socket(S.AF_UNIX, S.SOCK_STREAM) |
| 202 | sock.connect(me.socket) |
| 203 | me.sock = sock |
| 204 | me.lbuf = M.LineBuffer(me.line, me._eof) |
| 205 | me.lbuf.size = 1024 |
| 206 | me.connected() |
| 207 | return me |
| 208 | |
| 209 | def disconnect(me, reason): |
| 210 | """ |
| 211 | Disconnect the physical connection. |
| 212 | |
| 213 | Invoke the `disconnected' method, giving the provided REASON, which |
| 214 | should be either `None' or an exception. |
| 215 | """ |
| 216 | if not me.sock: return |
| 217 | me.disconnected(reason) |
| 218 | me.sock.close() |
| 219 | me.sock = None |
| 220 | me.lbuf.disable() |
| 221 | me.lbuf = None |
| 222 | return me |
| 223 | |
| 224 | def connectedp(me): |
| 225 | """ |
| 226 | Return true if there's a current, believed-good physical connection. |
| 227 | """ |
| 228 | return me.sock is not None |
| 229 | |
| 230 | __nonzero__ = connectedp |
| 231 | |
| 232 | def send(me, line): |
| 233 | """ |
| 234 | Send the LINE to the connection's socket. |
| 235 | |
| 236 | All output is done through this method; it can be overridden to provide |
| 237 | proper nonblocking writing, though this seems generally unnecessary. |
| 238 | """ |
| 239 | try: |
| 240 | me.sock.setblocking(1) |
| 241 | me.sock.send(line + '\n') |
| 242 | except Exception, exc: |
| 243 | me.disconnect(exc) |
| 244 | raise |
| 245 | return me |
| 246 | |
| 247 | def receive(me): |
| 248 | """ |
| 249 | Receive whatever's ready from the connection's socket. |
| 250 | |
| 251 | Call `line' on each complete line, and `eof' if the connection closed. |
| 252 | Subclasses which attach this class to an I/O-event system should call |
| 253 | this method when the socket (the `sock' attribute) is ready for reading. |
| 254 | """ |
| 255 | while me.sock is not None: |
| 256 | try: |
| 257 | buf = readnonblockingly(me.sock, 16384) |
| 258 | except Exception, exc: |
| 259 | me.disconnect(exc) |
| 260 | raise |
| 261 | if buf is None: |
| 262 | return me |
| 263 | if buf == '': |
| 264 | me._eof() |
| 265 | return me |
| 266 | me.lbuf.flush(buf) |
| 267 | return me |
| 268 | |
| 269 | def _eof(me): |
| 270 | """Internal end-of-file handler.""" |
| 271 | me.disconnect(TripeConnectionError('connection lost')) |
| 272 | me.eof() |
| 273 | |
| 274 | def connected(me): |
| 275 | """ |
| 276 | To be overridden by subclasses to react to a connection being |
| 277 | established. |
| 278 | """ |
| 279 | me.iowatch.connected(me.sock) |
| 280 | |
| 281 | def disconnected(me, reason): |
| 282 | """ |
| 283 | To be overridden by subclasses to react to a connection being severed. |
| 284 | """ |
| 285 | me.iowatch.disconnected() |
| 286 | |
| 287 | def eof(me): |
| 288 | """To be overridden by subclasses to handle end-of-file.""" |
| 289 | pass |
| 290 | |
| 291 | def line(me, line): |
| 292 | """To be overridden by subclasses to handle incoming lines.""" |
| 293 | pass |
| 294 | |
| 295 | ###-------------------------------------------------------------------------- |
| 296 | ### I/O loop integration. |
| 297 | |
| 298 | class SelIOWatcher (object): |
| 299 | """ |
| 300 | Integration with mLib's I/O event system. |
| 301 | |
| 302 | You can replace this object with a different one for integration with, |
| 303 | e.g., glib's main loop, by setting `CONN.iowatcher' to a different object |
| 304 | while the CONN is disconnected. |
| 305 | """ |
| 306 | |
| 307 | def __init__(me, conn): |
| 308 | me._conn = conn |
| 309 | me._selfile = None |
| 310 | |
| 311 | def connected(me, sock): |
| 312 | """ |
| 313 | Called when a connection is made. |
| 314 | |
| 315 | SOCK is the socket. The watcher must arrange to call `CONN.receive' when |
| 316 | data is available. |
| 317 | """ |
| 318 | me._selfile = M.SelFile(sock.fileno(), M.SEL_READ, me._conn.receive) |
| 319 | me._selfile.enable() |
| 320 | |
| 321 | def disconnected(me): |
| 322 | """ |
| 323 | Called when the connection is lost. |
| 324 | """ |
| 325 | me._selfile = None |
| 326 | |
| 327 | def iterate(me): |
| 328 | """ |
| 329 | Wait for something interesting to happen, and issue events. |
| 330 | |
| 331 | That is, basically, do one iteration of a main select loop, processing |
| 332 | all of the events, and then return. This is used in the method |
| 333 | `TripeCommandDispatcher.mainloop', but that's mostly for the benefit of |
| 334 | `runservices'; if your I/O watcher has a different main loop, you can |
| 335 | drive it yourself. |
| 336 | """ |
| 337 | M.select() |
| 338 | |
| 339 | ###-------------------------------------------------------------------------- |
| 340 | ### Inter-coroutine communication. |
| 341 | |
| 342 | class Queue (object): |
| 343 | """ |
| 344 | A queue of things arriving asynchronously. |
| 345 | |
| 346 | This is a very simple single-reader multiple-writer queue. It's useful for |
| 347 | more complex coroutines which need to cope with a variety of possible |
| 348 | incoming events. |
| 349 | """ |
| 350 | |
| 351 | def __init__(me): |
| 352 | """Create a new empty queue.""" |
| 353 | me.contents = M.Array() |
| 354 | me.waiter = None |
| 355 | |
| 356 | def _wait(me): |
| 357 | """ |
| 358 | Internal: wait for an item to arrive in the queue. |
| 359 | |
| 360 | Complain if someone is already waiting, because this is just a |
| 361 | single-reader queue. |
| 362 | """ |
| 363 | if me.waiter: |
| 364 | raise ValueError('queue already being waited on') |
| 365 | try: |
| 366 | me.waiter = Coroutine.getcurrent() |
| 367 | while not me.contents: |
| 368 | me.waiter.parent.switch() |
| 369 | finally: |
| 370 | me.waiter = None |
| 371 | |
| 372 | def get(me): |
| 373 | """ |
| 374 | Remove and return the item at the head of the queue. |
| 375 | |
| 376 | If the queue is empty, wait until an item arrives. |
| 377 | """ |
| 378 | me._wait() |
| 379 | return me.contents.shift() |
| 380 | |
| 381 | def peek(me): |
| 382 | """ |
| 383 | Return the item at the head of the queue without removing it. |
| 384 | |
| 385 | If the queue is empty, wait until an item arrives. |
| 386 | """ |
| 387 | me._wait() |
| 388 | return me.contents[0] |
| 389 | |
| 390 | def put(me, thing): |
| 391 | """ |
| 392 | Write THING to the queue. |
| 393 | |
| 394 | If someone is waiting on the queue, wake him up immediately; otherwise |
| 395 | just leave the item there for later. |
| 396 | """ |
| 397 | me.contents.push(thing) |
| 398 | if me.waiter: |
| 399 | me.waiter.switch() |
| 400 | |
| 401 | ###-------------------------------------------------------------------------- |
| 402 | ### Dispatching coroutine. |
| 403 | |
| 404 | ## Match a string if it can stand on its own as a bareword: i.e., it doesn't |
| 405 | ## contain backslashes, quotes or whitespace. |
| 406 | rx_ordinary = RX.compile(r'^[^\\\'\"\s]+$') |
| 407 | |
| 408 | ## Match characters which need to be escaped, even in quoted text. |
| 409 | rx_weird = RX.compile(r'([\\\'])') |
| 410 | |
| 411 | def quotify(s): |
| 412 | """Quote S according to the tripe-admin(5) rules.""" |
| 413 | m = rx_ordinary.match(s) |
| 414 | if m and m.end() == len(s): |
| 415 | return s |
| 416 | else: |
| 417 | return "'" + rx_weird.sub(r'\\\1', s) + "'" |
| 418 | |
| 419 | def _callback(func): |
| 420 | """ |
| 421 | Return a wrapper for FUNC which reports exceptions thrown by it. |
| 422 | |
| 423 | Useful in the case of callbacks invoked by C functions which ignore |
| 424 | exceptions. |
| 425 | """ |
| 426 | def _(*a, **kw): |
| 427 | try: |
| 428 | return func(*a, **kw) |
| 429 | except: |
| 430 | SYS.excepthook(*SYS.exc_info()) |
| 431 | raise |
| 432 | return _ |
| 433 | |
| 434 | class TripeCommand (object): |
| 435 | """ |
| 436 | This abstract class represents a command in progress. |
| 437 | |
| 438 | The `words' attribute contains the list of tokens which make up the |
| 439 | command. |
| 440 | |
| 441 | Subclasses must implement a method to handle server responses: |
| 442 | |
| 443 | * response(CODE, *ARGS): CODE is one of the strings `OK', `INFO' or |
| 444 | `FAIL'; ARGS are the remaining tokens from the server's response. |
| 445 | """ |
| 446 | |
| 447 | def __init__(me, words): |
| 448 | """Make a new command consisting of the given list of WORDS.""" |
| 449 | me.words = words |
| 450 | |
| 451 | class TripeSynchronousCommand (TripeCommand): |
| 452 | """ |
| 453 | A simple command, processed apparently synchronously. |
| 454 | |
| 455 | Must be invoked from a coroutine other than the root (or whichever one is |
| 456 | running the dispatcher); in reality, other coroutines carry on running |
| 457 | while we wait for a response from the server. |
| 458 | |
| 459 | Each server response causes the calling coroutine to be resumed with the |
| 460 | pair (CODE, REST) -- where CODE is the server's response code (`OK', `INFO' |
| 461 | or `FAIL') and REST is a list of the server's other response tokens. The |
| 462 | calling coroutine must continue switching back to the dispatcher until a |
| 463 | terminating response (`OK' or `FAIL') is received or become very |
| 464 | confused. |
| 465 | |
| 466 | Mostly it's better to use the `TripeCommandIterator' to do this |
| 467 | automatically. |
| 468 | """ |
| 469 | |
| 470 | def __init__(me, words): |
| 471 | """Initialize the command, specifying the WORDS to send to the server.""" |
| 472 | TripeCommand.__init__(me, words) |
| 473 | me.owner = Coroutine.getcurrent() |
| 474 | |
| 475 | def response(me, code, *rest): |
| 476 | """Handle a server response by forwarding it to the calling coroutine.""" |
| 477 | me.owner.switch((code, rest)) |
| 478 | |
| 479 | class TripeError (StandardError): |
| 480 | """ |
| 481 | A tripe command failed with an error (a `FAIL' code). The args attribute |
| 482 | contains a list of the server's message tokens. |
| 483 | """ |
| 484 | pass |
| 485 | |
| 486 | class TripeCommandIterator (object): |
| 487 | """ |
| 488 | Iterator interface to a tripe command. |
| 489 | |
| 490 | The values returned by the iterator are lists of tokens from the server's |
| 491 | `INFO' lines, as processed by the given filter function, if any. The |
| 492 | iterator completes normally (by raising `StopIteration') if the server |
| 493 | reported `OK', and raises an exception if the command failed for some reason. |
| 494 | |
| 495 | A `TripeError' is raised if the server issues a `FAIL' code. If the |
| 496 | connection failed, some other exception is raised. |
| 497 | """ |
| 498 | |
| 499 | def __init__(me, dispatcher, words, bg = False, filter = None): |
| 500 | """ |
| 501 | Create a new command iterator. |
| 502 | |
| 503 | The command is submitted to the DISPATCHER; it consists of the given |
| 504 | WORDS. If BG is true, then an option is inserted to request that the |
| 505 | server run the command in the background. The FILTER is applied to the |
| 506 | token lists which the server responds, and the filter's output are the |
| 507 | items returned by the iterator. |
| 508 | """ |
| 509 | me.dcr = Coroutine.getcurrent().parent |
| 510 | if me.dcr is None: |
| 511 | raise ValueError('must invoke from coroutine') |
| 512 | me.filter = filter or (lambda x: x) |
| 513 | if bg: |
| 514 | words = [words[0], '-background', dispatcher.bgtag()] + list(words[1:]) |
| 515 | dispatcher.rawcommand(TripeSynchronousCommand(words)) |
| 516 | |
| 517 | def __iter__(me): |
| 518 | """Iterator protocol: I am my own iterator.""" |
| 519 | return me |
| 520 | |
| 521 | def next(me): |
| 522 | """ |
| 523 | Iterator protocol: return the next piece of information from the server. |
| 524 | |
| 525 | `INFO' responses are filtered and returned as the values of the |
| 526 | iteration. `FAIL' and `CONNERR' responses are turned into exceptions and |
| 527 | raised. Finally, `OK' is turned into `StopIteration', which should cause |
| 528 | a normal end to the iteration process. |
| 529 | """ |
| 530 | thing = me.dcr.switch() |
| 531 | code, rest = thing |
| 532 | if code == 'INFO': |
| 533 | return me.filter(rest) |
| 534 | elif code == 'OK': |
| 535 | raise StopIteration() |
| 536 | elif code == 'CONNERR': |
| 537 | if rest is None: |
| 538 | raise TripeConnectionError('connection terminated by user') |
| 539 | else: |
| 540 | raise rest |
| 541 | elif code == 'FAIL': |
| 542 | raise TripeError(*rest) |
| 543 | else: |
| 544 | raise TripeInternalError('unexpected tripe response %r' % |
| 545 | ([code] + rest)) |
| 546 | |
| 547 | ### Simple utility functions for the TripeCommandIterator convenience |
| 548 | ### methods. |
| 549 | |
| 550 | def _tokenjoin(words): |
| 551 | """Filter function: simply join the given tokens with spaces between.""" |
| 552 | return ' '.join(words) |
| 553 | |
| 554 | def _keyvals(iter): |
| 555 | """Return a dictionary formed from the `KEY=VALUE' pairs returned by the |
| 556 | iterator ITER.""" |
| 557 | kv = {} |
| 558 | for ww in iter: |
| 559 | for w in ww: |
| 560 | q = w.index('=') |
| 561 | kv[w[:q]] = w[q + 1:] |
| 562 | return kv |
| 563 | |
| 564 | def _simple(iter): |
| 565 | """Raise an error if ITER contains any item.""" |
| 566 | stuff = list(iter) |
| 567 | if len(stuff) != 0: |
| 568 | raise TripeInternalError('expected no response') |
| 569 | return None |
| 570 | |
| 571 | def _oneline(iter): |
| 572 | """If ITER contains a single item, return it; otherwise raise an error.""" |
| 573 | stuff = list(iter) |
| 574 | if len(stuff) != 1: |
| 575 | raise TripeInternalError('expected only one line of response') |
| 576 | return stuff[0] |
| 577 | |
| 578 | def _tracelike(iter): |
| 579 | """Handle a TRACE-like command. The result is a list of tuples (CHAR, |
| 580 | STATUS, DESC): CHAR is a selector character, STATUS is the status (empty if |
| 581 | disabled, `+' if enabled, maybe something else later), and DESC is the |
| 582 | human-readable description.""" |
| 583 | stuff = [] |
| 584 | for ww in iter: |
| 585 | ch = ww[0][0] |
| 586 | st = ww[0][1:] |
| 587 | desc = ' '.join(ww[1:]) |
| 588 | stuff.append((ch, st, desc)) |
| 589 | return stuff |
| 590 | |
| 591 | def _kwopts(kw, allowed): |
| 592 | """Parse keyword arguments into options. ALLOWED is a list of allowable |
| 593 | keywords; raise errors if other keywords are present. `KEY = VALUE' |
| 594 | becomes an option pair `-KEY VALUE' if VALUE is a string, just the option |
| 595 | `-KEY' if VALUE is a true non-string, or nothing if VALUE is false. Insert |
| 596 | a `--' at the end to stop the parser getting confused.""" |
| 597 | opts = [] |
| 598 | amap = {} |
| 599 | for a in allowed: amap[a] = True |
| 600 | for k, v in kw.iteritems(): |
| 601 | if k not in amap: |
| 602 | raise ValueError('option %s not allowed here' % k) |
| 603 | if isinstance(v, str): |
| 604 | opts += ['-' + k, v] |
| 605 | elif v: |
| 606 | opts += ['-' + k] |
| 607 | opts.append('--') |
| 608 | return opts |
| 609 | |
| 610 | ## Deferral. |
| 611 | _deferq = [] |
| 612 | def defer(func, *args, **kw): |
| 613 | """Call FUNC(*ARGS, **KW) later, in the root coroutine.""" |
| 614 | _deferq.append((func, args, kw)) |
| 615 | |
| 616 | def funargstr(func, args, kw): |
| 617 | items = [repr(a) for a in args] |
| 618 | for k, v in kw.iteritems(): |
| 619 | items.append('%s = %r' % (k, v)) |
| 620 | return '%s(%s)' % (func.__name__, ', '.join(items)) |
| 621 | |
| 622 | def spawn(func, *args, **kw): |
| 623 | """Call FUNC, passing ARGS and KW, in a fresh coroutine.""" |
| 624 | defer(lambda: (Coroutine(func, name = funargstr(func, args, kw)) |
| 625 | .switch(*args, **kw))) |
| 626 | |
| 627 | ## Asides. |
| 628 | _asideq = Queue() |
| 629 | def _runasides(): |
| 630 | """ |
| 631 | Read (FUNC, ARGS, KW) triples from queue and invoke FUNC(*ARGS, **KW). |
| 632 | """ |
| 633 | while True: |
| 634 | func, args, kw = _asideq.get() |
| 635 | try: |
| 636 | func(*args, **kw) |
| 637 | except: |
| 638 | SYS.excepthook(*SYS.exc_info()) |
| 639 | |
| 640 | def aside(func, *args, **kw): |
| 641 | """Call FUNC(*ARGS, **KW) later, in a non-root coroutine.""" |
| 642 | defer(_asideq.put, (func, args, kw)) |
| 643 | |
| 644 | class TripeCommandDispatcher (TripeConnection): |
| 645 | """ |
| 646 | Command dispatcher. |
| 647 | |
| 648 | The command dispatcher is a connection which knows how to handle commands. |
| 649 | This is probably the most important class in this module to understand. |
| 650 | |
| 651 | Lines from the server are parsed into tokens. The first token is a code |
| 652 | (`OK' or `NOTE' or something) explaining what kind of line this is. The |
| 653 | `handler' attribute is a dictionary mapping server line codes to handler |
| 654 | functions, which are applied to the words of the line as individual |
| 655 | arguments. *Exception*: the content of `TRACE' lines is not tokenized. |
| 656 | |
| 657 | There are default handlers for server codes which respond to commands. |
| 658 | Commands arrive as `TripeCommand' instances through the `rawcommand' |
| 659 | interface. The dispatcher keeps track of which command objects represent |
| 660 | which jobs, and sends responses on to the appropriate command objects by |
| 661 | invoking their `response' methods. Command objects don't see the `BG...' |
| 662 | codes, because the dispatcher has already transformed them into regular |
| 663 | codes when it was looking up the job tag. |
| 664 | |
| 665 | The dispatcher also has a special response code of its own: `CONNERR' |
| 666 | indicates that the connection failed and the command has therefore been |
| 667 | lost. This is sent to all outstanding commands when a connection error is |
| 668 | encountered: rather than a token list, it is accompanied by an exception |
| 669 | object which is the cause of the disconnection, which may be `None' if the |
| 670 | disconnection is expected (e.g., the direct result of a user request). |
| 671 | """ |
| 672 | |
| 673 | ## --- Infrastructure --- |
| 674 | ## |
| 675 | ## We will get confused if we pipeline commands. Send them one at a time. |
| 676 | ## Only send a command when the previous one detaches or completes. |
| 677 | ## |
| 678 | ## The following attributes are interesting: |
| 679 | ## |
| 680 | ## tagseq Sequence number for next background job (for bgtag) |
| 681 | ## |
| 682 | ## queue Commands awaiting submission. |
| 683 | ## |
| 684 | ## cmd Mapping from job tags to commands: cmd[None] is the |
| 685 | ## foreground command. |
| 686 | ## |
| 687 | ## handler Mapping from server codes to handler functions. |
| 688 | |
| 689 | def __init__(me, socket): |
| 690 | """ |
| 691 | Initialize the dispatcher. |
| 692 | |
| 693 | The SOCKET is the filename of the administration socket to connect to, |
| 694 | for TripeConnection.__init__. |
| 695 | """ |
| 696 | TripeConnection.__init__(me, socket) |
| 697 | me.tagseq = 0 |
| 698 | me.handler = {} |
| 699 | me.handler['BGDETACH'] = me._detach |
| 700 | for i in 'BGOK', 'BGINFO', 'BGFAIL': |
| 701 | me.handler[i] = me._response |
| 702 | for i in 'OK', 'INFO', 'FAIL': |
| 703 | me.handler[i] = me._fgresponse |
| 704 | |
| 705 | def quitp(me): |
| 706 | """Should we quit the main loop? Subclasses should override.""" |
| 707 | return False |
| 708 | |
| 709 | def mainloop(me, quitp = None): |
| 710 | """ |
| 711 | Iterate the I/O watcher until QUITP returns true. |
| 712 | |
| 713 | Arranges for asides and deferred calls to be made at the right times. |
| 714 | """ |
| 715 | |
| 716 | global _deferq |
| 717 | assert _Coroutine.getcurrent() is rootcr |
| 718 | Coroutine(_runasides, name = '_runasides').switch() |
| 719 | if quitp is None: |
| 720 | quitp = me.quitp |
| 721 | while not quitp(): |
| 722 | while _deferq: |
| 723 | q = _deferq |
| 724 | _deferq = [] |
| 725 | for func, args, kw in q: |
| 726 | func(*args, **kw) |
| 727 | me.iowatch.iterate() |
| 728 | |
| 729 | def connected(me): |
| 730 | """ |
| 731 | Connection hook. |
| 732 | |
| 733 | If a subclass overrides this method, it must call us; clears out the |
| 734 | command queue and job map. |
| 735 | """ |
| 736 | me.queue = M.Array() |
| 737 | me.cmd = {} |
| 738 | TripeConnection.connected(me) |
| 739 | |
| 740 | def disconnected(me, reason): |
| 741 | """ |
| 742 | Disconnection hook. |
| 743 | |
| 744 | If a subclass hooks overrides this method, it must call us; sends a |
| 745 | special `CONNERR' code to all incomplete commands. |
| 746 | """ |
| 747 | TripeConnection.disconnected(me, reason) |
| 748 | for cmd in me.cmd.itervalues(): |
| 749 | cmd.response('CONNERR', reason) |
| 750 | for cmd in me.queue: |
| 751 | cmd.response('CONNERR', reason) |
| 752 | |
| 753 | @_callback |
| 754 | def line(me, line): |
| 755 | """Handle an incoming line, sending it to the right place.""" |
| 756 | if _debug: print '<', line |
| 757 | code, rest = M.word(line, quotep = True) |
| 758 | func = me.handler.get(code) |
| 759 | if func is not None: |
| 760 | if code == 'TRACE': |
| 761 | func(code, rest) |
| 762 | else: |
| 763 | func(code, *M.split(rest, quotep = True)[0]) |
| 764 | me.dequeue() |
| 765 | |
| 766 | def dequeue(me): |
| 767 | """ |
| 768 | Pull the oldest command off the queue and try to send it to the server. |
| 769 | """ |
| 770 | if not me.queue or None in me.cmd: return |
| 771 | cmd = me.queue.shift() |
| 772 | if _debug: print '>', ' '.join([quotify(w) for w in cmd.words]) |
| 773 | me.send(' '.join([quotify(w) for w in cmd.words])) |
| 774 | me.cmd[None] = cmd |
| 775 | |
| 776 | def bgtag(me): |
| 777 | """ |
| 778 | Return an unused job tag. |
| 779 | |
| 780 | May be of use when composing commands by hand. |
| 781 | """ |
| 782 | tag = 'J%05d' % me.tagseq |
| 783 | me.tagseq += 1 |
| 784 | return tag |
| 785 | |
| 786 | ## --- Built-in handler functions for server responses --- |
| 787 | |
| 788 | def _detach(me, _, tag): |
| 789 | """ |
| 790 | Respond to a `BGDETACH' TAG message. |
| 791 | |
| 792 | Move the current foreground command to the background. |
| 793 | """ |
| 794 | assert tag not in me.cmd |
| 795 | me.cmd[tag] = me.cmd[None] |
| 796 | del me.cmd[None] |
| 797 | |
| 798 | def _response(me, code, tag, *w): |
| 799 | """ |
| 800 | Respond to an `OK', `INFO' or `FAIL' message. |
| 801 | |
| 802 | If this is a message for a background job, find the tag; then dispatch |
| 803 | the result to the command object. This is also called by `_fgresponse' |
| 804 | (wth TAG set to `None') to handle responses for foreground commands, and |
| 805 | is therefore a useful method to extend or override in subclasses. |
| 806 | """ |
| 807 | if code.startswith('BG'): |
| 808 | code = code[2:] |
| 809 | cmd = me.cmd[tag] |
| 810 | if code != 'INFO': |
| 811 | del me.cmd[tag] |
| 812 | cmd.response(code, *w) |
| 813 | |
| 814 | def _fgresponse(me, code, *w): |
| 815 | """Process responses to the foreground command.""" |
| 816 | me._response(code, None, *w) |
| 817 | |
| 818 | ## --- Interface methods --- |
| 819 | |
| 820 | def rawcommand(me, cmd): |
| 821 | """ |
| 822 | Submit the `TripeCommand' CMD to the server, and look after it until it |
| 823 | completes. |
| 824 | """ |
| 825 | if not me.connectedp(): |
| 826 | raise TripeConnectionError('connection closed') |
| 827 | me.queue.push(cmd) |
| 828 | me.dequeue() |
| 829 | |
| 830 | def command(me, *cmd, **kw): |
| 831 | """Convenience wrapper for creating a TripeCommandIterator object.""" |
| 832 | return TripeCommandIterator(me, cmd, **kw) |
| 833 | |
| 834 | ## --- Convenience methods for server commands --- |
| 835 | |
| 836 | def add(me, peer, *addr, **kw): |
| 837 | return _simple(me.command(bg = True, |
| 838 | *['ADD'] + |
| 839 | _kwopts(kw, ['tunnel', 'keepalive', |
| 840 | 'key', 'priv', 'cork', |
| 841 | 'mobile']) + |
| 842 | [peer] + |
| 843 | list(addr))) |
| 844 | def addr(me, peer): |
| 845 | return _oneline(me.command('ADDR', peer)) |
| 846 | def algs(me, peer = None): |
| 847 | return _keyvals(me.command('ALGS', |
| 848 | *((peer is not None and [peer]) or []))) |
| 849 | def checkchal(me, chal): |
| 850 | return _simple(me.command('CHECKCHAL', chal)) |
| 851 | def daemon(me): |
| 852 | return _simple(me.command('DAEMON')) |
| 853 | def eping(me, peer, **kw): |
| 854 | return _oneline(me.command(bg = True, |
| 855 | *['EPING'] + |
| 856 | _kwopts(kw, ['timeout']) + |
| 857 | [peer])) |
| 858 | def forcekx(me, peer): |
| 859 | return _simple(me.command('FORCEKX', peer)) |
| 860 | def getchal(me): |
| 861 | return _oneline(me.command('GETCHAL', filter = _tokenjoin)) |
| 862 | def greet(me, peer, chal): |
| 863 | return _simple(me.command('GREET', peer, chal)) |
| 864 | def help(me): |
| 865 | return list(me.command('HELP', filter = _tokenjoin)) |
| 866 | def ifname(me, peer): |
| 867 | return _oneline(me.command('IFNAME', peer, filter = _tokenjoin)) |
| 868 | def kill(me, peer): |
| 869 | return _simple(me.command('KILL', peer)) |
| 870 | def list(me): |
| 871 | return list(me.command('LIST', filter = _tokenjoin)) |
| 872 | def notify(me, *msg): |
| 873 | return _simple(me.command('NOTIFY', *msg)) |
| 874 | def peerinfo(me, peer): |
| 875 | return _keyvals(me.command('PEERINFO', peer)) |
| 876 | def ping(me, peer, **kw): |
| 877 | return _oneline(me.command(bg = True, |
| 878 | *['PING'] + |
| 879 | _kwopts(kw, ['timeout']) + |
| 880 | [peer])) |
| 881 | def port(me): |
| 882 | return _oneline(me.command('PORT', filter = _tokenjoin)) |
| 883 | def quit(me): |
| 884 | return _simple(me.command('QUIT')) |
| 885 | def reload(me): |
| 886 | return _simple(me.command('RELOAD')) |
| 887 | def servinfo(me): |
| 888 | return _keyvals(me.command('SERVINFO')) |
| 889 | def setifname(me, new): |
| 890 | return _simple(me.command('SETIFNAME', new)) |
| 891 | def svcclaim(me, service, version): |
| 892 | return _simple(me.command('SVCCLAIM', service, version)) |
| 893 | def svcensure(me, service, version = None): |
| 894 | return _simple(me.command('SVCENSURE', service, |
| 895 | *((version is not None and [version]) or []))) |
| 896 | def svcfail(me, job, *msg): |
| 897 | return _simple(me.command('SVCFAIL', job, *msg)) |
| 898 | def svcinfo(me, job, *msg): |
| 899 | return _simple(me.command('SVCINFO', job, *msg)) |
| 900 | def svclist(me): |
| 901 | return list(me.command('SVCLIST')) |
| 902 | def svcok(me, job): |
| 903 | return _simple(me.command('SVCOK', job)) |
| 904 | def svcquery(me, service): |
| 905 | return _keyvals(me.command('SVCQUERY', service)) |
| 906 | def svcrelease(me, service): |
| 907 | return _simple(me.command('SVCRELEASE', service)) |
| 908 | def svcsubmit(me, service, *args, **kw): |
| 909 | return me.command(bg = True, |
| 910 | *['SVCSUBMIT'] + |
| 911 | _kwopts(kw, ['version']) + |
| 912 | [service] + |
| 913 | list(args)) |
| 914 | def stats(me, peer): |
| 915 | return _keyvals(me.command('STATS', peer)) |
| 916 | def trace(me, *args): |
| 917 | return _tracelike(me.command('TRACE', *args)) |
| 918 | def tunnels(me): |
| 919 | return list(me.command('TUNNELS', filter = _tokenjoin)) |
| 920 | def version(me): |
| 921 | return _oneline(me.command('VERSION', filter = _tokenjoin)) |
| 922 | def warn(me, *msg): |
| 923 | return _simple(me.command('WARN', *msg)) |
| 924 | def watch(me, *args): |
| 925 | return _tracelike(me.command('WATCH', *args)) |
| 926 | |
| 927 | ###-------------------------------------------------------------------------- |
| 928 | ### Asynchronous commands. |
| 929 | |
| 930 | class TripeAsynchronousCommand (TripeCommand): |
| 931 | """ |
| 932 | Asynchronous commands. |
| 933 | |
| 934 | This is the complicated way of issuing commands. You must set up a queue, |
| 935 | and associate the command with the queue. Responses arriving for the |
| 936 | command will be put on the queue as an triple of the form (TAG, CODE, REST) |
| 937 | -- where TAG is an object of your choice, not interpreted by this class, |
| 938 | CODE is the server's response code (`OK', `INFO', `FAIL', or `CONNERR'), |
| 939 | and REST is the list of the rest of the server's tokens. |
| 940 | |
| 941 | Using this, you can write coroutines which process many commands (and |
| 942 | possibly other events) simultaneously. |
| 943 | """ |
| 944 | |
| 945 | def __init__(me, queue, tag, words): |
| 946 | """Make an asynchronous command consisting of the given WORDS, which |
| 947 | sends responses to QUEUE, labelled with TAG.""" |
| 948 | TripeCommand.__init__(me, words) |
| 949 | me.queue = queue |
| 950 | me.tag = tag |
| 951 | |
| 952 | def response(me, code, *stuff): |
| 953 | """Handle a server response by writing it to the caller's queue.""" |
| 954 | me.queue.put((me.tag, code, list(stuff))) |
| 955 | |
| 956 | ###-------------------------------------------------------------------------- |
| 957 | ### Services. |
| 958 | |
| 959 | class TripeJobCancelled (Exception): |
| 960 | """ |
| 961 | Exception sent to job handler if the client kills the job. |
| 962 | |
| 963 | Not propagated further. |
| 964 | """ |
| 965 | pass |
| 966 | |
| 967 | class TripeJobError (Exception): |
| 968 | """ |
| 969 | Exception to cause failure report for running job. |
| 970 | |
| 971 | Sends an SVCFAIL code back. |
| 972 | """ |
| 973 | pass |
| 974 | |
| 975 | class TripeSyntaxError (Exception): |
| 976 | """ |
| 977 | Exception to report a syntax error for a job. |
| 978 | |
| 979 | Sends an SVCFAIL bad-svc-syntax message back. |
| 980 | """ |
| 981 | pass |
| 982 | |
| 983 | class TripeServiceManager (TripeCommandDispatcher): |
| 984 | """ |
| 985 | A command dispatcher with added handling for incoming service requests. |
| 986 | |
| 987 | There is usually only one instance of this class, called svcmgr. Some of |
| 988 | the support functions in this module assume that this is the case. |
| 989 | |
| 990 | To use, run `mLib.select' in a loop until the quitp method returns true; |
| 991 | then, in a non-root coroutine, register your services by calling `add', and |
| 992 | then call `running' when you've finished setting up. |
| 993 | |
| 994 | The instance handles server service messages `SVCJOB', `SVCCANCEL' and |
| 995 | `SVCCLAIM'. It maintains a table of running services. Incoming jobs cause |
| 996 | the service's `job' method to be invoked; `SVCCANCEL' sends a |
| 997 | `TripeJobCancelled' exception to the handler coroutine, and `SVCCLAIM' |
| 998 | causes the relevant service to be deregistered. |
| 999 | |
| 1000 | There is no base class for jobs, but a job must implement two methods: |
| 1001 | |
| 1002 | start() Begin processing; might be a no-op. |
| 1003 | |
| 1004 | cancel() Stop processing; the original client has killed the |
| 1005 | job. |
| 1006 | |
| 1007 | The life of a service manager is divided into two parts: setup and running; |
| 1008 | you tell the manager that you've finished setting up by calling the |
| 1009 | `running' method. If, at any point after setup is finished, there are no |
| 1010 | remaining services or jobs, `quitp' will return true, ending the process. |
| 1011 | """ |
| 1012 | |
| 1013 | ## --- Attributes --- |
| 1014 | ## |
| 1015 | ## svc Mapping name -> service object |
| 1016 | ## |
| 1017 | ## job Mapping jobid -> job handler coroutine |
| 1018 | ## |
| 1019 | ## runningp True when setup is finished |
| 1020 | ## |
| 1021 | ## _quitp True if explicit quit has been requested |
| 1022 | |
| 1023 | def __init__(me, socket): |
| 1024 | """ |
| 1025 | Initialize the service manager. |
| 1026 | |
| 1027 | SOCKET is the administration socket to connect to. |
| 1028 | """ |
| 1029 | TripeCommandDispatcher.__init__(me, socket) |
| 1030 | me.svc = {} |
| 1031 | me.job = {} |
| 1032 | me.runningp = False |
| 1033 | me.handler['SVCCANCEL'] = me._cancel |
| 1034 | me.handler['SVCJOB'] = me._job |
| 1035 | me.handler['SVCCLAIM'] = me._claim |
| 1036 | me._quitp = 0 |
| 1037 | |
| 1038 | def addsvc(me, svc): |
| 1039 | """Register a new service; SVC is a `TripeService' instance.""" |
| 1040 | assert svc.name not in me.svc |
| 1041 | me.svcclaim(svc.name, svc.version) |
| 1042 | me.svc[svc.name] = svc |
| 1043 | |
| 1044 | def _cancel(me, _, jid): |
| 1045 | """ |
| 1046 | Called when the server cancels a job; invokes the job's `cancel' method. |
| 1047 | """ |
| 1048 | job = me.job[jid] |
| 1049 | del me.job[jid] |
| 1050 | job.cancel() |
| 1051 | |
| 1052 | def _claim(me, _, svc, __): |
| 1053 | """Called when another program claims our service at a higher version.""" |
| 1054 | del me.svc[svc] |
| 1055 | |
| 1056 | def _job(me, _, jid, svc, cmd, *args): |
| 1057 | """ |
| 1058 | Called when the server sends us a job to do. |
| 1059 | |
| 1060 | Calls the service to collect a job, and begins processing it. |
| 1061 | """ |
| 1062 | assert jid not in me.job |
| 1063 | svc = me.svc[svc.lower()] |
| 1064 | job = svc.job(jid, cmd, args) |
| 1065 | me.job[jid] = job |
| 1066 | job.start() |
| 1067 | |
| 1068 | def running(me): |
| 1069 | """Answer true if setup is finished.""" |
| 1070 | me.runningp = True |
| 1071 | |
| 1072 | def jobdone(me, jid): |
| 1073 | """Informs the service manager that the job with id JID has finished.""" |
| 1074 | try: |
| 1075 | del me.job[jid] |
| 1076 | except KeyError: |
| 1077 | pass |
| 1078 | |
| 1079 | def quitp(me): |
| 1080 | """ |
| 1081 | Return true if no services or jobs are active (and, therefore, if this |
| 1082 | process can quit without anyone caring). |
| 1083 | """ |
| 1084 | return me._quitp or (me.runningp and ((not me.svc and not me.job) or |
| 1085 | not me.sock)) |
| 1086 | |
| 1087 | def quit(me): |
| 1088 | """Forces the quit flag (returned by quitp) on.""" |
| 1089 | me._quitp = True |
| 1090 | |
| 1091 | class TripeService (object): |
| 1092 | """ |
| 1093 | A standard service. |
| 1094 | |
| 1095 | The NAME and VERSION are passed on to the server. The CMDTAB is a |
| 1096 | dictionary mapping command names (in lowercase) to command objects. |
| 1097 | |
| 1098 | If the CMDTAB doesn't have entries for commands `HELP' and `QUIT' then |
| 1099 | defaults are provided. |
| 1100 | |
| 1101 | TripeService itself is mostly agnostic about the nature of command objects, |
| 1102 | but the TripeServiceJob class (below) has some requirements. The built-in |
| 1103 | HELP command requires command objects to have `usage' attributes. |
| 1104 | """ |
| 1105 | |
| 1106 | def __init__(me, name, version, cmdtab): |
| 1107 | """ |
| 1108 | Create and register a new service with the given NAME and VERSION. |
| 1109 | |
| 1110 | CMDTAB maps command names (in lower-case) to command objects. |
| 1111 | """ |
| 1112 | me.name = name |
| 1113 | me.version = version |
| 1114 | me.cmd = cmdtab |
| 1115 | me.activep = True |
| 1116 | me.cmd.setdefault('help', |
| 1117 | TripeServiceCommand('help', 0, 0, '', me._help)) |
| 1118 | me.cmd.setdefault('quit', |
| 1119 | TripeServiceCommand('quit', 0, 0, '', me._quit)) |
| 1120 | |
| 1121 | def job(me, jid, cmd, args): |
| 1122 | """ |
| 1123 | Called by the service manager: a job arrived with id JID. |
| 1124 | |
| 1125 | It asks for comamnd CMD with argument list ARGS. Creates a new job, |
| 1126 | passing it the information needed. |
| 1127 | """ |
| 1128 | return TripeServiceJob(jid, me, cmd, me.cmd.get(cmd.lower()), args) |
| 1129 | |
| 1130 | ## Simple default command handlers, complying with the spec in |
| 1131 | ## tripe-service(7). |
| 1132 | |
| 1133 | def _help(me): |
| 1134 | """Send a help summary to the user.""" |
| 1135 | cmds = me.cmd.items() |
| 1136 | cmds.sort() |
| 1137 | for name, cmd in cmds: |
| 1138 | svcinfo(name, *cmd.usage) |
| 1139 | |
| 1140 | def _quit(me): |
| 1141 | """Terminate the service manager.""" |
| 1142 | svcmgr.notify('svc-quit', me.name, 'admin-request') |
| 1143 | svcmgr.quit() |
| 1144 | |
| 1145 | class TripeServiceCommand (object): |
| 1146 | """A simple service command.""" |
| 1147 | |
| 1148 | def __init__(me, name, min, max, usage, func): |
| 1149 | """ |
| 1150 | Creates a new command. |
| 1151 | |
| 1152 | NAME is the command's name (in lowercase). |
| 1153 | |
| 1154 | MIN and MAX are the minimum and maximum number of allowed arguments (used |
| 1155 | for checking); either may be None to indicate no minimum or maximum. |
| 1156 | |
| 1157 | USAGE is a usage string, used for generating help and error messages. |
| 1158 | |
| 1159 | FUNC is the function to invoke. |
| 1160 | """ |
| 1161 | me.name = name |
| 1162 | me.min = min |
| 1163 | me.max = max |
| 1164 | me.usage = usage.split() |
| 1165 | me.func = func |
| 1166 | |
| 1167 | def run(me, *args): |
| 1168 | """ |
| 1169 | Called when the command is invoked. |
| 1170 | |
| 1171 | Does minimal checking of the arguments and calls the supplied function. |
| 1172 | """ |
| 1173 | if (me.min is not None and len(args) < me.min) or \ |
| 1174 | (me.max is not None and len(args) > me.max): |
| 1175 | raise TripeSyntaxError() |
| 1176 | me.func(*args) |
| 1177 | |
| 1178 | class TripeServiceJob (Coroutine): |
| 1179 | """ |
| 1180 | Job handler coroutine. |
| 1181 | |
| 1182 | A standard `TripeService' invokes a `TripeServiceJob' for each incoming job |
| 1183 | request, passing it the jobid, command and arguments, and a command object. |
| 1184 | The command object needs the following attributes. |
| 1185 | |
| 1186 | usage A usage list (excluding the command name) showing |
| 1187 | arguments and options. |
| 1188 | |
| 1189 | run(*ARGS) Function to react to the command with ARGS split into |
| 1190 | separate arguments. Invoked in a coroutine. The |
| 1191 | `svcinfo function (not the `TripeCommandDispatcher' |
| 1192 | method) may be used to send `INFO' lines. The |
| 1193 | function may raise `TripeJobError' to send a `FAIL' |
| 1194 | response back, or `TripeSyntaxError' to send a |
| 1195 | generic usage error. `TripeJobCancelled' exceptions |
| 1196 | are trapped silently. Other exceptions are |
| 1197 | translated into a generic internal-error message. |
| 1198 | |
| 1199 | This class automatically takes care of sending some closing response to the |
| 1200 | job, and for informing the service manager that the job is completed. |
| 1201 | |
| 1202 | The `jid' attribute stores the job's id. |
| 1203 | """ |
| 1204 | |
| 1205 | def __init__(me, jid, svc, cmd, command, args): |
| 1206 | """ |
| 1207 | Start a new job. |
| 1208 | |
| 1209 | The job is created with id JID, for service SVC, processing command name |
| 1210 | CMD (which the service resolved into the command object COMMAND, or |
| 1211 | `None'), and with the arguments ARGS. |
| 1212 | """ |
| 1213 | Coroutine.__init__(me) |
| 1214 | me.jid = jid |
| 1215 | me.svc = svc |
| 1216 | me.cmd = cmd |
| 1217 | me.command = command |
| 1218 | me.args = args |
| 1219 | |
| 1220 | def run(me): |
| 1221 | """ |
| 1222 | Main body of the coroutine. |
| 1223 | |
| 1224 | Does the tedious exception handling boilerplate and invokes the command's |
| 1225 | run method. |
| 1226 | """ |
| 1227 | try: |
| 1228 | try: |
| 1229 | if me.command is None: |
| 1230 | svcmgr.svcfail(me.jid, 'unknown-svc-command', me.cmd) |
| 1231 | else: |
| 1232 | me.command.run(*me.args) |
| 1233 | svcmgr.svcok(me.jid) |
| 1234 | except TripeJobError, exc: |
| 1235 | svcmgr.svcfail(me.jid, *exc.args) |
| 1236 | except TripeSyntaxError: |
| 1237 | svcmgr.svcfail(me.jid, 'bad-svc-syntax', |
| 1238 | me.svc.name, me.command.name, |
| 1239 | *me.command.usage) |
| 1240 | except TripeJobCancelled: |
| 1241 | pass |
| 1242 | except Exception, exc: |
| 1243 | svcmgr.svcfail(me.jid, 'svc-internal-error', |
| 1244 | exc.__class__.__name__, str(exc)) |
| 1245 | finally: |
| 1246 | svcmgr.jobdone(me.jid) |
| 1247 | |
| 1248 | def start(me): |
| 1249 | """Invoked by the service manager to start running the coroutine.""" |
| 1250 | me.switch() |
| 1251 | |
| 1252 | def cancel(me): |
| 1253 | """Invoked by the service manager to cancel the job.""" |
| 1254 | me.throw(TripeJobCancelled()) |
| 1255 | |
| 1256 | def svcinfo(*args): |
| 1257 | """ |
| 1258 | If invoked from a TripeServiceJob coroutine, sends an `INFO' line to the |
| 1259 | job's sender, automatically using the correct job id. |
| 1260 | """ |
| 1261 | svcmgr.svcinfo(Coroutine.getcurrent().jid, *args) |
| 1262 | |
| 1263 | def _setupsvc(tab, func): |
| 1264 | """ |
| 1265 | Setup coroutine for setting up service programs. |
| 1266 | |
| 1267 | Register the given services. |
| 1268 | """ |
| 1269 | try: |
| 1270 | for service in tab: |
| 1271 | svcmgr.addsvc(service) |
| 1272 | if func: |
| 1273 | func() |
| 1274 | finally: |
| 1275 | svcmgr.running() |
| 1276 | |
| 1277 | svcmgr = TripeServiceManager(None) |
| 1278 | def runservices(socket, tab, init = None, setup = None, daemon = False): |
| 1279 | """ |
| 1280 | Function to start a service provider. |
| 1281 | |
| 1282 | SOCKET is the socket to connect to, usually tripesock. |
| 1283 | |
| 1284 | TAB is a list of entries. An entry may be either a tuple |
| 1285 | |
| 1286 | (NAME, VERSION, COMMANDS) |
| 1287 | |
| 1288 | or a service object (e.g., a `TripeService' instance). |
| 1289 | |
| 1290 | COMMANDS is a dictionary mapping command names to tuples |
| 1291 | |
| 1292 | (MIN, MAX, USAGE, FUNC) |
| 1293 | |
| 1294 | of arguments for a `TripeServiceCommand' object. |
| 1295 | |
| 1296 | If DAEMON is true, then the process is forked into the background before we |
| 1297 | start. If INIT is given, it is called in the main coroutine, immediately |
| 1298 | after forking. If SETUP is given, it is called in a coroutine, after |
| 1299 | calling INIT and setting up the services but before marking the service |
| 1300 | manager as running. |
| 1301 | |
| 1302 | It is a really bad idea to do any initialization, particularly setting up |
| 1303 | coroutines, outside of the INIT or SETUP functions. In particular, if |
| 1304 | we're using rmcr for fake coroutines, the daemonizing fork will kill off |
| 1305 | the currently established coroutines in a most surprising way. |
| 1306 | |
| 1307 | The function runs a main select loop until the service manager decides to |
| 1308 | quit. |
| 1309 | """ |
| 1310 | |
| 1311 | svcmgr.socket = socket |
| 1312 | svcmgr.connect() |
| 1313 | svcs = [] |
| 1314 | for service in tab: |
| 1315 | if not isinstance(service, tuple): |
| 1316 | svcs.append(service) |
| 1317 | else: |
| 1318 | name, version, commands = service |
| 1319 | cmdmap = {} |
| 1320 | for cmd, stuff in commands.iteritems(): |
| 1321 | cmdmap[cmd] = TripeServiceCommand(cmd, *stuff) |
| 1322 | svcs.append(TripeService(name, version, cmdmap)) |
| 1323 | if daemon: |
| 1324 | M.daemonize() |
| 1325 | if init is not None: |
| 1326 | init() |
| 1327 | spawn(_setupsvc, svcs, setup) |
| 1328 | svcmgr.mainloop() |
| 1329 | |
| 1330 | ###-------------------------------------------------------------------------- |
| 1331 | ### Utilities for services. |
| 1332 | |
| 1333 | _timeunits = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400} |
| 1334 | def timespec(spec): |
| 1335 | """Parse the timespec SPEC, returning a number of seconds.""" |
| 1336 | mul = 1 |
| 1337 | if len(spec) > 1 and spec[-1] in _timeunits: |
| 1338 | mul = _timeunits[spec[-1]] |
| 1339 | spec = spec[:-1] |
| 1340 | try: |
| 1341 | t = int(spec) |
| 1342 | except: |
| 1343 | raise TripeJobError('bad-time-spec', spec) |
| 1344 | if t < 0: |
| 1345 | raise TripeJobError('bad-time-spec', spec) |
| 1346 | return mul * int(spec) |
| 1347 | |
| 1348 | class OptParse (object): |
| 1349 | """ |
| 1350 | Parse options from a command list in the conventional fashion. |
| 1351 | |
| 1352 | ARGS is a list of arguments to a command. ALLOWED is a sequence of allowed |
| 1353 | options. The returned values are the option tags. During parsing, the |
| 1354 | `arg' method may be used to retrieve the argument for the most recent |
| 1355 | option. Afterwards, `rest' may be used to retrieve the remaining |
| 1356 | non-option arguments, and do a simple check on how many there are. |
| 1357 | |
| 1358 | The parser correctly handles `--' option terminators. |
| 1359 | """ |
| 1360 | |
| 1361 | def __init__(me, args, allowed): |
| 1362 | """ |
| 1363 | Create a new option parser. |
| 1364 | |
| 1365 | The parser will scan the ARGS for options given in the sequence ALLOWED |
| 1366 | (which are expected to include the `-' prefix). |
| 1367 | """ |
| 1368 | me.allowed = {} |
| 1369 | for a in allowed: |
| 1370 | me.allowed[a] = True |
| 1371 | me.args = list(args) |
| 1372 | |
| 1373 | def __iter__(me): |
| 1374 | """Iterator protocol: I am my own iterator.""" |
| 1375 | return me |
| 1376 | |
| 1377 | def next(me): |
| 1378 | """ |
| 1379 | Iterator protocol: return the next option. |
| 1380 | |
| 1381 | If we've run out, raise `StopIteration'. |
| 1382 | """ |
| 1383 | if len(me.args) == 0 or \ |
| 1384 | len(me.args[0]) < 2 or \ |
| 1385 | not me.args[0].startswith('-'): |
| 1386 | raise StopIteration() |
| 1387 | opt = me.args.pop(0) |
| 1388 | if opt == '--': |
| 1389 | raise StopIteration() |
| 1390 | if opt not in me.allowed: |
| 1391 | raise TripeSyntaxError() |
| 1392 | return opt |
| 1393 | |
| 1394 | def arg(me): |
| 1395 | """ |
| 1396 | Return the argument for the most recent option. |
| 1397 | |
| 1398 | If none is available, raise `TripeSyntaxError'. |
| 1399 | """ |
| 1400 | if len(me.args) == 0: |
| 1401 | raise TripeSyntaxError() |
| 1402 | return me.args.pop(0) |
| 1403 | |
| 1404 | def rest(me, min = None, max = None): |
| 1405 | """ |
| 1406 | After option parsing is done, return the remaining arguments. |
| 1407 | |
| 1408 | Check that there are at least MIN and at most MAX arguments remaining -- |
| 1409 | either may be None to suppress the check. |
| 1410 | """ |
| 1411 | if (min is not None and len(me.args) < min) or \ |
| 1412 | (max is not None and len(me.args) > max): |
| 1413 | raise TripeSyntaxError() |
| 1414 | return me.args |
| 1415 | |
| 1416 | ###----- That's all, folks -------------------------------------------------- |