wip debug
[hippotat] / hippotat / __init__.py
CommitLineData
b0cfbfce
IJ
1# -*- python -*-
2
37ab4cdc
IJ
3import signal
4signal.signal(signal.SIGINT, signal.SIG_DFL)
5
1321ad5f
IJ
6import sys
7
040ff511
IJ
8import twisted
9from twisted.internet import reactor
1d023c89 10import twisted.internet.endpoints
8c3b6620
IJ
11import twisted.logger
12from twisted.logger import LogLevel
13import twisted.python.constants
14from twisted.python.constants import NamedConstant
b0cfbfce
IJ
15
16import ipaddress
17from ipaddress import AddressValueError
18
ae7c7784
IJ
19from optparse import OptionParser
20from configparser import ConfigParser
21from configparser import NoOptionError
22
c13ee6e6
IJ
23from functools import partial
24
ae7c7784 25import collections
84e763c7 26import time
8c3b6620 27import codecs
eedc8b30 28import traceback
ae7c7784 29
1321ad5f
IJ
30import re as regexp
31
32import hippotat.slip as slip
33
d579a048 34class DBG(twisted.python.constants.Names):
380ed56c 35 INIT = NamedConstant()
d579a048 36 ROUTE = NamedConstant()
b68c0739 37 DROP = NamedConstant()
d579a048
IJ
38 FLOW = NamedConstant()
39 HTTP = NamedConstant()
380ed56c 40 TWISTED = NamedConstant()
d579a048 41 QUEUE = NamedConstant()
380ed56c 42 HTTP_CTRL = NamedConstant()
d579a048 43 QUEUE_CTRL = NamedConstant()
297b3ebf 44 HTTP_FULL = NamedConstant()
0accf0d3 45 CTRL_DUMP = NamedConstant()
380ed56c 46 SLIP_FULL = NamedConstant()
d579a048 47
b68c0739 48_hex_codec = codecs.getencoder('hex_codec')
8c3b6620
IJ
49
50log = twisted.logger.Logger()
51
52def log_debug(dflag, msg, idof=None, d=None):
380ed56c 53 if dflag > DBG.HTTP: return
e8fcf3b7 54 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
8c3b6620 55 if idof is not None:
e8ed0029 56 msg = '[%#x] %s' % (id(idof), msg)
8c3b6620 57 if d is not None:
ba5630fd 58 #d = d[0:64]
b68c0739 59 d = _hex_codec(d)[0].decode('ascii')
8c3b6620
IJ
60 msg += ' ' + d
61 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
62
ca732796
IJ
63defcfg = '''
64[DEFAULT]
65#[<client>] overrides
66max_batch_down = 65536 # used by server, subject to [limits]
67max_queue_time = 10 # used by server, subject to [limits]
ca732796 68target_requests_outstanding = 3 # must match; subject to [limits] on server
ba5630fd
IJ
69http_timeout = 30 # used by both } must be
70http_timeout_grace = 5 # used by both } compatible
ca732796
IJ
71max_requests_outstanding = 4 # used by client
72max_batch_up = 4000 # used by client
4edf77a3 73http_retry = 5 # used by client
ca732796
IJ
74
75#[server] or [<client>] overrides
76ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
77# extra interpolations: %(local)s %(peer)s %(rnet)s
78# obtained on server [virtual]server [virtual]relay [virtual]network
79# from on client <client> [virtual]server [virtual]routes
80
81[virtual]
82mtu = 1500
83routes = ''
84# network = <prefix>/<len> # mandatory for server
85# server = <ipaddr> # used by both, default is computed from `network'
86# relay = <ipaddr> # used by server, default from `network' and `server'
87# default server is first host in network
88# default relay is first host which is not server
89
90[server]
91# addrs = 127.0.0.1 ::1 # mandatory for server
92port = 80 # used by server
93# url # used by client; default from first `addrs' and `port'
94
95# [<client-ip4-or-ipv6-address>]
96# password = <password> # used by both, must match
97
98[limits]
99max_batch_down = 262144 # used by server
100max_queue_time = 121 # used by server
ba5630fd 101http_timeout = 121 # used by server
ca732796
IJ
102target_requests_outstanding = 10 # used by server
103'''
104
87a7c0c7 105# these need to be defined here so that they can be imported by import *
ae7c7784
IJ
106cfg = ConfigParser()
107optparser = OptionParser()
108
e4006ac4 109_mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
7b07f0b5
IJ
110def mime_translate(s):
111 # SLIP-encoded packets cannot contain ESC ESC.
112 # Swap `-' and ESC. The result cannot contain `--'
113 return s.translate(_mimetrans)
114
87a7c0c7
IJ
115class ConfigResults:
116 def __init__(self, d = { }):
117 self.__dict__ = d
118 def __repr__(self):
119 return 'ConfigResults('+repr(self.__dict__)+')'
120
121c = ConfigResults()
122
a8827d59 123def log_discard(packet, iface, saddr, daddr, why):
b68c0739 124 log_debug(DBG.DROP,
a8827d59 125 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
b68c0739 126 d=packet)
1321ad5f 127
b0cfbfce
IJ
128#---------- packet parsing ----------
129
130def packet_addrs(packet):
131 version = packet[0] >> 4
132 if version == 4:
133 addrlen = 4
134 saddroff = 3*4
135 factory = ipaddress.IPv4Address
136 elif version == 6:
137 addrlen = 16
138 saddroff = 2*4
139 factory = ipaddress.IPv6Address
140 else:
141 raise ValueError('unsupported IP version %d' % version)
142 saddr = factory(packet[ saddroff : saddroff + addrlen ])
143 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
144 return (saddr, daddr)
145
146#---------- address handling ----------
147
148def ipaddr(input):
149 try:
150 r = ipaddress.IPv4Address(input)
151 except AddressValueError:
152 r = ipaddress.IPv6Address(input)
153 return r
154
155def ipnetwork(input):
156 try:
157 r = ipaddress.IPv4Network(input)
158 except NetworkValueError:
159 r = ipaddress.IPv6Network(input)
160 return r
040ff511
IJ
161
162#---------- ipif (SLIP) subprocess ----------
163
a95cfeb2 164class SlipStreamDecoder():
db6ba584 165 def __init__(self, desc, on_packet):
040ff511 166 self._buffer = b''
a95cfeb2 167 self._on_packet = on_packet
db6ba584
IJ
168 self._desc = desc
169 self._log('__init__')
170
171 def _log(self, msg, **kwargs):
3297cac1 172 log_debug(DBG.SLIP_FULL, 'slip %s: %s' % (self._desc, msg), **kwargs)
a95cfeb2
IJ
173
174 def inputdata(self, data):
db6ba584 175 self._log('inputdata', d=data)
7a68893f 176 packets = slip.decode(data)
ba5630fd 177 packets[0] = self._buffer + packets[0]
040ff511
IJ
178 self._buffer = packets.pop()
179 for packet in packets:
a95cfeb2 180 self._maybe_packet(packet)
54890d4d 181 self._log('bufremain', d=self._buffer)
a95cfeb2
IJ
182
183 def _maybe_packet(self, packet):
54890d4d 184 self._log('maybepacket', d=packet)
db6ba584
IJ
185 if len(packet):
186 self._on_packet(packet)
a95cfeb2 187
4f991c0c 188 def flush(self):
54890d4d 189 self._log('flush')
a95cfeb2
IJ
190 self._maybe_packet(self._buffer)
191 self._buffer = b''
4f991c0c 192
e4006ac4 193class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
4f991c0c
IJ
194 def __init__(self, router):
195 self._router = router
db6ba584 196 self._decoder = SlipStreamDecoder('ipif', self.slip_on_packet)
a95cfeb2
IJ
197 def connectionMade(self): pass
198 def outReceived(self, data):
199 self._decoder.inputdata(data)
200 def slip_on_packet(self, packet):
4f991c0c
IJ
201 (saddr, daddr) = packet_addrs(packet)
202 if saddr.is_link_local or daddr.is_link_local:
a8827d59 203 log_discard(packet, 'ipif', saddr, daddr, 'link-local')
4f991c0c
IJ
204 return
205 self._router(packet, saddr, daddr)
040ff511
IJ
206 def processEnded(self, status):
207 status.raiseException()
208
209def start_ipif(command, router):
210 global ipif
211 ipif = _IpifProcessProtocol(router)
212 reactor.spawnProcess(ipif,
213 '/bin/sh',['sh','-xc', command],
ff613365
IJ
214 childFDs={0:'w', 1:'r', 2:2},
215 env=None)
040ff511
IJ
216
217def queue_inbound(packet):
15407d80 218 log_debug(DBG.FLOW, "queue_inbound", d=packet)
040ff511
IJ
219 ipif.transport.write(slip.delimiter)
220 ipif.transport.write(slip.encode(packet))
221 ipif.transport.write(slip.delimiter)
222
650a3251
IJ
223#---------- packet queue ----------
224
225class PacketQueue():
d579a048
IJ
226 def __init__(self, desc, max_queue_time):
227 self._desc = desc
8718b02c 228 assert(desc + '')
650a3251
IJ
229 self._max_queue_time = max_queue_time
230 self._pq = collections.deque() # packets
231
b68c0739 232 def _log(self, dflag, msg, **kwargs):
8c3b6620 233 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
d579a048 234
650a3251 235 def append(self, packet):
8c3b6620 236 self._log(DBG.QUEUE, 'append', d=packet)
650a3251
IJ
237 self._pq.append((time.monotonic(), packet))
238
239 def nonempty(self):
8c3b6620 240 self._log(DBG.QUEUE, 'nonempty ?')
650a3251
IJ
241 while True:
242 try: (queuetime, packet) = self._pq[0]
8c3b6620
IJ
243 except IndexError:
244 self._log(DBG.QUEUE, 'nonempty ? empty.')
245 return False
650a3251
IJ
246
247 age = time.monotonic() - queuetime
84e763c7 248 if age > self._max_queue_time:
650a3251 249 # strip old packets off the front
8c3b6620 250 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
650a3251
IJ
251 self._pq.popleft()
252 continue
253
8c3b6620 254 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
650a3251
IJ
255 return True
256
7b07f0b5
IJ
257 def process(self, sizequery, moredata, max_batch):
258 # sizequery() should return size of batch so far
259 # moredata(s) should add s to batch
8c3b6620 260 self._log(DBG.QUEUE, 'process...')
7b07f0b5
IJ
261 while True:
262 try: (dummy, packet) = self._pq[0]
8c3b6620
IJ
263 except IndexError:
264 self._log(DBG.QUEUE, 'process... empty')
265 break
266
267 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
7b07f0b5
IJ
268
269 encoded = slip.encode(packet)
270 sofar = sizequery()
271
8c3b6620
IJ
272 self._log(DBG.QUEUE_CTRL,
273 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
b68c0739 274 d=encoded)
8c3b6620 275
7b07f0b5
IJ
276 if sofar > 0:
277 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
8c3b6620 278 self._log(DBG.QUEUE_CTRL, 'process... overflow')
7b07f0b5
IJ
279 break
280 moredata(slip.delimiter)
281
282 moredata(encoded)
84e763c7 283 self._pq.popleft()
ae7c7784
IJ
284
285#---------- error handling ----------
286
b68c0739
IJ
287_crashing = False
288
ae7c7784 289def crash(err):
b68c0739
IJ
290 global _crashing
291 _crashing = True
e8ed0029
IJ
292 print('========== CRASH ==========', err,
293 '===========================', file=sys.stderr)
ae7c7784
IJ
294 try: reactor.stop()
295 except twisted.internet.error.ReactorNotRunning: pass
296
297def crash_on_defer(defer):
298 defer.addErrback(lambda err: crash(err))
299
e4006ac4 300def crash_on_critical(event):
ae7c7784
IJ
301 if event.get('log_level') >= LogLevel.critical:
302 crash(twisted.logger.formatEvent(event))
303
87a7c0c7
IJ
304#---------- config processing ----------
305
306def process_cfg_common_always():
307 global mtu
308 c.mtu = cfg.get('virtual','mtu')
309
88487243
IJ
310def process_cfg_ipif(section, varmap):
311 for d, s in varmap:
312 try: v = getattr(c, s)
034284c3 313 except AttributeError: continue
88487243
IJ
314 setattr(c, d, v)
315
b68c0739 316 #print(repr(c))
88487243
IJ
317
318 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
319
320def process_cfg_network():
321 c.network = ipnetwork(cfg.get('virtual','network'))
322 if c.network.num_addresses < 3 + 2:
323 raise ValueError('network needs at least 2^3 addresses')
324
325def process_cfg_server():
326 try:
327 c.server = cfg.get('virtual','server')
328 except NoOptionError:
329 process_cfg_network()
330 c.server = next(c.network.hosts())
331
332class ServerAddr():
333 def __init__(self, port, addrspec):
334 self.port = port
335 # also self.addr
336 try:
337 self.addr = ipaddress.IPv4Address(addrspec)
338 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
84e763c7 339 self._inurl = b'%s'
88487243
IJ
340 except AddressValueError:
341 self.addr = ipaddress.IPv6Address(addrspec)
342 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
84e763c7 343 self._inurl = b'[%s]'
88487243
IJ
344 def make_endpoint(self):
345 return self._endpointfactory(reactor, self.port, self.addr)
346 def url(self):
84e763c7
IJ
347 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
348 if self.port != 80: url += b':%d' % self.port
349 url += b'/'
88487243
IJ
350 return url
351
352def process_cfg_saddrs():
1d023c89
IJ
353 try: port = cfg.getint('server','port')
354 except NoOptionError: port = 80
88487243
IJ
355
356 c.saddrs = [ ]
357 for addrspec in cfg.get('server','addrs').split():
358 sa = ServerAddr(port, addrspec)
359 c.saddrs.append(sa)
360
361def process_cfg_clients(constructor):
362 c.clients = [ ]
363 for cs in cfg.sections():
364 if not (':' in cs or '.' in cs): continue
365 ci = ipaddr(cs)
366 pw = cfg.get(cs, 'password')
84e763c7 367 pw = pw.encode('utf-8')
88487243
IJ
368 constructor(ci,cs,pw)
369
ae7c7784
IJ
370#---------- startup ----------
371
1321ad5f 372def common_startup():
8c3b6620 373 log_formatter = twisted.logger.formatEventAsClassicLogText
389236df
IJ
374 stdout_obs = twisted.logger.FileLogObserver(sys.stdout, log_formatter)
375 stderr_obs = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
376 pred = twisted.logger.LogLevelFilterPredicate(LogLevel.error)
377 log_observer = twisted.logger.FilteringLogObserver(
378 stderr_obs, [pred], stdout_obs
379 )
8c3b6620
IJ
380 twisted.logger.globalLogBeginner.beginLoggingTo(
381 [ log_observer, crash_on_critical ]
382 )
ae7c7784
IJ
383
384 optparser.add_option('-c', '--config', dest='configfile',
385 default='/etc/hippotat/config')
386 (opts, args) = optparser.parse_args()
387 if len(args): optparser.error('no non-option arguments please')
388
1321ad5f
IJ
389 re = regexp.compile('#.*')
390 cfg.read_string(re.sub('', defcfg))
ae7c7784
IJ
391 cfg.read(opts.configfile)
392
393def common_run():
b68c0739
IJ
394 log_debug(DBG.INIT, 'entering reactor')
395 if not _crashing: reactor.run()
ae7c7784 396 print('CRASHED (end)', file=sys.stderr)