better config errors
[hippotat] / hippotat / __init__.py
CommitLineData
b0cfbfce
IJ
1# -*- python -*-
2
37ab4cdc
IJ
3import signal
4signal.signal(signal.SIGINT, signal.SIG_DFL)
5
1321ad5f 6import sys
cae50358 7import os
1321ad5f 8
b83d422a
IJ
9from zope.interface import implementer
10
040ff511
IJ
11import twisted
12from twisted.internet import reactor
1d023c89 13import twisted.internet.endpoints
8c3b6620
IJ
14import twisted.logger
15from twisted.logger import LogLevel
16import twisted.python.constants
17from twisted.python.constants import NamedConstant
b0cfbfce
IJ
18
19import ipaddress
20from ipaddress import AddressValueError
21
ae7c7784 22from optparse import OptionParser
5510890e 23import configparser
ae7c7784
IJ
24from configparser import ConfigParser
25from configparser import NoOptionError
26
c13ee6e6
IJ
27from functools import partial
28
ae7c7784 29import collections
84e763c7 30import time
8c3b6620 31import codecs
eedc8b30 32import traceback
ae7c7784 33
1321ad5f
IJ
34import re as regexp
35
36import hippotat.slip as slip
37
d579a048 38class DBG(twisted.python.constants.Names):
380ed56c 39 INIT = NamedConstant()
cae50358 40 CONFIG = NamedConstant()
d579a048 41 ROUTE = NamedConstant()
b68c0739 42 DROP = NamedConstant()
d579a048
IJ
43 FLOW = NamedConstant()
44 HTTP = NamedConstant()
380ed56c 45 TWISTED = NamedConstant()
d579a048 46 QUEUE = NamedConstant()
380ed56c 47 HTTP_CTRL = NamedConstant()
d579a048 48 QUEUE_CTRL = NamedConstant()
297b3ebf 49 HTTP_FULL = NamedConstant()
0accf0d3 50 CTRL_DUMP = NamedConstant()
380ed56c 51 SLIP_FULL = NamedConstant()
9acb0eca 52 DATA_COMPLETE = NamedConstant()
d579a048 53
b68c0739 54_hex_codec = codecs.getencoder('hex_codec')
8c3b6620 55
b83d422a
IJ
56#---------- logging ----------
57
58org_stderr = sys.stderr
59
8c3b6620
IJ
60log = twisted.logger.Logger()
61
2e68eb10
IJ
62debug_set = set()
63debug_def_detail = DBG.HTTP
3e35fc99 64
8c3b6620 65def log_debug(dflag, msg, idof=None, d=None):
3e35fc99 66 if dflag not in debug_set: return
e8fcf3b7 67 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
8c3b6620 68 if idof is not None:
e8ed0029 69 msg = '[%#x] %s' % (id(idof), msg)
8c3b6620 70 if d is not None:
9acb0eca
IJ
71 trunc = ''
72 if not DBG.DATA_COMPLETE in debug_set:
73 if len(d) > 64:
74 d = d[0:64]
75 trunc = '...'
b68c0739 76 d = _hex_codec(d)[0].decode('ascii')
9acb0eca 77 msg += ' ' + d + trunc
8c3b6620
IJ
78 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
79
b83d422a
IJ
80@implementer(twisted.logger.ILogFilterPredicate)
81class LogNotBoringTwisted:
82 def __call__(self, event):
83 yes = twisted.logger.PredicateResult.yes
84 no = twisted.logger.PredicateResult.no
85 try:
86 if event.get('log_level') != LogLevel.info:
87 return yes
9acb0eca
IJ
88 dflag = event.get('dflag')
89 if dflag in debug_set: return yes
90 if dflag is None and DBG.TWISTED in debug_set: return yes
91 return no
b83d422a
IJ
92 except Exception:
93 print(traceback.format_exc(), file=org_stderr)
94 return yes
95
96#---------- default config ----------
97
ca732796
IJ
98defcfg = '''
99[DEFAULT]
100#[<client>] overrides
101max_batch_down = 65536 # used by server, subject to [limits]
102max_queue_time = 10 # used by server, subject to [limits]
ca732796 103target_requests_outstanding = 3 # must match; subject to [limits] on server
ba5630fd
IJ
104http_timeout = 30 # used by both } must be
105http_timeout_grace = 5 # used by both } compatible
ca732796
IJ
106max_requests_outstanding = 4 # used by client
107max_batch_up = 4000 # used by client
4edf77a3 108http_retry = 5 # used by client
ca732796
IJ
109
110#[server] or [<client>] overrides
111ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
112# extra interpolations: %(local)s %(peer)s %(rnet)s
113# obtained on server [virtual]server [virtual]relay [virtual]network
114# from on client <client> [virtual]server [virtual]routes
115
116[virtual]
117mtu = 1500
118routes = ''
119# network = <prefix>/<len> # mandatory for server
120# server = <ipaddr> # used by both, default is computed from `network'
121# relay = <ipaddr> # used by server, default from `network' and `server'
122# default server is first host in network
123# default relay is first host which is not server
124
125[server]
126# addrs = 127.0.0.1 ::1 # mandatory for server
127port = 80 # used by server
128# url # used by client; default from first `addrs' and `port'
129
130# [<client-ip4-or-ipv6-address>]
131# password = <password> # used by both, must match
132
133[limits]
134max_batch_down = 262144 # used by server
135max_queue_time = 121 # used by server
ba5630fd 136http_timeout = 121 # used by server
ca732796
IJ
137target_requests_outstanding = 10 # used by server
138'''
139
87a7c0c7 140# these need to be defined here so that they can be imported by import *
cae50358 141cfg = ConfigParser(strict=False)
ae7c7784
IJ
142optparser = OptionParser()
143
e4006ac4 144_mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
7b07f0b5
IJ
145def mime_translate(s):
146 # SLIP-encoded packets cannot contain ESC ESC.
147 # Swap `-' and ESC. The result cannot contain `--'
148 return s.translate(_mimetrans)
149
87a7c0c7
IJ
150class ConfigResults:
151 def __init__(self, d = { }):
152 self.__dict__ = d
153 def __repr__(self):
154 return 'ConfigResults('+repr(self.__dict__)+')'
155
156c = ConfigResults()
157
a8827d59 158def log_discard(packet, iface, saddr, daddr, why):
b68c0739 159 log_debug(DBG.DROP,
a8827d59 160 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
b68c0739 161 d=packet)
1321ad5f 162
b0cfbfce
IJ
163#---------- packet parsing ----------
164
165def packet_addrs(packet):
166 version = packet[0] >> 4
167 if version == 4:
168 addrlen = 4
169 saddroff = 3*4
170 factory = ipaddress.IPv4Address
171 elif version == 6:
172 addrlen = 16
173 saddroff = 2*4
174 factory = ipaddress.IPv6Address
175 else:
176 raise ValueError('unsupported IP version %d' % version)
177 saddr = factory(packet[ saddroff : saddroff + addrlen ])
178 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
179 return (saddr, daddr)
180
181#---------- address handling ----------
182
183def ipaddr(input):
184 try:
185 r = ipaddress.IPv4Address(input)
186 except AddressValueError:
187 r = ipaddress.IPv6Address(input)
188 return r
189
190def ipnetwork(input):
191 try:
192 r = ipaddress.IPv4Network(input)
193 except NetworkValueError:
194 r = ipaddress.IPv6Network(input)
195 return r
040ff511
IJ
196
197#---------- ipif (SLIP) subprocess ----------
198
a95cfeb2 199class SlipStreamDecoder():
db6ba584 200 def __init__(self, desc, on_packet):
040ff511 201 self._buffer = b''
a95cfeb2 202 self._on_packet = on_packet
db6ba584
IJ
203 self._desc = desc
204 self._log('__init__')
205
206 def _log(self, msg, **kwargs):
3297cac1 207 log_debug(DBG.SLIP_FULL, 'slip %s: %s' % (self._desc, msg), **kwargs)
a95cfeb2
IJ
208
209 def inputdata(self, data):
db6ba584 210 self._log('inputdata', d=data)
7a68893f 211 packets = slip.decode(data)
ba5630fd 212 packets[0] = self._buffer + packets[0]
040ff511
IJ
213 self._buffer = packets.pop()
214 for packet in packets:
a95cfeb2 215 self._maybe_packet(packet)
54890d4d 216 self._log('bufremain', d=self._buffer)
a95cfeb2
IJ
217
218 def _maybe_packet(self, packet):
54890d4d 219 self._log('maybepacket', d=packet)
db6ba584
IJ
220 if len(packet):
221 self._on_packet(packet)
a95cfeb2 222
4f991c0c 223 def flush(self):
54890d4d 224 self._log('flush')
a95cfeb2
IJ
225 self._maybe_packet(self._buffer)
226 self._buffer = b''
4f991c0c 227
e4006ac4 228class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
4f991c0c
IJ
229 def __init__(self, router):
230 self._router = router
db6ba584 231 self._decoder = SlipStreamDecoder('ipif', self.slip_on_packet)
a95cfeb2
IJ
232 def connectionMade(self): pass
233 def outReceived(self, data):
234 self._decoder.inputdata(data)
235 def slip_on_packet(self, packet):
4f991c0c
IJ
236 (saddr, daddr) = packet_addrs(packet)
237 if saddr.is_link_local or daddr.is_link_local:
a8827d59 238 log_discard(packet, 'ipif', saddr, daddr, 'link-local')
4f991c0c
IJ
239 return
240 self._router(packet, saddr, daddr)
040ff511
IJ
241 def processEnded(self, status):
242 status.raiseException()
243
244def start_ipif(command, router):
245 global ipif
246 ipif = _IpifProcessProtocol(router)
247 reactor.spawnProcess(ipif,
248 '/bin/sh',['sh','-xc', command],
ff613365
IJ
249 childFDs={0:'w', 1:'r', 2:2},
250 env=None)
040ff511
IJ
251
252def queue_inbound(packet):
15407d80 253 log_debug(DBG.FLOW, "queue_inbound", d=packet)
040ff511
IJ
254 ipif.transport.write(slip.delimiter)
255 ipif.transport.write(slip.encode(packet))
256 ipif.transport.write(slip.delimiter)
257
650a3251
IJ
258#---------- packet queue ----------
259
260class PacketQueue():
d579a048
IJ
261 def __init__(self, desc, max_queue_time):
262 self._desc = desc
8718b02c 263 assert(desc + '')
650a3251
IJ
264 self._max_queue_time = max_queue_time
265 self._pq = collections.deque() # packets
266
b68c0739 267 def _log(self, dflag, msg, **kwargs):
8c3b6620 268 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
d579a048 269
650a3251 270 def append(self, packet):
8c3b6620 271 self._log(DBG.QUEUE, 'append', d=packet)
650a3251
IJ
272 self._pq.append((time.monotonic(), packet))
273
274 def nonempty(self):
8c3b6620 275 self._log(DBG.QUEUE, 'nonempty ?')
650a3251
IJ
276 while True:
277 try: (queuetime, packet) = self._pq[0]
8c3b6620
IJ
278 except IndexError:
279 self._log(DBG.QUEUE, 'nonempty ? empty.')
280 return False
650a3251
IJ
281
282 age = time.monotonic() - queuetime
84e763c7 283 if age > self._max_queue_time:
650a3251 284 # strip old packets off the front
8c3b6620 285 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
650a3251
IJ
286 self._pq.popleft()
287 continue
288
8c3b6620 289 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
650a3251
IJ
290 return True
291
7b07f0b5
IJ
292 def process(self, sizequery, moredata, max_batch):
293 # sizequery() should return size of batch so far
294 # moredata(s) should add s to batch
8c3b6620 295 self._log(DBG.QUEUE, 'process...')
7b07f0b5
IJ
296 while True:
297 try: (dummy, packet) = self._pq[0]
8c3b6620
IJ
298 except IndexError:
299 self._log(DBG.QUEUE, 'process... empty')
300 break
301
302 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
7b07f0b5
IJ
303
304 encoded = slip.encode(packet)
305 sofar = sizequery()
306
8c3b6620
IJ
307 self._log(DBG.QUEUE_CTRL,
308 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
b68c0739 309 d=encoded)
8c3b6620 310
7b07f0b5
IJ
311 if sofar > 0:
312 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
8c3b6620 313 self._log(DBG.QUEUE_CTRL, 'process... overflow')
7b07f0b5
IJ
314 break
315 moredata(slip.delimiter)
316
317 moredata(encoded)
84e763c7 318 self._pq.popleft()
ae7c7784
IJ
319
320#---------- error handling ----------
321
b68c0739
IJ
322_crashing = False
323
ae7c7784 324def crash(err):
b68c0739
IJ
325 global _crashing
326 _crashing = True
e8ed0029
IJ
327 print('========== CRASH ==========', err,
328 '===========================', file=sys.stderr)
ae7c7784
IJ
329 try: reactor.stop()
330 except twisted.internet.error.ReactorNotRunning: pass
331
332def crash_on_defer(defer):
333 defer.addErrback(lambda err: crash(err))
334
e4006ac4 335def crash_on_critical(event):
ae7c7784
IJ
336 if event.get('log_level') >= LogLevel.critical:
337 crash(twisted.logger.formatEvent(event))
338
87a7c0c7
IJ
339#---------- config processing ----------
340
341def process_cfg_common_always():
342 global mtu
343 c.mtu = cfg.get('virtual','mtu')
344
88487243
IJ
345def process_cfg_ipif(section, varmap):
346 for d, s in varmap:
347 try: v = getattr(c, s)
034284c3 348 except AttributeError: continue
88487243
IJ
349 setattr(c, d, v)
350
b68c0739 351 #print(repr(c))
88487243
IJ
352
353 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
354
355def process_cfg_network():
356 c.network = ipnetwork(cfg.get('virtual','network'))
357 if c.network.num_addresses < 3 + 2:
358 raise ValueError('network needs at least 2^3 addresses')
359
360def process_cfg_server():
361 try:
362 c.server = cfg.get('virtual','server')
363 except NoOptionError:
364 process_cfg_network()
365 c.server = next(c.network.hosts())
366
367class ServerAddr():
368 def __init__(self, port, addrspec):
369 self.port = port
370 # also self.addr
371 try:
372 self.addr = ipaddress.IPv4Address(addrspec)
373 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
84e763c7 374 self._inurl = b'%s'
88487243
IJ
375 except AddressValueError:
376 self.addr = ipaddress.IPv6Address(addrspec)
377 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
84e763c7 378 self._inurl = b'[%s]'
88487243
IJ
379 def make_endpoint(self):
380 return self._endpointfactory(reactor, self.port, self.addr)
381 def url(self):
84e763c7
IJ
382 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
383 if self.port != 80: url += b':%d' % self.port
384 url += b'/'
88487243
IJ
385 return url
386
387def process_cfg_saddrs():
1d023c89
IJ
388 try: port = cfg.getint('server','port')
389 except NoOptionError: port = 80
88487243
IJ
390
391 c.saddrs = [ ]
392 for addrspec in cfg.get('server','addrs').split():
393 sa = ServerAddr(port, addrspec)
394 c.saddrs.append(sa)
395
396def process_cfg_clients(constructor):
397 c.clients = [ ]
398 for cs in cfg.sections():
399 if not (':' in cs or '.' in cs): continue
400 ci = ipaddr(cs)
401 pw = cfg.get(cs, 'password')
84e763c7 402 pw = pw.encode('utf-8')
88487243
IJ
403 constructor(ci,cs,pw)
404
ae7c7784
IJ
405#---------- startup ----------
406
5510890e 407def common_startup(process_cfg):
cae50358
IJ
408 re = regexp.compile('#.*')
409 cfg.read_string(re.sub('', defcfg))
410 need_defcfg = True
411
412 def readconfig(pathname, mandatory=True):
413 def log(m, p=pathname):
414 if not DBG.CONFIG in debug_set: return
415 print('DBG.CONFIG: %s: %s' % (m, pathname))
416
417 try:
418 files = os.listdir(pathname)
419
420 except FileNotFoundError:
421 if mandatory: raise
422 log('skipped')
423 return
424
425 except NotADirectoryError:
426 cfg.read(pathname)
427 log('read file')
428 return
429
430 # is a directory
431 log('directory')
432 re = regexp.compile('[^-A-Za-z0-9_]')
433 for f in os.listdir(cdir):
434 if re.search(f): continue
435 subpath = pathname + '/' + f
436 try:
437 os.stat(subpath)
438 except FileNotFoundError:
439 log('entry skipped', subpath)
440 continue
441 cfg.read(subpath)
442 log('entry read', subpath)
443
444 def oc_config(od,os, value, op):
445 nonlocal need_defcfg
446 need_defcfg = False
447 readconfig(value)
2e68eb10 448
9acb0eca
IJ
449 def dfs_less_detailed(dl):
450 return [df for df in DBG.iterconstants() if df <= dl]
451
452 def ds_default(od,os,dl,op):
2e68eb10 453 global debug_set
9acb0eca 454 debug_set = set(dfs_less_detailed(debug_def_detail))
2e68eb10 455
9acb0eca 456 def ds_select(od,os, spec, op):
9acb0eca
IJ
457 for it in spec.split(','):
458
9acb0eca
IJ
459 if it.startswith('-'):
460 mutator = debug_set.discard
461 it = it[1:]
462 else:
463 mutator = debug_set.add
2cf75145
IJ
464
465 if it == '+':
466 dfs = DBG.iterconstants()
467
468 else:
469 if it.endswith('+'):
470 mapper = dfs_less_detailed
471 it = it[0:len(it)-1]
472 else:
473 mapper = lambda x: [x]
474
475 try:
476 dfspec = DBG.lookupByName(it)
477 except ValueError:
478 optparser.error('unknown debug flag %s in --debug-select' % it)
479
480 dfs = mapper(dfspec)
481
482 for df in dfs:
483 mutator(df)
9acb0eca
IJ
484
485 optparser.add_option('-D', '--debug',
2e68eb10
IJ
486 nargs=0,
487 action='callback',
9acb0eca
IJ
488 help='enable default debug (to stdout)',
489 callback= ds_default)
490
491 optparser.add_option('--debug-select',
492 nargs=1,
493 type='string',
2cf75145 494 metavar='[-]DFLAG[+]|[-]+,...',
9acb0eca 495 help=
2cf75145
IJ
496'''enable (`-': disable) each specified DFLAG;
497`+': do same for all "more interesting" DFLAGSs;
498just `+': all DFLAGs.
499 DFLAGS: ''' + ' '.join([df.name for df in DBG.iterconstants()]),
9acb0eca
IJ
500 action='callback',
501 callback= ds_select)
2e68eb10 502
cae50358
IJ
503 optparser.add_option('-c', '--config',
504 nargs=1,
505 type='string',
506 metavar='CONFIGFILE',
507 dest='configfile',
508 action='callback',
509 callback= oc_config)
510
2e68eb10
IJ
511 (opts, args) = optparser.parse_args()
512 if len(args): optparser.error('no non-option arguments please')
513
cae50358
IJ
514 if need_defcfg:
515 readconfig('/etc/hippotat/config', False)
516 readconfig('/etc/hippotat/config.d', False)
9acb0eca 517
5510890e
IJ
518 try: process_cfg()
519 except (configparser.Error, ValueError):
520 traceback.print_exc(file=sys.stderr)
521 print('\nInvalid configuration, giving up.', file=sys.stderr)
522 sys.exit(12)
523
cae50358 524 #print(repr(debug_set), file=sys.stderr)
2e68eb10 525
8c3b6620 526 log_formatter = twisted.logger.formatEventAsClassicLogText
389236df
IJ
527 stdout_obs = twisted.logger.FileLogObserver(sys.stdout, log_formatter)
528 stderr_obs = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
529 pred = twisted.logger.LogLevelFilterPredicate(LogLevel.error)
b83d422a 530 stdsomething_obs = twisted.logger.FilteringLogObserver(
389236df
IJ
531 stderr_obs, [pred], stdout_obs
532 )
b83d422a
IJ
533 log_observer = twisted.logger.FilteringLogObserver(
534 stdsomething_obs, [LogNotBoringTwisted()]
535 )
536 #log_observer = stdsomething_obs
8c3b6620
IJ
537 twisted.logger.globalLogBeginner.beginLoggingTo(
538 [ log_observer, crash_on_critical ]
539 )
ae7c7784 540
ae7c7784 541def common_run():
b68c0739
IJ
542 log_debug(DBG.INIT, 'entering reactor')
543 if not _crashing: reactor.run()
ae7c7784 544 print('CRASHED (end)', file=sys.stderr)