new config definition
[hippotat] / hippotat / __init__.py
CommitLineData
b0cfbfce
IJ
1# -*- python -*-
2
37ab4cdc
IJ
3import signal
4signal.signal(signal.SIGINT, signal.SIG_DFL)
5
1321ad5f 6import sys
cae50358 7import os
1321ad5f 8
b83d422a
IJ
9from zope.interface import implementer
10
040ff511
IJ
11import twisted
12from twisted.internet import reactor
1d023c89 13import twisted.internet.endpoints
8c3b6620
IJ
14import twisted.logger
15from twisted.logger import LogLevel
16import twisted.python.constants
17from twisted.python.constants import NamedConstant
b0cfbfce
IJ
18
19import ipaddress
20from ipaddress import AddressValueError
21
ae7c7784 22from optparse import OptionParser
5510890e 23import configparser
ae7c7784
IJ
24from configparser import ConfigParser
25from configparser import NoOptionError
26
c13ee6e6
IJ
27from functools import partial
28
ae7c7784 29import collections
84e763c7 30import time
8c3b6620 31import codecs
eedc8b30 32import traceback
ae7c7784 33
1321ad5f
IJ
34import re as regexp
35
36import hippotat.slip as slip
37
d579a048 38class DBG(twisted.python.constants.Names):
380ed56c 39 INIT = NamedConstant()
cae50358 40 CONFIG = NamedConstant()
d579a048 41 ROUTE = NamedConstant()
b68c0739 42 DROP = NamedConstant()
d579a048
IJ
43 FLOW = NamedConstant()
44 HTTP = NamedConstant()
380ed56c 45 TWISTED = NamedConstant()
d579a048 46 QUEUE = NamedConstant()
380ed56c 47 HTTP_CTRL = NamedConstant()
d579a048 48 QUEUE_CTRL = NamedConstant()
297b3ebf 49 HTTP_FULL = NamedConstant()
0accf0d3 50 CTRL_DUMP = NamedConstant()
380ed56c 51 SLIP_FULL = NamedConstant()
9acb0eca 52 DATA_COMPLETE = NamedConstant()
d579a048 53
b68c0739 54_hex_codec = codecs.getencoder('hex_codec')
8c3b6620 55
b83d422a
IJ
56#---------- logging ----------
57
58org_stderr = sys.stderr
59
8c3b6620
IJ
60log = twisted.logger.Logger()
61
2e68eb10
IJ
62debug_set = set()
63debug_def_detail = DBG.HTTP
3e35fc99 64
8c3b6620 65def log_debug(dflag, msg, idof=None, d=None):
3e35fc99 66 if dflag not in debug_set: return
e8fcf3b7 67 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
8c3b6620 68 if idof is not None:
e8ed0029 69 msg = '[%#x] %s' % (id(idof), msg)
8c3b6620 70 if d is not None:
9acb0eca
IJ
71 trunc = ''
72 if not DBG.DATA_COMPLETE in debug_set:
73 if len(d) > 64:
74 d = d[0:64]
75 trunc = '...'
b68c0739 76 d = _hex_codec(d)[0].decode('ascii')
9acb0eca 77 msg += ' ' + d + trunc
8c3b6620
IJ
78 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
79
b83d422a
IJ
80@implementer(twisted.logger.ILogFilterPredicate)
81class LogNotBoringTwisted:
82 def __call__(self, event):
83 yes = twisted.logger.PredicateResult.yes
84 no = twisted.logger.PredicateResult.no
85 try:
86 if event.get('log_level') != LogLevel.info:
87 return yes
9acb0eca
IJ
88 dflag = event.get('dflag')
89 if dflag in debug_set: return yes
90 if dflag is None and DBG.TWISTED in debug_set: return yes
91 return no
b83d422a
IJ
92 except Exception:
93 print(traceback.format_exc(), file=org_stderr)
94 return yes
95
96#---------- default config ----------
97
ca732796
IJ
98defcfg = '''
99[DEFAULT]
9e445690
IJ
100max_batch_down = 65536
101max_queue_time = 10
102target_requests_outstanding = 3
103http_timeout = 30
104http_timeout_grace = 5
105max_requests_outstanding = 6
106max_batch_up = 4000
107http_retry = 5
ca732796
IJ
108
109#[server] or [<client>] overrides
110ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
ca732796 111
9e445690 112# relating to virtual network
ca732796 113mtu = 1500
ca732796
IJ
114
115[server]
9e445690
IJ
116# addrs = 127.0.0.1 ::1
117port = 80
118# url
119
120# relating to virtual network
121routes = ''
122vnetwork = 172.24.230.192
123# network = <prefix>/<len>
124# server = <ipaddr>
125# relay = <ipaddr>
126
ca732796
IJ
127
128# [<client-ip4-or-ipv6-address>]
129# password = <password> # used by both, must match
130
131[limits]
9e445690
IJ
132max_batch_down = 262144
133max_queue_time = 121
134http_timeout = 121
135target_requests_outstanding = 10
ca732796
IJ
136'''
137
87a7c0c7 138# these need to be defined here so that they can be imported by import *
cae50358 139cfg = ConfigParser(strict=False)
ae7c7784
IJ
140optparser = OptionParser()
141
e4006ac4 142_mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
7b07f0b5
IJ
143def mime_translate(s):
144 # SLIP-encoded packets cannot contain ESC ESC.
145 # Swap `-' and ESC. The result cannot contain `--'
146 return s.translate(_mimetrans)
147
87a7c0c7
IJ
148class ConfigResults:
149 def __init__(self, d = { }):
150 self.__dict__ = d
151 def __repr__(self):
152 return 'ConfigResults('+repr(self.__dict__)+')'
153
154c = ConfigResults()
155
a8827d59 156def log_discard(packet, iface, saddr, daddr, why):
b68c0739 157 log_debug(DBG.DROP,
a8827d59 158 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
b68c0739 159 d=packet)
1321ad5f 160
b0cfbfce
IJ
161#---------- packet parsing ----------
162
163def packet_addrs(packet):
164 version = packet[0] >> 4
165 if version == 4:
166 addrlen = 4
167 saddroff = 3*4
168 factory = ipaddress.IPv4Address
169 elif version == 6:
170 addrlen = 16
171 saddroff = 2*4
172 factory = ipaddress.IPv6Address
173 else:
174 raise ValueError('unsupported IP version %d' % version)
175 saddr = factory(packet[ saddroff : saddroff + addrlen ])
176 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
177 return (saddr, daddr)
178
179#---------- address handling ----------
180
181def ipaddr(input):
182 try:
183 r = ipaddress.IPv4Address(input)
184 except AddressValueError:
185 r = ipaddress.IPv6Address(input)
186 return r
187
188def ipnetwork(input):
189 try:
190 r = ipaddress.IPv4Network(input)
191 except NetworkValueError:
192 r = ipaddress.IPv6Network(input)
193 return r
040ff511
IJ
194
195#---------- ipif (SLIP) subprocess ----------
196
a95cfeb2 197class SlipStreamDecoder():
db6ba584 198 def __init__(self, desc, on_packet):
040ff511 199 self._buffer = b''
a95cfeb2 200 self._on_packet = on_packet
db6ba584
IJ
201 self._desc = desc
202 self._log('__init__')
203
204 def _log(self, msg, **kwargs):
3297cac1 205 log_debug(DBG.SLIP_FULL, 'slip %s: %s' % (self._desc, msg), **kwargs)
a95cfeb2
IJ
206
207 def inputdata(self, data):
db6ba584 208 self._log('inputdata', d=data)
7a68893f 209 packets = slip.decode(data)
ba5630fd 210 packets[0] = self._buffer + packets[0]
040ff511
IJ
211 self._buffer = packets.pop()
212 for packet in packets:
a95cfeb2 213 self._maybe_packet(packet)
54890d4d 214 self._log('bufremain', d=self._buffer)
a95cfeb2
IJ
215
216 def _maybe_packet(self, packet):
54890d4d 217 self._log('maybepacket', d=packet)
db6ba584
IJ
218 if len(packet):
219 self._on_packet(packet)
a95cfeb2 220
4f991c0c 221 def flush(self):
54890d4d 222 self._log('flush')
a95cfeb2
IJ
223 self._maybe_packet(self._buffer)
224 self._buffer = b''
4f991c0c 225
e4006ac4 226class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
4f991c0c
IJ
227 def __init__(self, router):
228 self._router = router
db6ba584 229 self._decoder = SlipStreamDecoder('ipif', self.slip_on_packet)
a95cfeb2
IJ
230 def connectionMade(self): pass
231 def outReceived(self, data):
232 self._decoder.inputdata(data)
233 def slip_on_packet(self, packet):
4f991c0c
IJ
234 (saddr, daddr) = packet_addrs(packet)
235 if saddr.is_link_local or daddr.is_link_local:
a8827d59 236 log_discard(packet, 'ipif', saddr, daddr, 'link-local')
4f991c0c
IJ
237 return
238 self._router(packet, saddr, daddr)
040ff511
IJ
239 def processEnded(self, status):
240 status.raiseException()
241
242def start_ipif(command, router):
243 global ipif
244 ipif = _IpifProcessProtocol(router)
245 reactor.spawnProcess(ipif,
246 '/bin/sh',['sh','-xc', command],
ff613365
IJ
247 childFDs={0:'w', 1:'r', 2:2},
248 env=None)
040ff511
IJ
249
250def queue_inbound(packet):
15407d80 251 log_debug(DBG.FLOW, "queue_inbound", d=packet)
040ff511
IJ
252 ipif.transport.write(slip.delimiter)
253 ipif.transport.write(slip.encode(packet))
254 ipif.transport.write(slip.delimiter)
255
650a3251
IJ
256#---------- packet queue ----------
257
258class PacketQueue():
d579a048
IJ
259 def __init__(self, desc, max_queue_time):
260 self._desc = desc
8718b02c 261 assert(desc + '')
650a3251
IJ
262 self._max_queue_time = max_queue_time
263 self._pq = collections.deque() # packets
264
b68c0739 265 def _log(self, dflag, msg, **kwargs):
8c3b6620 266 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
d579a048 267
650a3251 268 def append(self, packet):
8c3b6620 269 self._log(DBG.QUEUE, 'append', d=packet)
650a3251
IJ
270 self._pq.append((time.monotonic(), packet))
271
272 def nonempty(self):
8c3b6620 273 self._log(DBG.QUEUE, 'nonempty ?')
650a3251
IJ
274 while True:
275 try: (queuetime, packet) = self._pq[0]
8c3b6620
IJ
276 except IndexError:
277 self._log(DBG.QUEUE, 'nonempty ? empty.')
278 return False
650a3251
IJ
279
280 age = time.monotonic() - queuetime
84e763c7 281 if age > self._max_queue_time:
650a3251 282 # strip old packets off the front
8c3b6620 283 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
650a3251
IJ
284 self._pq.popleft()
285 continue
286
8c3b6620 287 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
650a3251
IJ
288 return True
289
7b07f0b5
IJ
290 def process(self, sizequery, moredata, max_batch):
291 # sizequery() should return size of batch so far
292 # moredata(s) should add s to batch
8c3b6620 293 self._log(DBG.QUEUE, 'process...')
7b07f0b5
IJ
294 while True:
295 try: (dummy, packet) = self._pq[0]
8c3b6620
IJ
296 except IndexError:
297 self._log(DBG.QUEUE, 'process... empty')
298 break
299
300 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
7b07f0b5
IJ
301
302 encoded = slip.encode(packet)
303 sofar = sizequery()
304
8c3b6620
IJ
305 self._log(DBG.QUEUE_CTRL,
306 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
b68c0739 307 d=encoded)
8c3b6620 308
7b07f0b5
IJ
309 if sofar > 0:
310 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
8c3b6620 311 self._log(DBG.QUEUE_CTRL, 'process... overflow')
7b07f0b5
IJ
312 break
313 moredata(slip.delimiter)
314
315 moredata(encoded)
84e763c7 316 self._pq.popleft()
ae7c7784
IJ
317
318#---------- error handling ----------
319
b68c0739
IJ
320_crashing = False
321
ae7c7784 322def crash(err):
b68c0739
IJ
323 global _crashing
324 _crashing = True
e8ed0029
IJ
325 print('========== CRASH ==========', err,
326 '===========================', file=sys.stderr)
ae7c7784
IJ
327 try: reactor.stop()
328 except twisted.internet.error.ReactorNotRunning: pass
329
330def crash_on_defer(defer):
331 defer.addErrback(lambda err: crash(err))
332
e4006ac4 333def crash_on_critical(event):
ae7c7784
IJ
334 if event.get('log_level') >= LogLevel.critical:
335 crash(twisted.logger.formatEvent(event))
336
87a7c0c7
IJ
337#---------- config processing ----------
338
339def process_cfg_common_always():
340 global mtu
341 c.mtu = cfg.get('virtual','mtu')
342
88487243
IJ
343def process_cfg_ipif(section, varmap):
344 for d, s in varmap:
345 try: v = getattr(c, s)
034284c3 346 except AttributeError: continue
88487243
IJ
347 setattr(c, d, v)
348
b68c0739 349 #print(repr(c))
88487243
IJ
350
351 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
352
353def process_cfg_network():
354 c.network = ipnetwork(cfg.get('virtual','network'))
355 if c.network.num_addresses < 3 + 2:
356 raise ValueError('network needs at least 2^3 addresses')
357
358def process_cfg_server():
359 try:
360 c.server = cfg.get('virtual','server')
361 except NoOptionError:
362 process_cfg_network()
363 c.server = next(c.network.hosts())
364
365class ServerAddr():
366 def __init__(self, port, addrspec):
367 self.port = port
368 # also self.addr
369 try:
370 self.addr = ipaddress.IPv4Address(addrspec)
371 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
84e763c7 372 self._inurl = b'%s'
88487243
IJ
373 except AddressValueError:
374 self.addr = ipaddress.IPv6Address(addrspec)
375 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
84e763c7 376 self._inurl = b'[%s]'
88487243
IJ
377 def make_endpoint(self):
378 return self._endpointfactory(reactor, self.port, self.addr)
379 def url(self):
84e763c7
IJ
380 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
381 if self.port != 80: url += b':%d' % self.port
382 url += b'/'
88487243
IJ
383 return url
384
385def process_cfg_saddrs():
1d023c89
IJ
386 try: port = cfg.getint('server','port')
387 except NoOptionError: port = 80
88487243
IJ
388
389 c.saddrs = [ ]
390 for addrspec in cfg.get('server','addrs').split():
391 sa = ServerAddr(port, addrspec)
392 c.saddrs.append(sa)
393
394def process_cfg_clients(constructor):
395 c.clients = [ ]
396 for cs in cfg.sections():
397 if not (':' in cs or '.' in cs): continue
398 ci = ipaddr(cs)
399 pw = cfg.get(cs, 'password')
84e763c7 400 pw = pw.encode('utf-8')
88487243
IJ
401 constructor(ci,cs,pw)
402
ae7c7784
IJ
403#---------- startup ----------
404
5510890e 405def common_startup(process_cfg):
82302bac
IJ
406 # ConfigParser hates #-comments after values
407 trailingcomments_re = regexp.compile('#.*')
408 cfg.read_string(trailingcomments_re.sub('', defcfg))
cae50358
IJ
409 need_defcfg = True
410
411 def readconfig(pathname, mandatory=True):
412 def log(m, p=pathname):
413 if not DBG.CONFIG in debug_set: return
414 print('DBG.CONFIG: %s: %s' % (m, pathname))
415
416 try:
417 files = os.listdir(pathname)
418
419 except FileNotFoundError:
420 if mandatory: raise
421 log('skipped')
422 return
423
424 except NotADirectoryError:
425 cfg.read(pathname)
426 log('read file')
427 return
428
429 # is a directory
430 log('directory')
431 re = regexp.compile('[^-A-Za-z0-9_]')
432 for f in os.listdir(cdir):
433 if re.search(f): continue
434 subpath = pathname + '/' + f
435 try:
436 os.stat(subpath)
437 except FileNotFoundError:
438 log('entry skipped', subpath)
439 continue
440 cfg.read(subpath)
441 log('entry read', subpath)
442
443 def oc_config(od,os, value, op):
444 nonlocal need_defcfg
445 need_defcfg = False
446 readconfig(value)
2e68eb10 447
9acb0eca
IJ
448 def dfs_less_detailed(dl):
449 return [df for df in DBG.iterconstants() if df <= dl]
450
451 def ds_default(od,os,dl,op):
2e68eb10 452 global debug_set
9acb0eca 453 debug_set = set(dfs_less_detailed(debug_def_detail))
2e68eb10 454
9acb0eca 455 def ds_select(od,os, spec, op):
9acb0eca
IJ
456 for it in spec.split(','):
457
9acb0eca
IJ
458 if it.startswith('-'):
459 mutator = debug_set.discard
460 it = it[1:]
461 else:
462 mutator = debug_set.add
2cf75145
IJ
463
464 if it == '+':
465 dfs = DBG.iterconstants()
466
467 else:
468 if it.endswith('+'):
469 mapper = dfs_less_detailed
470 it = it[0:len(it)-1]
471 else:
472 mapper = lambda x: [x]
473
474 try:
475 dfspec = DBG.lookupByName(it)
476 except ValueError:
477 optparser.error('unknown debug flag %s in --debug-select' % it)
478
479 dfs = mapper(dfspec)
480
481 for df in dfs:
482 mutator(df)
9acb0eca
IJ
483
484 optparser.add_option('-D', '--debug',
2e68eb10
IJ
485 nargs=0,
486 action='callback',
9acb0eca
IJ
487 help='enable default debug (to stdout)',
488 callback= ds_default)
489
490 optparser.add_option('--debug-select',
491 nargs=1,
492 type='string',
2cf75145 493 metavar='[-]DFLAG[+]|[-]+,...',
9acb0eca 494 help=
2cf75145
IJ
495'''enable (`-': disable) each specified DFLAG;
496`+': do same for all "more interesting" DFLAGSs;
497just `+': all DFLAGs.
498 DFLAGS: ''' + ' '.join([df.name for df in DBG.iterconstants()]),
9acb0eca
IJ
499 action='callback',
500 callback= ds_select)
2e68eb10 501
cae50358
IJ
502 optparser.add_option('-c', '--config',
503 nargs=1,
504 type='string',
505 metavar='CONFIGFILE',
506 dest='configfile',
507 action='callback',
508 callback= oc_config)
509
2e68eb10
IJ
510 (opts, args) = optparser.parse_args()
511 if len(args): optparser.error('no non-option arguments please')
512
cae50358
IJ
513 if need_defcfg:
514 readconfig('/etc/hippotat/config', False)
515 readconfig('/etc/hippotat/config.d', False)
9acb0eca 516
5510890e
IJ
517 try: process_cfg()
518 except (configparser.Error, ValueError):
519 traceback.print_exc(file=sys.stderr)
520 print('\nInvalid configuration, giving up.', file=sys.stderr)
521 sys.exit(12)
522
cae50358 523 #print(repr(debug_set), file=sys.stderr)
2e68eb10 524
8c3b6620 525 log_formatter = twisted.logger.formatEventAsClassicLogText
389236df
IJ
526 stdout_obs = twisted.logger.FileLogObserver(sys.stdout, log_formatter)
527 stderr_obs = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
528 pred = twisted.logger.LogLevelFilterPredicate(LogLevel.error)
b83d422a 529 stdsomething_obs = twisted.logger.FilteringLogObserver(
389236df
IJ
530 stderr_obs, [pred], stdout_obs
531 )
b83d422a
IJ
532 log_observer = twisted.logger.FilteringLogObserver(
533 stdsomething_obs, [LogNotBoringTwisted()]
534 )
535 #log_observer = stdsomething_obs
8c3b6620
IJ
536 twisted.logger.globalLogBeginner.beginLoggingTo(
537 [ log_observer, crash_on_critical ]
538 )
ae7c7784 539
ae7c7784 540def common_run():
b68c0739
IJ
541 log_debug(DBG.INIT, 'entering reactor')
542 if not _crashing: reactor.run()
ae7c7784 543 print('CRASHED (end)', file=sys.stderr)