filter out twisted stuff
[hippotat] / hippotat / __init__.py
CommitLineData
b0cfbfce
IJ
1# -*- python -*-
2
37ab4cdc
IJ
3import signal
4signal.signal(signal.SIGINT, signal.SIG_DFL)
5
1321ad5f
IJ
6import sys
7
b83d422a
IJ
8from zope.interface import implementer
9
040ff511
IJ
10import twisted
11from twisted.internet import reactor
1d023c89 12import twisted.internet.endpoints
8c3b6620
IJ
13import twisted.logger
14from twisted.logger import LogLevel
15import twisted.python.constants
16from twisted.python.constants import NamedConstant
b0cfbfce
IJ
17
18import ipaddress
19from ipaddress import AddressValueError
20
ae7c7784
IJ
21from optparse import OptionParser
22from configparser import ConfigParser
23from configparser import NoOptionError
24
c13ee6e6
IJ
25from functools import partial
26
ae7c7784 27import collections
84e763c7 28import time
8c3b6620 29import codecs
eedc8b30 30import traceback
ae7c7784 31
1321ad5f
IJ
32import re as regexp
33
34import hippotat.slip as slip
35
d579a048 36class DBG(twisted.python.constants.Names):
380ed56c 37 INIT = NamedConstant()
d579a048 38 ROUTE = NamedConstant()
b68c0739 39 DROP = NamedConstant()
d579a048
IJ
40 FLOW = NamedConstant()
41 HTTP = NamedConstant()
380ed56c 42 TWISTED = NamedConstant()
d579a048 43 QUEUE = NamedConstant()
380ed56c 44 HTTP_CTRL = NamedConstant()
d579a048 45 QUEUE_CTRL = NamedConstant()
297b3ebf 46 HTTP_FULL = NamedConstant()
0accf0d3 47 CTRL_DUMP = NamedConstant()
380ed56c 48 SLIP_FULL = NamedConstant()
d579a048 49
b68c0739 50_hex_codec = codecs.getencoder('hex_codec')
8c3b6620 51
b83d422a
IJ
52#---------- logging ----------
53
54org_stderr = sys.stderr
55
8c3b6620
IJ
56log = twisted.logger.Logger()
57
3e35fc99
IJ
58debug_set = set([x for x in DBG.iterconstants() if x <= DBG.HTTP])
59
8c3b6620 60def log_debug(dflag, msg, idof=None, d=None):
3e35fc99 61 if dflag not in debug_set: return
e8fcf3b7 62 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
8c3b6620 63 if idof is not None:
e8ed0029 64 msg = '[%#x] %s' % (id(idof), msg)
8c3b6620 65 if d is not None:
ba5630fd 66 #d = d[0:64]
b68c0739 67 d = _hex_codec(d)[0].decode('ascii')
8c3b6620
IJ
68 msg += ' ' + d
69 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
70
b83d422a
IJ
71@implementer(twisted.logger.ILogFilterPredicate)
72class LogNotBoringTwisted:
73 def __call__(self, event):
74 yes = twisted.logger.PredicateResult.yes
75 no = twisted.logger.PredicateResult.no
76 try:
77 if event.get('log_level') != LogLevel.info:
78 return yes
79 try:
80 dflag = event.get('dflag')
81 except KeyError:
82 dflag = DBG.TWISTED
83 return yes if (dflag in debug_set) else no
84 except Exception:
85 print(traceback.format_exc(), file=org_stderr)
86 return yes
87
88#---------- default config ----------
89
ca732796
IJ
90defcfg = '''
91[DEFAULT]
92#[<client>] overrides
93max_batch_down = 65536 # used by server, subject to [limits]
94max_queue_time = 10 # used by server, subject to [limits]
ca732796 95target_requests_outstanding = 3 # must match; subject to [limits] on server
ba5630fd
IJ
96http_timeout = 30 # used by both } must be
97http_timeout_grace = 5 # used by both } compatible
ca732796
IJ
98max_requests_outstanding = 4 # used by client
99max_batch_up = 4000 # used by client
4edf77a3 100http_retry = 5 # used by client
ca732796
IJ
101
102#[server] or [<client>] overrides
103ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
104# extra interpolations: %(local)s %(peer)s %(rnet)s
105# obtained on server [virtual]server [virtual]relay [virtual]network
106# from on client <client> [virtual]server [virtual]routes
107
108[virtual]
109mtu = 1500
110routes = ''
111# network = <prefix>/<len> # mandatory for server
112# server = <ipaddr> # used by both, default is computed from `network'
113# relay = <ipaddr> # used by server, default from `network' and `server'
114# default server is first host in network
115# default relay is first host which is not server
116
117[server]
118# addrs = 127.0.0.1 ::1 # mandatory for server
119port = 80 # used by server
120# url # used by client; default from first `addrs' and `port'
121
122# [<client-ip4-or-ipv6-address>]
123# password = <password> # used by both, must match
124
125[limits]
126max_batch_down = 262144 # used by server
127max_queue_time = 121 # used by server
ba5630fd 128http_timeout = 121 # used by server
ca732796
IJ
129target_requests_outstanding = 10 # used by server
130'''
131
87a7c0c7 132# these need to be defined here so that they can be imported by import *
ae7c7784
IJ
133cfg = ConfigParser()
134optparser = OptionParser()
135
e4006ac4 136_mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
7b07f0b5
IJ
137def mime_translate(s):
138 # SLIP-encoded packets cannot contain ESC ESC.
139 # Swap `-' and ESC. The result cannot contain `--'
140 return s.translate(_mimetrans)
141
87a7c0c7
IJ
142class ConfigResults:
143 def __init__(self, d = { }):
144 self.__dict__ = d
145 def __repr__(self):
146 return 'ConfigResults('+repr(self.__dict__)+')'
147
148c = ConfigResults()
149
a8827d59 150def log_discard(packet, iface, saddr, daddr, why):
b68c0739 151 log_debug(DBG.DROP,
a8827d59 152 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
b68c0739 153 d=packet)
1321ad5f 154
b0cfbfce
IJ
155#---------- packet parsing ----------
156
157def packet_addrs(packet):
158 version = packet[0] >> 4
159 if version == 4:
160 addrlen = 4
161 saddroff = 3*4
162 factory = ipaddress.IPv4Address
163 elif version == 6:
164 addrlen = 16
165 saddroff = 2*4
166 factory = ipaddress.IPv6Address
167 else:
168 raise ValueError('unsupported IP version %d' % version)
169 saddr = factory(packet[ saddroff : saddroff + addrlen ])
170 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
171 return (saddr, daddr)
172
173#---------- address handling ----------
174
175def ipaddr(input):
176 try:
177 r = ipaddress.IPv4Address(input)
178 except AddressValueError:
179 r = ipaddress.IPv6Address(input)
180 return r
181
182def ipnetwork(input):
183 try:
184 r = ipaddress.IPv4Network(input)
185 except NetworkValueError:
186 r = ipaddress.IPv6Network(input)
187 return r
040ff511
IJ
188
189#---------- ipif (SLIP) subprocess ----------
190
a95cfeb2 191class SlipStreamDecoder():
db6ba584 192 def __init__(self, desc, on_packet):
040ff511 193 self._buffer = b''
a95cfeb2 194 self._on_packet = on_packet
db6ba584
IJ
195 self._desc = desc
196 self._log('__init__')
197
198 def _log(self, msg, **kwargs):
3297cac1 199 log_debug(DBG.SLIP_FULL, 'slip %s: %s' % (self._desc, msg), **kwargs)
a95cfeb2
IJ
200
201 def inputdata(self, data):
db6ba584 202 self._log('inputdata', d=data)
7a68893f 203 packets = slip.decode(data)
ba5630fd 204 packets[0] = self._buffer + packets[0]
040ff511
IJ
205 self._buffer = packets.pop()
206 for packet in packets:
a95cfeb2 207 self._maybe_packet(packet)
54890d4d 208 self._log('bufremain', d=self._buffer)
a95cfeb2
IJ
209
210 def _maybe_packet(self, packet):
54890d4d 211 self._log('maybepacket', d=packet)
db6ba584
IJ
212 if len(packet):
213 self._on_packet(packet)
a95cfeb2 214
4f991c0c 215 def flush(self):
54890d4d 216 self._log('flush')
a95cfeb2
IJ
217 self._maybe_packet(self._buffer)
218 self._buffer = b''
4f991c0c 219
e4006ac4 220class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
4f991c0c
IJ
221 def __init__(self, router):
222 self._router = router
db6ba584 223 self._decoder = SlipStreamDecoder('ipif', self.slip_on_packet)
a95cfeb2
IJ
224 def connectionMade(self): pass
225 def outReceived(self, data):
226 self._decoder.inputdata(data)
227 def slip_on_packet(self, packet):
4f991c0c
IJ
228 (saddr, daddr) = packet_addrs(packet)
229 if saddr.is_link_local or daddr.is_link_local:
a8827d59 230 log_discard(packet, 'ipif', saddr, daddr, 'link-local')
4f991c0c
IJ
231 return
232 self._router(packet, saddr, daddr)
040ff511
IJ
233 def processEnded(self, status):
234 status.raiseException()
235
236def start_ipif(command, router):
237 global ipif
238 ipif = _IpifProcessProtocol(router)
239 reactor.spawnProcess(ipif,
240 '/bin/sh',['sh','-xc', command],
ff613365
IJ
241 childFDs={0:'w', 1:'r', 2:2},
242 env=None)
040ff511
IJ
243
244def queue_inbound(packet):
15407d80 245 log_debug(DBG.FLOW, "queue_inbound", d=packet)
040ff511
IJ
246 ipif.transport.write(slip.delimiter)
247 ipif.transport.write(slip.encode(packet))
248 ipif.transport.write(slip.delimiter)
249
650a3251
IJ
250#---------- packet queue ----------
251
252class PacketQueue():
d579a048
IJ
253 def __init__(self, desc, max_queue_time):
254 self._desc = desc
8718b02c 255 assert(desc + '')
650a3251
IJ
256 self._max_queue_time = max_queue_time
257 self._pq = collections.deque() # packets
258
b68c0739 259 def _log(self, dflag, msg, **kwargs):
8c3b6620 260 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
d579a048 261
650a3251 262 def append(self, packet):
8c3b6620 263 self._log(DBG.QUEUE, 'append', d=packet)
650a3251
IJ
264 self._pq.append((time.monotonic(), packet))
265
266 def nonempty(self):
8c3b6620 267 self._log(DBG.QUEUE, 'nonempty ?')
650a3251
IJ
268 while True:
269 try: (queuetime, packet) = self._pq[0]
8c3b6620
IJ
270 except IndexError:
271 self._log(DBG.QUEUE, 'nonempty ? empty.')
272 return False
650a3251
IJ
273
274 age = time.monotonic() - queuetime
84e763c7 275 if age > self._max_queue_time:
650a3251 276 # strip old packets off the front
8c3b6620 277 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
650a3251
IJ
278 self._pq.popleft()
279 continue
280
8c3b6620 281 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
650a3251
IJ
282 return True
283
7b07f0b5
IJ
284 def process(self, sizequery, moredata, max_batch):
285 # sizequery() should return size of batch so far
286 # moredata(s) should add s to batch
8c3b6620 287 self._log(DBG.QUEUE, 'process...')
7b07f0b5
IJ
288 while True:
289 try: (dummy, packet) = self._pq[0]
8c3b6620
IJ
290 except IndexError:
291 self._log(DBG.QUEUE, 'process... empty')
292 break
293
294 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
7b07f0b5
IJ
295
296 encoded = slip.encode(packet)
297 sofar = sizequery()
298
8c3b6620
IJ
299 self._log(DBG.QUEUE_CTRL,
300 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
b68c0739 301 d=encoded)
8c3b6620 302
7b07f0b5
IJ
303 if sofar > 0:
304 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
8c3b6620 305 self._log(DBG.QUEUE_CTRL, 'process... overflow')
7b07f0b5
IJ
306 break
307 moredata(slip.delimiter)
308
309 moredata(encoded)
84e763c7 310 self._pq.popleft()
ae7c7784
IJ
311
312#---------- error handling ----------
313
b68c0739
IJ
314_crashing = False
315
ae7c7784 316def crash(err):
b68c0739
IJ
317 global _crashing
318 _crashing = True
e8ed0029
IJ
319 print('========== CRASH ==========', err,
320 '===========================', file=sys.stderr)
ae7c7784
IJ
321 try: reactor.stop()
322 except twisted.internet.error.ReactorNotRunning: pass
323
324def crash_on_defer(defer):
325 defer.addErrback(lambda err: crash(err))
326
e4006ac4 327def crash_on_critical(event):
ae7c7784
IJ
328 if event.get('log_level') >= LogLevel.critical:
329 crash(twisted.logger.formatEvent(event))
330
87a7c0c7
IJ
331#---------- config processing ----------
332
333def process_cfg_common_always():
334 global mtu
335 c.mtu = cfg.get('virtual','mtu')
336
88487243
IJ
337def process_cfg_ipif(section, varmap):
338 for d, s in varmap:
339 try: v = getattr(c, s)
034284c3 340 except AttributeError: continue
88487243
IJ
341 setattr(c, d, v)
342
b68c0739 343 #print(repr(c))
88487243
IJ
344
345 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
346
347def process_cfg_network():
348 c.network = ipnetwork(cfg.get('virtual','network'))
349 if c.network.num_addresses < 3 + 2:
350 raise ValueError('network needs at least 2^3 addresses')
351
352def process_cfg_server():
353 try:
354 c.server = cfg.get('virtual','server')
355 except NoOptionError:
356 process_cfg_network()
357 c.server = next(c.network.hosts())
358
359class ServerAddr():
360 def __init__(self, port, addrspec):
361 self.port = port
362 # also self.addr
363 try:
364 self.addr = ipaddress.IPv4Address(addrspec)
365 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
84e763c7 366 self._inurl = b'%s'
88487243
IJ
367 except AddressValueError:
368 self.addr = ipaddress.IPv6Address(addrspec)
369 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
84e763c7 370 self._inurl = b'[%s]'
88487243
IJ
371 def make_endpoint(self):
372 return self._endpointfactory(reactor, self.port, self.addr)
373 def url(self):
84e763c7
IJ
374 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
375 if self.port != 80: url += b':%d' % self.port
376 url += b'/'
88487243
IJ
377 return url
378
379def process_cfg_saddrs():
1d023c89
IJ
380 try: port = cfg.getint('server','port')
381 except NoOptionError: port = 80
88487243
IJ
382
383 c.saddrs = [ ]
384 for addrspec in cfg.get('server','addrs').split():
385 sa = ServerAddr(port, addrspec)
386 c.saddrs.append(sa)
387
388def process_cfg_clients(constructor):
389 c.clients = [ ]
390 for cs in cfg.sections():
391 if not (':' in cs or '.' in cs): continue
392 ci = ipaddr(cs)
393 pw = cfg.get(cs, 'password')
84e763c7 394 pw = pw.encode('utf-8')
88487243
IJ
395 constructor(ci,cs,pw)
396
ae7c7784
IJ
397#---------- startup ----------
398
1321ad5f 399def common_startup():
8c3b6620 400 log_formatter = twisted.logger.formatEventAsClassicLogText
389236df
IJ
401 stdout_obs = twisted.logger.FileLogObserver(sys.stdout, log_formatter)
402 stderr_obs = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
403 pred = twisted.logger.LogLevelFilterPredicate(LogLevel.error)
b83d422a 404 stdsomething_obs = twisted.logger.FilteringLogObserver(
389236df
IJ
405 stderr_obs, [pred], stdout_obs
406 )
b83d422a
IJ
407 log_observer = twisted.logger.FilteringLogObserver(
408 stdsomething_obs, [LogNotBoringTwisted()]
409 )
410 #log_observer = stdsomething_obs
8c3b6620
IJ
411 twisted.logger.globalLogBeginner.beginLoggingTo(
412 [ log_observer, crash_on_critical ]
413 )
ae7c7784
IJ
414
415 optparser.add_option('-c', '--config', dest='configfile',
416 default='/etc/hippotat/config')
417 (opts, args) = optparser.parse_args()
418 if len(args): optparser.error('no non-option arguments please')
419
1321ad5f
IJ
420 re = regexp.compile('#.*')
421 cfg.read_string(re.sub('', defcfg))
ae7c7784
IJ
422 cfg.read(opts.configfile)
423
424def common_run():
b68c0739
IJ
425 log_debug(DBG.INIT, 'entering reactor')
426 if not _crashing: reactor.run()
ae7c7784 427 print('CRASHED (end)', file=sys.stderr)