wip
[hippotat] / hippotat / __init__.py
CommitLineData
b0cfbfce
IJ
1# -*- python -*-
2
37ab4cdc
IJ
3import signal
4signal.signal(signal.SIGINT, signal.SIG_DFL)
5
1321ad5f
IJ
6import sys
7
040ff511
IJ
8import twisted
9from twisted.internet import reactor
ae7c7784 10from twisted.logger import LogLevel
1d023c89 11import twisted.internet.endpoints
b0cfbfce
IJ
12
13import ipaddress
14from ipaddress import AddressValueError
15
ae7c7784
IJ
16from optparse import OptionParser
17from configparser import ConfigParser
18from configparser import NoOptionError
19
20import collections
84e763c7 21import time
ae7c7784 22
1321ad5f
IJ
23import re as regexp
24
25import hippotat.slip as slip
26
ca732796
IJ
27defcfg = '''
28[DEFAULT]
29#[<client>] overrides
30max_batch_down = 65536 # used by server, subject to [limits]
31max_queue_time = 10 # used by server, subject to [limits]
32max_request_time = 54 # used by server, subject to [limits]
33target_requests_outstanding = 3 # must match; subject to [limits] on server
34max_requests_outstanding = 4 # used by client
35max_batch_up = 4000 # used by client
7b07f0b5 36http_timeout = 30 # used by client
4edf77a3 37http_retry = 5 # used by client
ca732796
IJ
38
39#[server] or [<client>] overrides
40ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
41# extra interpolations: %(local)s %(peer)s %(rnet)s
42# obtained on server [virtual]server [virtual]relay [virtual]network
43# from on client <client> [virtual]server [virtual]routes
44
45[virtual]
46mtu = 1500
47routes = ''
48# network = <prefix>/<len> # mandatory for server
49# server = <ipaddr> # used by both, default is computed from `network'
50# relay = <ipaddr> # used by server, default from `network' and `server'
51# default server is first host in network
52# default relay is first host which is not server
53
54[server]
55# addrs = 127.0.0.1 ::1 # mandatory for server
56port = 80 # used by server
57# url # used by client; default from first `addrs' and `port'
58
59# [<client-ip4-or-ipv6-address>]
60# password = <password> # used by both, must match
61
62[limits]
63max_batch_down = 262144 # used by server
64max_queue_time = 121 # used by server
65max_request_time = 121 # used by server
66target_requests_outstanding = 10 # used by server
67'''
68
87a7c0c7 69# these need to be defined here so that they can be imported by import *
ae7c7784
IJ
70cfg = ConfigParser()
71optparser = OptionParser()
72
e4006ac4 73_mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
7b07f0b5
IJ
74def mime_translate(s):
75 # SLIP-encoded packets cannot contain ESC ESC.
76 # Swap `-' and ESC. The result cannot contain `--'
77 return s.translate(_mimetrans)
78
87a7c0c7
IJ
79class ConfigResults:
80 def __init__(self, d = { }):
81 self.__dict__ = d
82 def __repr__(self):
83 return 'ConfigResults('+repr(self.__dict__)+')'
84
85c = ConfigResults()
86
1321ad5f
IJ
87def log_discard(packet, saddr, daddr, why):
88 print('DROP ', saddr, daddr, why)
89# syslog.syslog(syslog.LOG_DEBUG,
90# 'discarded packet %s -> %s (%s)' % (saddr, daddr, why))
91
b0cfbfce
IJ
92#---------- packet parsing ----------
93
94def packet_addrs(packet):
95 version = packet[0] >> 4
96 if version == 4:
97 addrlen = 4
98 saddroff = 3*4
99 factory = ipaddress.IPv4Address
100 elif version == 6:
101 addrlen = 16
102 saddroff = 2*4
103 factory = ipaddress.IPv6Address
104 else:
105 raise ValueError('unsupported IP version %d' % version)
106 saddr = factory(packet[ saddroff : saddroff + addrlen ])
107 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
108 return (saddr, daddr)
109
110#---------- address handling ----------
111
112def ipaddr(input):
113 try:
114 r = ipaddress.IPv4Address(input)
115 except AddressValueError:
116 r = ipaddress.IPv6Address(input)
117 return r
118
119def ipnetwork(input):
120 try:
121 r = ipaddress.IPv4Network(input)
122 except NetworkValueError:
123 r = ipaddress.IPv6Network(input)
124 return r
040ff511
IJ
125
126#---------- ipif (SLIP) subprocess ----------
127
a95cfeb2
IJ
128class SlipStreamDecoder():
129 def __init__(self, on_packet):
130 # we will call packet(<packet>)
040ff511 131 self._buffer = b''
a95cfeb2
IJ
132 self._on_packet = on_packet
133
134 def inputdata(self, data):
4f991c0c 135 #print('SLIP-GOT ', repr(data))
040ff511
IJ
136 self._buffer += data
137 packets = slip.decode(self._buffer)
138 self._buffer = packets.pop()
139 for packet in packets:
a95cfeb2
IJ
140 self._maybe_packet(packet)
141
142 def _maybe_packet(self, packet):
143 if len(packet):
144 self._on_packet(packet)
145
4f991c0c 146 def flush(self):
a95cfeb2
IJ
147 self._maybe_packet(self._buffer)
148 self._buffer = b''
4f991c0c 149
e4006ac4 150class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
4f991c0c
IJ
151 def __init__(self, router):
152 self._router = router
a95cfeb2
IJ
153 self._decoder = SlipStreamDecoder(self.slip_on_packet)
154 def connectionMade(self): pass
155 def outReceived(self, data):
156 self._decoder.inputdata(data)
157 def slip_on_packet(self, packet):
4f991c0c
IJ
158 (saddr, daddr) = packet_addrs(packet)
159 if saddr.is_link_local or daddr.is_link_local:
160 log_discard(packet, saddr, daddr, 'link-local')
161 return
162 self._router(packet, saddr, daddr)
040ff511
IJ
163 def processEnded(self, status):
164 status.raiseException()
165
166def start_ipif(command, router):
167 global ipif
168 ipif = _IpifProcessProtocol(router)
169 reactor.spawnProcess(ipif,
170 '/bin/sh',['sh','-xc', command],
ff613365
IJ
171 childFDs={0:'w', 1:'r', 2:2},
172 env=None)
040ff511
IJ
173
174def queue_inbound(packet):
175 ipif.transport.write(slip.delimiter)
176 ipif.transport.write(slip.encode(packet))
177 ipif.transport.write(slip.delimiter)
178
650a3251
IJ
179#---------- packet queue ----------
180
181class PacketQueue():
182 def __init__(self, max_queue_time):
183 self._max_queue_time = max_queue_time
184 self._pq = collections.deque() # packets
185
186 def append(self, packet):
187 self._pq.append((time.monotonic(), packet))
188
189 def nonempty(self):
190 while True:
191 try: (queuetime, packet) = self._pq[0]
192 except IndexError: return False
193
194 age = time.monotonic() - queuetime
84e763c7 195 if age > self._max_queue_time:
650a3251
IJ
196 # strip old packets off the front
197 self._pq.popleft()
198 continue
199
200 return True
201
7b07f0b5
IJ
202 def process(self, sizequery, moredata, max_batch):
203 # sizequery() should return size of batch so far
204 # moredata(s) should add s to batch
205 while True:
206 try: (dummy, packet) = self._pq[0]
207 except IndexError: break
208
209 encoded = slip.encode(packet)
210 sofar = sizequery()
211
212 if sofar > 0:
213 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
214 break
215 moredata(slip.delimiter)
216
217 moredata(encoded)
84e763c7 218 self._pq.popleft()
ae7c7784
IJ
219
220#---------- error handling ----------
221
222def crash(err):
223 print('CRASH ', err, file=sys.stderr)
224 try: reactor.stop()
225 except twisted.internet.error.ReactorNotRunning: pass
226
227def crash_on_defer(defer):
228 defer.addErrback(lambda err: crash(err))
229
e4006ac4 230def crash_on_critical(event):
ae7c7784
IJ
231 if event.get('log_level') >= LogLevel.critical:
232 crash(twisted.logger.formatEvent(event))
233
87a7c0c7
IJ
234#---------- config processing ----------
235
236def process_cfg_common_always():
237 global mtu
238 c.mtu = cfg.get('virtual','mtu')
239
88487243
IJ
240def process_cfg_ipif(section, varmap):
241 for d, s in varmap:
242 try: v = getattr(c, s)
034284c3 243 except AttributeError: continue
88487243
IJ
244 setattr(c, d, v)
245
246 print(repr(c))
247
248 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
249
250def process_cfg_network():
251 c.network = ipnetwork(cfg.get('virtual','network'))
252 if c.network.num_addresses < 3 + 2:
253 raise ValueError('network needs at least 2^3 addresses')
254
255def process_cfg_server():
256 try:
257 c.server = cfg.get('virtual','server')
258 except NoOptionError:
259 process_cfg_network()
260 c.server = next(c.network.hosts())
261
262class ServerAddr():
263 def __init__(self, port, addrspec):
264 self.port = port
265 # also self.addr
266 try:
267 self.addr = ipaddress.IPv4Address(addrspec)
268 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
84e763c7 269 self._inurl = b'%s'
88487243
IJ
270 except AddressValueError:
271 self.addr = ipaddress.IPv6Address(addrspec)
272 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
84e763c7 273 self._inurl = b'[%s]'
88487243
IJ
274 def make_endpoint(self):
275 return self._endpointfactory(reactor, self.port, self.addr)
276 def url(self):
84e763c7
IJ
277 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
278 if self.port != 80: url += b':%d' % self.port
279 url += b'/'
88487243
IJ
280 return url
281
282def process_cfg_saddrs():
1d023c89
IJ
283 try: port = cfg.getint('server','port')
284 except NoOptionError: port = 80
88487243
IJ
285
286 c.saddrs = [ ]
287 for addrspec in cfg.get('server','addrs').split():
288 sa = ServerAddr(port, addrspec)
289 c.saddrs.append(sa)
290
291def process_cfg_clients(constructor):
292 c.clients = [ ]
293 for cs in cfg.sections():
294 if not (':' in cs or '.' in cs): continue
295 ci = ipaddr(cs)
296 pw = cfg.get(cs, 'password')
84e763c7 297 pw = pw.encode('utf-8')
88487243
IJ
298 constructor(ci,cs,pw)
299
ae7c7784
IJ
300#---------- startup ----------
301
1321ad5f 302def common_startup():
ae7c7784
IJ
303 twisted.logger.globalLogPublisher.addObserver(crash_on_critical)
304
305 optparser.add_option('-c', '--config', dest='configfile',
306 default='/etc/hippotat/config')
307 (opts, args) = optparser.parse_args()
308 if len(args): optparser.error('no non-option arguments please')
309
1321ad5f
IJ
310 re = regexp.compile('#.*')
311 cfg.read_string(re.sub('', defcfg))
ae7c7784
IJ
312 cfg.read(opts.configfile)
313
314def common_run():
315 reactor.run()
316 print('CRASHED (end)', file=sys.stderr)