wip, towards target
[hippotat] / hippotat / __init__.py
CommitLineData
b0cfbfce
IJ
1# -*- python -*-
2
37ab4cdc
IJ
3import signal
4signal.signal(signal.SIGINT, signal.SIG_DFL)
5
1321ad5f
IJ
6import sys
7
040ff511
IJ
8import twisted
9from twisted.internet import reactor
1d023c89 10import twisted.internet.endpoints
8c3b6620
IJ
11import twisted.logger
12from twisted.logger import LogLevel
13import twisted.python.constants
14from twisted.python.constants import NamedConstant
b0cfbfce
IJ
15
16import ipaddress
17from ipaddress import AddressValueError
18
ae7c7784
IJ
19from optparse import OptionParser
20from configparser import ConfigParser
21from configparser import NoOptionError
22
23import collections
84e763c7 24import time
8c3b6620 25import codecs
ae7c7784 26
1321ad5f
IJ
27import re as regexp
28
29import hippotat.slip as slip
30
d579a048
IJ
31class DBG(twisted.python.constants.Names):
32 ROUTE = NamedConstant()
b68c0739 33 DROP = NamedConstant()
d579a048
IJ
34 FLOW = NamedConstant()
35 HTTP = NamedConstant()
36 HTTP_CTRL = NamedConstant()
37 INIT = NamedConstant()
38 QUEUE = NamedConstant()
39 QUEUE_CTRL = NamedConstant()
297b3ebf 40 HTTP_FULL = NamedConstant()
d579a048 41
b68c0739 42_hex_codec = codecs.getencoder('hex_codec')
8c3b6620
IJ
43
44log = twisted.logger.Logger()
45
46def log_debug(dflag, msg, idof=None, d=None):
e8fcf3b7 47 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
8c3b6620
IJ
48 if idof is not None:
49 msg = '[%d] %s' % (id(idof), msg)
50 if d is not None:
e8fcf3b7 51 #d = d[0:64]
b68c0739 52 d = _hex_codec(d)[0].decode('ascii')
8c3b6620
IJ
53 msg += ' ' + d
54 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
55
ca732796
IJ
56defcfg = '''
57[DEFAULT]
58#[<client>] overrides
59max_batch_down = 65536 # used by server, subject to [limits]
60max_queue_time = 10 # used by server, subject to [limits]
61max_request_time = 54 # used by server, subject to [limits]
62target_requests_outstanding = 3 # must match; subject to [limits] on server
63max_requests_outstanding = 4 # used by client
64max_batch_up = 4000 # used by client
7b07f0b5 65http_timeout = 30 # used by client
4edf77a3 66http_retry = 5 # used by client
ca732796
IJ
67
68#[server] or [<client>] overrides
69ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
70# extra interpolations: %(local)s %(peer)s %(rnet)s
71# obtained on server [virtual]server [virtual]relay [virtual]network
72# from on client <client> [virtual]server [virtual]routes
73
74[virtual]
75mtu = 1500
76routes = ''
77# network = <prefix>/<len> # mandatory for server
78# server = <ipaddr> # used by both, default is computed from `network'
79# relay = <ipaddr> # used by server, default from `network' and `server'
80# default server is first host in network
81# default relay is first host which is not server
82
83[server]
84# addrs = 127.0.0.1 ::1 # mandatory for server
85port = 80 # used by server
86# url # used by client; default from first `addrs' and `port'
87
88# [<client-ip4-or-ipv6-address>]
89# password = <password> # used by both, must match
90
91[limits]
92max_batch_down = 262144 # used by server
93max_queue_time = 121 # used by server
94max_request_time = 121 # used by server
95target_requests_outstanding = 10 # used by server
96'''
97
87a7c0c7 98# these need to be defined here so that they can be imported by import *
ae7c7784
IJ
99cfg = ConfigParser()
100optparser = OptionParser()
101
e4006ac4 102_mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
7b07f0b5
IJ
103def mime_translate(s):
104 # SLIP-encoded packets cannot contain ESC ESC.
105 # Swap `-' and ESC. The result cannot contain `--'
106 return s.translate(_mimetrans)
107
87a7c0c7
IJ
108class ConfigResults:
109 def __init__(self, d = { }):
110 self.__dict__ = d
111 def __repr__(self):
112 return 'ConfigResults('+repr(self.__dict__)+')'
113
114c = ConfigResults()
115
a8827d59 116def log_discard(packet, iface, saddr, daddr, why):
b68c0739 117 log_debug(DBG.DROP,
a8827d59 118 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
b68c0739 119 d=packet)
1321ad5f 120
b0cfbfce
IJ
121#---------- packet parsing ----------
122
123def packet_addrs(packet):
124 version = packet[0] >> 4
125 if version == 4:
126 addrlen = 4
127 saddroff = 3*4
128 factory = ipaddress.IPv4Address
129 elif version == 6:
130 addrlen = 16
131 saddroff = 2*4
132 factory = ipaddress.IPv6Address
133 else:
134 raise ValueError('unsupported IP version %d' % version)
135 saddr = factory(packet[ saddroff : saddroff + addrlen ])
136 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
137 return (saddr, daddr)
138
139#---------- address handling ----------
140
141def ipaddr(input):
142 try:
143 r = ipaddress.IPv4Address(input)
144 except AddressValueError:
145 r = ipaddress.IPv6Address(input)
146 return r
147
148def ipnetwork(input):
149 try:
150 r = ipaddress.IPv4Network(input)
151 except NetworkValueError:
152 r = ipaddress.IPv6Network(input)
153 return r
040ff511
IJ
154
155#---------- ipif (SLIP) subprocess ----------
156
a95cfeb2
IJ
157class SlipStreamDecoder():
158 def __init__(self, on_packet):
159 # we will call packet(<packet>)
040ff511 160 self._buffer = b''
a95cfeb2
IJ
161 self._on_packet = on_packet
162
163 def inputdata(self, data):
4f991c0c 164 #print('SLIP-GOT ', repr(data))
040ff511
IJ
165 self._buffer += data
166 packets = slip.decode(self._buffer)
167 self._buffer = packets.pop()
168 for packet in packets:
a95cfeb2
IJ
169 self._maybe_packet(packet)
170
171 def _maybe_packet(self, packet):
172 if len(packet):
173 self._on_packet(packet)
174
4f991c0c 175 def flush(self):
a95cfeb2
IJ
176 self._maybe_packet(self._buffer)
177 self._buffer = b''
4f991c0c 178
e4006ac4 179class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
4f991c0c
IJ
180 def __init__(self, router):
181 self._router = router
a95cfeb2
IJ
182 self._decoder = SlipStreamDecoder(self.slip_on_packet)
183 def connectionMade(self): pass
184 def outReceived(self, data):
185 self._decoder.inputdata(data)
186 def slip_on_packet(self, packet):
4f991c0c
IJ
187 (saddr, daddr) = packet_addrs(packet)
188 if saddr.is_link_local or daddr.is_link_local:
a8827d59 189 log_discard(packet, 'ipif', saddr, daddr, 'link-local')
4f991c0c
IJ
190 return
191 self._router(packet, saddr, daddr)
040ff511
IJ
192 def processEnded(self, status):
193 status.raiseException()
194
195def start_ipif(command, router):
196 global ipif
197 ipif = _IpifProcessProtocol(router)
198 reactor.spawnProcess(ipif,
199 '/bin/sh',['sh','-xc', command],
ff613365
IJ
200 childFDs={0:'w', 1:'r', 2:2},
201 env=None)
040ff511
IJ
202
203def queue_inbound(packet):
204 ipif.transport.write(slip.delimiter)
205 ipif.transport.write(slip.encode(packet))
206 ipif.transport.write(slip.delimiter)
207
650a3251
IJ
208#---------- packet queue ----------
209
210class PacketQueue():
d579a048
IJ
211 def __init__(self, desc, max_queue_time):
212 self._desc = desc
8718b02c 213 assert(desc + '')
650a3251
IJ
214 self._max_queue_time = max_queue_time
215 self._pq = collections.deque() # packets
216
b68c0739 217 def _log(self, dflag, msg, **kwargs):
8c3b6620 218 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
d579a048 219
650a3251 220 def append(self, packet):
8c3b6620 221 self._log(DBG.QUEUE, 'append', d=packet)
650a3251
IJ
222 self._pq.append((time.monotonic(), packet))
223
224 def nonempty(self):
8c3b6620 225 self._log(DBG.QUEUE, 'nonempty ?')
650a3251
IJ
226 while True:
227 try: (queuetime, packet) = self._pq[0]
8c3b6620
IJ
228 except IndexError:
229 self._log(DBG.QUEUE, 'nonempty ? empty.')
230 return False
650a3251
IJ
231
232 age = time.monotonic() - queuetime
84e763c7 233 if age > self._max_queue_time:
650a3251 234 # strip old packets off the front
8c3b6620 235 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
650a3251
IJ
236 self._pq.popleft()
237 continue
238
8c3b6620 239 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
650a3251
IJ
240 return True
241
7b07f0b5
IJ
242 def process(self, sizequery, moredata, max_batch):
243 # sizequery() should return size of batch so far
244 # moredata(s) should add s to batch
8c3b6620 245 self._log(DBG.QUEUE, 'process...')
7b07f0b5
IJ
246 while True:
247 try: (dummy, packet) = self._pq[0]
8c3b6620
IJ
248 except IndexError:
249 self._log(DBG.QUEUE, 'process... empty')
250 break
251
252 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
7b07f0b5
IJ
253
254 encoded = slip.encode(packet)
255 sofar = sizequery()
256
8c3b6620
IJ
257 self._log(DBG.QUEUE_CTRL,
258 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
b68c0739 259 d=encoded)
8c3b6620 260
7b07f0b5
IJ
261 if sofar > 0:
262 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
8c3b6620 263 self._log(DBG.QUEUE_CTRL, 'process... overflow')
7b07f0b5
IJ
264 break
265 moredata(slip.delimiter)
266
267 moredata(encoded)
84e763c7 268 self._pq.popleft()
ae7c7784
IJ
269
270#---------- error handling ----------
271
b68c0739
IJ
272_crashing = False
273
ae7c7784 274def crash(err):
b68c0739
IJ
275 global _crashing
276 _crashing = True
ae7c7784
IJ
277 print('CRASH ', err, file=sys.stderr)
278 try: reactor.stop()
279 except twisted.internet.error.ReactorNotRunning: pass
280
281def crash_on_defer(defer):
282 defer.addErrback(lambda err: crash(err))
283
e4006ac4 284def crash_on_critical(event):
ae7c7784
IJ
285 if event.get('log_level') >= LogLevel.critical:
286 crash(twisted.logger.formatEvent(event))
287
87a7c0c7
IJ
288#---------- config processing ----------
289
290def process_cfg_common_always():
291 global mtu
292 c.mtu = cfg.get('virtual','mtu')
293
88487243
IJ
294def process_cfg_ipif(section, varmap):
295 for d, s in varmap:
296 try: v = getattr(c, s)
034284c3 297 except AttributeError: continue
88487243
IJ
298 setattr(c, d, v)
299
b68c0739 300 #print(repr(c))
88487243
IJ
301
302 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
303
304def process_cfg_network():
305 c.network = ipnetwork(cfg.get('virtual','network'))
306 if c.network.num_addresses < 3 + 2:
307 raise ValueError('network needs at least 2^3 addresses')
308
309def process_cfg_server():
310 try:
311 c.server = cfg.get('virtual','server')
312 except NoOptionError:
313 process_cfg_network()
314 c.server = next(c.network.hosts())
315
316class ServerAddr():
317 def __init__(self, port, addrspec):
318 self.port = port
319 # also self.addr
320 try:
321 self.addr = ipaddress.IPv4Address(addrspec)
322 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
84e763c7 323 self._inurl = b'%s'
88487243
IJ
324 except AddressValueError:
325 self.addr = ipaddress.IPv6Address(addrspec)
326 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
84e763c7 327 self._inurl = b'[%s]'
88487243
IJ
328 def make_endpoint(self):
329 return self._endpointfactory(reactor, self.port, self.addr)
330 def url(self):
84e763c7
IJ
331 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
332 if self.port != 80: url += b':%d' % self.port
333 url += b'/'
88487243
IJ
334 return url
335
336def process_cfg_saddrs():
1d023c89
IJ
337 try: port = cfg.getint('server','port')
338 except NoOptionError: port = 80
88487243
IJ
339
340 c.saddrs = [ ]
341 for addrspec in cfg.get('server','addrs').split():
342 sa = ServerAddr(port, addrspec)
343 c.saddrs.append(sa)
344
345def process_cfg_clients(constructor):
346 c.clients = [ ]
347 for cs in cfg.sections():
348 if not (':' in cs or '.' in cs): continue
349 ci = ipaddr(cs)
350 pw = cfg.get(cs, 'password')
84e763c7 351 pw = pw.encode('utf-8')
88487243
IJ
352 constructor(ci,cs,pw)
353
ae7c7784
IJ
354#---------- startup ----------
355
1321ad5f 356def common_startup():
8c3b6620
IJ
357 log_formatter = twisted.logger.formatEventAsClassicLogText
358 log_observer = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
359 twisted.logger.globalLogBeginner.beginLoggingTo(
360 [ log_observer, crash_on_critical ]
361 )
ae7c7784
IJ
362
363 optparser.add_option('-c', '--config', dest='configfile',
364 default='/etc/hippotat/config')
365 (opts, args) = optparser.parse_args()
366 if len(args): optparser.error('no non-option arguments please')
367
1321ad5f
IJ
368 re = regexp.compile('#.*')
369 cfg.read_string(re.sub('', defcfg))
ae7c7784
IJ
370 cfg.read(opts.configfile)
371
372def common_run():
b68c0739
IJ
373 log_debug(DBG.INIT, 'entering reactor')
374 if not _crashing: reactor.run()
ae7c7784 375 print('CRASHED (end)', file=sys.stderr)