wip new config
[hippotat] / hippotat / __init__.py
CommitLineData
b0cfbfce
IJ
1# -*- python -*-
2
37ab4cdc
IJ
3import signal
4signal.signal(signal.SIGINT, signal.SIG_DFL)
5
1321ad5f 6import sys
cae50358 7import os
1321ad5f 8
b83d422a
IJ
9from zope.interface import implementer
10
040ff511
IJ
11import twisted
12from twisted.internet import reactor
1d023c89 13import twisted.internet.endpoints
8c3b6620
IJ
14import twisted.logger
15from twisted.logger import LogLevel
16import twisted.python.constants
17from twisted.python.constants import NamedConstant
b0cfbfce
IJ
18
19import ipaddress
20from ipaddress import AddressValueError
21
ae7c7784 22from optparse import OptionParser
5510890e 23import configparser
ae7c7784
IJ
24from configparser import ConfigParser
25from configparser import NoOptionError
26
c13ee6e6
IJ
27from functools import partial
28
ae7c7784 29import collections
84e763c7 30import time
8c3b6620 31import codecs
eedc8b30 32import traceback
ae7c7784 33
1321ad5f
IJ
34import re as regexp
35
36import hippotat.slip as slip
37
d579a048 38class DBG(twisted.python.constants.Names):
380ed56c 39 INIT = NamedConstant()
cae50358 40 CONFIG = NamedConstant()
d579a048 41 ROUTE = NamedConstant()
b68c0739 42 DROP = NamedConstant()
d579a048
IJ
43 FLOW = NamedConstant()
44 HTTP = NamedConstant()
380ed56c 45 TWISTED = NamedConstant()
d579a048 46 QUEUE = NamedConstant()
380ed56c 47 HTTP_CTRL = NamedConstant()
d579a048 48 QUEUE_CTRL = NamedConstant()
297b3ebf 49 HTTP_FULL = NamedConstant()
0accf0d3 50 CTRL_DUMP = NamedConstant()
380ed56c 51 SLIP_FULL = NamedConstant()
9acb0eca 52 DATA_COMPLETE = NamedConstant()
d579a048 53
b68c0739 54_hex_codec = codecs.getencoder('hex_codec')
8c3b6620 55
b83d422a
IJ
56#---------- logging ----------
57
58org_stderr = sys.stderr
59
8c3b6620
IJ
60log = twisted.logger.Logger()
61
2e68eb10
IJ
62debug_set = set()
63debug_def_detail = DBG.HTTP
3e35fc99 64
8c3b6620 65def log_debug(dflag, msg, idof=None, d=None):
3e35fc99 66 if dflag not in debug_set: return
e8fcf3b7 67 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
8c3b6620 68 if idof is not None:
e8ed0029 69 msg = '[%#x] %s' % (id(idof), msg)
8c3b6620 70 if d is not None:
9acb0eca
IJ
71 trunc = ''
72 if not DBG.DATA_COMPLETE in debug_set:
73 if len(d) > 64:
74 d = d[0:64]
75 trunc = '...'
b68c0739 76 d = _hex_codec(d)[0].decode('ascii')
9acb0eca 77 msg += ' ' + d + trunc
8c3b6620
IJ
78 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
79
b83d422a
IJ
80@implementer(twisted.logger.ILogFilterPredicate)
81class LogNotBoringTwisted:
82 def __call__(self, event):
83 yes = twisted.logger.PredicateResult.yes
84 no = twisted.logger.PredicateResult.no
85 try:
86 if event.get('log_level') != LogLevel.info:
87 return yes
9acb0eca
IJ
88 dflag = event.get('dflag')
89 if dflag in debug_set: return yes
90 if dflag is None and DBG.TWISTED in debug_set: return yes
91 return no
b83d422a
IJ
92 except Exception:
93 print(traceback.format_exc(), file=org_stderr)
94 return yes
95
96#---------- default config ----------
97
ca732796
IJ
98defcfg = '''
99[DEFAULT]
100#[<client>] overrides
101max_batch_down = 65536 # used by server, subject to [limits]
102max_queue_time = 10 # used by server, subject to [limits]
ca732796 103target_requests_outstanding = 3 # must match; subject to [limits] on server
ba5630fd
IJ
104http_timeout = 30 # used by both } must be
105http_timeout_grace = 5 # used by both } compatible
ca732796
IJ
106max_requests_outstanding = 4 # used by client
107max_batch_up = 4000 # used by client
4edf77a3 108http_retry = 5 # used by client
ca732796
IJ
109
110#[server] or [<client>] overrides
111ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
112# extra interpolations: %(local)s %(peer)s %(rnet)s
113# obtained on server [virtual]server [virtual]relay [virtual]network
114# from on client <client> [virtual]server [virtual]routes
115
116[virtual]
117mtu = 1500
118routes = ''
119# network = <prefix>/<len> # mandatory for server
120# server = <ipaddr> # used by both, default is computed from `network'
121# relay = <ipaddr> # used by server, default from `network' and `server'
122# default server is first host in network
123# default relay is first host which is not server
124
125[server]
126# addrs = 127.0.0.1 ::1 # mandatory for server
127port = 80 # used by server
128# url # used by client; default from first `addrs' and `port'
129
130# [<client-ip4-or-ipv6-address>]
131# password = <password> # used by both, must match
132
133[limits]
134max_batch_down = 262144 # used by server
135max_queue_time = 121 # used by server
ba5630fd 136http_timeout = 121 # used by server
ca732796
IJ
137target_requests_outstanding = 10 # used by server
138'''
139
87a7c0c7 140# these need to be defined here so that they can be imported by import *
cae50358 141cfg = ConfigParser(strict=False)
ae7c7784
IJ
142optparser = OptionParser()
143
e4006ac4 144_mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
7b07f0b5
IJ
145def mime_translate(s):
146 # SLIP-encoded packets cannot contain ESC ESC.
147 # Swap `-' and ESC. The result cannot contain `--'
148 return s.translate(_mimetrans)
149
87a7c0c7
IJ
150class ConfigResults:
151 def __init__(self, d = { }):
152 self.__dict__ = d
153 def __repr__(self):
154 return 'ConfigResults('+repr(self.__dict__)+')'
155
156c = ConfigResults()
157
a8827d59 158def log_discard(packet, iface, saddr, daddr, why):
b68c0739 159 log_debug(DBG.DROP,
a8827d59 160 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
b68c0739 161 d=packet)
1321ad5f 162
b0cfbfce
IJ
163#---------- packet parsing ----------
164
165def packet_addrs(packet):
166 version = packet[0] >> 4
167 if version == 4:
168 addrlen = 4
169 saddroff = 3*4
170 factory = ipaddress.IPv4Address
171 elif version == 6:
172 addrlen = 16
173 saddroff = 2*4
174 factory = ipaddress.IPv6Address
175 else:
176 raise ValueError('unsupported IP version %d' % version)
177 saddr = factory(packet[ saddroff : saddroff + addrlen ])
178 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
179 return (saddr, daddr)
180
181#---------- address handling ----------
182
183def ipaddr(input):
184 try:
185 r = ipaddress.IPv4Address(input)
186 except AddressValueError:
187 r = ipaddress.IPv6Address(input)
188 return r
189
190def ipnetwork(input):
191 try:
192 r = ipaddress.IPv4Network(input)
193 except NetworkValueError:
194 r = ipaddress.IPv6Network(input)
195 return r
040ff511
IJ
196
197#---------- ipif (SLIP) subprocess ----------
198
a95cfeb2 199class SlipStreamDecoder():
db6ba584 200 def __init__(self, desc, on_packet):
040ff511 201 self._buffer = b''
a95cfeb2 202 self._on_packet = on_packet
db6ba584
IJ
203 self._desc = desc
204 self._log('__init__')
205
206 def _log(self, msg, **kwargs):
3297cac1 207 log_debug(DBG.SLIP_FULL, 'slip %s: %s' % (self._desc, msg), **kwargs)
a95cfeb2
IJ
208
209 def inputdata(self, data):
db6ba584 210 self._log('inputdata', d=data)
7a68893f 211 packets = slip.decode(data)
ba5630fd 212 packets[0] = self._buffer + packets[0]
040ff511
IJ
213 self._buffer = packets.pop()
214 for packet in packets:
a95cfeb2 215 self._maybe_packet(packet)
54890d4d 216 self._log('bufremain', d=self._buffer)
a95cfeb2
IJ
217
218 def _maybe_packet(self, packet):
54890d4d 219 self._log('maybepacket', d=packet)
db6ba584
IJ
220 if len(packet):
221 self._on_packet(packet)
a95cfeb2 222
4f991c0c 223 def flush(self):
54890d4d 224 self._log('flush')
a95cfeb2
IJ
225 self._maybe_packet(self._buffer)
226 self._buffer = b''
4f991c0c 227
e4006ac4 228class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
4f991c0c
IJ
229 def __init__(self, router):
230 self._router = router
db6ba584 231 self._decoder = SlipStreamDecoder('ipif', self.slip_on_packet)
a95cfeb2
IJ
232 def connectionMade(self): pass
233 def outReceived(self, data):
234 self._decoder.inputdata(data)
235 def slip_on_packet(self, packet):
4f991c0c
IJ
236 (saddr, daddr) = packet_addrs(packet)
237 if saddr.is_link_local or daddr.is_link_local:
a8827d59 238 log_discard(packet, 'ipif', saddr, daddr, 'link-local')
4f991c0c
IJ
239 return
240 self._router(packet, saddr, daddr)
040ff511
IJ
241 def processEnded(self, status):
242 status.raiseException()
243
244def start_ipif(command, router):
245 global ipif
246 ipif = _IpifProcessProtocol(router)
247 reactor.spawnProcess(ipif,
248 '/bin/sh',['sh','-xc', command],
ff613365
IJ
249 childFDs={0:'w', 1:'r', 2:2},
250 env=None)
040ff511
IJ
251
252def queue_inbound(packet):
15407d80 253 log_debug(DBG.FLOW, "queue_inbound", d=packet)
040ff511
IJ
254 ipif.transport.write(slip.delimiter)
255 ipif.transport.write(slip.encode(packet))
256 ipif.transport.write(slip.delimiter)
257
650a3251
IJ
258#---------- packet queue ----------
259
260class PacketQueue():
d579a048
IJ
261 def __init__(self, desc, max_queue_time):
262 self._desc = desc
8718b02c 263 assert(desc + '')
650a3251
IJ
264 self._max_queue_time = max_queue_time
265 self._pq = collections.deque() # packets
266
b68c0739 267 def _log(self, dflag, msg, **kwargs):
8c3b6620 268 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
d579a048 269
650a3251 270 def append(self, packet):
8c3b6620 271 self._log(DBG.QUEUE, 'append', d=packet)
650a3251
IJ
272 self._pq.append((time.monotonic(), packet))
273
274 def nonempty(self):
8c3b6620 275 self._log(DBG.QUEUE, 'nonempty ?')
650a3251
IJ
276 while True:
277 try: (queuetime, packet) = self._pq[0]
8c3b6620
IJ
278 except IndexError:
279 self._log(DBG.QUEUE, 'nonempty ? empty.')
280 return False
650a3251
IJ
281
282 age = time.monotonic() - queuetime
84e763c7 283 if age > self._max_queue_time:
650a3251 284 # strip old packets off the front
8c3b6620 285 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
650a3251
IJ
286 self._pq.popleft()
287 continue
288
8c3b6620 289 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
650a3251
IJ
290 return True
291
7b07f0b5
IJ
292 def process(self, sizequery, moredata, max_batch):
293 # sizequery() should return size of batch so far
294 # moredata(s) should add s to batch
8c3b6620 295 self._log(DBG.QUEUE, 'process...')
7b07f0b5
IJ
296 while True:
297 try: (dummy, packet) = self._pq[0]
8c3b6620
IJ
298 except IndexError:
299 self._log(DBG.QUEUE, 'process... empty')
300 break
301
302 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
7b07f0b5
IJ
303
304 encoded = slip.encode(packet)
305 sofar = sizequery()
306
8c3b6620
IJ
307 self._log(DBG.QUEUE_CTRL,
308 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
b68c0739 309 d=encoded)
8c3b6620 310
7b07f0b5
IJ
311 if sofar > 0:
312 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
8c3b6620 313 self._log(DBG.QUEUE_CTRL, 'process... overflow')
7b07f0b5
IJ
314 break
315 moredata(slip.delimiter)
316
317 moredata(encoded)
84e763c7 318 self._pq.popleft()
ae7c7784
IJ
319
320#---------- error handling ----------
321
b68c0739
IJ
322_crashing = False
323
ae7c7784 324def crash(err):
b68c0739
IJ
325 global _crashing
326 _crashing = True
e8ed0029
IJ
327 print('========== CRASH ==========', err,
328 '===========================', file=sys.stderr)
ae7c7784
IJ
329 try: reactor.stop()
330 except twisted.internet.error.ReactorNotRunning: pass
331
332def crash_on_defer(defer):
333 defer.addErrback(lambda err: crash(err))
334
e4006ac4 335def crash_on_critical(event):
ae7c7784
IJ
336 if event.get('log_level') >= LogLevel.critical:
337 crash(twisted.logger.formatEvent(event))
338
87a7c0c7
IJ
339#---------- config processing ----------
340
341def process_cfg_common_always():
342 global mtu
343 c.mtu = cfg.get('virtual','mtu')
344
88487243
IJ
345def process_cfg_ipif(section, varmap):
346 for d, s in varmap:
347 try: v = getattr(c, s)
034284c3 348 except AttributeError: continue
88487243
IJ
349 setattr(c, d, v)
350
b68c0739 351 #print(repr(c))
88487243
IJ
352
353 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
354
355def process_cfg_network():
356 c.network = ipnetwork(cfg.get('virtual','network'))
357 if c.network.num_addresses < 3 + 2:
358 raise ValueError('network needs at least 2^3 addresses')
359
360def process_cfg_server():
361 try:
362 c.server = cfg.get('virtual','server')
363 except NoOptionError:
364 process_cfg_network()
365 c.server = next(c.network.hosts())
366
367class ServerAddr():
368 def __init__(self, port, addrspec):
369 self.port = port
370 # also self.addr
371 try:
372 self.addr = ipaddress.IPv4Address(addrspec)
373 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
84e763c7 374 self._inurl = b'%s'
88487243
IJ
375 except AddressValueError:
376 self.addr = ipaddress.IPv6Address(addrspec)
377 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
84e763c7 378 self._inurl = b'[%s]'
88487243
IJ
379 def make_endpoint(self):
380 return self._endpointfactory(reactor, self.port, self.addr)
381 def url(self):
84e763c7
IJ
382 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
383 if self.port != 80: url += b':%d' % self.port
384 url += b'/'
88487243
IJ
385 return url
386
387def process_cfg_saddrs():
1d023c89
IJ
388 try: port = cfg.getint('server','port')
389 except NoOptionError: port = 80
88487243
IJ
390
391 c.saddrs = [ ]
392 for addrspec in cfg.get('server','addrs').split():
393 sa = ServerAddr(port, addrspec)
394 c.saddrs.append(sa)
395
396def process_cfg_clients(constructor):
397 c.clients = [ ]
398 for cs in cfg.sections():
399 if not (':' in cs or '.' in cs): continue
400 ci = ipaddr(cs)
401 pw = cfg.get(cs, 'password')
84e763c7 402 pw = pw.encode('utf-8')
88487243
IJ
403 constructor(ci,cs,pw)
404
ae7c7784
IJ
405#---------- startup ----------
406
5510890e 407def common_startup(process_cfg):
82302bac
IJ
408 # ConfigParser hates #-comments after values
409 trailingcomments_re = regexp.compile('#.*')
410 cfg.read_string(trailingcomments_re.sub('', defcfg))
cae50358
IJ
411 need_defcfg = True
412
413 def readconfig(pathname, mandatory=True):
414 def log(m, p=pathname):
415 if not DBG.CONFIG in debug_set: return
416 print('DBG.CONFIG: %s: %s' % (m, pathname))
417
418 try:
419 files = os.listdir(pathname)
420
421 except FileNotFoundError:
422 if mandatory: raise
423 log('skipped')
424 return
425
426 except NotADirectoryError:
427 cfg.read(pathname)
428 log('read file')
429 return
430
431 # is a directory
432 log('directory')
433 re = regexp.compile('[^-A-Za-z0-9_]')
434 for f in os.listdir(cdir):
435 if re.search(f): continue
436 subpath = pathname + '/' + f
437 try:
438 os.stat(subpath)
439 except FileNotFoundError:
440 log('entry skipped', subpath)
441 continue
442 cfg.read(subpath)
443 log('entry read', subpath)
444
445 def oc_config(od,os, value, op):
446 nonlocal need_defcfg
447 need_defcfg = False
448 readconfig(value)
2e68eb10 449
9acb0eca
IJ
450 def dfs_less_detailed(dl):
451 return [df for df in DBG.iterconstants() if df <= dl]
452
453 def ds_default(od,os,dl,op):
2e68eb10 454 global debug_set
9acb0eca 455 debug_set = set(dfs_less_detailed(debug_def_detail))
2e68eb10 456
9acb0eca 457 def ds_select(od,os, spec, op):
9acb0eca
IJ
458 for it in spec.split(','):
459
9acb0eca
IJ
460 if it.startswith('-'):
461 mutator = debug_set.discard
462 it = it[1:]
463 else:
464 mutator = debug_set.add
2cf75145
IJ
465
466 if it == '+':
467 dfs = DBG.iterconstants()
468
469 else:
470 if it.endswith('+'):
471 mapper = dfs_less_detailed
472 it = it[0:len(it)-1]
473 else:
474 mapper = lambda x: [x]
475
476 try:
477 dfspec = DBG.lookupByName(it)
478 except ValueError:
479 optparser.error('unknown debug flag %s in --debug-select' % it)
480
481 dfs = mapper(dfspec)
482
483 for df in dfs:
484 mutator(df)
9acb0eca
IJ
485
486 optparser.add_option('-D', '--debug',
2e68eb10
IJ
487 nargs=0,
488 action='callback',
9acb0eca
IJ
489 help='enable default debug (to stdout)',
490 callback= ds_default)
491
492 optparser.add_option('--debug-select',
493 nargs=1,
494 type='string',
2cf75145 495 metavar='[-]DFLAG[+]|[-]+,...',
9acb0eca 496 help=
2cf75145
IJ
497'''enable (`-': disable) each specified DFLAG;
498`+': do same for all "more interesting" DFLAGSs;
499just `+': all DFLAGs.
500 DFLAGS: ''' + ' '.join([df.name for df in DBG.iterconstants()]),
9acb0eca
IJ
501 action='callback',
502 callback= ds_select)
2e68eb10 503
cae50358
IJ
504 optparser.add_option('-c', '--config',
505 nargs=1,
506 type='string',
507 metavar='CONFIGFILE',
508 dest='configfile',
509 action='callback',
510 callback= oc_config)
511
2e68eb10
IJ
512 (opts, args) = optparser.parse_args()
513 if len(args): optparser.error('no non-option arguments please')
514
cae50358
IJ
515 if need_defcfg:
516 readconfig('/etc/hippotat/config', False)
517 readconfig('/etc/hippotat/config.d', False)
9acb0eca 518
5510890e
IJ
519 try: process_cfg()
520 except (configparser.Error, ValueError):
521 traceback.print_exc(file=sys.stderr)
522 print('\nInvalid configuration, giving up.', file=sys.stderr)
523 sys.exit(12)
524
cae50358 525 #print(repr(debug_set), file=sys.stderr)
2e68eb10 526
8c3b6620 527 log_formatter = twisted.logger.formatEventAsClassicLogText
389236df
IJ
528 stdout_obs = twisted.logger.FileLogObserver(sys.stdout, log_formatter)
529 stderr_obs = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
530 pred = twisted.logger.LogLevelFilterPredicate(LogLevel.error)
b83d422a 531 stdsomething_obs = twisted.logger.FilteringLogObserver(
389236df
IJ
532 stderr_obs, [pred], stdout_obs
533 )
b83d422a
IJ
534 log_observer = twisted.logger.FilteringLogObserver(
535 stdsomething_obs, [LogNotBoringTwisted()]
536 )
537 #log_observer = stdsomething_obs
8c3b6620
IJ
538 twisted.logger.globalLogBeginner.beginLoggingTo(
539 [ log_observer, crash_on_critical ]
540 )
ae7c7784 541
ae7c7784 542def common_run():
b68c0739
IJ
543 log_debug(DBG.INIT, 'entering reactor')
544 if not _crashing: reactor.run()
ae7c7784 545 print('CRASHED (end)', file=sys.stderr)