init debug
[hippotat] / hippotat / __init__.py
CommitLineData
b0cfbfce
IJ
1# -*- python -*-
2
37ab4cdc
IJ
3import signal
4signal.signal(signal.SIGINT, signal.SIG_DFL)
5
1321ad5f
IJ
6import sys
7
040ff511
IJ
8import twisted
9from twisted.internet import reactor
1d023c89 10import twisted.internet.endpoints
8c3b6620
IJ
11import twisted.logger
12from twisted.logger import LogLevel
13import twisted.python.constants
14from twisted.python.constants import NamedConstant
b0cfbfce
IJ
15
16import ipaddress
17from ipaddress import AddressValueError
18
ae7c7784
IJ
19from optparse import OptionParser
20from configparser import ConfigParser
21from configparser import NoOptionError
22
c13ee6e6
IJ
23from functools import partial
24
ae7c7784 25import collections
84e763c7 26import time
8c3b6620 27import codecs
eedc8b30 28import traceback
ae7c7784 29
1321ad5f
IJ
30import re as regexp
31
32import hippotat.slip as slip
33
d579a048 34class DBG(twisted.python.constants.Names):
380ed56c 35 INIT = NamedConstant()
d579a048 36 ROUTE = NamedConstant()
b68c0739 37 DROP = NamedConstant()
d579a048
IJ
38 FLOW = NamedConstant()
39 HTTP = NamedConstant()
380ed56c 40 TWISTED = NamedConstant()
d579a048 41 QUEUE = NamedConstant()
380ed56c 42 HTTP_CTRL = NamedConstant()
d579a048 43 QUEUE_CTRL = NamedConstant()
297b3ebf 44 HTTP_FULL = NamedConstant()
0accf0d3 45 CTRL_DUMP = NamedConstant()
380ed56c 46 SLIP_FULL = NamedConstant()
d579a048 47
b68c0739 48_hex_codec = codecs.getencoder('hex_codec')
8c3b6620
IJ
49
50log = twisted.logger.Logger()
51
3e35fc99
IJ
52debug_set = set([x for x in DBG.iterconstants() if x <= DBG.HTTP])
53
8c3b6620 54def log_debug(dflag, msg, idof=None, d=None):
3e35fc99 55 if dflag not in debug_set: return
e8fcf3b7 56 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
8c3b6620 57 if idof is not None:
e8ed0029 58 msg = '[%#x] %s' % (id(idof), msg)
8c3b6620 59 if d is not None:
ba5630fd 60 #d = d[0:64]
b68c0739 61 d = _hex_codec(d)[0].decode('ascii')
8c3b6620
IJ
62 msg += ' ' + d
63 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
64
ca732796
IJ
65defcfg = '''
66[DEFAULT]
67#[<client>] overrides
68max_batch_down = 65536 # used by server, subject to [limits]
69max_queue_time = 10 # used by server, subject to [limits]
ca732796 70target_requests_outstanding = 3 # must match; subject to [limits] on server
ba5630fd
IJ
71http_timeout = 30 # used by both } must be
72http_timeout_grace = 5 # used by both } compatible
ca732796
IJ
73max_requests_outstanding = 4 # used by client
74max_batch_up = 4000 # used by client
4edf77a3 75http_retry = 5 # used by client
ca732796
IJ
76
77#[server] or [<client>] overrides
78ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
79# extra interpolations: %(local)s %(peer)s %(rnet)s
80# obtained on server [virtual]server [virtual]relay [virtual]network
81# from on client <client> [virtual]server [virtual]routes
82
83[virtual]
84mtu = 1500
85routes = ''
86# network = <prefix>/<len> # mandatory for server
87# server = <ipaddr> # used by both, default is computed from `network'
88# relay = <ipaddr> # used by server, default from `network' and `server'
89# default server is first host in network
90# default relay is first host which is not server
91
92[server]
93# addrs = 127.0.0.1 ::1 # mandatory for server
94port = 80 # used by server
95# url # used by client; default from first `addrs' and `port'
96
97# [<client-ip4-or-ipv6-address>]
98# password = <password> # used by both, must match
99
100[limits]
101max_batch_down = 262144 # used by server
102max_queue_time = 121 # used by server
ba5630fd 103http_timeout = 121 # used by server
ca732796
IJ
104target_requests_outstanding = 10 # used by server
105'''
106
87a7c0c7 107# these need to be defined here so that they can be imported by import *
ae7c7784
IJ
108cfg = ConfigParser()
109optparser = OptionParser()
110
e4006ac4 111_mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
7b07f0b5
IJ
112def mime_translate(s):
113 # SLIP-encoded packets cannot contain ESC ESC.
114 # Swap `-' and ESC. The result cannot contain `--'
115 return s.translate(_mimetrans)
116
87a7c0c7
IJ
117class ConfigResults:
118 def __init__(self, d = { }):
119 self.__dict__ = d
120 def __repr__(self):
121 return 'ConfigResults('+repr(self.__dict__)+')'
122
123c = ConfigResults()
124
a8827d59 125def log_discard(packet, iface, saddr, daddr, why):
b68c0739 126 log_debug(DBG.DROP,
a8827d59 127 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
b68c0739 128 d=packet)
1321ad5f 129
b0cfbfce
IJ
130#---------- packet parsing ----------
131
132def packet_addrs(packet):
133 version = packet[0] >> 4
134 if version == 4:
135 addrlen = 4
136 saddroff = 3*4
137 factory = ipaddress.IPv4Address
138 elif version == 6:
139 addrlen = 16
140 saddroff = 2*4
141 factory = ipaddress.IPv6Address
142 else:
143 raise ValueError('unsupported IP version %d' % version)
144 saddr = factory(packet[ saddroff : saddroff + addrlen ])
145 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
146 return (saddr, daddr)
147
148#---------- address handling ----------
149
150def ipaddr(input):
151 try:
152 r = ipaddress.IPv4Address(input)
153 except AddressValueError:
154 r = ipaddress.IPv6Address(input)
155 return r
156
157def ipnetwork(input):
158 try:
159 r = ipaddress.IPv4Network(input)
160 except NetworkValueError:
161 r = ipaddress.IPv6Network(input)
162 return r
040ff511
IJ
163
164#---------- ipif (SLIP) subprocess ----------
165
a95cfeb2 166class SlipStreamDecoder():
db6ba584 167 def __init__(self, desc, on_packet):
040ff511 168 self._buffer = b''
a95cfeb2 169 self._on_packet = on_packet
db6ba584
IJ
170 self._desc = desc
171 self._log('__init__')
172
173 def _log(self, msg, **kwargs):
3297cac1 174 log_debug(DBG.SLIP_FULL, 'slip %s: %s' % (self._desc, msg), **kwargs)
a95cfeb2
IJ
175
176 def inputdata(self, data):
db6ba584 177 self._log('inputdata', d=data)
7a68893f 178 packets = slip.decode(data)
ba5630fd 179 packets[0] = self._buffer + packets[0]
040ff511
IJ
180 self._buffer = packets.pop()
181 for packet in packets:
a95cfeb2 182 self._maybe_packet(packet)
54890d4d 183 self._log('bufremain', d=self._buffer)
a95cfeb2
IJ
184
185 def _maybe_packet(self, packet):
54890d4d 186 self._log('maybepacket', d=packet)
db6ba584
IJ
187 if len(packet):
188 self._on_packet(packet)
a95cfeb2 189
4f991c0c 190 def flush(self):
54890d4d 191 self._log('flush')
a95cfeb2
IJ
192 self._maybe_packet(self._buffer)
193 self._buffer = b''
4f991c0c 194
e4006ac4 195class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
4f991c0c
IJ
196 def __init__(self, router):
197 self._router = router
db6ba584 198 self._decoder = SlipStreamDecoder('ipif', self.slip_on_packet)
a95cfeb2
IJ
199 def connectionMade(self): pass
200 def outReceived(self, data):
201 self._decoder.inputdata(data)
202 def slip_on_packet(self, packet):
4f991c0c
IJ
203 (saddr, daddr) = packet_addrs(packet)
204 if saddr.is_link_local or daddr.is_link_local:
a8827d59 205 log_discard(packet, 'ipif', saddr, daddr, 'link-local')
4f991c0c
IJ
206 return
207 self._router(packet, saddr, daddr)
040ff511
IJ
208 def processEnded(self, status):
209 status.raiseException()
210
211def start_ipif(command, router):
212 global ipif
213 ipif = _IpifProcessProtocol(router)
214 reactor.spawnProcess(ipif,
215 '/bin/sh',['sh','-xc', command],
ff613365
IJ
216 childFDs={0:'w', 1:'r', 2:2},
217 env=None)
040ff511
IJ
218
219def queue_inbound(packet):
15407d80 220 log_debug(DBG.FLOW, "queue_inbound", d=packet)
040ff511
IJ
221 ipif.transport.write(slip.delimiter)
222 ipif.transport.write(slip.encode(packet))
223 ipif.transport.write(slip.delimiter)
224
650a3251
IJ
225#---------- packet queue ----------
226
227class PacketQueue():
d579a048
IJ
228 def __init__(self, desc, max_queue_time):
229 self._desc = desc
8718b02c 230 assert(desc + '')
650a3251
IJ
231 self._max_queue_time = max_queue_time
232 self._pq = collections.deque() # packets
233
b68c0739 234 def _log(self, dflag, msg, **kwargs):
8c3b6620 235 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
d579a048 236
650a3251 237 def append(self, packet):
8c3b6620 238 self._log(DBG.QUEUE, 'append', d=packet)
650a3251
IJ
239 self._pq.append((time.monotonic(), packet))
240
241 def nonempty(self):
8c3b6620 242 self._log(DBG.QUEUE, 'nonempty ?')
650a3251
IJ
243 while True:
244 try: (queuetime, packet) = self._pq[0]
8c3b6620
IJ
245 except IndexError:
246 self._log(DBG.QUEUE, 'nonempty ? empty.')
247 return False
650a3251
IJ
248
249 age = time.monotonic() - queuetime
84e763c7 250 if age > self._max_queue_time:
650a3251 251 # strip old packets off the front
8c3b6620 252 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
650a3251
IJ
253 self._pq.popleft()
254 continue
255
8c3b6620 256 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
650a3251
IJ
257 return True
258
7b07f0b5
IJ
259 def process(self, sizequery, moredata, max_batch):
260 # sizequery() should return size of batch so far
261 # moredata(s) should add s to batch
8c3b6620 262 self._log(DBG.QUEUE, 'process...')
7b07f0b5
IJ
263 while True:
264 try: (dummy, packet) = self._pq[0]
8c3b6620
IJ
265 except IndexError:
266 self._log(DBG.QUEUE, 'process... empty')
267 break
268
269 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
7b07f0b5
IJ
270
271 encoded = slip.encode(packet)
272 sofar = sizequery()
273
8c3b6620
IJ
274 self._log(DBG.QUEUE_CTRL,
275 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
b68c0739 276 d=encoded)
8c3b6620 277
7b07f0b5
IJ
278 if sofar > 0:
279 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
8c3b6620 280 self._log(DBG.QUEUE_CTRL, 'process... overflow')
7b07f0b5
IJ
281 break
282 moredata(slip.delimiter)
283
284 moredata(encoded)
84e763c7 285 self._pq.popleft()
ae7c7784
IJ
286
287#---------- error handling ----------
288
b68c0739
IJ
289_crashing = False
290
ae7c7784 291def crash(err):
b68c0739
IJ
292 global _crashing
293 _crashing = True
e8ed0029
IJ
294 print('========== CRASH ==========', err,
295 '===========================', file=sys.stderr)
ae7c7784
IJ
296 try: reactor.stop()
297 except twisted.internet.error.ReactorNotRunning: pass
298
299def crash_on_defer(defer):
300 defer.addErrback(lambda err: crash(err))
301
e4006ac4 302def crash_on_critical(event):
ae7c7784
IJ
303 if event.get('log_level') >= LogLevel.critical:
304 crash(twisted.logger.formatEvent(event))
305
87a7c0c7
IJ
306#---------- config processing ----------
307
308def process_cfg_common_always():
309 global mtu
310 c.mtu = cfg.get('virtual','mtu')
311
88487243
IJ
312def process_cfg_ipif(section, varmap):
313 for d, s in varmap:
314 try: v = getattr(c, s)
034284c3 315 except AttributeError: continue
88487243
IJ
316 setattr(c, d, v)
317
b68c0739 318 #print(repr(c))
88487243
IJ
319
320 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
321
322def process_cfg_network():
323 c.network = ipnetwork(cfg.get('virtual','network'))
324 if c.network.num_addresses < 3 + 2:
325 raise ValueError('network needs at least 2^3 addresses')
326
327def process_cfg_server():
328 try:
329 c.server = cfg.get('virtual','server')
330 except NoOptionError:
331 process_cfg_network()
332 c.server = next(c.network.hosts())
333
334class ServerAddr():
335 def __init__(self, port, addrspec):
336 self.port = port
337 # also self.addr
338 try:
339 self.addr = ipaddress.IPv4Address(addrspec)
340 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
84e763c7 341 self._inurl = b'%s'
88487243
IJ
342 except AddressValueError:
343 self.addr = ipaddress.IPv6Address(addrspec)
344 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
84e763c7 345 self._inurl = b'[%s]'
88487243
IJ
346 def make_endpoint(self):
347 return self._endpointfactory(reactor, self.port, self.addr)
348 def url(self):
84e763c7
IJ
349 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
350 if self.port != 80: url += b':%d' % self.port
351 url += b'/'
88487243
IJ
352 return url
353
354def process_cfg_saddrs():
1d023c89
IJ
355 try: port = cfg.getint('server','port')
356 except NoOptionError: port = 80
88487243
IJ
357
358 c.saddrs = [ ]
359 for addrspec in cfg.get('server','addrs').split():
360 sa = ServerAddr(port, addrspec)
361 c.saddrs.append(sa)
362
363def process_cfg_clients(constructor):
364 c.clients = [ ]
365 for cs in cfg.sections():
366 if not (':' in cs or '.' in cs): continue
367 ci = ipaddr(cs)
368 pw = cfg.get(cs, 'password')
84e763c7 369 pw = pw.encode('utf-8')
88487243
IJ
370 constructor(ci,cs,pw)
371
ae7c7784
IJ
372#---------- startup ----------
373
1321ad5f 374def common_startup():
8c3b6620 375 log_formatter = twisted.logger.formatEventAsClassicLogText
389236df
IJ
376 stdout_obs = twisted.logger.FileLogObserver(sys.stdout, log_formatter)
377 stderr_obs = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
378 pred = twisted.logger.LogLevelFilterPredicate(LogLevel.error)
379 log_observer = twisted.logger.FilteringLogObserver(
380 stderr_obs, [pred], stdout_obs
381 )
8c3b6620
IJ
382 twisted.logger.globalLogBeginner.beginLoggingTo(
383 [ log_observer, crash_on_critical ]
384 )
ae7c7784
IJ
385
386 optparser.add_option('-c', '--config', dest='configfile',
387 default='/etc/hippotat/config')
388 (opts, args) = optparser.parse_args()
389 if len(args): optparser.error('no non-option arguments please')
390
1321ad5f
IJ
391 re = regexp.compile('#.*')
392 cfg.read_string(re.sub('', defcfg))
ae7c7784
IJ
393 cfg.read(opts.configfile)
394
395def common_run():
b68c0739
IJ
396 log_debug(DBG.INIT, 'entering reactor')
397 if not _crashing: reactor.run()
ae7c7784 398 print('CRASHED (end)', file=sys.stderr)