before undo GeneralResponseConsumer
[hippotat] / hippotat / __init__.py
CommitLineData
b0cfbfce
IJ
1# -*- python -*-
2
37ab4cdc
IJ
3import signal
4signal.signal(signal.SIGINT, signal.SIG_DFL)
5
1321ad5f
IJ
6import sys
7
040ff511
IJ
8import twisted
9from twisted.internet import reactor
1d023c89 10import twisted.internet.endpoints
8c3b6620
IJ
11import twisted.logger
12from twisted.logger import LogLevel
13import twisted.python.constants
14from twisted.python.constants import NamedConstant
b0cfbfce
IJ
15
16import ipaddress
17from ipaddress import AddressValueError
18
ae7c7784
IJ
19from optparse import OptionParser
20from configparser import ConfigParser
21from configparser import NoOptionError
22
23import collections
84e763c7 24import time
8c3b6620 25import codecs
eedc8b30 26import traceback
ae7c7784 27
1321ad5f
IJ
28import re as regexp
29
30import hippotat.slip as slip
31
d579a048
IJ
32class DBG(twisted.python.constants.Names):
33 ROUTE = NamedConstant()
b68c0739 34 DROP = NamedConstant()
d579a048
IJ
35 FLOW = NamedConstant()
36 HTTP = NamedConstant()
37 HTTP_CTRL = NamedConstant()
38 INIT = NamedConstant()
39 QUEUE = NamedConstant()
40 QUEUE_CTRL = NamedConstant()
297b3ebf 41 HTTP_FULL = NamedConstant()
db6ba584 42 SLIP_FULL = NamedConstant()
0accf0d3 43 CTRL_DUMP = NamedConstant()
d579a048 44
b68c0739 45_hex_codec = codecs.getencoder('hex_codec')
8c3b6620
IJ
46
47log = twisted.logger.Logger()
48
49def log_debug(dflag, msg, idof=None, d=None):
e8fcf3b7 50 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
8c3b6620
IJ
51 if idof is not None:
52 msg = '[%d] %s' % (id(idof), msg)
53 if d is not None:
ca386ea2 54 d = d[0:64]
b68c0739 55 d = _hex_codec(d)[0].decode('ascii')
8c3b6620
IJ
56 msg += ' ' + d
57 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
58
ca732796
IJ
59defcfg = '''
60[DEFAULT]
61#[<client>] overrides
62max_batch_down = 65536 # used by server, subject to [limits]
63max_queue_time = 10 # used by server, subject to [limits]
64max_request_time = 54 # used by server, subject to [limits]
65target_requests_outstanding = 3 # must match; subject to [limits] on server
66max_requests_outstanding = 4 # used by client
67max_batch_up = 4000 # used by client
7b07f0b5 68http_timeout = 30 # used by client
4edf77a3 69http_retry = 5 # used by client
ca732796
IJ
70
71#[server] or [<client>] overrides
72ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
73# extra interpolations: %(local)s %(peer)s %(rnet)s
74# obtained on server [virtual]server [virtual]relay [virtual]network
75# from on client <client> [virtual]server [virtual]routes
76
77[virtual]
78mtu = 1500
79routes = ''
80# network = <prefix>/<len> # mandatory for server
81# server = <ipaddr> # used by both, default is computed from `network'
82# relay = <ipaddr> # used by server, default from `network' and `server'
83# default server is first host in network
84# default relay is first host which is not server
85
86[server]
87# addrs = 127.0.0.1 ::1 # mandatory for server
88port = 80 # used by server
89# url # used by client; default from first `addrs' and `port'
90
91# [<client-ip4-or-ipv6-address>]
92# password = <password> # used by both, must match
93
94[limits]
95max_batch_down = 262144 # used by server
96max_queue_time = 121 # used by server
97max_request_time = 121 # used by server
98target_requests_outstanding = 10 # used by server
99'''
100
87a7c0c7 101# these need to be defined here so that they can be imported by import *
ae7c7784
IJ
102cfg = ConfigParser()
103optparser = OptionParser()
104
e4006ac4 105_mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
7b07f0b5
IJ
106def mime_translate(s):
107 # SLIP-encoded packets cannot contain ESC ESC.
108 # Swap `-' and ESC. The result cannot contain `--'
109 return s.translate(_mimetrans)
110
87a7c0c7
IJ
111class ConfigResults:
112 def __init__(self, d = { }):
113 self.__dict__ = d
114 def __repr__(self):
115 return 'ConfigResults('+repr(self.__dict__)+')'
116
117c = ConfigResults()
118
a8827d59 119def log_discard(packet, iface, saddr, daddr, why):
b68c0739 120 log_debug(DBG.DROP,
a8827d59 121 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
b68c0739 122 d=packet)
1321ad5f 123
b0cfbfce
IJ
124#---------- packet parsing ----------
125
126def packet_addrs(packet):
127 version = packet[0] >> 4
128 if version == 4:
129 addrlen = 4
130 saddroff = 3*4
131 factory = ipaddress.IPv4Address
132 elif version == 6:
133 addrlen = 16
134 saddroff = 2*4
135 factory = ipaddress.IPv6Address
136 else:
137 raise ValueError('unsupported IP version %d' % version)
138 saddr = factory(packet[ saddroff : saddroff + addrlen ])
139 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
140 return (saddr, daddr)
141
142#---------- address handling ----------
143
144def ipaddr(input):
145 try:
146 r = ipaddress.IPv4Address(input)
147 except AddressValueError:
148 r = ipaddress.IPv6Address(input)
149 return r
150
151def ipnetwork(input):
152 try:
153 r = ipaddress.IPv4Network(input)
154 except NetworkValueError:
155 r = ipaddress.IPv6Network(input)
156 return r
040ff511
IJ
157
158#---------- ipif (SLIP) subprocess ----------
159
a95cfeb2 160class SlipStreamDecoder():
db6ba584 161 def __init__(self, desc, on_packet):
040ff511 162 self._buffer = b''
a95cfeb2 163 self._on_packet = on_packet
db6ba584
IJ
164 self._desc = desc
165 self._log('__init__')
166
167 def _log(self, msg, **kwargs):
3297cac1 168 log_debug(DBG.SLIP_FULL, 'slip %s: %s' % (self._desc, msg), **kwargs)
a95cfeb2
IJ
169
170 def inputdata(self, data):
db6ba584 171 self._log('inputdata', d=data)
7a68893f
IJ
172 data = self._buffer + data
173 self._buffer = b''
174 packets = slip.decode(data)
040ff511
IJ
175 self._buffer = packets.pop()
176 for packet in packets:
a95cfeb2 177 self._maybe_packet(packet)
54890d4d 178 self._log('bufremain', d=self._buffer)
a95cfeb2
IJ
179
180 def _maybe_packet(self, packet):
54890d4d 181 self._log('maybepacket', d=packet)
db6ba584
IJ
182 if len(packet):
183 self._on_packet(packet)
a95cfeb2 184
4f991c0c 185 def flush(self):
54890d4d 186 self._log('flush')
a95cfeb2
IJ
187 self._maybe_packet(self._buffer)
188 self._buffer = b''
4f991c0c 189
e4006ac4 190class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
4f991c0c
IJ
191 def __init__(self, router):
192 self._router = router
db6ba584 193 self._decoder = SlipStreamDecoder('ipif', self.slip_on_packet)
a95cfeb2
IJ
194 def connectionMade(self): pass
195 def outReceived(self, data):
196 self._decoder.inputdata(data)
197 def slip_on_packet(self, packet):
4f991c0c
IJ
198 (saddr, daddr) = packet_addrs(packet)
199 if saddr.is_link_local or daddr.is_link_local:
a8827d59 200 log_discard(packet, 'ipif', saddr, daddr, 'link-local')
4f991c0c
IJ
201 return
202 self._router(packet, saddr, daddr)
040ff511
IJ
203 def processEnded(self, status):
204 status.raiseException()
205
206def start_ipif(command, router):
207 global ipif
208 ipif = _IpifProcessProtocol(router)
209 reactor.spawnProcess(ipif,
210 '/bin/sh',['sh','-xc', command],
ff613365
IJ
211 childFDs={0:'w', 1:'r', 2:2},
212 env=None)
040ff511
IJ
213
214def queue_inbound(packet):
15407d80 215 log_debug(DBG.FLOW, "queue_inbound", d=packet)
040ff511
IJ
216 ipif.transport.write(slip.delimiter)
217 ipif.transport.write(slip.encode(packet))
218 ipif.transport.write(slip.delimiter)
219
650a3251
IJ
220#---------- packet queue ----------
221
222class PacketQueue():
d579a048
IJ
223 def __init__(self, desc, max_queue_time):
224 self._desc = desc
8718b02c 225 assert(desc + '')
650a3251
IJ
226 self._max_queue_time = max_queue_time
227 self._pq = collections.deque() # packets
228
b68c0739 229 def _log(self, dflag, msg, **kwargs):
8c3b6620 230 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
d579a048 231
650a3251 232 def append(self, packet):
8c3b6620 233 self._log(DBG.QUEUE, 'append', d=packet)
650a3251
IJ
234 self._pq.append((time.monotonic(), packet))
235
236 def nonempty(self):
8c3b6620 237 self._log(DBG.QUEUE, 'nonempty ?')
650a3251
IJ
238 while True:
239 try: (queuetime, packet) = self._pq[0]
8c3b6620
IJ
240 except IndexError:
241 self._log(DBG.QUEUE, 'nonempty ? empty.')
242 return False
650a3251
IJ
243
244 age = time.monotonic() - queuetime
84e763c7 245 if age > self._max_queue_time:
650a3251 246 # strip old packets off the front
8c3b6620 247 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
650a3251
IJ
248 self._pq.popleft()
249 continue
250
8c3b6620 251 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
650a3251
IJ
252 return True
253
7b07f0b5
IJ
254 def process(self, sizequery, moredata, max_batch):
255 # sizequery() should return size of batch so far
256 # moredata(s) should add s to batch
8c3b6620 257 self._log(DBG.QUEUE, 'process...')
7b07f0b5
IJ
258 while True:
259 try: (dummy, packet) = self._pq[0]
8c3b6620
IJ
260 except IndexError:
261 self._log(DBG.QUEUE, 'process... empty')
262 break
263
264 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
7b07f0b5
IJ
265
266 encoded = slip.encode(packet)
267 sofar = sizequery()
268
8c3b6620
IJ
269 self._log(DBG.QUEUE_CTRL,
270 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
b68c0739 271 d=encoded)
8c3b6620 272
7b07f0b5
IJ
273 if sofar > 0:
274 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
8c3b6620 275 self._log(DBG.QUEUE_CTRL, 'process... overflow')
7b07f0b5
IJ
276 break
277 moredata(slip.delimiter)
278
279 moredata(encoded)
84e763c7 280 self._pq.popleft()
ae7c7784
IJ
281
282#---------- error handling ----------
283
b68c0739
IJ
284_crashing = False
285
ae7c7784 286def crash(err):
b68c0739
IJ
287 global _crashing
288 _crashing = True
ae7c7784
IJ
289 print('CRASH ', err, file=sys.stderr)
290 try: reactor.stop()
291 except twisted.internet.error.ReactorNotRunning: pass
292
293def crash_on_defer(defer):
294 defer.addErrback(lambda err: crash(err))
295
e4006ac4 296def crash_on_critical(event):
ae7c7784
IJ
297 if event.get('log_level') >= LogLevel.critical:
298 crash(twisted.logger.formatEvent(event))
299
87a7c0c7
IJ
300#---------- config processing ----------
301
302def process_cfg_common_always():
303 global mtu
304 c.mtu = cfg.get('virtual','mtu')
305
88487243
IJ
306def process_cfg_ipif(section, varmap):
307 for d, s in varmap:
308 try: v = getattr(c, s)
034284c3 309 except AttributeError: continue
88487243
IJ
310 setattr(c, d, v)
311
b68c0739 312 #print(repr(c))
88487243
IJ
313
314 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
315
316def process_cfg_network():
317 c.network = ipnetwork(cfg.get('virtual','network'))
318 if c.network.num_addresses < 3 + 2:
319 raise ValueError('network needs at least 2^3 addresses')
320
321def process_cfg_server():
322 try:
323 c.server = cfg.get('virtual','server')
324 except NoOptionError:
325 process_cfg_network()
326 c.server = next(c.network.hosts())
327
328class ServerAddr():
329 def __init__(self, port, addrspec):
330 self.port = port
331 # also self.addr
332 try:
333 self.addr = ipaddress.IPv4Address(addrspec)
334 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
84e763c7 335 self._inurl = b'%s'
88487243
IJ
336 except AddressValueError:
337 self.addr = ipaddress.IPv6Address(addrspec)
338 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
84e763c7 339 self._inurl = b'[%s]'
88487243
IJ
340 def make_endpoint(self):
341 return self._endpointfactory(reactor, self.port, self.addr)
342 def url(self):
84e763c7
IJ
343 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
344 if self.port != 80: url += b':%d' % self.port
345 url += b'/'
88487243
IJ
346 return url
347
348def process_cfg_saddrs():
1d023c89
IJ
349 try: port = cfg.getint('server','port')
350 except NoOptionError: port = 80
88487243
IJ
351
352 c.saddrs = [ ]
353 for addrspec in cfg.get('server','addrs').split():
354 sa = ServerAddr(port, addrspec)
355 c.saddrs.append(sa)
356
357def process_cfg_clients(constructor):
358 c.clients = [ ]
359 for cs in cfg.sections():
360 if not (':' in cs or '.' in cs): continue
361 ci = ipaddr(cs)
362 pw = cfg.get(cs, 'password')
84e763c7 363 pw = pw.encode('utf-8')
88487243
IJ
364 constructor(ci,cs,pw)
365
ae7c7784
IJ
366#---------- startup ----------
367
1321ad5f 368def common_startup():
8c3b6620
IJ
369 log_formatter = twisted.logger.formatEventAsClassicLogText
370 log_observer = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
371 twisted.logger.globalLogBeginner.beginLoggingTo(
372 [ log_observer, crash_on_critical ]
373 )
ae7c7784
IJ
374
375 optparser.add_option('-c', '--config', dest='configfile',
376 default='/etc/hippotat/config')
377 (opts, args) = optparser.parse_args()
378 if len(args): optparser.error('no non-option arguments please')
379
1321ad5f
IJ
380 re = regexp.compile('#.*')
381 cfg.read_string(re.sub('', defcfg))
ae7c7784
IJ
382 cfg.read(opts.configfile)
383
384def common_run():
b68c0739
IJ
385 log_debug(DBG.INIT, 'entering reactor')
386 if not _crashing: reactor.run()
ae7c7784 387 print('CRASHED (end)', file=sys.stderr)