before reorg debug opts again
[hippotat] / hippotat / __init__.py
CommitLineData
b0cfbfce
IJ
1# -*- python -*-
2
37ab4cdc
IJ
3import signal
4signal.signal(signal.SIGINT, signal.SIG_DFL)
5
1321ad5f
IJ
6import sys
7
b83d422a
IJ
8from zope.interface import implementer
9
040ff511
IJ
10import twisted
11from twisted.internet import reactor
1d023c89 12import twisted.internet.endpoints
8c3b6620
IJ
13import twisted.logger
14from twisted.logger import LogLevel
15import twisted.python.constants
16from twisted.python.constants import NamedConstant
b0cfbfce
IJ
17
18import ipaddress
19from ipaddress import AddressValueError
20
ae7c7784
IJ
21from optparse import OptionParser
22from configparser import ConfigParser
23from configparser import NoOptionError
24
c13ee6e6
IJ
25from functools import partial
26
ae7c7784 27import collections
84e763c7 28import time
8c3b6620 29import codecs
eedc8b30 30import traceback
ae7c7784 31
1321ad5f
IJ
32import re as regexp
33
34import hippotat.slip as slip
35
d579a048 36class DBG(twisted.python.constants.Names):
380ed56c 37 INIT = NamedConstant()
d579a048 38 ROUTE = NamedConstant()
b68c0739 39 DROP = NamedConstant()
d579a048
IJ
40 FLOW = NamedConstant()
41 HTTP = NamedConstant()
380ed56c 42 TWISTED = NamedConstant()
d579a048 43 QUEUE = NamedConstant()
380ed56c 44 HTTP_CTRL = NamedConstant()
d579a048 45 QUEUE_CTRL = NamedConstant()
297b3ebf 46 HTTP_FULL = NamedConstant()
0accf0d3 47 CTRL_DUMP = NamedConstant()
380ed56c 48 SLIP_FULL = NamedConstant()
d579a048 49
b68c0739 50_hex_codec = codecs.getencoder('hex_codec')
8c3b6620 51
b83d422a
IJ
52#---------- logging ----------
53
54org_stderr = sys.stderr
55
8c3b6620
IJ
56log = twisted.logger.Logger()
57
2e68eb10
IJ
58debug_set = set()
59debug_def_detail = DBG.HTTP
3e35fc99 60
8c3b6620 61def log_debug(dflag, msg, idof=None, d=None):
3e35fc99 62 if dflag not in debug_set: return
e8fcf3b7 63 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
8c3b6620 64 if idof is not None:
e8ed0029 65 msg = '[%#x] %s' % (id(idof), msg)
8c3b6620 66 if d is not None:
ba5630fd 67 #d = d[0:64]
b68c0739 68 d = _hex_codec(d)[0].decode('ascii')
8c3b6620
IJ
69 msg += ' ' + d
70 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
71
b83d422a
IJ
72@implementer(twisted.logger.ILogFilterPredicate)
73class LogNotBoringTwisted:
74 def __call__(self, event):
75 yes = twisted.logger.PredicateResult.yes
76 no = twisted.logger.PredicateResult.no
2e68eb10 77 return yes
b83d422a
IJ
78 try:
79 if event.get('log_level') != LogLevel.info:
80 return yes
81 try:
82 dflag = event.get('dflag')
83 except KeyError:
84 dflag = DBG.TWISTED
85 return yes if (dflag in debug_set) else no
86 except Exception:
87 print(traceback.format_exc(), file=org_stderr)
88 return yes
89
90#---------- default config ----------
91
ca732796
IJ
92defcfg = '''
93[DEFAULT]
94#[<client>] overrides
95max_batch_down = 65536 # used by server, subject to [limits]
96max_queue_time = 10 # used by server, subject to [limits]
ca732796 97target_requests_outstanding = 3 # must match; subject to [limits] on server
ba5630fd
IJ
98http_timeout = 30 # used by both } must be
99http_timeout_grace = 5 # used by both } compatible
ca732796
IJ
100max_requests_outstanding = 4 # used by client
101max_batch_up = 4000 # used by client
4edf77a3 102http_retry = 5 # used by client
ca732796
IJ
103
104#[server] or [<client>] overrides
105ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
106# extra interpolations: %(local)s %(peer)s %(rnet)s
107# obtained on server [virtual]server [virtual]relay [virtual]network
108# from on client <client> [virtual]server [virtual]routes
109
110[virtual]
111mtu = 1500
112routes = ''
113# network = <prefix>/<len> # mandatory for server
114# server = <ipaddr> # used by both, default is computed from `network'
115# relay = <ipaddr> # used by server, default from `network' and `server'
116# default server is first host in network
117# default relay is first host which is not server
118
119[server]
120# addrs = 127.0.0.1 ::1 # mandatory for server
121port = 80 # used by server
122# url # used by client; default from first `addrs' and `port'
123
124# [<client-ip4-or-ipv6-address>]
125# password = <password> # used by both, must match
126
127[limits]
128max_batch_down = 262144 # used by server
129max_queue_time = 121 # used by server
ba5630fd 130http_timeout = 121 # used by server
ca732796
IJ
131target_requests_outstanding = 10 # used by server
132'''
133
87a7c0c7 134# these need to be defined here so that they can be imported by import *
ae7c7784
IJ
135cfg = ConfigParser()
136optparser = OptionParser()
137
e4006ac4 138_mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
7b07f0b5
IJ
139def mime_translate(s):
140 # SLIP-encoded packets cannot contain ESC ESC.
141 # Swap `-' and ESC. The result cannot contain `--'
142 return s.translate(_mimetrans)
143
87a7c0c7
IJ
144class ConfigResults:
145 def __init__(self, d = { }):
146 self.__dict__ = d
147 def __repr__(self):
148 return 'ConfigResults('+repr(self.__dict__)+')'
149
150c = ConfigResults()
151
a8827d59 152def log_discard(packet, iface, saddr, daddr, why):
b68c0739 153 log_debug(DBG.DROP,
a8827d59 154 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
b68c0739 155 d=packet)
1321ad5f 156
b0cfbfce
IJ
157#---------- packet parsing ----------
158
159def packet_addrs(packet):
160 version = packet[0] >> 4
161 if version == 4:
162 addrlen = 4
163 saddroff = 3*4
164 factory = ipaddress.IPv4Address
165 elif version == 6:
166 addrlen = 16
167 saddroff = 2*4
168 factory = ipaddress.IPv6Address
169 else:
170 raise ValueError('unsupported IP version %d' % version)
171 saddr = factory(packet[ saddroff : saddroff + addrlen ])
172 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
173 return (saddr, daddr)
174
175#---------- address handling ----------
176
177def ipaddr(input):
178 try:
179 r = ipaddress.IPv4Address(input)
180 except AddressValueError:
181 r = ipaddress.IPv6Address(input)
182 return r
183
184def ipnetwork(input):
185 try:
186 r = ipaddress.IPv4Network(input)
187 except NetworkValueError:
188 r = ipaddress.IPv6Network(input)
189 return r
040ff511
IJ
190
191#---------- ipif (SLIP) subprocess ----------
192
a95cfeb2 193class SlipStreamDecoder():
db6ba584 194 def __init__(self, desc, on_packet):
040ff511 195 self._buffer = b''
a95cfeb2 196 self._on_packet = on_packet
db6ba584
IJ
197 self._desc = desc
198 self._log('__init__')
199
200 def _log(self, msg, **kwargs):
3297cac1 201 log_debug(DBG.SLIP_FULL, 'slip %s: %s' % (self._desc, msg), **kwargs)
a95cfeb2
IJ
202
203 def inputdata(self, data):
db6ba584 204 self._log('inputdata', d=data)
7a68893f 205 packets = slip.decode(data)
ba5630fd 206 packets[0] = self._buffer + packets[0]
040ff511
IJ
207 self._buffer = packets.pop()
208 for packet in packets:
a95cfeb2 209 self._maybe_packet(packet)
54890d4d 210 self._log('bufremain', d=self._buffer)
a95cfeb2
IJ
211
212 def _maybe_packet(self, packet):
54890d4d 213 self._log('maybepacket', d=packet)
db6ba584
IJ
214 if len(packet):
215 self._on_packet(packet)
a95cfeb2 216
4f991c0c 217 def flush(self):
54890d4d 218 self._log('flush')
a95cfeb2
IJ
219 self._maybe_packet(self._buffer)
220 self._buffer = b''
4f991c0c 221
e4006ac4 222class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
4f991c0c
IJ
223 def __init__(self, router):
224 self._router = router
db6ba584 225 self._decoder = SlipStreamDecoder('ipif', self.slip_on_packet)
a95cfeb2
IJ
226 def connectionMade(self): pass
227 def outReceived(self, data):
228 self._decoder.inputdata(data)
229 def slip_on_packet(self, packet):
4f991c0c
IJ
230 (saddr, daddr) = packet_addrs(packet)
231 if saddr.is_link_local or daddr.is_link_local:
a8827d59 232 log_discard(packet, 'ipif', saddr, daddr, 'link-local')
4f991c0c
IJ
233 return
234 self._router(packet, saddr, daddr)
040ff511
IJ
235 def processEnded(self, status):
236 status.raiseException()
237
238def start_ipif(command, router):
239 global ipif
240 ipif = _IpifProcessProtocol(router)
241 reactor.spawnProcess(ipif,
242 '/bin/sh',['sh','-xc', command],
ff613365
IJ
243 childFDs={0:'w', 1:'r', 2:2},
244 env=None)
040ff511
IJ
245
246def queue_inbound(packet):
15407d80 247 log_debug(DBG.FLOW, "queue_inbound", d=packet)
040ff511
IJ
248 ipif.transport.write(slip.delimiter)
249 ipif.transport.write(slip.encode(packet))
250 ipif.transport.write(slip.delimiter)
251
650a3251
IJ
252#---------- packet queue ----------
253
254class PacketQueue():
d579a048
IJ
255 def __init__(self, desc, max_queue_time):
256 self._desc = desc
8718b02c 257 assert(desc + '')
650a3251
IJ
258 self._max_queue_time = max_queue_time
259 self._pq = collections.deque() # packets
260
b68c0739 261 def _log(self, dflag, msg, **kwargs):
8c3b6620 262 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
d579a048 263
650a3251 264 def append(self, packet):
8c3b6620 265 self._log(DBG.QUEUE, 'append', d=packet)
650a3251
IJ
266 self._pq.append((time.monotonic(), packet))
267
268 def nonempty(self):
8c3b6620 269 self._log(DBG.QUEUE, 'nonempty ?')
650a3251
IJ
270 while True:
271 try: (queuetime, packet) = self._pq[0]
8c3b6620
IJ
272 except IndexError:
273 self._log(DBG.QUEUE, 'nonempty ? empty.')
274 return False
650a3251
IJ
275
276 age = time.monotonic() - queuetime
84e763c7 277 if age > self._max_queue_time:
650a3251 278 # strip old packets off the front
8c3b6620 279 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
650a3251
IJ
280 self._pq.popleft()
281 continue
282
8c3b6620 283 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
650a3251
IJ
284 return True
285
7b07f0b5
IJ
286 def process(self, sizequery, moredata, max_batch):
287 # sizequery() should return size of batch so far
288 # moredata(s) should add s to batch
8c3b6620 289 self._log(DBG.QUEUE, 'process...')
7b07f0b5
IJ
290 while True:
291 try: (dummy, packet) = self._pq[0]
8c3b6620
IJ
292 except IndexError:
293 self._log(DBG.QUEUE, 'process... empty')
294 break
295
296 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
7b07f0b5
IJ
297
298 encoded = slip.encode(packet)
299 sofar = sizequery()
300
8c3b6620
IJ
301 self._log(DBG.QUEUE_CTRL,
302 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
b68c0739 303 d=encoded)
8c3b6620 304
7b07f0b5
IJ
305 if sofar > 0:
306 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
8c3b6620 307 self._log(DBG.QUEUE_CTRL, 'process... overflow')
7b07f0b5
IJ
308 break
309 moredata(slip.delimiter)
310
311 moredata(encoded)
84e763c7 312 self._pq.popleft()
ae7c7784
IJ
313
314#---------- error handling ----------
315
b68c0739
IJ
316_crashing = False
317
ae7c7784 318def crash(err):
b68c0739
IJ
319 global _crashing
320 _crashing = True
e8ed0029
IJ
321 print('========== CRASH ==========', err,
322 '===========================', file=sys.stderr)
ae7c7784
IJ
323 try: reactor.stop()
324 except twisted.internet.error.ReactorNotRunning: pass
325
326def crash_on_defer(defer):
327 defer.addErrback(lambda err: crash(err))
328
e4006ac4 329def crash_on_critical(event):
ae7c7784
IJ
330 if event.get('log_level') >= LogLevel.critical:
331 crash(twisted.logger.formatEvent(event))
332
87a7c0c7
IJ
333#---------- config processing ----------
334
335def process_cfg_common_always():
336 global mtu
337 c.mtu = cfg.get('virtual','mtu')
338
88487243
IJ
339def process_cfg_ipif(section, varmap):
340 for d, s in varmap:
341 try: v = getattr(c, s)
034284c3 342 except AttributeError: continue
88487243
IJ
343 setattr(c, d, v)
344
b68c0739 345 #print(repr(c))
88487243
IJ
346
347 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
348
349def process_cfg_network():
350 c.network = ipnetwork(cfg.get('virtual','network'))
351 if c.network.num_addresses < 3 + 2:
352 raise ValueError('network needs at least 2^3 addresses')
353
354def process_cfg_server():
355 try:
356 c.server = cfg.get('virtual','server')
357 except NoOptionError:
358 process_cfg_network()
359 c.server = next(c.network.hosts())
360
361class ServerAddr():
362 def __init__(self, port, addrspec):
363 self.port = port
364 # also self.addr
365 try:
366 self.addr = ipaddress.IPv4Address(addrspec)
367 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
84e763c7 368 self._inurl = b'%s'
88487243
IJ
369 except AddressValueError:
370 self.addr = ipaddress.IPv6Address(addrspec)
371 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
84e763c7 372 self._inurl = b'[%s]'
88487243
IJ
373 def make_endpoint(self):
374 return self._endpointfactory(reactor, self.port, self.addr)
375 def url(self):
84e763c7
IJ
376 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
377 if self.port != 80: url += b':%d' % self.port
378 url += b'/'
88487243
IJ
379 return url
380
381def process_cfg_saddrs():
1d023c89
IJ
382 try: port = cfg.getint('server','port')
383 except NoOptionError: port = 80
88487243
IJ
384
385 c.saddrs = [ ]
386 for addrspec in cfg.get('server','addrs').split():
387 sa = ServerAddr(port, addrspec)
388 c.saddrs.append(sa)
389
390def process_cfg_clients(constructor):
391 c.clients = [ ]
392 for cs in cfg.sections():
393 if not (':' in cs or '.' in cs): continue
394 ci = ipaddr(cs)
395 pw = cfg.get(cs, 'password')
84e763c7 396 pw = pw.encode('utf-8')
88487243
IJ
397 constructor(ci,cs,pw)
398
ae7c7784
IJ
399#---------- startup ----------
400
1321ad5f 401def common_startup():
2e68eb10
IJ
402 optparser.add_option('-c', '--config', dest='configfile',
403 default='/etc/hippotat/config')
404
405 def ds_by_detail(od,os,detail_level,op):
406 global debug_set
407 debug_set = set([df for df in DBG.iterconstants() if df <= detail_level])
408
409 def ds_one(mutator,df, od,os,value,op):
410 mutator(df)
411
412 optparser.add_option('-D', '--debug',
413 default=debug_def_detail.name,
414 type='choice',
415 choices=[dl.name for dl in DBG.iterconstants()],
416 action='callback',
417 callback= ds_by_detail)
418
419 optparser.add_option('--no-debug',
420 nargs=0,
421 action='callback',
422 callback= partial(ds_by_detail,DBG.INIT))
423
424 for df in DBG.iterconstants():
425 optparser.add_option('--debug-'+df.name,
426 action='callback',
427 callback= partial(ds_one, debug_set.add, df))
428 optparser.add_option('--no-debug-'+df.name,
429 action='callback',
430 callback= partial(ds_one, debug_set.discard, df))
431
432 (opts, args) = optparser.parse_args()
433 if len(args): optparser.error('no non-option arguments please')
434
435 re = regexp.compile('#.*')
436 cfg.read_string(re.sub('', defcfg))
437 cfg.read(opts.configfile)
438
8c3b6620 439 log_formatter = twisted.logger.formatEventAsClassicLogText
389236df
IJ
440 stdout_obs = twisted.logger.FileLogObserver(sys.stdout, log_formatter)
441 stderr_obs = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
442 pred = twisted.logger.LogLevelFilterPredicate(LogLevel.error)
b83d422a 443 stdsomething_obs = twisted.logger.FilteringLogObserver(
389236df
IJ
444 stderr_obs, [pred], stdout_obs
445 )
b83d422a
IJ
446 log_observer = twisted.logger.FilteringLogObserver(
447 stdsomething_obs, [LogNotBoringTwisted()]
448 )
449 #log_observer = stdsomething_obs
8c3b6620
IJ
450 twisted.logger.globalLogBeginner.beginLoggingTo(
451 [ log_observer, crash_on_critical ]
452 )
ae7c7784 453
ae7c7784 454def common_run():
b68c0739
IJ
455 log_debug(DBG.INIT, 'entering reactor')
456 if not _crashing: reactor.run()
ae7c7784 457 print('CRASHED (end)', file=sys.stderr)