wip
[hippotat] / server
CommitLineData
094ee3a2 1#!/usr/bin/python3
3fba9787 2
aa663282
IJ
3import signal
4signal.signal(signal.SIGINT, signal.SIG_DFL)
5
e2d41dc1
IJ
6import sys
7import os
8
5bae5ba3 9import twisted
e2d41dc1
IJ
10import twisted.internet
11import twisted.internet.endpoints
12from twisted.internet import reactor
13from twisted.web.server import NOT_DONE_YET
14from twisted.logger import LogLevel
15
16import ipaddress
17from ipaddress import AddressValueError
5bae5ba3 18
5da7763e
IJ
19#import twisted.web.server import Site
20#from twisted.web.resource import Resource
3fba9787 21
e75e9c17
IJ
22from optparse import OptionParser
23from configparser import ConfigParser
24from configparser import NoOptionError
3fba9787 25
0ac316c8
IJ
26import collections
27
c4b6d990
IJ
28import syslog
29
3fba9787
IJ
30clients = { }
31
e2d41dc1 32def ipaddr(input):
3fba9787 33 try:
ec88b1f1 34 r = ipaddress.IPv4Address(input)
3fba9787 35 except AddressValueError:
ec88b1f1 36 r = ipaddress.IPv6Address(input)
3fba9787
IJ
37 return r
38
39def ipnetwork(input):
40 try:
ec88b1f1 41 r = ipaddress.IPv4Network(input)
3fba9787 42 except NetworkValueError:
ec88b1f1 43 r = ipaddress.IPv6Network(input)
3fba9787
IJ
44 return r
45
e75e9c17 46defcfg = '''
094ee3a2
IJ
47[DEFAULT]
48max_batch_down = 65536
49max_queue_time = 10
50max_request_time = 54
51
e75e9c17
IJ
52[virtual]
53mtu = 1500
54# network
55# [host]
56# [relay]
57
58[server]
e2d41dc1 59ipif = userv root ipif %(host)s,%(relay)s,%(mtu)s,slip %(network)s
5da7763e 60addrs = 127.0.0.1 ::1
aa663282 61port = 8099
e75e9c17 62
094ee3a2
IJ
63[limits]
64max_batch_down = 262144
65max_queue_time = 121
66max_request_time = 121
ec88b1f1
IJ
67'''
68
aa663282
IJ
69#---------- error handling ----------
70
71def crash(err):
72 print('CRASH ', err, file=sys.stderr)
73 try: reactor.stop()
74 except twisted.internet.error.ReactorNotRunning: pass
75
76def crash_on_defer(defer):
77 defer.addErrback(lambda err: crash(err))
78
79def crash_on_critical(event):
80 if event.get('log_level') >= LogLevel.critical:
81 crash(twisted.logger.formatEvent(event))
82
5da7763e
IJ
83#---------- "router" ----------
84
ec0c4d95
IJ
85def route(packet, saddr, daddr):
86 print('TRACE ', saddr, daddr, packet)
5da7763e
IJ
87 try: client = clients[daddr]
88 except KeyError: dclient = None
89 if dclient is not None:
90 dclient.queue_outbound(packet)
3a6076b4 91 elif saddr.is_link_local or daddr.is_link_local:
ec0c4d95 92 log_discard(packet, saddr, daddr, 'link-local')
e2d41dc1 93 elif daddr == host or daddr not in network:
ec0c4d95 94 print('TRACE INBOUND ', saddr, daddr, packet)
5da7763e 95 queue_inbound(packet)
e2d41dc1 96 elif daddr == relay:
5da7763e
IJ
97 log_discard(packet, saddr, daddr, 'relay')
98 else:
99 log_discard(packet, saddr, daddr, 'no client')
100
101def log_discard(packet, saddr, daddr, why):
3a6076b4 102 print('DROP ', saddr, daddr, why)
ec0c4d95
IJ
103# syslog.syslog(syslog.LOG_DEBUG,
104# 'discarded packet %s -> %s (%s)' % (saddr, daddr, why))
5da7763e
IJ
105
106#---------- ipif (slip subprocess) ----------
107
5bae5ba3
IJ
108class IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
109 def __init__(self):
110 self._buffer = b''
111 def connectionMade(self): pass
112 def outReceived(self, data):
ce7f1431 113 #print('RECV ', repr(data))
2b95da16
IJ
114 self._buffer += data
115 packets = slip_decode(self._buffer)
116 self._buffer = packets.pop()
5bae5ba3 117 for packet in packets:
ec0c4d95 118 if not len(packet): continue
5bae5ba3 119 (saddr, daddr) = packet_addrs(packet)
ec0c4d95 120 route(packet, saddr, daddr)
5da7763e
IJ
121 def processEnded(self, status):
122 status.raiseException()
5bae5ba3
IJ
123
124def start_ipif():
5da7763e
IJ
125 global ipif
126 ipif = IpifProcessProtocol()
127 reactor.spawnProcess(ipif,
ce7f1431 128 '/bin/sh',['sh','-xc', ipif_command],
5bae5ba3
IJ
129 childFDs={0:'w', 1:'r', 2:2})
130
5da7763e
IJ
131def queue_inbound(packet):
132 ipif.transport.write(slip_delimiter)
133 ipif.transport.write(slip_encode(packet))
134 ipif.transport.write(slip_delimiter)
5bae5ba3 135
ce7f1431
IJ
136#---------- SLIP handling ----------
137
138slip_end = b'\300'
139slip_esc = b'\333'
140slip_esc_end = b'\334'
141slip_esc_esc = b'\335'
142slip_delimiter = slip_end
143
144def slip_encode(packet):
145 return (packet
146 .replace(slip_esc, slip_esc + slip_esc_esc)
147 .replace(slip_end, slip_esc + slip_esc_end))
148
149def slip_decode(data):
150 print('DECODE ', repr(data))
151 out = []
152 for packet in data.split(slip_end):
153 pdata = b''
154 while True:
155 eix = packet.find(slip_esc)
156 if eix == -1:
157 pdata += packet
158 break
159 #print('ESC ', repr((pdata, packet, eix)))
160 pdata += packet[0 : eix]
161 ck = packet[eix+1]
162 if ck == slip_esc_esc: pdata += slip_esc
163 elif ck == slip_esc_end: pdata += slip_end
164 else: raise ValueError('invalid SLIP escape')
165 packet = packet[eix+2 : ]
166 out.append(pdata)
167 print('DECODED ', repr(out))
168 return out
169
170#---------- packet parsing ----------
171
172def packet_addrs(packet):
ec0c4d95
IJ
173 version = packet[0] >> 4
174 if version == 4:
175 addrlen = 4
176 saddroff = 3*4
177 factory = ipaddress.IPv4Address
178 elif version == 6:
179 addrlen = 16
180 saddroff = 2*4
181 factory = ipaddress.IPv6Address
182 else:
183 raise ValueError('unsupported IP version %d' % version)
184 saddr = factory(packet[ saddroff : saddroff + addrlen ])
185 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
186 return (saddr, daddr)
ce7f1431 187
5da7763e 188#---------- client ----------
c4b6d990 189
ec88b1f1 190class Client():
c4b6d990 191 def __init__(self, ip, cs):
ec88b1f1
IJ
192 # instance data members
193 self._ip = ip
194 self._cs = cs
195 self.pw = cfg.get(cs, 'password')
0ac316c8
IJ
196 self._rq = collections.deque() # requests
197 self._pq = collections.deque() # packets
c4b6d990
IJ
198 # plus from config:
199 # .max_batch_down
200 # .max_queue_time
201 # .max_request_time
ec88b1f1
IJ
202 for k in ('max_batch_down','max_queue_time','max_request_time'):
203 req = cfg.getint(cs, k)
094ee3a2 204 limit = cfg.getint('limits',k)
c4b6d990
IJ
205 self.__dict__[k] = min(req, limit)
206
207 def process_arriving_data(self, d):
208 for packet in slip_decode(d):
5bae5ba3 209 (saddr, daddr) = packet_addrs(packet)
c4b6d990
IJ
210 if saddr != self._ip:
211 raise ValueError('wrong source address %s' % saddr)
ec0c4d95 212 route(packet, saddr, daddr)
ec88b1f1 213
c4b6d990
IJ
214 def _req_cancel(self, request):
215 request.finish()
216
217 def _req_error(self, err, request):
218 self._req_cancel(request)
219
0ac316c8 220 def queue_outbound(self, packet):
094ee3a2 221 self._pq.append((time.monotonic(), packet))
0ac316c8 222
c4b6d990
IJ
223 def http_request(self, request):
224 request.setHeader('Content-Type','application/octet-stream')
225 reactor.callLater(self.max_request_time, self._req_cancel, request)
226 request.notifyFinish().addErrback(self._req_error, request)
0ac316c8
IJ
227 self._rq.append(request)
228 self._check_outbound()
229
230 def _check_outbound(self):
231 while True:
232 try: request = self._rq[0]
233 except IndexError: request = None
234 if request and request.finished:
235 self._rq.popleft()
236 continue
237
238 # now request is an unfinished request, or None
239 try: (queuetime, packet) = self._pq[0]
e2d41dc1 240 except IndexError:
0ac316c8 241 # no packets, oh well
094ee3a2
IJ
242 break
243
244 age = time.monotonic() - queuetime
245 if age > self.max_queue_time:
246 self._pq.popleft()
0ac316c8
IJ
247 continue
248
094ee3a2
IJ
249 if request is None:
250 # no request
251 break
252
253 # request, and also some non-expired packets
254 while True:
255 try: (dummy, packet) = self._pq[0]
256 except IndexError: break
257
258 encoded = slip_encode(packet)
259
260 if request.sentLength > 0:
261 if (request.sentLength + len(slip_delimiter)
262 + len(encoded) > self.max_batch_down):
263 break
264 request.write(slip_delimiter)
265
266 request.write(encoded)
267 self._pq.popLeft()
268
269 assert(request.sentLength)
270 self._rq.popLeft()
271 request.finish()
272 # round again, looking for more to do
ec88b1f1 273
5da7763e
IJ
274class IphttpResource(twisted.web.resource.Resource):
275 def render_POST(self, request):
276 # find client, update config, etc.
e2d41dc1 277 ci = ipaddr(request.args['i'])
5da7763e
IJ
278 c = clients[ci]
279 pw = request.args['pw']
280 if pw != c.pw: raise ValueError('bad password')
281
282 # update config
283 for r, w in (('mbd', 'max_batch_down'),
284 ('mqt', 'max_queue_time'),
285 ('mrt', 'max_request_time')):
286 try: v = request.args[r]
287 except KeyError: continue
288 v = int(v)
289 c.__dict__[w] = v
290
291 try: d = request.args['d']
292 except KeyError: d = ''
293
294 c.process_arriving_data(d)
295 c.new_request(request)
296
297def start_http():
298 resource = IphttpResource()
299 sitefactory = twisted.web.server.Site(resource)
e2d41dc1 300 for addrspec in cfg.get('server','addrs').split():
5da7763e
IJ
301 try:
302 addr = ipaddress.IPv4Address(addrspec)
303 endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
304 except AddressValueError:
305 addr = ipaddress.IPv6Address(addrspec)
306 endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
307 ep = endpointfactory(reactor, cfg.getint('server','port'), addr)
aa663282 308 crash_on_defer(ep.listen(sitefactory))
5da7763e
IJ
309
310#---------- config and setup ----------
311
3fba9787
IJ
312def process_cfg():
313 global network
e75e9c17
IJ
314 global host
315 global relay
5bae5ba3 316 global ipif_command
3fba9787 317
ec88b1f1 318 network = ipnetwork(cfg.get('virtual','network'))
e75e9c17
IJ
319 if network.num_addresses < 3 + 2:
320 raise ValueError('network needs at least 2^3 addresses')
321
3fba9787 322 try:
e75e9c17
IJ
323 host = cfg.get('virtual','host')
324 except NoOptionError:
e2d41dc1 325 host = next(network.hosts())
e75e9c17
IJ
326
327 try:
328 relay = cfg.get('virtual','relay')
e2d41dc1 329 except NoOptionError:
e75e9c17 330 for search in network.hosts():
e2d41dc1 331 if search == host: continue
e75e9c17
IJ
332 relay = search
333 break
3fba9787 334
ec88b1f1
IJ
335 for cs in cfg.sections():
336 if not (':' in cs or '.' in cs): continue
e2d41dc1 337 ci = ipaddr(cs)
ec88b1f1
IJ
338 if ci not in network:
339 raise ValueError('client %s not in network' % ci)
340 if ci in clients:
341 raise ValueError('multiple client cfg sections for %s' % ci)
342 clients[ci] = Client(ci, cs)
3fba9787 343
e2d41dc1
IJ
344 global mtu
345 mtu = cfg.get('virtual','mtu')
346
5bae5ba3
IJ
347 iic_vars = { }
348 for k in ('host','relay','mtu','network'):
349 iic_vars[k] = globals()[k]
350
351 ipif_command = cfg.get('server','ipif', vars=iic_vars)
352
e75e9c17 353def startup():
e2d41dc1
IJ
354 global cfg
355
e75e9c17
IJ
356 op = OptionParser()
357 op.add_option('-c', '--config', dest='configfile',
358 default='/etc/hippottd/server.conf')
359 global opts
360 (opts, args) = op.parse_args()
361 if len(args): op.error('no non-option arguments please')
362
e2d41dc1
IJ
363 twisted.logger.globalLogPublisher.addObserver(crash_on_critical)
364
e75e9c17 365 cfg = ConfigParser()
5bae5ba3 366 cfg.read_string(defcfg)
e2d41dc1 367 cfg.read(opts.configfile)
5bae5ba3
IJ
368 process_cfg()
369
370 start_ipif()
371 start_http()
e2d41dc1
IJ
372
373startup()
374reactor.run()
aa663282 375print('CRASHED (end)', file=sys.stderr)