config redo
[hippotat] / hippotat / __init__.py
CommitLineData
b0cfbfce
IJ
1# -*- python -*-
2
37ab4cdc
IJ
3import signal
4signal.signal(signal.SIGINT, signal.SIG_DFL)
5
1321ad5f 6import sys
cae50358 7import os
1321ad5f 8
b83d422a
IJ
9from zope.interface import implementer
10
040ff511
IJ
11import twisted
12from twisted.internet import reactor
1d023c89 13import twisted.internet.endpoints
8c3b6620
IJ
14import twisted.logger
15from twisted.logger import LogLevel
16import twisted.python.constants
17from twisted.python.constants import NamedConstant
b0cfbfce
IJ
18
19import ipaddress
20from ipaddress import AddressValueError
21
ae7c7784
IJ
22from optparse import OptionParser
23from configparser import ConfigParser
24from configparser import NoOptionError
25
c13ee6e6
IJ
26from functools import partial
27
ae7c7784 28import collections
84e763c7 29import time
8c3b6620 30import codecs
eedc8b30 31import traceback
ae7c7784 32
1321ad5f
IJ
33import re as regexp
34
35import hippotat.slip as slip
36
d579a048 37class DBG(twisted.python.constants.Names):
380ed56c 38 INIT = NamedConstant()
cae50358 39 CONFIG = NamedConstant()
d579a048 40 ROUTE = NamedConstant()
b68c0739 41 DROP = NamedConstant()
d579a048
IJ
42 FLOW = NamedConstant()
43 HTTP = NamedConstant()
380ed56c 44 TWISTED = NamedConstant()
d579a048 45 QUEUE = NamedConstant()
380ed56c 46 HTTP_CTRL = NamedConstant()
d579a048 47 QUEUE_CTRL = NamedConstant()
297b3ebf 48 HTTP_FULL = NamedConstant()
0accf0d3 49 CTRL_DUMP = NamedConstant()
380ed56c 50 SLIP_FULL = NamedConstant()
9acb0eca 51 DATA_COMPLETE = NamedConstant()
d579a048 52
b68c0739 53_hex_codec = codecs.getencoder('hex_codec')
8c3b6620 54
b83d422a
IJ
55#---------- logging ----------
56
57org_stderr = sys.stderr
58
8c3b6620
IJ
59log = twisted.logger.Logger()
60
2e68eb10
IJ
61debug_set = set()
62debug_def_detail = DBG.HTTP
3e35fc99 63
8c3b6620 64def log_debug(dflag, msg, idof=None, d=None):
3e35fc99 65 if dflag not in debug_set: return
e8fcf3b7 66 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
8c3b6620 67 if idof is not None:
e8ed0029 68 msg = '[%#x] %s' % (id(idof), msg)
8c3b6620 69 if d is not None:
9acb0eca
IJ
70 trunc = ''
71 if not DBG.DATA_COMPLETE in debug_set:
72 if len(d) > 64:
73 d = d[0:64]
74 trunc = '...'
b68c0739 75 d = _hex_codec(d)[0].decode('ascii')
9acb0eca 76 msg += ' ' + d + trunc
8c3b6620
IJ
77 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
78
b83d422a
IJ
79@implementer(twisted.logger.ILogFilterPredicate)
80class LogNotBoringTwisted:
81 def __call__(self, event):
82 yes = twisted.logger.PredicateResult.yes
83 no = twisted.logger.PredicateResult.no
84 try:
85 if event.get('log_level') != LogLevel.info:
86 return yes
9acb0eca
IJ
87 dflag = event.get('dflag')
88 if dflag in debug_set: return yes
89 if dflag is None and DBG.TWISTED in debug_set: return yes
90 return no
b83d422a
IJ
91 except Exception:
92 print(traceback.format_exc(), file=org_stderr)
93 return yes
94
95#---------- default config ----------
96
ca732796
IJ
97defcfg = '''
98[DEFAULT]
99#[<client>] overrides
100max_batch_down = 65536 # used by server, subject to [limits]
101max_queue_time = 10 # used by server, subject to [limits]
ca732796 102target_requests_outstanding = 3 # must match; subject to [limits] on server
ba5630fd
IJ
103http_timeout = 30 # used by both } must be
104http_timeout_grace = 5 # used by both } compatible
ca732796
IJ
105max_requests_outstanding = 4 # used by client
106max_batch_up = 4000 # used by client
4edf77a3 107http_retry = 5 # used by client
ca732796
IJ
108
109#[server] or [<client>] overrides
110ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
111# extra interpolations: %(local)s %(peer)s %(rnet)s
112# obtained on server [virtual]server [virtual]relay [virtual]network
113# from on client <client> [virtual]server [virtual]routes
114
115[virtual]
116mtu = 1500
117routes = ''
118# network = <prefix>/<len> # mandatory for server
119# server = <ipaddr> # used by both, default is computed from `network'
120# relay = <ipaddr> # used by server, default from `network' and `server'
121# default server is first host in network
122# default relay is first host which is not server
123
124[server]
125# addrs = 127.0.0.1 ::1 # mandatory for server
126port = 80 # used by server
127# url # used by client; default from first `addrs' and `port'
128
129# [<client-ip4-or-ipv6-address>]
130# password = <password> # used by both, must match
131
132[limits]
133max_batch_down = 262144 # used by server
134max_queue_time = 121 # used by server
ba5630fd 135http_timeout = 121 # used by server
ca732796
IJ
136target_requests_outstanding = 10 # used by server
137'''
138
87a7c0c7 139# these need to be defined here so that they can be imported by import *
cae50358 140cfg = ConfigParser(strict=False)
ae7c7784
IJ
141optparser = OptionParser()
142
e4006ac4 143_mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
7b07f0b5
IJ
144def mime_translate(s):
145 # SLIP-encoded packets cannot contain ESC ESC.
146 # Swap `-' and ESC. The result cannot contain `--'
147 return s.translate(_mimetrans)
148
87a7c0c7
IJ
149class ConfigResults:
150 def __init__(self, d = { }):
151 self.__dict__ = d
152 def __repr__(self):
153 return 'ConfigResults('+repr(self.__dict__)+')'
154
155c = ConfigResults()
156
a8827d59 157def log_discard(packet, iface, saddr, daddr, why):
b68c0739 158 log_debug(DBG.DROP,
a8827d59 159 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
b68c0739 160 d=packet)
1321ad5f 161
b0cfbfce
IJ
162#---------- packet parsing ----------
163
164def packet_addrs(packet):
165 version = packet[0] >> 4
166 if version == 4:
167 addrlen = 4
168 saddroff = 3*4
169 factory = ipaddress.IPv4Address
170 elif version == 6:
171 addrlen = 16
172 saddroff = 2*4
173 factory = ipaddress.IPv6Address
174 else:
175 raise ValueError('unsupported IP version %d' % version)
176 saddr = factory(packet[ saddroff : saddroff + addrlen ])
177 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
178 return (saddr, daddr)
179
180#---------- address handling ----------
181
182def ipaddr(input):
183 try:
184 r = ipaddress.IPv4Address(input)
185 except AddressValueError:
186 r = ipaddress.IPv6Address(input)
187 return r
188
189def ipnetwork(input):
190 try:
191 r = ipaddress.IPv4Network(input)
192 except NetworkValueError:
193 r = ipaddress.IPv6Network(input)
194 return r
040ff511
IJ
195
196#---------- ipif (SLIP) subprocess ----------
197
a95cfeb2 198class SlipStreamDecoder():
db6ba584 199 def __init__(self, desc, on_packet):
040ff511 200 self._buffer = b''
a95cfeb2 201 self._on_packet = on_packet
db6ba584
IJ
202 self._desc = desc
203 self._log('__init__')
204
205 def _log(self, msg, **kwargs):
3297cac1 206 log_debug(DBG.SLIP_FULL, 'slip %s: %s' % (self._desc, msg), **kwargs)
a95cfeb2
IJ
207
208 def inputdata(self, data):
db6ba584 209 self._log('inputdata', d=data)
7a68893f 210 packets = slip.decode(data)
ba5630fd 211 packets[0] = self._buffer + packets[0]
040ff511
IJ
212 self._buffer = packets.pop()
213 for packet in packets:
a95cfeb2 214 self._maybe_packet(packet)
54890d4d 215 self._log('bufremain', d=self._buffer)
a95cfeb2
IJ
216
217 def _maybe_packet(self, packet):
54890d4d 218 self._log('maybepacket', d=packet)
db6ba584
IJ
219 if len(packet):
220 self._on_packet(packet)
a95cfeb2 221
4f991c0c 222 def flush(self):
54890d4d 223 self._log('flush')
a95cfeb2
IJ
224 self._maybe_packet(self._buffer)
225 self._buffer = b''
4f991c0c 226
e4006ac4 227class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
4f991c0c
IJ
228 def __init__(self, router):
229 self._router = router
db6ba584 230 self._decoder = SlipStreamDecoder('ipif', self.slip_on_packet)
a95cfeb2
IJ
231 def connectionMade(self): pass
232 def outReceived(self, data):
233 self._decoder.inputdata(data)
234 def slip_on_packet(self, packet):
4f991c0c
IJ
235 (saddr, daddr) = packet_addrs(packet)
236 if saddr.is_link_local or daddr.is_link_local:
a8827d59 237 log_discard(packet, 'ipif', saddr, daddr, 'link-local')
4f991c0c
IJ
238 return
239 self._router(packet, saddr, daddr)
040ff511
IJ
240 def processEnded(self, status):
241 status.raiseException()
242
243def start_ipif(command, router):
244 global ipif
245 ipif = _IpifProcessProtocol(router)
246 reactor.spawnProcess(ipif,
247 '/bin/sh',['sh','-xc', command],
ff613365
IJ
248 childFDs={0:'w', 1:'r', 2:2},
249 env=None)
040ff511
IJ
250
251def queue_inbound(packet):
15407d80 252 log_debug(DBG.FLOW, "queue_inbound", d=packet)
040ff511
IJ
253 ipif.transport.write(slip.delimiter)
254 ipif.transport.write(slip.encode(packet))
255 ipif.transport.write(slip.delimiter)
256
650a3251
IJ
257#---------- packet queue ----------
258
259class PacketQueue():
d579a048
IJ
260 def __init__(self, desc, max_queue_time):
261 self._desc = desc
8718b02c 262 assert(desc + '')
650a3251
IJ
263 self._max_queue_time = max_queue_time
264 self._pq = collections.deque() # packets
265
b68c0739 266 def _log(self, dflag, msg, **kwargs):
8c3b6620 267 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
d579a048 268
650a3251 269 def append(self, packet):
8c3b6620 270 self._log(DBG.QUEUE, 'append', d=packet)
650a3251
IJ
271 self._pq.append((time.monotonic(), packet))
272
273 def nonempty(self):
8c3b6620 274 self._log(DBG.QUEUE, 'nonempty ?')
650a3251
IJ
275 while True:
276 try: (queuetime, packet) = self._pq[0]
8c3b6620
IJ
277 except IndexError:
278 self._log(DBG.QUEUE, 'nonempty ? empty.')
279 return False
650a3251
IJ
280
281 age = time.monotonic() - queuetime
84e763c7 282 if age > self._max_queue_time:
650a3251 283 # strip old packets off the front
8c3b6620 284 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
650a3251
IJ
285 self._pq.popleft()
286 continue
287
8c3b6620 288 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
650a3251
IJ
289 return True
290
7b07f0b5
IJ
291 def process(self, sizequery, moredata, max_batch):
292 # sizequery() should return size of batch so far
293 # moredata(s) should add s to batch
8c3b6620 294 self._log(DBG.QUEUE, 'process...')
7b07f0b5
IJ
295 while True:
296 try: (dummy, packet) = self._pq[0]
8c3b6620
IJ
297 except IndexError:
298 self._log(DBG.QUEUE, 'process... empty')
299 break
300
301 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
7b07f0b5
IJ
302
303 encoded = slip.encode(packet)
304 sofar = sizequery()
305
8c3b6620
IJ
306 self._log(DBG.QUEUE_CTRL,
307 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
b68c0739 308 d=encoded)
8c3b6620 309
7b07f0b5
IJ
310 if sofar > 0:
311 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
8c3b6620 312 self._log(DBG.QUEUE_CTRL, 'process... overflow')
7b07f0b5
IJ
313 break
314 moredata(slip.delimiter)
315
316 moredata(encoded)
84e763c7 317 self._pq.popleft()
ae7c7784
IJ
318
319#---------- error handling ----------
320
b68c0739
IJ
321_crashing = False
322
ae7c7784 323def crash(err):
b68c0739
IJ
324 global _crashing
325 _crashing = True
e8ed0029
IJ
326 print('========== CRASH ==========', err,
327 '===========================', file=sys.stderr)
ae7c7784
IJ
328 try: reactor.stop()
329 except twisted.internet.error.ReactorNotRunning: pass
330
331def crash_on_defer(defer):
332 defer.addErrback(lambda err: crash(err))
333
e4006ac4 334def crash_on_critical(event):
ae7c7784
IJ
335 if event.get('log_level') >= LogLevel.critical:
336 crash(twisted.logger.formatEvent(event))
337
87a7c0c7
IJ
338#---------- config processing ----------
339
340def process_cfg_common_always():
341 global mtu
342 c.mtu = cfg.get('virtual','mtu')
343
88487243
IJ
344def process_cfg_ipif(section, varmap):
345 for d, s in varmap:
346 try: v = getattr(c, s)
034284c3 347 except AttributeError: continue
88487243
IJ
348 setattr(c, d, v)
349
b68c0739 350 #print(repr(c))
88487243
IJ
351
352 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
353
354def process_cfg_network():
355 c.network = ipnetwork(cfg.get('virtual','network'))
356 if c.network.num_addresses < 3 + 2:
357 raise ValueError('network needs at least 2^3 addresses')
358
359def process_cfg_server():
360 try:
361 c.server = cfg.get('virtual','server')
362 except NoOptionError:
363 process_cfg_network()
364 c.server = next(c.network.hosts())
365
366class ServerAddr():
367 def __init__(self, port, addrspec):
368 self.port = port
369 # also self.addr
370 try:
371 self.addr = ipaddress.IPv4Address(addrspec)
372 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
84e763c7 373 self._inurl = b'%s'
88487243
IJ
374 except AddressValueError:
375 self.addr = ipaddress.IPv6Address(addrspec)
376 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
84e763c7 377 self._inurl = b'[%s]'
88487243
IJ
378 def make_endpoint(self):
379 return self._endpointfactory(reactor, self.port, self.addr)
380 def url(self):
84e763c7
IJ
381 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
382 if self.port != 80: url += b':%d' % self.port
383 url += b'/'
88487243
IJ
384 return url
385
386def process_cfg_saddrs():
1d023c89
IJ
387 try: port = cfg.getint('server','port')
388 except NoOptionError: port = 80
88487243
IJ
389
390 c.saddrs = [ ]
391 for addrspec in cfg.get('server','addrs').split():
392 sa = ServerAddr(port, addrspec)
393 c.saddrs.append(sa)
394
395def process_cfg_clients(constructor):
396 c.clients = [ ]
397 for cs in cfg.sections():
398 if not (':' in cs or '.' in cs): continue
399 ci = ipaddr(cs)
400 pw = cfg.get(cs, 'password')
84e763c7 401 pw = pw.encode('utf-8')
88487243
IJ
402 constructor(ci,cs,pw)
403
ae7c7784
IJ
404#---------- startup ----------
405
1321ad5f 406def common_startup():
cae50358
IJ
407 re = regexp.compile('#.*')
408 cfg.read_string(re.sub('', defcfg))
409 need_defcfg = True
410
411 def readconfig(pathname, mandatory=True):
412 def log(m, p=pathname):
413 if not DBG.CONFIG in debug_set: return
414 print('DBG.CONFIG: %s: %s' % (m, pathname))
415
416 try:
417 files = os.listdir(pathname)
418
419 except FileNotFoundError:
420 if mandatory: raise
421 log('skipped')
422 return
423
424 except NotADirectoryError:
425 cfg.read(pathname)
426 log('read file')
427 return
428
429 # is a directory
430 log('directory')
431 re = regexp.compile('[^-A-Za-z0-9_]')
432 for f in os.listdir(cdir):
433 if re.search(f): continue
434 subpath = pathname + '/' + f
435 try:
436 os.stat(subpath)
437 except FileNotFoundError:
438 log('entry skipped', subpath)
439 continue
440 cfg.read(subpath)
441 log('entry read', subpath)
442
443 def oc_config(od,os, value, op):
444 nonlocal need_defcfg
445 need_defcfg = False
446 readconfig(value)
2e68eb10 447
9acb0eca
IJ
448 def dfs_less_detailed(dl):
449 return [df for df in DBG.iterconstants() if df <= dl]
450
451 def ds_default(od,os,dl,op):
2e68eb10 452 global debug_set
9acb0eca 453 debug_set = set(dfs_less_detailed(debug_def_detail))
2e68eb10 454
9acb0eca 455 def ds_select(od,os, spec, op):
9acb0eca
IJ
456 for it in spec.split(','):
457
9acb0eca
IJ
458 if it.startswith('-'):
459 mutator = debug_set.discard
460 it = it[1:]
461 else:
462 mutator = debug_set.add
2cf75145
IJ
463
464 if it == '+':
465 dfs = DBG.iterconstants()
466
467 else:
468 if it.endswith('+'):
469 mapper = dfs_less_detailed
470 it = it[0:len(it)-1]
471 else:
472 mapper = lambda x: [x]
473
474 try:
475 dfspec = DBG.lookupByName(it)
476 except ValueError:
477 optparser.error('unknown debug flag %s in --debug-select' % it)
478
479 dfs = mapper(dfspec)
480
481 for df in dfs:
482 mutator(df)
9acb0eca
IJ
483
484 optparser.add_option('-D', '--debug',
2e68eb10
IJ
485 nargs=0,
486 action='callback',
9acb0eca
IJ
487 help='enable default debug (to stdout)',
488 callback= ds_default)
489
490 optparser.add_option('--debug-select',
491 nargs=1,
492 type='string',
2cf75145 493 metavar='[-]DFLAG[+]|[-]+,...',
9acb0eca 494 help=
2cf75145
IJ
495'''enable (`-': disable) each specified DFLAG;
496`+': do same for all "more interesting" DFLAGSs;
497just `+': all DFLAGs.
498 DFLAGS: ''' + ' '.join([df.name for df in DBG.iterconstants()]),
9acb0eca
IJ
499 action='callback',
500 callback= ds_select)
2e68eb10 501
cae50358
IJ
502 optparser.add_option('-c', '--config',
503 nargs=1,
504 type='string',
505 metavar='CONFIGFILE',
506 dest='configfile',
507 action='callback',
508 callback= oc_config)
509
2e68eb10
IJ
510 (opts, args) = optparser.parse_args()
511 if len(args): optparser.error('no non-option arguments please')
512
cae50358
IJ
513 if need_defcfg:
514 readconfig('/etc/hippotat/config', False)
515 readconfig('/etc/hippotat/config.d', False)
9acb0eca 516
cae50358 517 #print(repr(debug_set), file=sys.stderr)
2e68eb10 518
8c3b6620 519 log_formatter = twisted.logger.formatEventAsClassicLogText
389236df
IJ
520 stdout_obs = twisted.logger.FileLogObserver(sys.stdout, log_formatter)
521 stderr_obs = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
522 pred = twisted.logger.LogLevelFilterPredicate(LogLevel.error)
b83d422a 523 stdsomething_obs = twisted.logger.FilteringLogObserver(
389236df
IJ
524 stderr_obs, [pred], stdout_obs
525 )
b83d422a
IJ
526 log_observer = twisted.logger.FilteringLogObserver(
527 stdsomething_obs, [LogNotBoringTwisted()]
528 )
529 #log_observer = stdsomething_obs
8c3b6620
IJ
530 twisted.logger.globalLogBeginner.beginLoggingTo(
531 [ log_observer, crash_on_critical ]
532 )
ae7c7784 533
ae7c7784 534def common_run():
b68c0739
IJ
535 log_debug(DBG.INIT, 'entering reactor')
536 if not _crashing: reactor.run()
ae7c7784 537 print('CRASHED (end)', file=sys.stderr)