wip, progress!
[hippotat] / hippotat / __init__.py
CommitLineData
b0cfbfce
IJ
1# -*- python -*-
2
37ab4cdc
IJ
3import signal
4signal.signal(signal.SIGINT, signal.SIG_DFL)
5
1321ad5f
IJ
6import sys
7
040ff511
IJ
8import twisted
9from twisted.internet import reactor
1d023c89 10import twisted.internet.endpoints
8c3b6620
IJ
11import twisted.logger
12from twisted.logger import LogLevel
13import twisted.python.constants
14from twisted.python.constants import NamedConstant
b0cfbfce
IJ
15
16import ipaddress
17from ipaddress import AddressValueError
18
ae7c7784
IJ
19from optparse import OptionParser
20from configparser import ConfigParser
21from configparser import NoOptionError
22
23import collections
84e763c7 24import time
8c3b6620 25import codecs
ae7c7784 26
1321ad5f
IJ
27import re as regexp
28
29import hippotat.slip as slip
30
d579a048
IJ
31class DBG(twisted.python.constants.Names):
32 ROUTE = NamedConstant()
b68c0739 33 DROP = NamedConstant()
d579a048
IJ
34 FLOW = NamedConstant()
35 HTTP = NamedConstant()
36 HTTP_CTRL = NamedConstant()
37 INIT = NamedConstant()
38 QUEUE = NamedConstant()
39 QUEUE_CTRL = NamedConstant()
297b3ebf 40 HTTP_FULL = NamedConstant()
d579a048 41
b68c0739 42_hex_codec = codecs.getencoder('hex_codec')
8c3b6620
IJ
43
44log = twisted.logger.Logger()
45
46def log_debug(dflag, msg, idof=None, d=None):
e8fcf3b7 47 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
8c3b6620
IJ
48 if idof is not None:
49 msg = '[%d] %s' % (id(idof), msg)
50 if d is not None:
e8fcf3b7 51 #d = d[0:64]
b68c0739 52 d = _hex_codec(d)[0].decode('ascii')
8c3b6620
IJ
53 msg += ' ' + d
54 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
55
ca732796
IJ
56defcfg = '''
57[DEFAULT]
58#[<client>] overrides
59max_batch_down = 65536 # used by server, subject to [limits]
60max_queue_time = 10 # used by server, subject to [limits]
61max_request_time = 54 # used by server, subject to [limits]
62target_requests_outstanding = 3 # must match; subject to [limits] on server
63max_requests_outstanding = 4 # used by client
64max_batch_up = 4000 # used by client
7b07f0b5 65http_timeout = 30 # used by client
4edf77a3 66http_retry = 5 # used by client
ca732796
IJ
67
68#[server] or [<client>] overrides
69ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
70# extra interpolations: %(local)s %(peer)s %(rnet)s
71# obtained on server [virtual]server [virtual]relay [virtual]network
72# from on client <client> [virtual]server [virtual]routes
73
74[virtual]
75mtu = 1500
76routes = ''
77# network = <prefix>/<len> # mandatory for server
78# server = <ipaddr> # used by both, default is computed from `network'
79# relay = <ipaddr> # used by server, default from `network' and `server'
80# default server is first host in network
81# default relay is first host which is not server
82
83[server]
84# addrs = 127.0.0.1 ::1 # mandatory for server
85port = 80 # used by server
86# url # used by client; default from first `addrs' and `port'
87
88# [<client-ip4-or-ipv6-address>]
89# password = <password> # used by both, must match
90
91[limits]
92max_batch_down = 262144 # used by server
93max_queue_time = 121 # used by server
94max_request_time = 121 # used by server
95target_requests_outstanding = 10 # used by server
96'''
97
87a7c0c7 98# these need to be defined here so that they can be imported by import *
ae7c7784
IJ
99cfg = ConfigParser()
100optparser = OptionParser()
101
e4006ac4 102_mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
7b07f0b5
IJ
103def mime_translate(s):
104 # SLIP-encoded packets cannot contain ESC ESC.
105 # Swap `-' and ESC. The result cannot contain `--'
106 return s.translate(_mimetrans)
107
87a7c0c7
IJ
108class ConfigResults:
109 def __init__(self, d = { }):
110 self.__dict__ = d
111 def __repr__(self):
112 return 'ConfigResults('+repr(self.__dict__)+')'
113
114c = ConfigResults()
115
a8827d59 116def log_discard(packet, iface, saddr, daddr, why):
b68c0739 117 log_debug(DBG.DROP,
a8827d59 118 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
b68c0739 119 d=packet)
1321ad5f 120
b0cfbfce
IJ
121#---------- packet parsing ----------
122
123def packet_addrs(packet):
124 version = packet[0] >> 4
125 if version == 4:
126 addrlen = 4
127 saddroff = 3*4
128 factory = ipaddress.IPv4Address
129 elif version == 6:
130 addrlen = 16
131 saddroff = 2*4
132 factory = ipaddress.IPv6Address
133 else:
134 raise ValueError('unsupported IP version %d' % version)
135 saddr = factory(packet[ saddroff : saddroff + addrlen ])
136 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
137 return (saddr, daddr)
138
139#---------- address handling ----------
140
141def ipaddr(input):
142 try:
143 r = ipaddress.IPv4Address(input)
144 except AddressValueError:
145 r = ipaddress.IPv6Address(input)
146 return r
147
148def ipnetwork(input):
149 try:
150 r = ipaddress.IPv4Network(input)
151 except NetworkValueError:
152 r = ipaddress.IPv6Network(input)
153 return r
040ff511
IJ
154
155#---------- ipif (SLIP) subprocess ----------
156
a95cfeb2
IJ
157class SlipStreamDecoder():
158 def __init__(self, on_packet):
159 # we will call packet(<packet>)
040ff511 160 self._buffer = b''
a95cfeb2
IJ
161 self._on_packet = on_packet
162
163 def inputdata(self, data):
4f991c0c 164 #print('SLIP-GOT ', repr(data))
040ff511
IJ
165 self._buffer += data
166 packets = slip.decode(self._buffer)
167 self._buffer = packets.pop()
168 for packet in packets:
a95cfeb2
IJ
169 self._maybe_packet(packet)
170
171 def _maybe_packet(self, packet):
172 if len(packet):
173 self._on_packet(packet)
174
4f991c0c 175 def flush(self):
a95cfeb2
IJ
176 self._maybe_packet(self._buffer)
177 self._buffer = b''
4f991c0c 178
e4006ac4 179class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
4f991c0c
IJ
180 def __init__(self, router):
181 self._router = router
a95cfeb2
IJ
182 self._decoder = SlipStreamDecoder(self.slip_on_packet)
183 def connectionMade(self): pass
184 def outReceived(self, data):
185 self._decoder.inputdata(data)
186 def slip_on_packet(self, packet):
4f991c0c
IJ
187 (saddr, daddr) = packet_addrs(packet)
188 if saddr.is_link_local or daddr.is_link_local:
a8827d59 189 log_discard(packet, 'ipif', saddr, daddr, 'link-local')
4f991c0c
IJ
190 return
191 self._router(packet, saddr, daddr)
040ff511
IJ
192 def processEnded(self, status):
193 status.raiseException()
194
195def start_ipif(command, router):
196 global ipif
197 ipif = _IpifProcessProtocol(router)
198 reactor.spawnProcess(ipif,
199 '/bin/sh',['sh','-xc', command],
ff613365
IJ
200 childFDs={0:'w', 1:'r', 2:2},
201 env=None)
040ff511
IJ
202
203def queue_inbound(packet):
15407d80 204 log_debug(DBG.FLOW, "queue_inbound", d=packet)
040ff511
IJ
205 ipif.transport.write(slip.delimiter)
206 ipif.transport.write(slip.encode(packet))
207 ipif.transport.write(slip.delimiter)
208
650a3251
IJ
209#---------- packet queue ----------
210
211class PacketQueue():
d579a048
IJ
212 def __init__(self, desc, max_queue_time):
213 self._desc = desc
8718b02c 214 assert(desc + '')
650a3251
IJ
215 self._max_queue_time = max_queue_time
216 self._pq = collections.deque() # packets
217
b68c0739 218 def _log(self, dflag, msg, **kwargs):
8c3b6620 219 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
d579a048 220
650a3251 221 def append(self, packet):
8c3b6620 222 self._log(DBG.QUEUE, 'append', d=packet)
650a3251
IJ
223 self._pq.append((time.monotonic(), packet))
224
225 def nonempty(self):
8c3b6620 226 self._log(DBG.QUEUE, 'nonempty ?')
650a3251
IJ
227 while True:
228 try: (queuetime, packet) = self._pq[0]
8c3b6620
IJ
229 except IndexError:
230 self._log(DBG.QUEUE, 'nonempty ? empty.')
231 return False
650a3251
IJ
232
233 age = time.monotonic() - queuetime
84e763c7 234 if age > self._max_queue_time:
650a3251 235 # strip old packets off the front
8c3b6620 236 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
650a3251
IJ
237 self._pq.popleft()
238 continue
239
8c3b6620 240 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
650a3251
IJ
241 return True
242
7b07f0b5
IJ
243 def process(self, sizequery, moredata, max_batch):
244 # sizequery() should return size of batch so far
245 # moredata(s) should add s to batch
8c3b6620 246 self._log(DBG.QUEUE, 'process...')
7b07f0b5
IJ
247 while True:
248 try: (dummy, packet) = self._pq[0]
8c3b6620
IJ
249 except IndexError:
250 self._log(DBG.QUEUE, 'process... empty')
251 break
252
253 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
7b07f0b5
IJ
254
255 encoded = slip.encode(packet)
256 sofar = sizequery()
257
8c3b6620
IJ
258 self._log(DBG.QUEUE_CTRL,
259 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
b68c0739 260 d=encoded)
8c3b6620 261
7b07f0b5
IJ
262 if sofar > 0:
263 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
8c3b6620 264 self._log(DBG.QUEUE_CTRL, 'process... overflow')
7b07f0b5
IJ
265 break
266 moredata(slip.delimiter)
267
268 moredata(encoded)
84e763c7 269 self._pq.popleft()
ae7c7784
IJ
270
271#---------- error handling ----------
272
b68c0739
IJ
273_crashing = False
274
ae7c7784 275def crash(err):
b68c0739
IJ
276 global _crashing
277 _crashing = True
ae7c7784
IJ
278 print('CRASH ', err, file=sys.stderr)
279 try: reactor.stop()
280 except twisted.internet.error.ReactorNotRunning: pass
281
282def crash_on_defer(defer):
283 defer.addErrback(lambda err: crash(err))
284
e4006ac4 285def crash_on_critical(event):
ae7c7784
IJ
286 if event.get('log_level') >= LogLevel.critical:
287 crash(twisted.logger.formatEvent(event))
288
87a7c0c7
IJ
289#---------- config processing ----------
290
291def process_cfg_common_always():
292 global mtu
293 c.mtu = cfg.get('virtual','mtu')
294
88487243
IJ
295def process_cfg_ipif(section, varmap):
296 for d, s in varmap:
297 try: v = getattr(c, s)
034284c3 298 except AttributeError: continue
88487243
IJ
299 setattr(c, d, v)
300
b68c0739 301 #print(repr(c))
88487243
IJ
302
303 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
304
305def process_cfg_network():
306 c.network = ipnetwork(cfg.get('virtual','network'))
307 if c.network.num_addresses < 3 + 2:
308 raise ValueError('network needs at least 2^3 addresses')
309
310def process_cfg_server():
311 try:
312 c.server = cfg.get('virtual','server')
313 except NoOptionError:
314 process_cfg_network()
315 c.server = next(c.network.hosts())
316
317class ServerAddr():
318 def __init__(self, port, addrspec):
319 self.port = port
320 # also self.addr
321 try:
322 self.addr = ipaddress.IPv4Address(addrspec)
323 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
84e763c7 324 self._inurl = b'%s'
88487243
IJ
325 except AddressValueError:
326 self.addr = ipaddress.IPv6Address(addrspec)
327 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
84e763c7 328 self._inurl = b'[%s]'
88487243
IJ
329 def make_endpoint(self):
330 return self._endpointfactory(reactor, self.port, self.addr)
331 def url(self):
84e763c7
IJ
332 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
333 if self.port != 80: url += b':%d' % self.port
334 url += b'/'
88487243
IJ
335 return url
336
337def process_cfg_saddrs():
1d023c89
IJ
338 try: port = cfg.getint('server','port')
339 except NoOptionError: port = 80
88487243
IJ
340
341 c.saddrs = [ ]
342 for addrspec in cfg.get('server','addrs').split():
343 sa = ServerAddr(port, addrspec)
344 c.saddrs.append(sa)
345
346def process_cfg_clients(constructor):
347 c.clients = [ ]
348 for cs in cfg.sections():
349 if not (':' in cs or '.' in cs): continue
350 ci = ipaddr(cs)
351 pw = cfg.get(cs, 'password')
84e763c7 352 pw = pw.encode('utf-8')
88487243
IJ
353 constructor(ci,cs,pw)
354
ae7c7784
IJ
355#---------- startup ----------
356
1321ad5f 357def common_startup():
8c3b6620
IJ
358 log_formatter = twisted.logger.formatEventAsClassicLogText
359 log_observer = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
360 twisted.logger.globalLogBeginner.beginLoggingTo(
361 [ log_observer, crash_on_critical ]
362 )
ae7c7784
IJ
363
364 optparser.add_option('-c', '--config', dest='configfile',
365 default='/etc/hippotat/config')
366 (opts, args) = optparser.parse_args()
367 if len(args): optparser.error('no non-option arguments please')
368
1321ad5f
IJ
369 re = regexp.compile('#.*')
370 cfg.read_string(re.sub('', defcfg))
ae7c7784
IJ
371 cfg.read(opts.configfile)
372
373def common_run():
b68c0739
IJ
374 log_debug(DBG.INIT, 'entering reactor')
375 if not _crashing: reactor.run()
ae7c7784 376 print('CRASHED (end)', file=sys.stderr)