svc/conntrack.in (kickpeers): Rename `map' variable.
[tripe] / svc / conntrack.in
1 #! @PYTHON@
2 ### -*-python-*-
3 ###
4 ### Service for automatically tracking network connection status
5 ###
6 ### (c) 2010 Straylight/Edgeware
7 ###
8
9 ###----- Licensing notice ---------------------------------------------------
10 ###
11 ### This file is part of Trivial IP Encryption (TrIPE).
12 ###
13 ### TrIPE is free software: you can redistribute it and/or modify it under
14 ### the terms of the GNU General Public License as published by the Free
15 ### Software Foundation; either version 3 of the License, or (at your
16 ### option) any later version.
17 ###
18 ### TrIPE is distributed in the hope that it will be useful, but WITHOUT
19 ### ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
20 ### FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
21 ### for more details.
22 ###
23 ### You should have received a copy of the GNU General Public License
24 ### along with TrIPE. If not, see <https://www.gnu.org/licenses/>.
25
26 VERSION = '@VERSION@'
27
28 ###--------------------------------------------------------------------------
29 ### External dependencies.
30
31 from ConfigParser import RawConfigParser
32 from optparse import OptionParser
33 import os as OS
34 import sys as SYS
35 import socket as S
36 import mLib as M
37 import tripe as T
38 import dbus as D
39 import re as RX
40 for i in ['mainloop', 'mainloop.glib']:
41 __import__('dbus.%s' % i)
42 try: from gi.repository import GLib as G
43 except ImportError: import gobject as G
44 from struct import pack, unpack
45
46 SM = T.svcmgr
47 ##__import__('rmcr').__debug = True
48
49 ###--------------------------------------------------------------------------
50 ### Utilities.
51
52 class struct (object):
53 """A simple container object."""
54 def __init__(me, **kw):
55 me.__dict__.update(kw)
56
57 ###--------------------------------------------------------------------------
58 ### Address manipulation.
59
60 class InetAddress (object):
61 def __init__(me, addrstr, maskstr = None):
62 me.addr = me._addrstr_to_int(addrstr)
63 if maskstr is None:
64 me.mask = -1
65 elif maskstr.isdigit():
66 me.mask = (1 << 32) - (1 << 32 - int(maskstr))
67 else:
68 me.mask = me._addrstr_to_int(maskstr)
69 if me.addr&~me.mask:
70 raise ValueError('network contains bits set beyond mask')
71 def _addrstr_to_int(me, addrstr):
72 return unpack('>L', S.inet_aton(addrstr))[0]
73 def _int_to_addrstr(me, n):
74 return S.inet_ntoa(pack('>L', n))
75 def sockaddr(me, port = 0):
76 if me.mask != -1: raise ValueError('not a simple address')
77 return me._int_to_addrstr(me.addr), port
78 def __str__(me):
79 addrstr = me._int_to_addrstr(me.addr)
80 if me.mask == -1:
81 return addrstr
82 else:
83 inv = me.mask ^ ((1 << 32) - 1)
84 if (inv&(inv + 1)) == 0:
85 return '%s/%d' % (addrstr, 32 - inv.bit_length())
86 else:
87 return '%s/%s' % (addrstr, me._int_to_addrstr(me.mask))
88 def withinp(me, net):
89 if (me.mask&net.mask) != net.mask: return False
90 if (me.addr ^ net.addr)&net.mask: return False
91 return True
92 def eq(me, other):
93 if me.mask != other.mask: return False
94 if me.addr != other.addr: return False
95 return True
96 @classmethod
97 def from_sockaddr(cls, sa):
98 addr, port = (lambda a, p: (a, p))(*sa)
99 return cls(addr), port
100
101 def parse_address(addrstr, maskstr = None):
102 return InetAddress(addrstr, maskstr)
103
104 def parse_net(netstr):
105 try: sl = netstr.index('/')
106 except ValueError: raise ValueError('missing mask')
107 return parse_address(netstr[:sl], netstr[sl + 1:])
108
109 def straddr(a): return a is None and '#<none>' or str(a)
110
111 ###--------------------------------------------------------------------------
112 ### Parse the configuration file.
113
114 ## Hmm. Should I try to integrate this with the peers database? It's not a
115 ## good fit; it'd need special hacks in tripe-newpeers. And the use case for
116 ## this service are largely going to be satellite notes, I don't think
117 ## scalability's going to be a problem.
118
119 TESTADDR = InetAddress('1.2.3.4')
120
121 CONFSYNTAX = [
122 ('COMMENT', RX.compile(r'^\s*($|[;#])')),
123 ('GRPHDR', RX.compile(r'^\s*\[(.*)\]\s*$')),
124 ('ASSGN', RX.compile(r'\s*([\w.-]+)\s*[:=]\s*(|\S|\S.*\S)\s*$'))]
125
126 class ConfigError (Exception):
127 def __init__(me, file, lno, msg):
128 me.file = file
129 me.lno = lno
130 me.msg = msg
131 def __str__(me):
132 return '%s:%d: %s' % (me.file, me.lno, me.msg)
133
134 class Config (object):
135 """
136 Represents a configuration file.
137
138 The most interesting thing is probably the `groups' slot, which stores a
139 list of pairs (NAME, PATTERNS); the NAME is a string, and the PATTERNS a
140 list of (TAG, PEER, NET) triples. The implication is that there should be
141 precisely one peer from the set, and that it should be named TAG, where
142 (TAG, PEER, NET) is the first triple such that the host's primary IP
143 address (if PEER is None -- or the IP address it would use for
144 communicating with PEER) is within the NET.
145 """
146
147 def __init__(me, file):
148 """
149 Construct a new Config object, reading the given FILE.
150 """
151 me._file = file
152 me._fwatch = M.FWatch(file)
153 me._update()
154
155 def check(me):
156 """
157 See whether the configuration file has been updated.
158 """
159 if me._fwatch.update():
160 me._update()
161
162 def _update(me):
163 """
164 Internal function to update the configuration from the underlying file.
165 """
166
167 if T._debug: print '# reread config'
168
169 ## Initial state.
170 testaddr = None
171 groups = {}
172 grpname = None
173 grplist = []
174
175 ## Open the file and start reading.
176 with open(me._file) as f:
177 lno = 0
178 for line in f:
179 lno += 1
180 for tag, rx in CONFSYNTAX:
181 m = rx.match(line)
182 if m: break
183 else:
184 raise ConfigError(me._file, lno, 'failed to parse line: %r' % line)
185
186 if tag == 'COMMENT':
187 ## A comment. Ignore it and hope it goes away.
188
189 continue
190
191 elif tag == 'GRPHDR':
192 ## A group header. Flush the old group and start a new one.
193 newname = m.group(1)
194
195 if grpname is not None: groups[grpname] = grplist
196 if newname in groups:
197 raise ConfigError(me._file, lno,
198 "duplicate group name `%s'" % newname)
199 grpname = newname
200 grplist = []
201
202 elif tag == 'ASSGN':
203 ## An assignment. Deal with it.
204 name, value = m.group(1), m.group(2)
205
206 if grpname is None:
207 ## We're outside of any group, so this is a global configuration
208 ## tweak.
209
210 if name == 'test-addr':
211 for astr in value.split():
212 try:
213 a = parse_address(astr)
214 except Exception, e:
215 raise ConfigError(me._file, lno,
216 "invalid IP address `%s': %s" %
217 (astr, e))
218 if testaddr is not None:
219 raise ConfigError(me._file, lno, 'duplicate test-address')
220 testaddr = a
221 else:
222 raise ConfigError(me._file, lno,
223 "unknown global option `%s'" % name)
224
225 else:
226 ## Parse a pattern and add it to the group.
227 spec = value.split()
228 i = 0
229
230 ## Check for an explicit target address.
231 if i >= len(spec) or spec[i].find('/') >= 0:
232 peer = None
233 else:
234 try:
235 peer = parse_address(spec[i])
236 except Exception, e:
237 raise ConfigError(me._file, lno,
238 "invalid IP address `%s': %s" %
239 (spec[i], e))
240 i += 1
241
242 ## Parse the local network.
243 if len(spec) != i + 1:
244 raise ConfigError(me._file, lno, 'no network defined')
245 try:
246 net = parse_net(spec[i])
247 except Exception, e:
248 raise ConfigError(me._file, lno,
249 "invalid IP network `%s': %s" %
250 (spec[i], e))
251
252 ## Add this entry to the list.
253 grplist.append((name, peer, net))
254
255 ## Fill in the default test address if necessary.
256 if testaddr is None: testaddr = TESTADDR
257
258 ## Done.
259 if grpname is not None: groups[grpname] = grplist
260 me.testaddr = testaddr
261 me.groups = groups
262
263 ### This will be a configuration file.
264 CF = None
265
266 def cmd_showconfig():
267 T.svcinfo('test-addr=%s' % CF.testaddr)
268 def cmd_showgroups():
269 for g in sorted(CF.groups.iterkeys()):
270 T.svcinfo(g)
271 def cmd_showgroup(g):
272 try: pats = CF.groups[g]
273 except KeyError: raise T.TripeJobError('unknown-group', g)
274 for t, p, n in pats:
275 T.svcinfo('peer', t,
276 'target', p and str(p) or '(default)',
277 'net', str(n))
278
279 ###--------------------------------------------------------------------------
280 ### Responding to a network up/down event.
281
282 def localaddr(peer):
283 """
284 Return the local IP address used for talking to PEER.
285 """
286 sk = S.socket(S.AF_INET, S.SOCK_DGRAM)
287 try:
288 try:
289 sk.connect(peer.sockaddr(1))
290 addr = sk.getsockname()
291 return InetAddress.from_sockaddr(addr)[0]
292 except S.error:
293 return None
294 finally:
295 sk.close()
296
297 _kick = T.Queue()
298 _delay = None
299
300 def cancel_delay():
301 global _delay
302 if _delay is not None:
303 if T._debug: print '# cancel delayed kick'
304 G.source_remove(_delay)
305 _delay = None
306
307 def netupdown(upness, reason):
308 """
309 Add or kill peers according to whether the network is up or down.
310
311 UPNESS is true if the network is up, or false if it's down.
312 """
313
314 _kick.put((upness, reason))
315
316 def delay_netupdown(upness, reason):
317 global _delay
318 cancel_delay()
319 def _func():
320 global _delay
321 if T._debug: print '# delayed %s: %s' % (upness, reason)
322 _delay = None
323 netupdown(upness, reason)
324 return False
325 if T._debug: print '# delaying %s: %s' % (upness, reason)
326 _delay = G.timeout_add(2000, _func)
327
328 def kickpeers():
329 while True:
330 upness, reason = _kick.get()
331 if T._debug: print '# kickpeers %s: %s' % (upness, reason)
332 select = []
333 cancel_delay()
334
335 ## Make sure the configuration file is up-to-date. Don't worry if we
336 ## can't do anything useful.
337 try:
338 CF.check()
339 except Exception, exc:
340 SM.warn('conntrack', 'config-file-error',
341 exc.__class__.__name__, str(exc))
342
343 ## Find the current list of peers.
344 peers = SM.list()
345
346 ## Work out the primary IP address.
347 if upness:
348 addr = localaddr(CF.testaddr)
349 if addr is None:
350 upness = False
351 else:
352 addr = None
353 if not T._debug: pass
354 elif addr: print '# local address = %s' % straddr(addr)
355 else: print '# offline'
356
357 ## Now decide what to do.
358 changes = []
359 for g, pp in CF.groups.iteritems():
360 if T._debug: print '# check group %s' % g
361
362 ## Find out which peer in the group ought to be active.
363 ip = None
364 statemap = {}
365 want = None
366 for t, p, n in pp:
367 if p is None or not upness:
368 ipq = addr
369 else:
370 ipq = localaddr(p)
371 if T._debug:
372 info = 'peer=%s; target=%s; net=%s; local=%s' % (
373 t, p or '(default)', n, straddr(ipq))
374 if upness and ip is None and \
375 ipq is not None and ipq.withinp(n):
376 if T._debug: print '# %s: SELECTED' % info
377 statemap[t] = 'up'
378 select.append('%s=%s' % (g, t))
379 if t == 'down' or t.startswith('down/'):
380 want = None
381 else:
382 want = t
383 ip = ipq
384 else:
385 statemap[t] = 'down'
386 if T._debug: print '# %s: skipped' % info
387
388 ## Shut down the wrong ones.
389 found = False
390 if T._debug: print '# peer-map = %r' % statemap
391 for p in peers:
392 what = statemap.get(p, 'leave')
393 if what == 'up':
394 found = True
395 if T._debug: print '# peer %s: already up' % p
396 elif what == 'down':
397 def _(p = p):
398 try:
399 SM.kill(p)
400 except T.TripeError, exc:
401 if exc.args[0] == 'unknown-peer':
402 ## Inherently racy; don't worry about this.
403 pass
404 else:
405 raise
406 if T._debug: print '# peer %s: bring down' % p
407 changes.append(_)
408
409 ## Start the right one if necessary.
410 if want is not None and not found:
411 def _(want = want):
412 try:
413 list(SM.svcsubmit('connect', 'active', want))
414 except T.TripeError, exc:
415 SM.warn('conntrack', 'connect-failed', want, *exc.args)
416 if T._debug: print '# peer %s: bring up' % want
417 changes.append(_)
418
419 ## Commit the changes.
420 if changes:
421 SM.notify('conntrack', upness and 'up' or 'down', *select + reason)
422 for c in changes: c()
423
424 ###--------------------------------------------------------------------------
425 ### NetworkManager monitor.
426
427 DBPROPS_IFACE = 'org.freedesktop.DBus.Properties'
428
429 NM_NAME = 'org.freedesktop.NetworkManager'
430 NM_PATH = '/org/freedesktop/NetworkManager'
431 NM_IFACE = NM_NAME
432 NMCA_IFACE = NM_NAME + '.Connection.Active'
433
434 NM_STATE_CONNECTED = 3 #obsolete
435 NM_STATE_CONNECTED_LOCAL = 50
436 NM_STATE_CONNECTED_SITE = 60
437 NM_STATE_CONNECTED_GLOBAL = 70
438 NM_CONNSTATES = set([NM_STATE_CONNECTED,
439 NM_STATE_CONNECTED_LOCAL,
440 NM_STATE_CONNECTED_SITE,
441 NM_STATE_CONNECTED_GLOBAL])
442
443 class NetworkManagerMonitor (object):
444 """
445 Watch NetworkManager signals for changes in network state.
446 """
447
448 ## Strategy. There are two kinds of interesting state transitions for us.
449 ## The first one is the global are-we-connected state, which we'll use to
450 ## toggle network upness on a global level. The second is which connection
451 ## has the default route, which we'll use to tweak which peer in the peer
452 ## group is active. The former is most easily tracked using the signal
453 ## org.freedesktop.NetworkManager.StateChanged; for the latter, we track
454 ## org.freedesktop.NetworkManager.Connection.Active.PropertiesChanged and
455 ## look for when a new connection gains the default route.
456
457 def attach(me, bus):
458 try:
459 nm = bus.get_object(NM_NAME, NM_PATH)
460 state = nm.Get(NM_IFACE, 'State', dbus_interface = DBPROPS_IFACE)
461 if state in NM_CONNSTATES:
462 netupdown(True, ['nm', 'initially-connected'])
463 else:
464 netupdown(False, ['nm', 'initially-disconnected'])
465 except D.DBusException, e:
466 if T._debug: print '# exception attaching to network-manager: %s' % e
467 bus.add_signal_receiver(me._nm_state, 'StateChanged',
468 NM_IFACE, NM_NAME, NM_PATH)
469 bus.add_signal_receiver(me._nm_connchange, 'PropertiesChanged',
470 NMCA_IFACE, NM_NAME, None)
471
472 def _nm_state(me, state):
473 if state in NM_CONNSTATES:
474 delay_netupdown(True, ['nm', 'connected'])
475 else:
476 delay_netupdown(False, ['nm', 'disconnected'])
477
478 def _nm_connchange(me, props):
479 if props.get('Default', False) or props.get('Default6', False):
480 delay_netupdown(True, ['nm', 'default-connection-change'])
481
482 ##--------------------------------------------------------------------------
483 ### Connman monitor.
484
485 CM_NAME = 'net.connman'
486 CM_PATH = '/'
487 CM_IFACE = 'net.connman.Manager'
488
489 class ConnManMonitor (object):
490 """
491 Watch ConnMan signls for changes in network state.
492 """
493
494 ## Strategy. Everything seems to be usefully encoded in the `State'
495 ## property. If it's `offline', `idle' or `ready' then we don't expect a
496 ## network connection. During handover from one network to another, the
497 ## property passes through `ready' to `online'.
498
499 def attach(me, bus):
500 try:
501 cm = bus.get_object(CM_NAME, CM_PATH)
502 props = cm.GetProperties(dbus_interface = CM_IFACE)
503 state = props['State']
504 netupdown(state == 'online', ['connman', 'initially-%s' % state])
505 except D.DBusException, e:
506 if T._debug: print '# exception attaching to connman: %s' % e
507 bus.add_signal_receiver(me._cm_state, 'PropertyChanged',
508 CM_IFACE, CM_NAME, CM_PATH)
509
510 def _cm_state(me, prop, value):
511 if prop != 'State': return
512 delay_netupdown(value == 'online', ['connman', value])
513
514 ###--------------------------------------------------------------------------
515 ### Maemo monitor.
516
517 ICD_NAME = 'com.nokia.icd'
518 ICD_PATH = '/com/nokia/icd'
519 ICD_IFACE = ICD_NAME
520
521 class MaemoICdMonitor (object):
522 """
523 Watch ICd signals for changes in network state.
524 """
525
526 ## Strategy. ICd only handles one connection at a time in steady state,
527 ## though when switching between connections, it tries to bring the new one
528 ## up before shutting down the old one. This makes life a bit easier than
529 ## it is with NetworkManager. On the other hand, the notifications are
530 ## relative to particular connections only, and the indicator that the old
531 ## connection is down (`IDLE') comes /after/ the new one comes up
532 ## (`CONNECTED'), so we have to remember which one is active.
533
534 def attach(me, bus):
535 try:
536 icd = bus.get_object(ICD_NAME, ICD_PATH)
537 try:
538 iap = icd.get_ipinfo(dbus_interface = ICD_IFACE)[0]
539 me._iap = iap
540 netupdown(True, ['icd', 'initially-connected', iap])
541 except D.DBusException:
542 me._iap = None
543 netupdown(False, ['icd', 'initially-disconnected'])
544 except D.DBusException, e:
545 if T._debug: print '# exception attaching to icd: %s' % e
546 me._iap = None
547 bus.add_signal_receiver(me._icd_state, 'status_changed', ICD_IFACE,
548 ICD_NAME, ICD_PATH)
549
550 def _icd_state(me, iap, ty, state, hunoz):
551 if state == 'CONNECTED':
552 me._iap = iap
553 delay_netupdown(True, ['icd', 'connected', iap])
554 elif state == 'IDLE' and iap == me._iap:
555 me._iap = None
556 delay_netupdown(False, ['icd', 'idle'])
557
558 ###--------------------------------------------------------------------------
559 ### D-Bus connection tracking.
560
561 class DBusMonitor (object):
562 """
563 Maintains a connection to the system D-Bus, and watches for signals.
564
565 If the connection is initially down, or drops for some reason, we retry
566 periodically (every five seconds at the moment). If the connection
567 resurfaces, we reattach the monitors.
568 """
569
570 def __init__(me):
571 """
572 Initialise the object and try to establish a connection to the bus.
573 """
574 me._mons = []
575 me._loop = D.mainloop.glib.DBusGMainLoop()
576 me._state = 'startup'
577 me._reconnect()
578
579 def addmon(me, mon):
580 """
581 Add a monitor object to watch for signals.
582
583 MON.attach(BUS) is called, with BUS being the connection to the system
584 bus. MON should query its service's current status and watch for
585 relevant signals.
586 """
587 me._mons.append(mon)
588 if me._bus is not None:
589 mon.attach(me._bus)
590
591 def _reconnect(me, hunoz = None):
592 """
593 Start connecting to the bus.
594
595 If we fail the first time, retry periodically.
596 """
597 if me._state == 'startup':
598 T.aside(SM.notify, 'conntrack', 'dbus-connection', 'startup')
599 elif me._state == 'connected':
600 T.aside(SM.notify, 'conntrack', 'dbus-connection', 'lost')
601 else:
602 T.aside(SM.notify, 'conntrack', 'dbus-connection',
603 'state=%s' % me._state)
604 me._state == 'reconnecting'
605 me._bus = None
606 if me._try_connect():
607 G.timeout_add_seconds(5, me._try_connect)
608
609 def _try_connect(me):
610 """
611 Actually make a connection attempt.
612
613 If we succeed, attach the monitors.
614 """
615 try:
616 addr = OS.getenv('TRIPE_CONNTRACK_BUS')
617 if addr == 'SESSION':
618 bus = D.SessionBus(mainloop = me._loop, private = True)
619 elif addr is not None:
620 bus = D.bus.BusConnection(addr, mainloop = me._loop)
621 else:
622 bus = D.SystemBus(mainloop = me._loop, private = True)
623 for m in me._mons:
624 m.attach(bus)
625 except D.DBusException, e:
626 return True
627 me._bus = bus
628 me._state = 'connected'
629 bus.call_on_disconnection(me._reconnect)
630 T.aside(SM.notify, 'conntrack', 'dbus-connection', 'connected')
631 return False
632
633 ###--------------------------------------------------------------------------
634 ### TrIPE service.
635
636 class GIOWatcher (object):
637 """
638 Monitor I/O events using glib.
639 """
640 def __init__(me, conn, mc = G.main_context_default()):
641 me._conn = conn
642 me._watch = None
643 me._mc = mc
644 def connected(me, sock):
645 me._watch = G.io_add_watch(sock, G.IO_IN,
646 lambda *hunoz: me._conn.receive())
647 def disconnected(me):
648 G.source_remove(me._watch)
649 me._watch = None
650 def iterate(me):
651 me._mc.iteration(True)
652
653 SM.iowatch = GIOWatcher(SM)
654
655 def init():
656 """
657 Service initialization.
658
659 Add the D-Bus monitor here, because we might send commands off immediately,
660 and we want to make sure the server connection is up.
661 """
662 global DBM
663 T.Coroutine(kickpeers, name = 'kickpeers').switch()
664 DBM = DBusMonitor()
665 DBM.addmon(NetworkManagerMonitor())
666 DBM.addmon(ConnManMonitor())
667 DBM.addmon(MaemoICdMonitor())
668 G.timeout_add_seconds(30, lambda: (_delay is not None or
669 netupdown(True, ['interval-timer']) or
670 True))
671
672 def parse_options():
673 """
674 Parse the command-line options.
675
676 Automatically changes directory to the requested configdir, and turns on
677 debugging. Returns the options object.
678 """
679 op = OptionParser(usage = '%prog [-a FILE] [-d DIR]',
680 version = '%%prog %s' % VERSION)
681
682 op.add_option('-a', '--admin-socket',
683 metavar = 'FILE', dest = 'tripesock', default = T.tripesock,
684 help = 'Select socket to connect to [default %default]')
685 op.add_option('-d', '--directory',
686 metavar = 'DIR', dest = 'dir', default = T.configdir,
687 help = 'Select current diretory [default %default]')
688 op.add_option('-c', '--config',
689 metavar = 'FILE', dest = 'conf', default = 'conntrack.conf',
690 help = 'Select configuration [default %default]')
691 op.add_option('--daemon', dest = 'daemon',
692 default = False, action = 'store_true',
693 help = 'Become a daemon after successful initialization')
694 op.add_option('--debug', dest = 'debug',
695 default = False, action = 'store_true',
696 help = 'Emit debugging trace information')
697 op.add_option('--startup', dest = 'startup',
698 default = False, action = 'store_true',
699 help = 'Being called as part of the server startup')
700
701 opts, args = op.parse_args()
702 if args: op.error('no arguments permitted')
703 OS.chdir(opts.dir)
704 T._debug = opts.debug
705 return opts
706
707 ## Service table, for running manually.
708 def cmd_updown(upness):
709 return lambda *args: T.defer(netupdown, upness, ['manual'] + list(args))
710 service_info = [('conntrack', VERSION, {
711 'up': (0, None, '', cmd_updown(True)),
712 'down': (0, None, '', cmd_updown(False)),
713 'show-config': (0, 0, '', cmd_showconfig),
714 'show-groups': (0, 0, '', cmd_showgroups),
715 'show-group': (1, 1, 'GROUP', cmd_showgroup)
716 })]
717
718 if __name__ == '__main__':
719 opts = parse_options()
720 CF = Config(opts.conf)
721 T.runservices(opts.tripesock, service_info,
722 init = init, daemon = opts.daemon)
723
724 ###----- That's all, folks --------------------------------------------------