less absurd
[hippotat] / hippotat / __init__.py
CommitLineData
b0cfbfce
IJ
1# -*- python -*-
2
37ab4cdc
IJ
3import signal
4signal.signal(signal.SIGINT, signal.SIG_DFL)
5
1321ad5f
IJ
6import sys
7
040ff511
IJ
8import twisted
9from twisted.internet import reactor
1d023c89 10import twisted.internet.endpoints
8c3b6620
IJ
11import twisted.logger
12from twisted.logger import LogLevel
13import twisted.python.constants
14from twisted.python.constants import NamedConstant
b0cfbfce
IJ
15
16import ipaddress
17from ipaddress import AddressValueError
18
ae7c7784
IJ
19from optparse import OptionParser
20from configparser import ConfigParser
21from configparser import NoOptionError
22
23import collections
84e763c7 24import time
8c3b6620 25import codecs
eedc8b30 26import traceback
ae7c7784 27
1321ad5f
IJ
28import re as regexp
29
30import hippotat.slip as slip
31
d579a048
IJ
32class DBG(twisted.python.constants.Names):
33 ROUTE = NamedConstant()
b68c0739 34 DROP = NamedConstant()
d579a048
IJ
35 FLOW = NamedConstant()
36 HTTP = NamedConstant()
37 HTTP_CTRL = NamedConstant()
38 INIT = NamedConstant()
39 QUEUE = NamedConstant()
40 QUEUE_CTRL = NamedConstant()
297b3ebf 41 HTTP_FULL = NamedConstant()
db6ba584 42 SLIP_FULL = NamedConstant()
0accf0d3 43 CTRL_DUMP = NamedConstant()
d579a048 44
b68c0739 45_hex_codec = codecs.getencoder('hex_codec')
8c3b6620
IJ
46
47log = twisted.logger.Logger()
48
49def log_debug(dflag, msg, idof=None, d=None):
e8fcf3b7 50 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
8c3b6620 51 if idof is not None:
e8ed0029 52 msg = '[%#x] %s' % (id(idof), msg)
8c3b6620 53 if d is not None:
ca386ea2 54 d = d[0:64]
b68c0739 55 d = _hex_codec(d)[0].decode('ascii')
8c3b6620
IJ
56 msg += ' ' + d
57 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
58
ca732796
IJ
59defcfg = '''
60[DEFAULT]
61#[<client>] overrides
62max_batch_down = 65536 # used by server, subject to [limits]
63max_queue_time = 10 # used by server, subject to [limits]
64max_request_time = 54 # used by server, subject to [limits]
65target_requests_outstanding = 3 # must match; subject to [limits] on server
66max_requests_outstanding = 4 # used by client
67max_batch_up = 4000 # used by client
7b07f0b5 68http_timeout = 30 # used by client
4edf77a3 69http_retry = 5 # used by client
ca732796
IJ
70
71#[server] or [<client>] overrides
72ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
73# extra interpolations: %(local)s %(peer)s %(rnet)s
74# obtained on server [virtual]server [virtual]relay [virtual]network
75# from on client <client> [virtual]server [virtual]routes
76
77[virtual]
78mtu = 1500
79routes = ''
80# network = <prefix>/<len> # mandatory for server
81# server = <ipaddr> # used by both, default is computed from `network'
82# relay = <ipaddr> # used by server, default from `network' and `server'
83# default server is first host in network
84# default relay is first host which is not server
85
86[server]
87# addrs = 127.0.0.1 ::1 # mandatory for server
88port = 80 # used by server
89# url # used by client; default from first `addrs' and `port'
90
91# [<client-ip4-or-ipv6-address>]
92# password = <password> # used by both, must match
93
94[limits]
95max_batch_down = 262144 # used by server
96max_queue_time = 121 # used by server
97max_request_time = 121 # used by server
98target_requests_outstanding = 10 # used by server
99'''
100
87a7c0c7 101# these need to be defined here so that they can be imported by import *
ae7c7784
IJ
102cfg = ConfigParser()
103optparser = OptionParser()
104
e4006ac4 105_mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
7b07f0b5
IJ
106def mime_translate(s):
107 # SLIP-encoded packets cannot contain ESC ESC.
108 # Swap `-' and ESC. The result cannot contain `--'
109 return s.translate(_mimetrans)
110
87a7c0c7
IJ
111class ConfigResults:
112 def __init__(self, d = { }):
113 self.__dict__ = d
114 def __repr__(self):
115 return 'ConfigResults('+repr(self.__dict__)+')'
116
117c = ConfigResults()
118
a8827d59 119def log_discard(packet, iface, saddr, daddr, why):
b68c0739 120 log_debug(DBG.DROP,
a8827d59 121 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
b68c0739 122 d=packet)
1321ad5f 123
b0cfbfce
IJ
124#---------- packet parsing ----------
125
126def packet_addrs(packet):
127 version = packet[0] >> 4
128 if version == 4:
129 addrlen = 4
130 saddroff = 3*4
131 factory = ipaddress.IPv4Address
132 elif version == 6:
133 addrlen = 16
134 saddroff = 2*4
135 factory = ipaddress.IPv6Address
136 else:
137 raise ValueError('unsupported IP version %d' % version)
138 saddr = factory(packet[ saddroff : saddroff + addrlen ])
139 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
140 return (saddr, daddr)
141
142#---------- address handling ----------
143
144def ipaddr(input):
145 try:
146 r = ipaddress.IPv4Address(input)
147 except AddressValueError:
148 r = ipaddress.IPv6Address(input)
149 return r
150
151def ipnetwork(input):
152 try:
153 r = ipaddress.IPv4Network(input)
154 except NetworkValueError:
155 r = ipaddress.IPv6Network(input)
156 return r
040ff511
IJ
157
158#---------- ipif (SLIP) subprocess ----------
159
a95cfeb2 160class SlipStreamDecoder():
db6ba584 161 def __init__(self, desc, on_packet):
040ff511 162 self._buffer = b''
a95cfeb2 163 self._on_packet = on_packet
db6ba584
IJ
164 self._desc = desc
165 self._log('__init__')
166
167 def _log(self, msg, **kwargs):
3297cac1 168 log_debug(DBG.SLIP_FULL, 'slip %s: %s' % (self._desc, msg), **kwargs)
a95cfeb2
IJ
169
170 def inputdata(self, data):
db6ba584 171 self._log('inputdata', d=data)
7a68893f
IJ
172 data = self._buffer + data
173 self._buffer = b''
174 packets = slip.decode(data)
040ff511
IJ
175 self._buffer = packets.pop()
176 for packet in packets:
a95cfeb2 177 self._maybe_packet(packet)
54890d4d 178 self._log('bufremain', d=self._buffer)
a95cfeb2
IJ
179
180 def _maybe_packet(self, packet):
54890d4d 181 self._log('maybepacket', d=packet)
db6ba584
IJ
182 if len(packet):
183 self._on_packet(packet)
a95cfeb2 184
4f991c0c 185 def flush(self):
54890d4d 186 self._log('flush')
a95cfeb2
IJ
187 self._maybe_packet(self._buffer)
188 self._buffer = b''
4f991c0c 189
e4006ac4 190class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
4f991c0c
IJ
191 def __init__(self, router):
192 self._router = router
db6ba584 193 self._decoder = SlipStreamDecoder('ipif', self.slip_on_packet)
a95cfeb2
IJ
194 def connectionMade(self): pass
195 def outReceived(self, data):
196 self._decoder.inputdata(data)
197 def slip_on_packet(self, packet):
4f991c0c
IJ
198 (saddr, daddr) = packet_addrs(packet)
199 if saddr.is_link_local or daddr.is_link_local:
a8827d59 200 log_discard(packet, 'ipif', saddr, daddr, 'link-local')
4f991c0c
IJ
201 return
202 self._router(packet, saddr, daddr)
040ff511
IJ
203 def processEnded(self, status):
204 status.raiseException()
205
206def start_ipif(command, router):
207 global ipif
208 ipif = _IpifProcessProtocol(router)
209 reactor.spawnProcess(ipif,
210 '/bin/sh',['sh','-xc', command],
ff613365
IJ
211 childFDs={0:'w', 1:'r', 2:2},
212 env=None)
040ff511
IJ
213
214def queue_inbound(packet):
15407d80 215 log_debug(DBG.FLOW, "queue_inbound", d=packet)
040ff511
IJ
216 ipif.transport.write(slip.delimiter)
217 ipif.transport.write(slip.encode(packet))
218 ipif.transport.write(slip.delimiter)
219
650a3251
IJ
220#---------- packet queue ----------
221
222class PacketQueue():
d579a048
IJ
223 def __init__(self, desc, max_queue_time):
224 self._desc = desc
8718b02c 225 assert(desc + '')
650a3251
IJ
226 self._max_queue_time = max_queue_time
227 self._pq = collections.deque() # packets
228
b68c0739 229 def _log(self, dflag, msg, **kwargs):
8c3b6620 230 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
d579a048 231
650a3251 232 def append(self, packet):
8c3b6620 233 self._log(DBG.QUEUE, 'append', d=packet)
650a3251
IJ
234 self._pq.append((time.monotonic(), packet))
235
236 def nonempty(self):
8c3b6620 237 self._log(DBG.QUEUE, 'nonempty ?')
650a3251
IJ
238 while True:
239 try: (queuetime, packet) = self._pq[0]
8c3b6620
IJ
240 except IndexError:
241 self._log(DBG.QUEUE, 'nonempty ? empty.')
242 return False
650a3251
IJ
243
244 age = time.monotonic() - queuetime
84e763c7 245 if age > self._max_queue_time:
650a3251 246 # strip old packets off the front
8c3b6620 247 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
650a3251
IJ
248 self._pq.popleft()
249 continue
250
8c3b6620 251 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
650a3251
IJ
252 return True
253
7b07f0b5
IJ
254 def process(self, sizequery, moredata, max_batch):
255 # sizequery() should return size of batch so far
256 # moredata(s) should add s to batch
8c3b6620 257 self._log(DBG.QUEUE, 'process...')
7b07f0b5
IJ
258 while True:
259 try: (dummy, packet) = self._pq[0]
8c3b6620
IJ
260 except IndexError:
261 self._log(DBG.QUEUE, 'process... empty')
262 break
263
264 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
7b07f0b5
IJ
265
266 encoded = slip.encode(packet)
267 sofar = sizequery()
268
8c3b6620
IJ
269 self._log(DBG.QUEUE_CTRL,
270 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
b68c0739 271 d=encoded)
8c3b6620 272
7b07f0b5
IJ
273 if sofar > 0:
274 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
8c3b6620 275 self._log(DBG.QUEUE_CTRL, 'process... overflow')
7b07f0b5
IJ
276 break
277 moredata(slip.delimiter)
278
279 moredata(encoded)
84e763c7 280 self._pq.popleft()
ae7c7784
IJ
281
282#---------- error handling ----------
283
b68c0739
IJ
284_crashing = False
285
ae7c7784 286def crash(err):
b68c0739
IJ
287 global _crashing
288 _crashing = True
e8ed0029
IJ
289 print('========== CRASH ==========', err,
290 '===========================', file=sys.stderr)
ae7c7784
IJ
291 try: reactor.stop()
292 except twisted.internet.error.ReactorNotRunning: pass
293
294def crash_on_defer(defer):
295 defer.addErrback(lambda err: crash(err))
296
e4006ac4 297def crash_on_critical(event):
ae7c7784
IJ
298 if event.get('log_level') >= LogLevel.critical:
299 crash(twisted.logger.formatEvent(event))
300
87a7c0c7
IJ
301#---------- config processing ----------
302
303def process_cfg_common_always():
304 global mtu
305 c.mtu = cfg.get('virtual','mtu')
306
88487243
IJ
307def process_cfg_ipif(section, varmap):
308 for d, s in varmap:
309 try: v = getattr(c, s)
034284c3 310 except AttributeError: continue
88487243
IJ
311 setattr(c, d, v)
312
b68c0739 313 #print(repr(c))
88487243
IJ
314
315 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
316
317def process_cfg_network():
318 c.network = ipnetwork(cfg.get('virtual','network'))
319 if c.network.num_addresses < 3 + 2:
320 raise ValueError('network needs at least 2^3 addresses')
321
322def process_cfg_server():
323 try:
324 c.server = cfg.get('virtual','server')
325 except NoOptionError:
326 process_cfg_network()
327 c.server = next(c.network.hosts())
328
329class ServerAddr():
330 def __init__(self, port, addrspec):
331 self.port = port
332 # also self.addr
333 try:
334 self.addr = ipaddress.IPv4Address(addrspec)
335 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
84e763c7 336 self._inurl = b'%s'
88487243
IJ
337 except AddressValueError:
338 self.addr = ipaddress.IPv6Address(addrspec)
339 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
84e763c7 340 self._inurl = b'[%s]'
88487243
IJ
341 def make_endpoint(self):
342 return self._endpointfactory(reactor, self.port, self.addr)
343 def url(self):
84e763c7
IJ
344 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
345 if self.port != 80: url += b':%d' % self.port
346 url += b'/'
88487243
IJ
347 return url
348
349def process_cfg_saddrs():
1d023c89
IJ
350 try: port = cfg.getint('server','port')
351 except NoOptionError: port = 80
88487243
IJ
352
353 c.saddrs = [ ]
354 for addrspec in cfg.get('server','addrs').split():
355 sa = ServerAddr(port, addrspec)
356 c.saddrs.append(sa)
357
358def process_cfg_clients(constructor):
359 c.clients = [ ]
360 for cs in cfg.sections():
361 if not (':' in cs or '.' in cs): continue
362 ci = ipaddr(cs)
363 pw = cfg.get(cs, 'password')
84e763c7 364 pw = pw.encode('utf-8')
88487243
IJ
365 constructor(ci,cs,pw)
366
ae7c7784
IJ
367#---------- startup ----------
368
1321ad5f 369def common_startup():
8c3b6620
IJ
370 log_formatter = twisted.logger.formatEventAsClassicLogText
371 log_observer = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
372 twisted.logger.globalLogBeginner.beginLoggingTo(
373 [ log_observer, crash_on_critical ]
374 )
ae7c7784
IJ
375
376 optparser.add_option('-c', '--config', dest='configfile',
377 default='/etc/hippotat/config')
378 (opts, args) = optparser.parse_args()
379 if len(args): optparser.error('no non-option arguments please')
380
1321ad5f
IJ
381 re = regexp.compile('#.*')
382 cfg.read_string(re.sub('', defcfg))
ae7c7784
IJ
383 cfg.read(opts.configfile)
384
385def common_run():
b68c0739
IJ
386 log_debug(DBG.INIT, 'entering reactor')
387 if not _crashing: reactor.run()
ae7c7784 388 print('CRASHED (end)', file=sys.stderr)