wip debug
[hippotat] / hippotat / __init__.py
CommitLineData
b0cfbfce
IJ
1# -*- python -*-
2
37ab4cdc
IJ
3import signal
4signal.signal(signal.SIGINT, signal.SIG_DFL)
5
1321ad5f
IJ
6import sys
7
b83d422a
IJ
8from zope.interface import implementer
9
040ff511
IJ
10import twisted
11from twisted.internet import reactor
1d023c89 12import twisted.internet.endpoints
8c3b6620
IJ
13import twisted.logger
14from twisted.logger import LogLevel
15import twisted.python.constants
16from twisted.python.constants import NamedConstant
b0cfbfce
IJ
17
18import ipaddress
19from ipaddress import AddressValueError
20
ae7c7784
IJ
21from optparse import OptionParser
22from configparser import ConfigParser
23from configparser import NoOptionError
24
c13ee6e6
IJ
25from functools import partial
26
ae7c7784 27import collections
84e763c7 28import time
8c3b6620 29import codecs
eedc8b30 30import traceback
ae7c7784 31
1321ad5f
IJ
32import re as regexp
33
34import hippotat.slip as slip
35
d579a048 36class DBG(twisted.python.constants.Names):
380ed56c 37 INIT = NamedConstant()
d579a048 38 ROUTE = NamedConstant()
b68c0739 39 DROP = NamedConstant()
d579a048
IJ
40 FLOW = NamedConstant()
41 HTTP = NamedConstant()
380ed56c 42 TWISTED = NamedConstant()
d579a048 43 QUEUE = NamedConstant()
380ed56c 44 HTTP_CTRL = NamedConstant()
d579a048 45 QUEUE_CTRL = NamedConstant()
297b3ebf 46 HTTP_FULL = NamedConstant()
0accf0d3 47 CTRL_DUMP = NamedConstant()
380ed56c 48 SLIP_FULL = NamedConstant()
9acb0eca 49 DATA_COMPLETE = NamedConstant()
d579a048 50
b68c0739 51_hex_codec = codecs.getencoder('hex_codec')
8c3b6620 52
b83d422a
IJ
53#---------- logging ----------
54
55org_stderr = sys.stderr
56
8c3b6620
IJ
57log = twisted.logger.Logger()
58
2e68eb10
IJ
59debug_set = set()
60debug_def_detail = DBG.HTTP
3e35fc99 61
8c3b6620 62def log_debug(dflag, msg, idof=None, d=None):
3e35fc99 63 if dflag not in debug_set: return
e8fcf3b7 64 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
8c3b6620 65 if idof is not None:
e8ed0029 66 msg = '[%#x] %s' % (id(idof), msg)
8c3b6620 67 if d is not None:
9acb0eca
IJ
68 trunc = ''
69 if not DBG.DATA_COMPLETE in debug_set:
70 if len(d) > 64:
71 d = d[0:64]
72 trunc = '...'
b68c0739 73 d = _hex_codec(d)[0].decode('ascii')
9acb0eca 74 msg += ' ' + d + trunc
8c3b6620
IJ
75 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
76
b83d422a
IJ
77@implementer(twisted.logger.ILogFilterPredicate)
78class LogNotBoringTwisted:
79 def __call__(self, event):
80 yes = twisted.logger.PredicateResult.yes
81 no = twisted.logger.PredicateResult.no
82 try:
83 if event.get('log_level') != LogLevel.info:
84 return yes
9acb0eca
IJ
85 dflag = event.get('dflag')
86 if dflag in debug_set: return yes
87 if dflag is None and DBG.TWISTED in debug_set: return yes
88 return no
b83d422a
IJ
89 except Exception:
90 print(traceback.format_exc(), file=org_stderr)
91 return yes
92
93#---------- default config ----------
94
ca732796
IJ
95defcfg = '''
96[DEFAULT]
97#[<client>] overrides
98max_batch_down = 65536 # used by server, subject to [limits]
99max_queue_time = 10 # used by server, subject to [limits]
ca732796 100target_requests_outstanding = 3 # must match; subject to [limits] on server
ba5630fd
IJ
101http_timeout = 30 # used by both } must be
102http_timeout_grace = 5 # used by both } compatible
ca732796
IJ
103max_requests_outstanding = 4 # used by client
104max_batch_up = 4000 # used by client
4edf77a3 105http_retry = 5 # used by client
ca732796
IJ
106
107#[server] or [<client>] overrides
108ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
109# extra interpolations: %(local)s %(peer)s %(rnet)s
110# obtained on server [virtual]server [virtual]relay [virtual]network
111# from on client <client> [virtual]server [virtual]routes
112
113[virtual]
114mtu = 1500
115routes = ''
116# network = <prefix>/<len> # mandatory for server
117# server = <ipaddr> # used by both, default is computed from `network'
118# relay = <ipaddr> # used by server, default from `network' and `server'
119# default server is first host in network
120# default relay is first host which is not server
121
122[server]
123# addrs = 127.0.0.1 ::1 # mandatory for server
124port = 80 # used by server
125# url # used by client; default from first `addrs' and `port'
126
127# [<client-ip4-or-ipv6-address>]
128# password = <password> # used by both, must match
129
130[limits]
131max_batch_down = 262144 # used by server
132max_queue_time = 121 # used by server
ba5630fd 133http_timeout = 121 # used by server
ca732796
IJ
134target_requests_outstanding = 10 # used by server
135'''
136
87a7c0c7 137# these need to be defined here so that they can be imported by import *
ae7c7784
IJ
138cfg = ConfigParser()
139optparser = OptionParser()
140
e4006ac4 141_mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
7b07f0b5
IJ
142def mime_translate(s):
143 # SLIP-encoded packets cannot contain ESC ESC.
144 # Swap `-' and ESC. The result cannot contain `--'
145 return s.translate(_mimetrans)
146
87a7c0c7
IJ
147class ConfigResults:
148 def __init__(self, d = { }):
149 self.__dict__ = d
150 def __repr__(self):
151 return 'ConfigResults('+repr(self.__dict__)+')'
152
153c = ConfigResults()
154
a8827d59 155def log_discard(packet, iface, saddr, daddr, why):
b68c0739 156 log_debug(DBG.DROP,
a8827d59 157 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
b68c0739 158 d=packet)
1321ad5f 159
b0cfbfce
IJ
160#---------- packet parsing ----------
161
162def packet_addrs(packet):
163 version = packet[0] >> 4
164 if version == 4:
165 addrlen = 4
166 saddroff = 3*4
167 factory = ipaddress.IPv4Address
168 elif version == 6:
169 addrlen = 16
170 saddroff = 2*4
171 factory = ipaddress.IPv6Address
172 else:
173 raise ValueError('unsupported IP version %d' % version)
174 saddr = factory(packet[ saddroff : saddroff + addrlen ])
175 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
176 return (saddr, daddr)
177
178#---------- address handling ----------
179
180def ipaddr(input):
181 try:
182 r = ipaddress.IPv4Address(input)
183 except AddressValueError:
184 r = ipaddress.IPv6Address(input)
185 return r
186
187def ipnetwork(input):
188 try:
189 r = ipaddress.IPv4Network(input)
190 except NetworkValueError:
191 r = ipaddress.IPv6Network(input)
192 return r
040ff511
IJ
193
194#---------- ipif (SLIP) subprocess ----------
195
a95cfeb2 196class SlipStreamDecoder():
db6ba584 197 def __init__(self, desc, on_packet):
040ff511 198 self._buffer = b''
a95cfeb2 199 self._on_packet = on_packet
db6ba584
IJ
200 self._desc = desc
201 self._log('__init__')
202
203 def _log(self, msg, **kwargs):
3297cac1 204 log_debug(DBG.SLIP_FULL, 'slip %s: %s' % (self._desc, msg), **kwargs)
a95cfeb2
IJ
205
206 def inputdata(self, data):
db6ba584 207 self._log('inputdata', d=data)
7a68893f 208 packets = slip.decode(data)
ba5630fd 209 packets[0] = self._buffer + packets[0]
040ff511
IJ
210 self._buffer = packets.pop()
211 for packet in packets:
a95cfeb2 212 self._maybe_packet(packet)
54890d4d 213 self._log('bufremain', d=self._buffer)
a95cfeb2
IJ
214
215 def _maybe_packet(self, packet):
54890d4d 216 self._log('maybepacket', d=packet)
db6ba584
IJ
217 if len(packet):
218 self._on_packet(packet)
a95cfeb2 219
4f991c0c 220 def flush(self):
54890d4d 221 self._log('flush')
a95cfeb2
IJ
222 self._maybe_packet(self._buffer)
223 self._buffer = b''
4f991c0c 224
e4006ac4 225class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
4f991c0c
IJ
226 def __init__(self, router):
227 self._router = router
db6ba584 228 self._decoder = SlipStreamDecoder('ipif', self.slip_on_packet)
a95cfeb2
IJ
229 def connectionMade(self): pass
230 def outReceived(self, data):
231 self._decoder.inputdata(data)
232 def slip_on_packet(self, packet):
4f991c0c
IJ
233 (saddr, daddr) = packet_addrs(packet)
234 if saddr.is_link_local or daddr.is_link_local:
a8827d59 235 log_discard(packet, 'ipif', saddr, daddr, 'link-local')
4f991c0c
IJ
236 return
237 self._router(packet, saddr, daddr)
040ff511
IJ
238 def processEnded(self, status):
239 status.raiseException()
240
241def start_ipif(command, router):
242 global ipif
243 ipif = _IpifProcessProtocol(router)
244 reactor.spawnProcess(ipif,
245 '/bin/sh',['sh','-xc', command],
ff613365
IJ
246 childFDs={0:'w', 1:'r', 2:2},
247 env=None)
040ff511
IJ
248
249def queue_inbound(packet):
15407d80 250 log_debug(DBG.FLOW, "queue_inbound", d=packet)
040ff511
IJ
251 ipif.transport.write(slip.delimiter)
252 ipif.transport.write(slip.encode(packet))
253 ipif.transport.write(slip.delimiter)
254
650a3251
IJ
255#---------- packet queue ----------
256
257class PacketQueue():
d579a048
IJ
258 def __init__(self, desc, max_queue_time):
259 self._desc = desc
8718b02c 260 assert(desc + '')
650a3251
IJ
261 self._max_queue_time = max_queue_time
262 self._pq = collections.deque() # packets
263
b68c0739 264 def _log(self, dflag, msg, **kwargs):
8c3b6620 265 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
d579a048 266
650a3251 267 def append(self, packet):
8c3b6620 268 self._log(DBG.QUEUE, 'append', d=packet)
650a3251
IJ
269 self._pq.append((time.monotonic(), packet))
270
271 def nonempty(self):
8c3b6620 272 self._log(DBG.QUEUE, 'nonempty ?')
650a3251
IJ
273 while True:
274 try: (queuetime, packet) = self._pq[0]
8c3b6620
IJ
275 except IndexError:
276 self._log(DBG.QUEUE, 'nonempty ? empty.')
277 return False
650a3251
IJ
278
279 age = time.monotonic() - queuetime
84e763c7 280 if age > self._max_queue_time:
650a3251 281 # strip old packets off the front
8c3b6620 282 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
650a3251
IJ
283 self._pq.popleft()
284 continue
285
8c3b6620 286 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
650a3251
IJ
287 return True
288
7b07f0b5
IJ
289 def process(self, sizequery, moredata, max_batch):
290 # sizequery() should return size of batch so far
291 # moredata(s) should add s to batch
8c3b6620 292 self._log(DBG.QUEUE, 'process...')
7b07f0b5
IJ
293 while True:
294 try: (dummy, packet) = self._pq[0]
8c3b6620
IJ
295 except IndexError:
296 self._log(DBG.QUEUE, 'process... empty')
297 break
298
299 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
7b07f0b5
IJ
300
301 encoded = slip.encode(packet)
302 sofar = sizequery()
303
8c3b6620
IJ
304 self._log(DBG.QUEUE_CTRL,
305 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
b68c0739 306 d=encoded)
8c3b6620 307
7b07f0b5
IJ
308 if sofar > 0:
309 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
8c3b6620 310 self._log(DBG.QUEUE_CTRL, 'process... overflow')
7b07f0b5
IJ
311 break
312 moredata(slip.delimiter)
313
314 moredata(encoded)
84e763c7 315 self._pq.popleft()
ae7c7784
IJ
316
317#---------- error handling ----------
318
b68c0739
IJ
319_crashing = False
320
ae7c7784 321def crash(err):
b68c0739
IJ
322 global _crashing
323 _crashing = True
e8ed0029
IJ
324 print('========== CRASH ==========', err,
325 '===========================', file=sys.stderr)
ae7c7784
IJ
326 try: reactor.stop()
327 except twisted.internet.error.ReactorNotRunning: pass
328
329def crash_on_defer(defer):
330 defer.addErrback(lambda err: crash(err))
331
e4006ac4 332def crash_on_critical(event):
ae7c7784
IJ
333 if event.get('log_level') >= LogLevel.critical:
334 crash(twisted.logger.formatEvent(event))
335
87a7c0c7
IJ
336#---------- config processing ----------
337
338def process_cfg_common_always():
339 global mtu
340 c.mtu = cfg.get('virtual','mtu')
341
88487243
IJ
342def process_cfg_ipif(section, varmap):
343 for d, s in varmap:
344 try: v = getattr(c, s)
034284c3 345 except AttributeError: continue
88487243
IJ
346 setattr(c, d, v)
347
b68c0739 348 #print(repr(c))
88487243
IJ
349
350 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
351
352def process_cfg_network():
353 c.network = ipnetwork(cfg.get('virtual','network'))
354 if c.network.num_addresses < 3 + 2:
355 raise ValueError('network needs at least 2^3 addresses')
356
357def process_cfg_server():
358 try:
359 c.server = cfg.get('virtual','server')
360 except NoOptionError:
361 process_cfg_network()
362 c.server = next(c.network.hosts())
363
364class ServerAddr():
365 def __init__(self, port, addrspec):
366 self.port = port
367 # also self.addr
368 try:
369 self.addr = ipaddress.IPv4Address(addrspec)
370 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
84e763c7 371 self._inurl = b'%s'
88487243
IJ
372 except AddressValueError:
373 self.addr = ipaddress.IPv6Address(addrspec)
374 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
84e763c7 375 self._inurl = b'[%s]'
88487243
IJ
376 def make_endpoint(self):
377 return self._endpointfactory(reactor, self.port, self.addr)
378 def url(self):
84e763c7
IJ
379 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
380 if self.port != 80: url += b':%d' % self.port
381 url += b'/'
88487243
IJ
382 return url
383
384def process_cfg_saddrs():
1d023c89
IJ
385 try: port = cfg.getint('server','port')
386 except NoOptionError: port = 80
88487243
IJ
387
388 c.saddrs = [ ]
389 for addrspec in cfg.get('server','addrs').split():
390 sa = ServerAddr(port, addrspec)
391 c.saddrs.append(sa)
392
393def process_cfg_clients(constructor):
394 c.clients = [ ]
395 for cs in cfg.sections():
396 if not (':' in cs or '.' in cs): continue
397 ci = ipaddr(cs)
398 pw = cfg.get(cs, 'password')
84e763c7 399 pw = pw.encode('utf-8')
88487243
IJ
400 constructor(ci,cs,pw)
401
ae7c7784
IJ
402#---------- startup ----------
403
1321ad5f 404def common_startup():
2e68eb10
IJ
405 optparser.add_option('-c', '--config', dest='configfile',
406 default='/etc/hippotat/config')
407
9acb0eca
IJ
408 def dfs_less_detailed(dl):
409 return [df for df in DBG.iterconstants() if df <= dl]
410
411 def ds_default(od,os,dl,op):
2e68eb10 412 global debug_set
9acb0eca 413 debug_set = set(dfs_less_detailed(debug_def_detail))
2e68eb10 414
9acb0eca
IJ
415 def ds_select(od,os, spec, op):
416 last_df = next(DBG.iterconstants())
417 mutator = debug_set.add
2e68eb10 418
9acb0eca
IJ
419 for it in spec.split(','):
420
421 if not len(it):
422 for df in dfs_less_detailed(last_df):
423 mutator(df)
424 continue
2e68eb10 425
9acb0eca
IJ
426 if it.startswith('-'):
427 mutator = debug_set.discard
428 it = it[1:]
429 else:
430 mutator = debug_set.add
431
432 try:
433 df = DBG.lookupByName(it)
434 except ValueError:
435 optparser.error('unknown debug flag %s in --debug-select' % it)
436 mutator(df)
437 last_df = df
438
439 optparser.add_option('-D', '--debug',
2e68eb10
IJ
440 nargs=0,
441 action='callback',
9acb0eca
IJ
442 help='enable default debug (to stdout)',
443 callback= ds_default)
444
445 optparser.add_option('--debug-select',
446 nargs=1,
447 type='string',
448 metavar='[-]DFLAG[,],...',
449 help=
450'''enable (or with -, disable) each specified DFLAG;
451empty entry means do the same for all DFLAGS "more interesting"
452than the last (or, all DFLAGs)''',
453 action='callback',
454 callback= ds_select)
2e68eb10
IJ
455
456 (opts, args) = optparser.parse_args()
457 if len(args): optparser.error('no non-option arguments please')
458
9acb0eca
IJ
459 print(repr(debug_set), file=sys.stderr)
460
2e68eb10
IJ
461 re = regexp.compile('#.*')
462 cfg.read_string(re.sub('', defcfg))
463 cfg.read(opts.configfile)
464
8c3b6620 465 log_formatter = twisted.logger.formatEventAsClassicLogText
389236df
IJ
466 stdout_obs = twisted.logger.FileLogObserver(sys.stdout, log_formatter)
467 stderr_obs = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
468 pred = twisted.logger.LogLevelFilterPredicate(LogLevel.error)
b83d422a 469 stdsomething_obs = twisted.logger.FilteringLogObserver(
389236df
IJ
470 stderr_obs, [pred], stdout_obs
471 )
b83d422a
IJ
472 log_observer = twisted.logger.FilteringLogObserver(
473 stdsomething_obs, [LogNotBoringTwisted()]
474 )
475 #log_observer = stdsomething_obs
8c3b6620
IJ
476 twisted.logger.globalLogBeginner.beginLoggingTo(
477 [ log_observer, crash_on_critical ]
478 )
ae7c7784 479
ae7c7784 480def common_run():
b68c0739
IJ
481 log_debug(DBG.INIT, 'entering reactor')
482 if not _crashing: reactor.run()
ae7c7784 483 print('CRASHED (end)', file=sys.stderr)