it pings!
[hippotat] / hippotat / __init__.py
CommitLineData
b0cfbfce
IJ
1# -*- python -*-
2
37ab4cdc
IJ
3import signal
4signal.signal(signal.SIGINT, signal.SIG_DFL)
5
1321ad5f
IJ
6import sys
7
040ff511
IJ
8import twisted
9from twisted.internet import reactor
1d023c89 10import twisted.internet.endpoints
8c3b6620
IJ
11import twisted.logger
12from twisted.logger import LogLevel
13import twisted.python.constants
14from twisted.python.constants import NamedConstant
b0cfbfce
IJ
15
16import ipaddress
17from ipaddress import AddressValueError
18
ae7c7784
IJ
19from optparse import OptionParser
20from configparser import ConfigParser
21from configparser import NoOptionError
22
c13ee6e6
IJ
23from functools import partial
24
ae7c7784 25import collections
84e763c7 26import time
8c3b6620 27import codecs
eedc8b30 28import traceback
ae7c7784 29
1321ad5f
IJ
30import re as regexp
31
32import hippotat.slip as slip
33
d579a048
IJ
34class DBG(twisted.python.constants.Names):
35 ROUTE = NamedConstant()
b68c0739 36 DROP = NamedConstant()
d579a048
IJ
37 FLOW = NamedConstant()
38 HTTP = NamedConstant()
39 HTTP_CTRL = NamedConstant()
40 INIT = NamedConstant()
41 QUEUE = NamedConstant()
42 QUEUE_CTRL = NamedConstant()
297b3ebf 43 HTTP_FULL = NamedConstant()
db6ba584 44 SLIP_FULL = NamedConstant()
0accf0d3 45 CTRL_DUMP = NamedConstant()
d579a048 46
b68c0739 47_hex_codec = codecs.getencoder('hex_codec')
8c3b6620
IJ
48
49log = twisted.logger.Logger()
50
51def log_debug(dflag, msg, idof=None, d=None):
e8fcf3b7 52 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
8c3b6620 53 if idof is not None:
e8ed0029 54 msg = '[%#x] %s' % (id(idof), msg)
8c3b6620 55 if d is not None:
ca386ea2 56 d = d[0:64]
b68c0739 57 d = _hex_codec(d)[0].decode('ascii')
8c3b6620
IJ
58 msg += ' ' + d
59 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
60
ca732796
IJ
61defcfg = '''
62[DEFAULT]
63#[<client>] overrides
64max_batch_down = 65536 # used by server, subject to [limits]
65max_queue_time = 10 # used by server, subject to [limits]
66max_request_time = 54 # used by server, subject to [limits]
67target_requests_outstanding = 3 # must match; subject to [limits] on server
68max_requests_outstanding = 4 # used by client
69max_batch_up = 4000 # used by client
7b07f0b5 70http_timeout = 30 # used by client
4edf77a3 71http_retry = 5 # used by client
ca732796
IJ
72
73#[server] or [<client>] overrides
74ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
75# extra interpolations: %(local)s %(peer)s %(rnet)s
76# obtained on server [virtual]server [virtual]relay [virtual]network
77# from on client <client> [virtual]server [virtual]routes
78
79[virtual]
80mtu = 1500
81routes = ''
82# network = <prefix>/<len> # mandatory for server
83# server = <ipaddr> # used by both, default is computed from `network'
84# relay = <ipaddr> # used by server, default from `network' and `server'
85# default server is first host in network
86# default relay is first host which is not server
87
88[server]
89# addrs = 127.0.0.1 ::1 # mandatory for server
90port = 80 # used by server
91# url # used by client; default from first `addrs' and `port'
92
93# [<client-ip4-or-ipv6-address>]
94# password = <password> # used by both, must match
95
96[limits]
97max_batch_down = 262144 # used by server
98max_queue_time = 121 # used by server
99max_request_time = 121 # used by server
100target_requests_outstanding = 10 # used by server
101'''
102
87a7c0c7 103# these need to be defined here so that they can be imported by import *
ae7c7784
IJ
104cfg = ConfigParser()
105optparser = OptionParser()
106
e4006ac4 107_mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
7b07f0b5
IJ
108def mime_translate(s):
109 # SLIP-encoded packets cannot contain ESC ESC.
110 # Swap `-' and ESC. The result cannot contain `--'
111 return s.translate(_mimetrans)
112
87a7c0c7
IJ
113class ConfigResults:
114 def __init__(self, d = { }):
115 self.__dict__ = d
116 def __repr__(self):
117 return 'ConfigResults('+repr(self.__dict__)+')'
118
119c = ConfigResults()
120
a8827d59 121def log_discard(packet, iface, saddr, daddr, why):
b68c0739 122 log_debug(DBG.DROP,
a8827d59 123 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
b68c0739 124 d=packet)
1321ad5f 125
b0cfbfce
IJ
126#---------- packet parsing ----------
127
128def packet_addrs(packet):
129 version = packet[0] >> 4
130 if version == 4:
131 addrlen = 4
132 saddroff = 3*4
133 factory = ipaddress.IPv4Address
134 elif version == 6:
135 addrlen = 16
136 saddroff = 2*4
137 factory = ipaddress.IPv6Address
138 else:
139 raise ValueError('unsupported IP version %d' % version)
140 saddr = factory(packet[ saddroff : saddroff + addrlen ])
141 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
142 return (saddr, daddr)
143
144#---------- address handling ----------
145
146def ipaddr(input):
147 try:
148 r = ipaddress.IPv4Address(input)
149 except AddressValueError:
150 r = ipaddress.IPv6Address(input)
151 return r
152
153def ipnetwork(input):
154 try:
155 r = ipaddress.IPv4Network(input)
156 except NetworkValueError:
157 r = ipaddress.IPv6Network(input)
158 return r
040ff511
IJ
159
160#---------- ipif (SLIP) subprocess ----------
161
a95cfeb2 162class SlipStreamDecoder():
db6ba584 163 def __init__(self, desc, on_packet):
040ff511 164 self._buffer = b''
a95cfeb2 165 self._on_packet = on_packet
db6ba584
IJ
166 self._desc = desc
167 self._log('__init__')
168
169 def _log(self, msg, **kwargs):
3297cac1 170 log_debug(DBG.SLIP_FULL, 'slip %s: %s' % (self._desc, msg), **kwargs)
a95cfeb2
IJ
171
172 def inputdata(self, data):
db6ba584 173 self._log('inputdata', d=data)
7a68893f
IJ
174 data = self._buffer + data
175 self._buffer = b''
176 packets = slip.decode(data)
040ff511
IJ
177 self._buffer = packets.pop()
178 for packet in packets:
a95cfeb2 179 self._maybe_packet(packet)
54890d4d 180 self._log('bufremain', d=self._buffer)
a95cfeb2
IJ
181
182 def _maybe_packet(self, packet):
54890d4d 183 self._log('maybepacket', d=packet)
db6ba584
IJ
184 if len(packet):
185 self._on_packet(packet)
a95cfeb2 186
4f991c0c 187 def flush(self):
54890d4d 188 self._log('flush')
a95cfeb2
IJ
189 self._maybe_packet(self._buffer)
190 self._buffer = b''
4f991c0c 191
e4006ac4 192class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
4f991c0c
IJ
193 def __init__(self, router):
194 self._router = router
db6ba584 195 self._decoder = SlipStreamDecoder('ipif', self.slip_on_packet)
a95cfeb2
IJ
196 def connectionMade(self): pass
197 def outReceived(self, data):
198 self._decoder.inputdata(data)
199 def slip_on_packet(self, packet):
4f991c0c
IJ
200 (saddr, daddr) = packet_addrs(packet)
201 if saddr.is_link_local or daddr.is_link_local:
a8827d59 202 log_discard(packet, 'ipif', saddr, daddr, 'link-local')
4f991c0c
IJ
203 return
204 self._router(packet, saddr, daddr)
040ff511
IJ
205 def processEnded(self, status):
206 status.raiseException()
207
208def start_ipif(command, router):
209 global ipif
210 ipif = _IpifProcessProtocol(router)
211 reactor.spawnProcess(ipif,
212 '/bin/sh',['sh','-xc', command],
ff613365
IJ
213 childFDs={0:'w', 1:'r', 2:2},
214 env=None)
040ff511
IJ
215
216def queue_inbound(packet):
15407d80 217 log_debug(DBG.FLOW, "queue_inbound", d=packet)
040ff511
IJ
218 ipif.transport.write(slip.delimiter)
219 ipif.transport.write(slip.encode(packet))
220 ipif.transport.write(slip.delimiter)
221
650a3251
IJ
222#---------- packet queue ----------
223
224class PacketQueue():
d579a048
IJ
225 def __init__(self, desc, max_queue_time):
226 self._desc = desc
8718b02c 227 assert(desc + '')
650a3251
IJ
228 self._max_queue_time = max_queue_time
229 self._pq = collections.deque() # packets
230
b68c0739 231 def _log(self, dflag, msg, **kwargs):
8c3b6620 232 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
d579a048 233
650a3251 234 def append(self, packet):
8c3b6620 235 self._log(DBG.QUEUE, 'append', d=packet)
650a3251
IJ
236 self._pq.append((time.monotonic(), packet))
237
238 def nonempty(self):
8c3b6620 239 self._log(DBG.QUEUE, 'nonempty ?')
650a3251
IJ
240 while True:
241 try: (queuetime, packet) = self._pq[0]
8c3b6620
IJ
242 except IndexError:
243 self._log(DBG.QUEUE, 'nonempty ? empty.')
244 return False
650a3251
IJ
245
246 age = time.monotonic() - queuetime
84e763c7 247 if age > self._max_queue_time:
650a3251 248 # strip old packets off the front
8c3b6620 249 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
650a3251
IJ
250 self._pq.popleft()
251 continue
252
8c3b6620 253 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
650a3251
IJ
254 return True
255
7b07f0b5
IJ
256 def process(self, sizequery, moredata, max_batch):
257 # sizequery() should return size of batch so far
258 # moredata(s) should add s to batch
8c3b6620 259 self._log(DBG.QUEUE, 'process...')
7b07f0b5
IJ
260 while True:
261 try: (dummy, packet) = self._pq[0]
8c3b6620
IJ
262 except IndexError:
263 self._log(DBG.QUEUE, 'process... empty')
264 break
265
266 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
7b07f0b5
IJ
267
268 encoded = slip.encode(packet)
269 sofar = sizequery()
270
8c3b6620
IJ
271 self._log(DBG.QUEUE_CTRL,
272 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
b68c0739 273 d=encoded)
8c3b6620 274
7b07f0b5
IJ
275 if sofar > 0:
276 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
8c3b6620 277 self._log(DBG.QUEUE_CTRL, 'process... overflow')
7b07f0b5
IJ
278 break
279 moredata(slip.delimiter)
280
281 moredata(encoded)
84e763c7 282 self._pq.popleft()
ae7c7784
IJ
283
284#---------- error handling ----------
285
b68c0739
IJ
286_crashing = False
287
ae7c7784 288def crash(err):
b68c0739
IJ
289 global _crashing
290 _crashing = True
e8ed0029
IJ
291 print('========== CRASH ==========', err,
292 '===========================', file=sys.stderr)
ae7c7784
IJ
293 try: reactor.stop()
294 except twisted.internet.error.ReactorNotRunning: pass
295
296def crash_on_defer(defer):
297 defer.addErrback(lambda err: crash(err))
298
e4006ac4 299def crash_on_critical(event):
ae7c7784
IJ
300 if event.get('log_level') >= LogLevel.critical:
301 crash(twisted.logger.formatEvent(event))
302
87a7c0c7
IJ
303#---------- config processing ----------
304
305def process_cfg_common_always():
306 global mtu
307 c.mtu = cfg.get('virtual','mtu')
308
88487243
IJ
309def process_cfg_ipif(section, varmap):
310 for d, s in varmap:
311 try: v = getattr(c, s)
034284c3 312 except AttributeError: continue
88487243
IJ
313 setattr(c, d, v)
314
b68c0739 315 #print(repr(c))
88487243
IJ
316
317 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
318
319def process_cfg_network():
320 c.network = ipnetwork(cfg.get('virtual','network'))
321 if c.network.num_addresses < 3 + 2:
322 raise ValueError('network needs at least 2^3 addresses')
323
324def process_cfg_server():
325 try:
326 c.server = cfg.get('virtual','server')
327 except NoOptionError:
328 process_cfg_network()
329 c.server = next(c.network.hosts())
330
331class ServerAddr():
332 def __init__(self, port, addrspec):
333 self.port = port
334 # also self.addr
335 try:
336 self.addr = ipaddress.IPv4Address(addrspec)
337 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
84e763c7 338 self._inurl = b'%s'
88487243
IJ
339 except AddressValueError:
340 self.addr = ipaddress.IPv6Address(addrspec)
341 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
84e763c7 342 self._inurl = b'[%s]'
88487243
IJ
343 def make_endpoint(self):
344 return self._endpointfactory(reactor, self.port, self.addr)
345 def url(self):
84e763c7
IJ
346 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
347 if self.port != 80: url += b':%d' % self.port
348 url += b'/'
88487243
IJ
349 return url
350
351def process_cfg_saddrs():
1d023c89
IJ
352 try: port = cfg.getint('server','port')
353 except NoOptionError: port = 80
88487243
IJ
354
355 c.saddrs = [ ]
356 for addrspec in cfg.get('server','addrs').split():
357 sa = ServerAddr(port, addrspec)
358 c.saddrs.append(sa)
359
360def process_cfg_clients(constructor):
361 c.clients = [ ]
362 for cs in cfg.sections():
363 if not (':' in cs or '.' in cs): continue
364 ci = ipaddr(cs)
365 pw = cfg.get(cs, 'password')
84e763c7 366 pw = pw.encode('utf-8')
88487243
IJ
367 constructor(ci,cs,pw)
368
ae7c7784
IJ
369#---------- startup ----------
370
1321ad5f 371def common_startup():
8c3b6620
IJ
372 log_formatter = twisted.logger.formatEventAsClassicLogText
373 log_observer = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
374 twisted.logger.globalLogBeginner.beginLoggingTo(
375 [ log_observer, crash_on_critical ]
376 )
ae7c7784
IJ
377
378 optparser.add_option('-c', '--config', dest='configfile',
379 default='/etc/hippotat/config')
380 (opts, args) = optparser.parse_args()
381 if len(args): optparser.error('no non-option arguments please')
382
1321ad5f
IJ
383 re = regexp.compile('#.*')
384 cfg.read_string(re.sub('', defcfg))
ae7c7784
IJ
385 cfg.read(opts.configfile)
386
387def common_run():
b68c0739
IJ
388 log_debug(DBG.INIT, 'entering reactor')
389 if not _crashing: reactor.run()
ae7c7784 390 print('CRASHED (end)', file=sys.stderr)