wip
[hippotat] / hippotat / __init__.py
CommitLineData
b0cfbfce
IJ
1# -*- python -*-
2
37ab4cdc
IJ
3import signal
4signal.signal(signal.SIGINT, signal.SIG_DFL)
5
1321ad5f
IJ
6import sys
7
040ff511
IJ
8import twisted
9from twisted.internet import reactor
1d023c89 10import twisted.internet.endpoints
8c3b6620
IJ
11import twisted.logger
12from twisted.logger import LogLevel
13import twisted.python.constants
14from twisted.python.constants import NamedConstant
b0cfbfce
IJ
15
16import ipaddress
17from ipaddress import AddressValueError
18
ae7c7784
IJ
19from optparse import OptionParser
20from configparser import ConfigParser
21from configparser import NoOptionError
22
23import collections
84e763c7 24import time
8c3b6620 25import codecs
eedc8b30 26import traceback
ae7c7784 27
1321ad5f
IJ
28import re as regexp
29
30import hippotat.slip as slip
31
d579a048
IJ
32class DBG(twisted.python.constants.Names):
33 ROUTE = NamedConstant()
b68c0739 34 DROP = NamedConstant()
d579a048
IJ
35 FLOW = NamedConstant()
36 HTTP = NamedConstant()
37 HTTP_CTRL = NamedConstant()
38 INIT = NamedConstant()
39 QUEUE = NamedConstant()
40 QUEUE_CTRL = NamedConstant()
297b3ebf 41 HTTP_FULL = NamedConstant()
db6ba584 42 SLIP_FULL = NamedConstant()
d579a048 43
b68c0739 44_hex_codec = codecs.getencoder('hex_codec')
8c3b6620
IJ
45
46log = twisted.logger.Logger()
47
48def log_debug(dflag, msg, idof=None, d=None):
e8fcf3b7 49 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
8c3b6620
IJ
50 if idof is not None:
51 msg = '[%d] %s' % (id(idof), msg)
52 if d is not None:
ca386ea2 53 d = d[0:64]
b68c0739 54 d = _hex_codec(d)[0].decode('ascii')
8c3b6620
IJ
55 msg += ' ' + d
56 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
57
ca732796
IJ
58defcfg = '''
59[DEFAULT]
60#[<client>] overrides
61max_batch_down = 65536 # used by server, subject to [limits]
62max_queue_time = 10 # used by server, subject to [limits]
63max_request_time = 54 # used by server, subject to [limits]
64target_requests_outstanding = 3 # must match; subject to [limits] on server
65max_requests_outstanding = 4 # used by client
66max_batch_up = 4000 # used by client
7b07f0b5 67http_timeout = 30 # used by client
4edf77a3 68http_retry = 5 # used by client
ca732796
IJ
69
70#[server] or [<client>] overrides
71ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
72# extra interpolations: %(local)s %(peer)s %(rnet)s
73# obtained on server [virtual]server [virtual]relay [virtual]network
74# from on client <client> [virtual]server [virtual]routes
75
76[virtual]
77mtu = 1500
78routes = ''
79# network = <prefix>/<len> # mandatory for server
80# server = <ipaddr> # used by both, default is computed from `network'
81# relay = <ipaddr> # used by server, default from `network' and `server'
82# default server is first host in network
83# default relay is first host which is not server
84
85[server]
86# addrs = 127.0.0.1 ::1 # mandatory for server
87port = 80 # used by server
88# url # used by client; default from first `addrs' and `port'
89
90# [<client-ip4-or-ipv6-address>]
91# password = <password> # used by both, must match
92
93[limits]
94max_batch_down = 262144 # used by server
95max_queue_time = 121 # used by server
96max_request_time = 121 # used by server
97target_requests_outstanding = 10 # used by server
98'''
99
87a7c0c7 100# these need to be defined here so that they can be imported by import *
ae7c7784
IJ
101cfg = ConfigParser()
102optparser = OptionParser()
103
e4006ac4 104_mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
7b07f0b5
IJ
105def mime_translate(s):
106 # SLIP-encoded packets cannot contain ESC ESC.
107 # Swap `-' and ESC. The result cannot contain `--'
108 return s.translate(_mimetrans)
109
87a7c0c7
IJ
110class ConfigResults:
111 def __init__(self, d = { }):
112 self.__dict__ = d
113 def __repr__(self):
114 return 'ConfigResults('+repr(self.__dict__)+')'
115
116c = ConfigResults()
117
a8827d59 118def log_discard(packet, iface, saddr, daddr, why):
b68c0739 119 log_debug(DBG.DROP,
a8827d59 120 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
b68c0739 121 d=packet)
1321ad5f 122
b0cfbfce
IJ
123#---------- packet parsing ----------
124
125def packet_addrs(packet):
126 version = packet[0] >> 4
127 if version == 4:
128 addrlen = 4
129 saddroff = 3*4
130 factory = ipaddress.IPv4Address
131 elif version == 6:
132 addrlen = 16
133 saddroff = 2*4
134 factory = ipaddress.IPv6Address
135 else:
136 raise ValueError('unsupported IP version %d' % version)
137 saddr = factory(packet[ saddroff : saddroff + addrlen ])
138 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
139 return (saddr, daddr)
140
141#---------- address handling ----------
142
143def ipaddr(input):
144 try:
145 r = ipaddress.IPv4Address(input)
146 except AddressValueError:
147 r = ipaddress.IPv6Address(input)
148 return r
149
150def ipnetwork(input):
151 try:
152 r = ipaddress.IPv4Network(input)
153 except NetworkValueError:
154 r = ipaddress.IPv6Network(input)
155 return r
040ff511
IJ
156
157#---------- ipif (SLIP) subprocess ----------
158
a95cfeb2 159class SlipStreamDecoder():
db6ba584 160 def __init__(self, desc, on_packet):
040ff511 161 self._buffer = b''
a95cfeb2 162 self._on_packet = on_packet
db6ba584
IJ
163 self._desc = desc
164 self._log('__init__')
165
166 def _log(self, msg, **kwargs):
3297cac1 167 log_debug(DBG.SLIP_FULL, 'slip %s: %s' % (self._desc, msg), **kwargs)
a95cfeb2
IJ
168
169 def inputdata(self, data):
db6ba584 170 self._log('inputdata', d=data)
7a68893f
IJ
171 data = self._buffer + data
172 self._buffer = b''
173 packets = slip.decode(data)
040ff511
IJ
174 self._buffer = packets.pop()
175 for packet in packets:
a95cfeb2 176 self._maybe_packet(packet)
54890d4d 177 self._log('bufremain', d=self._buffer)
a95cfeb2
IJ
178
179 def _maybe_packet(self, packet):
54890d4d 180 self._log('maybepacket', d=packet)
db6ba584
IJ
181 if len(packet):
182 self._on_packet(packet)
a95cfeb2 183
4f991c0c 184 def flush(self):
54890d4d 185 self._log('flush')
a95cfeb2
IJ
186 self._maybe_packet(self._buffer)
187 self._buffer = b''
4f991c0c 188
e4006ac4 189class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
4f991c0c
IJ
190 def __init__(self, router):
191 self._router = router
db6ba584 192 self._decoder = SlipStreamDecoder('ipif', self.slip_on_packet)
a95cfeb2
IJ
193 def connectionMade(self): pass
194 def outReceived(self, data):
195 self._decoder.inputdata(data)
196 def slip_on_packet(self, packet):
4f991c0c
IJ
197 (saddr, daddr) = packet_addrs(packet)
198 if saddr.is_link_local or daddr.is_link_local:
a8827d59 199 log_discard(packet, 'ipif', saddr, daddr, 'link-local')
4f991c0c
IJ
200 return
201 self._router(packet, saddr, daddr)
040ff511
IJ
202 def processEnded(self, status):
203 status.raiseException()
204
205def start_ipif(command, router):
206 global ipif
207 ipif = _IpifProcessProtocol(router)
208 reactor.spawnProcess(ipif,
209 '/bin/sh',['sh','-xc', command],
ff613365
IJ
210 childFDs={0:'w', 1:'r', 2:2},
211 env=None)
040ff511
IJ
212
213def queue_inbound(packet):
15407d80 214 log_debug(DBG.FLOW, "queue_inbound", d=packet)
040ff511
IJ
215 ipif.transport.write(slip.delimiter)
216 ipif.transport.write(slip.encode(packet))
217 ipif.transport.write(slip.delimiter)
218
650a3251
IJ
219#---------- packet queue ----------
220
221class PacketQueue():
d579a048
IJ
222 def __init__(self, desc, max_queue_time):
223 self._desc = desc
8718b02c 224 assert(desc + '')
650a3251
IJ
225 self._max_queue_time = max_queue_time
226 self._pq = collections.deque() # packets
227
b68c0739 228 def _log(self, dflag, msg, **kwargs):
8c3b6620 229 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
d579a048 230
650a3251 231 def append(self, packet):
8c3b6620 232 self._log(DBG.QUEUE, 'append', d=packet)
650a3251
IJ
233 self._pq.append((time.monotonic(), packet))
234
235 def nonempty(self):
8c3b6620 236 self._log(DBG.QUEUE, 'nonempty ?')
650a3251
IJ
237 while True:
238 try: (queuetime, packet) = self._pq[0]
8c3b6620
IJ
239 except IndexError:
240 self._log(DBG.QUEUE, 'nonempty ? empty.')
241 return False
650a3251
IJ
242
243 age = time.monotonic() - queuetime
84e763c7 244 if age > self._max_queue_time:
650a3251 245 # strip old packets off the front
8c3b6620 246 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
650a3251
IJ
247 self._pq.popleft()
248 continue
249
8c3b6620 250 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
650a3251
IJ
251 return True
252
7b07f0b5
IJ
253 def process(self, sizequery, moredata, max_batch):
254 # sizequery() should return size of batch so far
255 # moredata(s) should add s to batch
8c3b6620 256 self._log(DBG.QUEUE, 'process...')
7b07f0b5
IJ
257 while True:
258 try: (dummy, packet) = self._pq[0]
8c3b6620
IJ
259 except IndexError:
260 self._log(DBG.QUEUE, 'process... empty')
261 break
262
263 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
7b07f0b5
IJ
264
265 encoded = slip.encode(packet)
266 sofar = sizequery()
267
8c3b6620
IJ
268 self._log(DBG.QUEUE_CTRL,
269 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
b68c0739 270 d=encoded)
8c3b6620 271
7b07f0b5
IJ
272 if sofar > 0:
273 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
8c3b6620 274 self._log(DBG.QUEUE_CTRL, 'process... overflow')
7b07f0b5
IJ
275 break
276 moredata(slip.delimiter)
277
278 moredata(encoded)
84e763c7 279 self._pq.popleft()
ae7c7784
IJ
280
281#---------- error handling ----------
282
b68c0739
IJ
283_crashing = False
284
ae7c7784 285def crash(err):
b68c0739
IJ
286 global _crashing
287 _crashing = True
ae7c7784
IJ
288 print('CRASH ', err, file=sys.stderr)
289 try: reactor.stop()
290 except twisted.internet.error.ReactorNotRunning: pass
291
292def crash_on_defer(defer):
293 defer.addErrback(lambda err: crash(err))
294
e4006ac4 295def crash_on_critical(event):
ae7c7784
IJ
296 if event.get('log_level') >= LogLevel.critical:
297 crash(twisted.logger.formatEvent(event))
298
87a7c0c7
IJ
299#---------- config processing ----------
300
301def process_cfg_common_always():
302 global mtu
303 c.mtu = cfg.get('virtual','mtu')
304
88487243
IJ
305def process_cfg_ipif(section, varmap):
306 for d, s in varmap:
307 try: v = getattr(c, s)
034284c3 308 except AttributeError: continue
88487243
IJ
309 setattr(c, d, v)
310
b68c0739 311 #print(repr(c))
88487243
IJ
312
313 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
314
315def process_cfg_network():
316 c.network = ipnetwork(cfg.get('virtual','network'))
317 if c.network.num_addresses < 3 + 2:
318 raise ValueError('network needs at least 2^3 addresses')
319
320def process_cfg_server():
321 try:
322 c.server = cfg.get('virtual','server')
323 except NoOptionError:
324 process_cfg_network()
325 c.server = next(c.network.hosts())
326
327class ServerAddr():
328 def __init__(self, port, addrspec):
329 self.port = port
330 # also self.addr
331 try:
332 self.addr = ipaddress.IPv4Address(addrspec)
333 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
84e763c7 334 self._inurl = b'%s'
88487243
IJ
335 except AddressValueError:
336 self.addr = ipaddress.IPv6Address(addrspec)
337 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
84e763c7 338 self._inurl = b'[%s]'
88487243
IJ
339 def make_endpoint(self):
340 return self._endpointfactory(reactor, self.port, self.addr)
341 def url(self):
84e763c7
IJ
342 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
343 if self.port != 80: url += b':%d' % self.port
344 url += b'/'
88487243
IJ
345 return url
346
347def process_cfg_saddrs():
1d023c89
IJ
348 try: port = cfg.getint('server','port')
349 except NoOptionError: port = 80
88487243
IJ
350
351 c.saddrs = [ ]
352 for addrspec in cfg.get('server','addrs').split():
353 sa = ServerAddr(port, addrspec)
354 c.saddrs.append(sa)
355
356def process_cfg_clients(constructor):
357 c.clients = [ ]
358 for cs in cfg.sections():
359 if not (':' in cs or '.' in cs): continue
360 ci = ipaddr(cs)
361 pw = cfg.get(cs, 'password')
84e763c7 362 pw = pw.encode('utf-8')
88487243
IJ
363 constructor(ci,cs,pw)
364
ae7c7784
IJ
365#---------- startup ----------
366
1321ad5f 367def common_startup():
8c3b6620
IJ
368 log_formatter = twisted.logger.formatEventAsClassicLogText
369 log_observer = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
370 twisted.logger.globalLogBeginner.beginLoggingTo(
371 [ log_observer, crash_on_critical ]
372 )
ae7c7784
IJ
373
374 optparser.add_option('-c', '--config', dest='configfile',
375 default='/etc/hippotat/config')
376 (opts, args) = optparser.parse_args()
377 if len(args): optparser.error('no non-option arguments please')
378
1321ad5f
IJ
379 re = regexp.compile('#.*')
380 cfg.read_string(re.sub('', defcfg))
ae7c7784
IJ
381 cfg.read(opts.configfile)
382
383def common_run():
b68c0739
IJ
384 log_debug(DBG.INIT, 'entering reactor')
385 if not _crashing: reactor.run()
ae7c7784 386 print('CRASHED (end)', file=sys.stderr)