log diversion stdout
[hippotat] / hippotat / __init__.py
CommitLineData
b0cfbfce
IJ
1# -*- python -*-
2
37ab4cdc
IJ
3import signal
4signal.signal(signal.SIGINT, signal.SIG_DFL)
5
1321ad5f
IJ
6import sys
7
040ff511
IJ
8import twisted
9from twisted.internet import reactor
1d023c89 10import twisted.internet.endpoints
8c3b6620
IJ
11import twisted.logger
12from twisted.logger import LogLevel
13import twisted.python.constants
14from twisted.python.constants import NamedConstant
b0cfbfce
IJ
15
16import ipaddress
17from ipaddress import AddressValueError
18
ae7c7784
IJ
19from optparse import OptionParser
20from configparser import ConfigParser
21from configparser import NoOptionError
22
c13ee6e6
IJ
23from functools import partial
24
ae7c7784 25import collections
84e763c7 26import time
8c3b6620 27import codecs
eedc8b30 28import traceback
ae7c7784 29
1321ad5f
IJ
30import re as regexp
31
32import hippotat.slip as slip
33
d579a048
IJ
34class DBG(twisted.python.constants.Names):
35 ROUTE = NamedConstant()
b68c0739 36 DROP = NamedConstant()
d579a048
IJ
37 FLOW = NamedConstant()
38 HTTP = NamedConstant()
39 HTTP_CTRL = NamedConstant()
40 INIT = NamedConstant()
41 QUEUE = NamedConstant()
42 QUEUE_CTRL = NamedConstant()
297b3ebf 43 HTTP_FULL = NamedConstant()
db6ba584 44 SLIP_FULL = NamedConstant()
0accf0d3 45 CTRL_DUMP = NamedConstant()
d579a048 46
b68c0739 47_hex_codec = codecs.getencoder('hex_codec')
8c3b6620
IJ
48
49log = twisted.logger.Logger()
50
51def log_debug(dflag, msg, idof=None, d=None):
e8fcf3b7 52 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
8c3b6620 53 if idof is not None:
e8ed0029 54 msg = '[%#x] %s' % (id(idof), msg)
8c3b6620 55 if d is not None:
ba5630fd 56 #d = d[0:64]
b68c0739 57 d = _hex_codec(d)[0].decode('ascii')
8c3b6620
IJ
58 msg += ' ' + d
59 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
60
ca732796
IJ
61defcfg = '''
62[DEFAULT]
63#[<client>] overrides
64max_batch_down = 65536 # used by server, subject to [limits]
65max_queue_time = 10 # used by server, subject to [limits]
ca732796 66target_requests_outstanding = 3 # must match; subject to [limits] on server
ba5630fd
IJ
67http_timeout = 30 # used by both } must be
68http_timeout_grace = 5 # used by both } compatible
ca732796
IJ
69max_requests_outstanding = 4 # used by client
70max_batch_up = 4000 # used by client
4edf77a3 71http_retry = 5 # used by client
ca732796
IJ
72
73#[server] or [<client>] overrides
74ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
75# extra interpolations: %(local)s %(peer)s %(rnet)s
76# obtained on server [virtual]server [virtual]relay [virtual]network
77# from on client <client> [virtual]server [virtual]routes
78
79[virtual]
80mtu = 1500
81routes = ''
82# network = <prefix>/<len> # mandatory for server
83# server = <ipaddr> # used by both, default is computed from `network'
84# relay = <ipaddr> # used by server, default from `network' and `server'
85# default server is first host in network
86# default relay is first host which is not server
87
88[server]
89# addrs = 127.0.0.1 ::1 # mandatory for server
90port = 80 # used by server
91# url # used by client; default from first `addrs' and `port'
92
93# [<client-ip4-or-ipv6-address>]
94# password = <password> # used by both, must match
95
96[limits]
97max_batch_down = 262144 # used by server
98max_queue_time = 121 # used by server
ba5630fd 99http_timeout = 121 # used by server
ca732796
IJ
100target_requests_outstanding = 10 # used by server
101'''
102
87a7c0c7 103# these need to be defined here so that they can be imported by import *
ae7c7784
IJ
104cfg = ConfigParser()
105optparser = OptionParser()
106
e4006ac4 107_mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
7b07f0b5
IJ
108def mime_translate(s):
109 # SLIP-encoded packets cannot contain ESC ESC.
110 # Swap `-' and ESC. The result cannot contain `--'
111 return s.translate(_mimetrans)
112
87a7c0c7
IJ
113class ConfigResults:
114 def __init__(self, d = { }):
115 self.__dict__ = d
116 def __repr__(self):
117 return 'ConfigResults('+repr(self.__dict__)+')'
118
119c = ConfigResults()
120
a8827d59 121def log_discard(packet, iface, saddr, daddr, why):
b68c0739 122 log_debug(DBG.DROP,
a8827d59 123 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
b68c0739 124 d=packet)
1321ad5f 125
b0cfbfce
IJ
126#---------- packet parsing ----------
127
128def packet_addrs(packet):
129 version = packet[0] >> 4
130 if version == 4:
131 addrlen = 4
132 saddroff = 3*4
133 factory = ipaddress.IPv4Address
134 elif version == 6:
135 addrlen = 16
136 saddroff = 2*4
137 factory = ipaddress.IPv6Address
138 else:
139 raise ValueError('unsupported IP version %d' % version)
140 saddr = factory(packet[ saddroff : saddroff + addrlen ])
141 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
142 return (saddr, daddr)
143
144#---------- address handling ----------
145
146def ipaddr(input):
147 try:
148 r = ipaddress.IPv4Address(input)
149 except AddressValueError:
150 r = ipaddress.IPv6Address(input)
151 return r
152
153def ipnetwork(input):
154 try:
155 r = ipaddress.IPv4Network(input)
156 except NetworkValueError:
157 r = ipaddress.IPv6Network(input)
158 return r
040ff511
IJ
159
160#---------- ipif (SLIP) subprocess ----------
161
a95cfeb2 162class SlipStreamDecoder():
db6ba584 163 def __init__(self, desc, on_packet):
040ff511 164 self._buffer = b''
a95cfeb2 165 self._on_packet = on_packet
db6ba584
IJ
166 self._desc = desc
167 self._log('__init__')
168
169 def _log(self, msg, **kwargs):
3297cac1 170 log_debug(DBG.SLIP_FULL, 'slip %s: %s' % (self._desc, msg), **kwargs)
a95cfeb2
IJ
171
172 def inputdata(self, data):
db6ba584 173 self._log('inputdata', d=data)
7a68893f 174 packets = slip.decode(data)
ba5630fd 175 packets[0] = self._buffer + packets[0]
040ff511
IJ
176 self._buffer = packets.pop()
177 for packet in packets:
a95cfeb2 178 self._maybe_packet(packet)
54890d4d 179 self._log('bufremain', d=self._buffer)
a95cfeb2
IJ
180
181 def _maybe_packet(self, packet):
54890d4d 182 self._log('maybepacket', d=packet)
db6ba584
IJ
183 if len(packet):
184 self._on_packet(packet)
a95cfeb2 185
4f991c0c 186 def flush(self):
54890d4d 187 self._log('flush')
a95cfeb2
IJ
188 self._maybe_packet(self._buffer)
189 self._buffer = b''
4f991c0c 190
e4006ac4 191class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
4f991c0c
IJ
192 def __init__(self, router):
193 self._router = router
db6ba584 194 self._decoder = SlipStreamDecoder('ipif', self.slip_on_packet)
a95cfeb2
IJ
195 def connectionMade(self): pass
196 def outReceived(self, data):
197 self._decoder.inputdata(data)
198 def slip_on_packet(self, packet):
4f991c0c
IJ
199 (saddr, daddr) = packet_addrs(packet)
200 if saddr.is_link_local or daddr.is_link_local:
a8827d59 201 log_discard(packet, 'ipif', saddr, daddr, 'link-local')
4f991c0c
IJ
202 return
203 self._router(packet, saddr, daddr)
040ff511
IJ
204 def processEnded(self, status):
205 status.raiseException()
206
207def start_ipif(command, router):
208 global ipif
209 ipif = _IpifProcessProtocol(router)
210 reactor.spawnProcess(ipif,
211 '/bin/sh',['sh','-xc', command],
ff613365
IJ
212 childFDs={0:'w', 1:'r', 2:2},
213 env=None)
040ff511
IJ
214
215def queue_inbound(packet):
15407d80 216 log_debug(DBG.FLOW, "queue_inbound", d=packet)
040ff511
IJ
217 ipif.transport.write(slip.delimiter)
218 ipif.transport.write(slip.encode(packet))
219 ipif.transport.write(slip.delimiter)
220
650a3251
IJ
221#---------- packet queue ----------
222
223class PacketQueue():
d579a048
IJ
224 def __init__(self, desc, max_queue_time):
225 self._desc = desc
8718b02c 226 assert(desc + '')
650a3251
IJ
227 self._max_queue_time = max_queue_time
228 self._pq = collections.deque() # packets
229
b68c0739 230 def _log(self, dflag, msg, **kwargs):
8c3b6620 231 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
d579a048 232
650a3251 233 def append(self, packet):
8c3b6620 234 self._log(DBG.QUEUE, 'append', d=packet)
650a3251
IJ
235 self._pq.append((time.monotonic(), packet))
236
237 def nonempty(self):
8c3b6620 238 self._log(DBG.QUEUE, 'nonempty ?')
650a3251
IJ
239 while True:
240 try: (queuetime, packet) = self._pq[0]
8c3b6620
IJ
241 except IndexError:
242 self._log(DBG.QUEUE, 'nonempty ? empty.')
243 return False
650a3251
IJ
244
245 age = time.monotonic() - queuetime
84e763c7 246 if age > self._max_queue_time:
650a3251 247 # strip old packets off the front
8c3b6620 248 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
650a3251
IJ
249 self._pq.popleft()
250 continue
251
8c3b6620 252 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
650a3251
IJ
253 return True
254
7b07f0b5
IJ
255 def process(self, sizequery, moredata, max_batch):
256 # sizequery() should return size of batch so far
257 # moredata(s) should add s to batch
8c3b6620 258 self._log(DBG.QUEUE, 'process...')
7b07f0b5
IJ
259 while True:
260 try: (dummy, packet) = self._pq[0]
8c3b6620
IJ
261 except IndexError:
262 self._log(DBG.QUEUE, 'process... empty')
263 break
264
265 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
7b07f0b5
IJ
266
267 encoded = slip.encode(packet)
268 sofar = sizequery()
269
8c3b6620
IJ
270 self._log(DBG.QUEUE_CTRL,
271 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
b68c0739 272 d=encoded)
8c3b6620 273
7b07f0b5
IJ
274 if sofar > 0:
275 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
8c3b6620 276 self._log(DBG.QUEUE_CTRL, 'process... overflow')
7b07f0b5
IJ
277 break
278 moredata(slip.delimiter)
279
280 moredata(encoded)
84e763c7 281 self._pq.popleft()
ae7c7784
IJ
282
283#---------- error handling ----------
284
b68c0739
IJ
285_crashing = False
286
ae7c7784 287def crash(err):
b68c0739
IJ
288 global _crashing
289 _crashing = True
e8ed0029
IJ
290 print('========== CRASH ==========', err,
291 '===========================', file=sys.stderr)
ae7c7784
IJ
292 try: reactor.stop()
293 except twisted.internet.error.ReactorNotRunning: pass
294
295def crash_on_defer(defer):
296 defer.addErrback(lambda err: crash(err))
297
e4006ac4 298def crash_on_critical(event):
ae7c7784
IJ
299 if event.get('log_level') >= LogLevel.critical:
300 crash(twisted.logger.formatEvent(event))
301
87a7c0c7
IJ
302#---------- config processing ----------
303
304def process_cfg_common_always():
305 global mtu
306 c.mtu = cfg.get('virtual','mtu')
307
88487243
IJ
308def process_cfg_ipif(section, varmap):
309 for d, s in varmap:
310 try: v = getattr(c, s)
034284c3 311 except AttributeError: continue
88487243
IJ
312 setattr(c, d, v)
313
b68c0739 314 #print(repr(c))
88487243
IJ
315
316 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
317
318def process_cfg_network():
319 c.network = ipnetwork(cfg.get('virtual','network'))
320 if c.network.num_addresses < 3 + 2:
321 raise ValueError('network needs at least 2^3 addresses')
322
323def process_cfg_server():
324 try:
325 c.server = cfg.get('virtual','server')
326 except NoOptionError:
327 process_cfg_network()
328 c.server = next(c.network.hosts())
329
330class ServerAddr():
331 def __init__(self, port, addrspec):
332 self.port = port
333 # also self.addr
334 try:
335 self.addr = ipaddress.IPv4Address(addrspec)
336 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
84e763c7 337 self._inurl = b'%s'
88487243
IJ
338 except AddressValueError:
339 self.addr = ipaddress.IPv6Address(addrspec)
340 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
84e763c7 341 self._inurl = b'[%s]'
88487243
IJ
342 def make_endpoint(self):
343 return self._endpointfactory(reactor, self.port, self.addr)
344 def url(self):
84e763c7
IJ
345 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
346 if self.port != 80: url += b':%d' % self.port
347 url += b'/'
88487243
IJ
348 return url
349
350def process_cfg_saddrs():
1d023c89
IJ
351 try: port = cfg.getint('server','port')
352 except NoOptionError: port = 80
88487243
IJ
353
354 c.saddrs = [ ]
355 for addrspec in cfg.get('server','addrs').split():
356 sa = ServerAddr(port, addrspec)
357 c.saddrs.append(sa)
358
359def process_cfg_clients(constructor):
360 c.clients = [ ]
361 for cs in cfg.sections():
362 if not (':' in cs or '.' in cs): continue
363 ci = ipaddr(cs)
364 pw = cfg.get(cs, 'password')
84e763c7 365 pw = pw.encode('utf-8')
88487243
IJ
366 constructor(ci,cs,pw)
367
ae7c7784
IJ
368#---------- startup ----------
369
1321ad5f 370def common_startup():
8c3b6620 371 log_formatter = twisted.logger.formatEventAsClassicLogText
389236df
IJ
372 stdout_obs = twisted.logger.FileLogObserver(sys.stdout, log_formatter)
373 stderr_obs = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
374 pred = twisted.logger.LogLevelFilterPredicate(LogLevel.error)
375 log_observer = twisted.logger.FilteringLogObserver(
376 stderr_obs, [pred], stdout_obs
377 )
8c3b6620
IJ
378 twisted.logger.globalLogBeginner.beginLoggingTo(
379 [ log_observer, crash_on_critical ]
380 )
ae7c7784
IJ
381
382 optparser.add_option('-c', '--config', dest='configfile',
383 default='/etc/hippotat/config')
384 (opts, args) = optparser.parse_args()
385 if len(args): optparser.error('no non-option arguments please')
386
1321ad5f
IJ
387 re = regexp.compile('#.*')
388 cfg.read_string(re.sub('', defcfg))
ae7c7784
IJ
389 cfg.read(opts.configfile)
390
391def common_run():
b68c0739
IJ
392 log_debug(DBG.INIT, 'entering reactor')
393 if not _crashing: reactor.run()
ae7c7784 394 print('CRASHED (end)', file=sys.stderr)