reorg config - will break
[hippotat] / hippotat / __init__.py
CommitLineData
b0cfbfce
IJ
1# -*- python -*-
2
37ab4cdc
IJ
3import signal
4signal.signal(signal.SIGINT, signal.SIG_DFL)
5
1321ad5f 6import sys
cae50358 7import os
1321ad5f 8
b83d422a
IJ
9from zope.interface import implementer
10
040ff511
IJ
11import twisted
12from twisted.internet import reactor
1d023c89 13import twisted.internet.endpoints
8c3b6620
IJ
14import twisted.logger
15from twisted.logger import LogLevel
16import twisted.python.constants
17from twisted.python.constants import NamedConstant
b0cfbfce
IJ
18
19import ipaddress
20from ipaddress import AddressValueError
21
ae7c7784 22from optparse import OptionParser
5510890e 23import configparser
ae7c7784
IJ
24from configparser import ConfigParser
25from configparser import NoOptionError
26
c13ee6e6
IJ
27from functools import partial
28
ae7c7784 29import collections
84e763c7 30import time
8c3b6620 31import codecs
eedc8b30 32import traceback
ae7c7784 33
1321ad5f
IJ
34import re as regexp
35
36import hippotat.slip as slip
37
d579a048 38class DBG(twisted.python.constants.Names):
380ed56c 39 INIT = NamedConstant()
cae50358 40 CONFIG = NamedConstant()
d579a048 41 ROUTE = NamedConstant()
b68c0739 42 DROP = NamedConstant()
d579a048
IJ
43 FLOW = NamedConstant()
44 HTTP = NamedConstant()
380ed56c 45 TWISTED = NamedConstant()
d579a048 46 QUEUE = NamedConstant()
380ed56c 47 HTTP_CTRL = NamedConstant()
d579a048 48 QUEUE_CTRL = NamedConstant()
297b3ebf 49 HTTP_FULL = NamedConstant()
0accf0d3 50 CTRL_DUMP = NamedConstant()
380ed56c 51 SLIP_FULL = NamedConstant()
9acb0eca 52 DATA_COMPLETE = NamedConstant()
d579a048 53
b68c0739 54_hex_codec = codecs.getencoder('hex_codec')
8c3b6620 55
b83d422a
IJ
56#---------- logging ----------
57
58org_stderr = sys.stderr
59
8c3b6620
IJ
60log = twisted.logger.Logger()
61
2e68eb10
IJ
62debug_set = set()
63debug_def_detail = DBG.HTTP
3e35fc99 64
8c3b6620 65def log_debug(dflag, msg, idof=None, d=None):
3e35fc99 66 if dflag not in debug_set: return
e8fcf3b7 67 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
8c3b6620 68 if idof is not None:
e8ed0029 69 msg = '[%#x] %s' % (id(idof), msg)
8c3b6620 70 if d is not None:
9acb0eca
IJ
71 trunc = ''
72 if not DBG.DATA_COMPLETE in debug_set:
73 if len(d) > 64:
74 d = d[0:64]
75 trunc = '...'
b68c0739 76 d = _hex_codec(d)[0].decode('ascii')
9acb0eca 77 msg += ' ' + d + trunc
8c3b6620
IJ
78 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
79
b83d422a
IJ
80@implementer(twisted.logger.ILogFilterPredicate)
81class LogNotBoringTwisted:
82 def __call__(self, event):
83 yes = twisted.logger.PredicateResult.yes
84 no = twisted.logger.PredicateResult.no
85 try:
86 if event.get('log_level') != LogLevel.info:
87 return yes
9acb0eca
IJ
88 dflag = event.get('dflag')
89 if dflag in debug_set: return yes
90 if dflag is None and DBG.TWISTED in debug_set: return yes
91 return no
b83d422a
IJ
92 except Exception:
93 print(traceback.format_exc(), file=org_stderr)
94 return yes
95
96#---------- default config ----------
97
ca732796
IJ
98defcfg = '''
99[DEFAULT]
9e445690
IJ
100max_batch_down = 65536
101max_queue_time = 10
102target_requests_outstanding = 3
103http_timeout = 30
104http_timeout_grace = 5
105max_requests_outstanding = 6
106max_batch_up = 4000
107http_retry = 5
c7fb640e 108port = 80
ca732796
IJ
109
110#[server] or [<client>] overrides
111ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
ca732796 112
9e445690 113# relating to virtual network
ca732796 114mtu = 1500
ca732796 115
c7fb640e
IJ
116[SERVER]
117server = SERVER
9e445690 118# addrs = 127.0.0.1 ::1
9e445690
IJ
119# url
120
121# relating to virtual network
122routes = ''
123vnetwork = 172.24.230.192
124# network = <prefix>/<len>
125# server = <ipaddr>
126# relay = <ipaddr>
127
ca732796
IJ
128
129# [<client-ip4-or-ipv6-address>]
130# password = <password> # used by both, must match
131
c7fb640e 132[LIMIT]
9e445690
IJ
133max_batch_down = 262144
134max_queue_time = 121
135http_timeout = 121
136target_requests_outstanding = 10
ca732796
IJ
137'''
138
87a7c0c7 139# these need to be defined here so that they can be imported by import *
cae50358 140cfg = ConfigParser(strict=False)
ae7c7784
IJ
141optparser = OptionParser()
142
e4006ac4 143_mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
7b07f0b5
IJ
144def mime_translate(s):
145 # SLIP-encoded packets cannot contain ESC ESC.
146 # Swap `-' and ESC. The result cannot contain `--'
147 return s.translate(_mimetrans)
148
87a7c0c7 149class ConfigResults:
c7fb640e
IJ
150 def __init__(self):
151 pass
87a7c0c7
IJ
152 def __repr__(self):
153 return 'ConfigResults('+repr(self.__dict__)+')'
154
a8827d59 155def log_discard(packet, iface, saddr, daddr, why):
b68c0739 156 log_debug(DBG.DROP,
a8827d59 157 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
b68c0739 158 d=packet)
1321ad5f 159
b0cfbfce
IJ
160#---------- packet parsing ----------
161
162def packet_addrs(packet):
163 version = packet[0] >> 4
164 if version == 4:
165 addrlen = 4
166 saddroff = 3*4
167 factory = ipaddress.IPv4Address
168 elif version == 6:
169 addrlen = 16
170 saddroff = 2*4
171 factory = ipaddress.IPv6Address
172 else:
173 raise ValueError('unsupported IP version %d' % version)
174 saddr = factory(packet[ saddroff : saddroff + addrlen ])
175 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
176 return (saddr, daddr)
177
178#---------- address handling ----------
179
180def ipaddr(input):
181 try:
182 r = ipaddress.IPv4Address(input)
183 except AddressValueError:
184 r = ipaddress.IPv6Address(input)
185 return r
186
187def ipnetwork(input):
188 try:
189 r = ipaddress.IPv4Network(input)
190 except NetworkValueError:
191 r = ipaddress.IPv6Network(input)
192 return r
040ff511
IJ
193
194#---------- ipif (SLIP) subprocess ----------
195
a95cfeb2 196class SlipStreamDecoder():
db6ba584 197 def __init__(self, desc, on_packet):
040ff511 198 self._buffer = b''
a95cfeb2 199 self._on_packet = on_packet
db6ba584
IJ
200 self._desc = desc
201 self._log('__init__')
202
203 def _log(self, msg, **kwargs):
3297cac1 204 log_debug(DBG.SLIP_FULL, 'slip %s: %s' % (self._desc, msg), **kwargs)
a95cfeb2
IJ
205
206 def inputdata(self, data):
db6ba584 207 self._log('inputdata', d=data)
7a68893f 208 packets = slip.decode(data)
ba5630fd 209 packets[0] = self._buffer + packets[0]
040ff511
IJ
210 self._buffer = packets.pop()
211 for packet in packets:
a95cfeb2 212 self._maybe_packet(packet)
54890d4d 213 self._log('bufremain', d=self._buffer)
a95cfeb2
IJ
214
215 def _maybe_packet(self, packet):
54890d4d 216 self._log('maybepacket', d=packet)
db6ba584
IJ
217 if len(packet):
218 self._on_packet(packet)
a95cfeb2 219
4f991c0c 220 def flush(self):
54890d4d 221 self._log('flush')
a95cfeb2
IJ
222 self._maybe_packet(self._buffer)
223 self._buffer = b''
4f991c0c 224
e4006ac4 225class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
4f991c0c
IJ
226 def __init__(self, router):
227 self._router = router
db6ba584 228 self._decoder = SlipStreamDecoder('ipif', self.slip_on_packet)
a95cfeb2
IJ
229 def connectionMade(self): pass
230 def outReceived(self, data):
231 self._decoder.inputdata(data)
232 def slip_on_packet(self, packet):
4f991c0c
IJ
233 (saddr, daddr) = packet_addrs(packet)
234 if saddr.is_link_local or daddr.is_link_local:
a8827d59 235 log_discard(packet, 'ipif', saddr, daddr, 'link-local')
4f991c0c
IJ
236 return
237 self._router(packet, saddr, daddr)
040ff511
IJ
238 def processEnded(self, status):
239 status.raiseException()
240
241def start_ipif(command, router):
242 global ipif
243 ipif = _IpifProcessProtocol(router)
244 reactor.spawnProcess(ipif,
245 '/bin/sh',['sh','-xc', command],
ff613365
IJ
246 childFDs={0:'w', 1:'r', 2:2},
247 env=None)
040ff511
IJ
248
249def queue_inbound(packet):
15407d80 250 log_debug(DBG.FLOW, "queue_inbound", d=packet)
040ff511
IJ
251 ipif.transport.write(slip.delimiter)
252 ipif.transport.write(slip.encode(packet))
253 ipif.transport.write(slip.delimiter)
254
650a3251
IJ
255#---------- packet queue ----------
256
257class PacketQueue():
d579a048
IJ
258 def __init__(self, desc, max_queue_time):
259 self._desc = desc
8718b02c 260 assert(desc + '')
650a3251
IJ
261 self._max_queue_time = max_queue_time
262 self._pq = collections.deque() # packets
263
b68c0739 264 def _log(self, dflag, msg, **kwargs):
8c3b6620 265 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
d579a048 266
650a3251 267 def append(self, packet):
8c3b6620 268 self._log(DBG.QUEUE, 'append', d=packet)
650a3251
IJ
269 self._pq.append((time.monotonic(), packet))
270
271 def nonempty(self):
8c3b6620 272 self._log(DBG.QUEUE, 'nonempty ?')
650a3251
IJ
273 while True:
274 try: (queuetime, packet) = self._pq[0]
8c3b6620
IJ
275 except IndexError:
276 self._log(DBG.QUEUE, 'nonempty ? empty.')
277 return False
650a3251
IJ
278
279 age = time.monotonic() - queuetime
84e763c7 280 if age > self._max_queue_time:
650a3251 281 # strip old packets off the front
8c3b6620 282 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
650a3251
IJ
283 self._pq.popleft()
284 continue
285
8c3b6620 286 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
650a3251
IJ
287 return True
288
7b07f0b5
IJ
289 def process(self, sizequery, moredata, max_batch):
290 # sizequery() should return size of batch so far
291 # moredata(s) should add s to batch
8c3b6620 292 self._log(DBG.QUEUE, 'process...')
7b07f0b5
IJ
293 while True:
294 try: (dummy, packet) = self._pq[0]
8c3b6620
IJ
295 except IndexError:
296 self._log(DBG.QUEUE, 'process... empty')
297 break
298
299 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
7b07f0b5
IJ
300
301 encoded = slip.encode(packet)
302 sofar = sizequery()
303
8c3b6620
IJ
304 self._log(DBG.QUEUE_CTRL,
305 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
b68c0739 306 d=encoded)
8c3b6620 307
7b07f0b5
IJ
308 if sofar > 0:
309 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
8c3b6620 310 self._log(DBG.QUEUE_CTRL, 'process... overflow')
7b07f0b5
IJ
311 break
312 moredata(slip.delimiter)
313
314 moredata(encoded)
84e763c7 315 self._pq.popleft()
ae7c7784
IJ
316
317#---------- error handling ----------
318
b68c0739
IJ
319_crashing = False
320
ae7c7784 321def crash(err):
b68c0739
IJ
322 global _crashing
323 _crashing = True
e8ed0029
IJ
324 print('========== CRASH ==========', err,
325 '===========================', file=sys.stderr)
ae7c7784
IJ
326 try: reactor.stop()
327 except twisted.internet.error.ReactorNotRunning: pass
328
329def crash_on_defer(defer):
330 defer.addErrback(lambda err: crash(err))
331
e4006ac4 332def crash_on_critical(event):
ae7c7784
IJ
333 if event.get('log_level') >= LogLevel.critical:
334 crash(twisted.logger.formatEvent(event))
335
87a7c0c7
IJ
336#---------- config processing ----------
337
c7fb640e
IJ
338def _cfg_process_putatives():
339 servers = { }
340 clients = { }
341 # maps from abstract object to canonical name for cs's
87a7c0c7 342
c7fb640e
IJ
343 def putative(cmap, abstract, canoncs):
344 try:
345 current_canoncs = cmap[abstract]
346 except KeyError:
347 pass
348 else:
349 assert(current_canoncs == canoncs)
350 cmap[abstract] = canoncs
351
352 server_pat = r'[-.0-9A-Za-z]+'
353 client_pat = r'[.:0-9a-f]+'
354 server_re = regexp.compile(server_pat)
355 serverclient_re = regexp.compile(server_pat + r' ' + client_pat)
88487243 356
c7fb640e
IJ
357 for cs in cfg.sections():
358 if cs = 'LIMIT':
359 # plan A "[LIMIT]"
360 continue
88487243 361
c7fb640e
IJ
362 try:
363 # plan B "[<client>]" part 1
364 ci = ipaddr(cs)
365 except AddressValueError:
88487243 366
c7fb640e
IJ
367 if server_re.fullmatch(cs):
368 # plan C "[<servername>]"
369 putative(servers, cs, cs)
370 continue
371
372 if serverclient_re.fullmatch(cs):
373 # plan D "[<servername> <client>]" part 1
374 (pss,pcs) = cs.split(' ')
375
376 if pcs = 'LIMIT':
377 # plan E "[<servername> LIMIT]"
378 continue
379
380 try:
381 # plan D "[<servername> <client>]" part 2
382 ci = ipaddr(pc)
383 except AddressValueError:
384 # plan F "[<some thing we do not understand>]"
385 # well, we ignore this
386 print('warning: ignoring config section %s' % cs, file=sys.stderr)
387 continue
388
389 else: # no AddressValueError
390 # plan D "[<servername> <client]" part 3
391 putative(clients, ci, pcs)
392 putative(servers, pss, pss)
393 continue
394
395 else: # no AddressValueError
396 # plan B "[<client>" part 2
397 putative(clients, ci, cs)
398 continue
399
400 return (servers, clients)
401
402def cfg_process_common(ss):
403 c.mtu = cfg.getint(ss, 'mtu')
404
405def cfg_process_saddrs(c, ss):
406 class ServerAddr():
407 def __init__(self, port, addrspec):
408 self.port = port
409 # also self.addr
410 try:
411 self.addr = ipaddress.IPv4Address(addrspec)
412 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
413 self._inurl = b'%s'
414 except AddressValueError:
415 self.addr = ipaddress.IPv6Address(addrspec)
416 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
417 self._inurl = b'[%s]'
418 def make_endpoint(self):
419 return self._endpointfactory(reactor, self.port, self.addr)
420 def url(self):
421 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
422 if self.port != 80: url += b':%d' % self.port
423 url += b'/'
424 return url
425
426 c.port = cfg.getint(ss,'port')
427 c.saddrs = [ ]
428 for addrspec in cfg.get(ss, 'addrs').split():
429 sa = ServerAddr(c.port, addrspec)
430 c.saddrs.append(sa)
431
432def cfg_process_vnetwork(c, ss):
433 c.network = ipnetwork(cfg.get(ss,'network'))
88487243
IJ
434 if c.network.num_addresses < 3 + 2:
435 raise ValueError('network needs at least 2^3 addresses')
436
c7fb640e 437def cfg_process_vaddr():
88487243
IJ
438 try:
439 c.server = cfg.get('virtual','server')
440 except NoOptionError:
441 process_cfg_network()
442 c.server = next(c.network.hosts())
443
c7fb640e
IJ
444def cfg_search_section(key,sections):
445 for section in sections:
446 if cfg.has_option(section, key):
447 return section
448 raise NoOptionError('missing %s %s' % (key, repr(sections)))
449
450def cfg_search(getter,key,sections):
451 section = cfg_search_section(key,sections)
452 return getter(section, key)
453
454def cfg_process_client_limited(cc,ss,sections,key):
455 val = cfg_search(cfg.getint, key, sections)
456 lim = cfg_search(cfg.getint, key, '%s LIMIT' % ss, 'LIMIT')
457 cc.__dict__[key] = min(val,lim)
458
459def cfg_process_client_common(cc,ss,cs,ci):
460 # returns sections to search in, iff password is defined, otherwise None
461 cc.ci = ci
462
463 sections = ['%s %s' % section,
464 cs,
465 ss,
466 'DEFAULT']
467
468 try: pwsection = cfg_search_section('password', sections)
469 except NoOptionError: return None
88487243 470
c7fb640e
IJ
471 pw = cfg.get(pwsection, 'password')
472 pw = pw.encode('utf-8')
88487243 473
c7fb640e
IJ
474 cfg_process_client_limited(cc,ss,sections,'target_requests_outstanding')
475 cfg_process_client_limited(cc,ss,sections,'http_timeout')
88487243 476
c7fb640e
IJ
477 return sections
478
479def process_cfg_ipif(c, sections, varmap):
480 for d, s in varmap:
481 try: v = getattr(c, s)
482 except AttributeError: continue
483 setattr(c, d, v)
484
485 section = cfg_search_section('ipif', sections)
486 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
88487243 487
ae7c7784
IJ
488#---------- startup ----------
489
5510890e 490def common_startup(process_cfg):
c7fb640e
IJ
491 # calls process_cfg(putative_clients, putative_servers)
492
82302bac 493 # ConfigParser hates #-comments after values
c7fb640e 494 trailingcomments_re = regexp.compile(r'#.*')
82302bac 495 cfg.read_string(trailingcomments_re.sub('', defcfg))
cae50358
IJ
496 need_defcfg = True
497
498 def readconfig(pathname, mandatory=True):
499 def log(m, p=pathname):
500 if not DBG.CONFIG in debug_set: return
501 print('DBG.CONFIG: %s: %s' % (m, pathname))
502
503 try:
504 files = os.listdir(pathname)
505
506 except FileNotFoundError:
507 if mandatory: raise
508 log('skipped')
509 return
510
511 except NotADirectoryError:
512 cfg.read(pathname)
513 log('read file')
514 return
515
516 # is a directory
517 log('directory')
518 re = regexp.compile('[^-A-Za-z0-9_]')
519 for f in os.listdir(cdir):
520 if re.search(f): continue
521 subpath = pathname + '/' + f
522 try:
523 os.stat(subpath)
524 except FileNotFoundError:
525 log('entry skipped', subpath)
526 continue
527 cfg.read(subpath)
528 log('entry read', subpath)
529
530 def oc_config(od,os, value, op):
531 nonlocal need_defcfg
532 need_defcfg = False
533 readconfig(value)
2e68eb10 534
9acb0eca
IJ
535 def dfs_less_detailed(dl):
536 return [df for df in DBG.iterconstants() if df <= dl]
537
538 def ds_default(od,os,dl,op):
2e68eb10 539 global debug_set
9acb0eca 540 debug_set = set(dfs_less_detailed(debug_def_detail))
2e68eb10 541
9acb0eca 542 def ds_select(od,os, spec, op):
9acb0eca
IJ
543 for it in spec.split(','):
544
9acb0eca
IJ
545 if it.startswith('-'):
546 mutator = debug_set.discard
547 it = it[1:]
548 else:
549 mutator = debug_set.add
2cf75145
IJ
550
551 if it == '+':
552 dfs = DBG.iterconstants()
553
554 else:
555 if it.endswith('+'):
556 mapper = dfs_less_detailed
557 it = it[0:len(it)-1]
558 else:
559 mapper = lambda x: [x]
560
561 try:
562 dfspec = DBG.lookupByName(it)
563 except ValueError:
564 optparser.error('unknown debug flag %s in --debug-select' % it)
565
566 dfs = mapper(dfspec)
567
568 for df in dfs:
569 mutator(df)
9acb0eca
IJ
570
571 optparser.add_option('-D', '--debug',
2e68eb10
IJ
572 nargs=0,
573 action='callback',
9acb0eca
IJ
574 help='enable default debug (to stdout)',
575 callback= ds_default)
576
577 optparser.add_option('--debug-select',
578 nargs=1,
579 type='string',
2cf75145 580 metavar='[-]DFLAG[+]|[-]+,...',
9acb0eca 581 help=
2cf75145
IJ
582'''enable (`-': disable) each specified DFLAG;
583`+': do same for all "more interesting" DFLAGSs;
584just `+': all DFLAGs.
585 DFLAGS: ''' + ' '.join([df.name for df in DBG.iterconstants()]),
9acb0eca
IJ
586 action='callback',
587 callback= ds_select)
2e68eb10 588
cae50358
IJ
589 optparser.add_option('-c', '--config',
590 nargs=1,
591 type='string',
592 metavar='CONFIGFILE',
593 dest='configfile',
594 action='callback',
595 callback= oc_config)
596
2e68eb10
IJ
597 (opts, args) = optparser.parse_args()
598 if len(args): optparser.error('no non-option arguments please')
599
cae50358
IJ
600 if need_defcfg:
601 readconfig('/etc/hippotat/config', False)
602 readconfig('/etc/hippotat/config.d', False)
9acb0eca 603
c7fb640e
IJ
604 try:
605 (pss, pcs) = process_cfg_putatives()
606 process_cfg(pss, pcs)
5510890e
IJ
607 except (configparser.Error, ValueError):
608 traceback.print_exc(file=sys.stderr)
609 print('\nInvalid configuration, giving up.', file=sys.stderr)
610 sys.exit(12)
611
cae50358 612 #print(repr(debug_set), file=sys.stderr)
2e68eb10 613
8c3b6620 614 log_formatter = twisted.logger.formatEventAsClassicLogText
389236df
IJ
615 stdout_obs = twisted.logger.FileLogObserver(sys.stdout, log_formatter)
616 stderr_obs = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
617 pred = twisted.logger.LogLevelFilterPredicate(LogLevel.error)
b83d422a 618 stdsomething_obs = twisted.logger.FilteringLogObserver(
389236df
IJ
619 stderr_obs, [pred], stdout_obs
620 )
b83d422a
IJ
621 log_observer = twisted.logger.FilteringLogObserver(
622 stdsomething_obs, [LogNotBoringTwisted()]
623 )
624 #log_observer = stdsomething_obs
8c3b6620
IJ
625 twisted.logger.globalLogBeginner.beginLoggingTo(
626 [ log_observer, crash_on_critical ]
627 )
ae7c7784 628
ae7c7784 629def common_run():
b68c0739
IJ
630 log_debug(DBG.INIT, 'entering reactor')
631 if not _crashing: reactor.run()
ae7c7784 632 print('CRASHED (end)', file=sys.stderr)