wip
[hippotat] / server
CommitLineData
094ee3a2 1#!/usr/bin/python3
3fba9787 2
aa663282
IJ
3import signal
4signal.signal(signal.SIGINT, signal.SIG_DFL)
5
e2d41dc1
IJ
6import sys
7import os
8
5bae5ba3 9import twisted
e2d41dc1
IJ
10import twisted.internet
11import twisted.internet.endpoints
12from twisted.internet import reactor
13from twisted.web.server import NOT_DONE_YET
14from twisted.logger import LogLevel
15
16import ipaddress
17from ipaddress import AddressValueError
5bae5ba3 18
5da7763e
IJ
19#import twisted.web.server import Site
20#from twisted.web.resource import Resource
3fba9787 21
e75e9c17
IJ
22from optparse import OptionParser
23from configparser import ConfigParser
24from configparser import NoOptionError
3fba9787 25
0ac316c8
IJ
26import collections
27
c4b6d990
IJ
28import syslog
29
3fba9787
IJ
30clients = { }
31
e2d41dc1 32def ipaddr(input):
3fba9787 33 try:
ec88b1f1 34 r = ipaddress.IPv4Address(input)
3fba9787 35 except AddressValueError:
ec88b1f1 36 r = ipaddress.IPv6Address(input)
3fba9787
IJ
37 return r
38
39def ipnetwork(input):
40 try:
ec88b1f1 41 r = ipaddress.IPv4Network(input)
3fba9787 42 except NetworkValueError:
ec88b1f1 43 r = ipaddress.IPv6Network(input)
3fba9787
IJ
44 return r
45
e75e9c17 46defcfg = '''
094ee3a2
IJ
47[DEFAULT]
48max_batch_down = 65536
49max_queue_time = 10
50max_request_time = 54
51
e75e9c17
IJ
52[virtual]
53mtu = 1500
54# network
55# [host]
56# [relay]
57
58[server]
e2d41dc1 59ipif = userv root ipif %(host)s,%(relay)s,%(mtu)s,slip %(network)s
5da7763e 60addrs = 127.0.0.1 ::1
aa663282 61port = 8099
e75e9c17 62
094ee3a2
IJ
63[limits]
64max_batch_down = 262144
65max_queue_time = 121
66max_request_time = 121
ec88b1f1
IJ
67'''
68
aa663282
IJ
69#---------- error handling ----------
70
71def crash(err):
72 print('CRASH ', err, file=sys.stderr)
73 try: reactor.stop()
74 except twisted.internet.error.ReactorNotRunning: pass
75
76def crash_on_defer(defer):
77 defer.addErrback(lambda err: crash(err))
78
79def crash_on_critical(event):
80 if event.get('log_level') >= LogLevel.critical:
81 crash(twisted.logger.formatEvent(event))
82
5da7763e
IJ
83#---------- "router" ----------
84
ec0c4d95
IJ
85def route(packet, saddr, daddr):
86 print('TRACE ', saddr, daddr, packet)
5da7763e
IJ
87 try: client = clients[daddr]
88 except KeyError: dclient = None
89 if dclient is not None:
90 dclient.queue_outbound(packet)
3a6076b4 91 elif saddr.is_link_local or daddr.is_link_local:
ec0c4d95 92 log_discard(packet, saddr, daddr, 'link-local')
e2d41dc1 93 elif daddr == host or daddr not in network:
ec0c4d95 94 print('TRACE INBOUND ', saddr, daddr, packet)
5da7763e 95 queue_inbound(packet)
e2d41dc1 96 elif daddr == relay:
5da7763e
IJ
97 log_discard(packet, saddr, daddr, 'relay')
98 else:
99 log_discard(packet, saddr, daddr, 'no client')
100
101def log_discard(packet, saddr, daddr, why):
3a6076b4 102 print('DROP ', saddr, daddr, why)
ec0c4d95
IJ
103# syslog.syslog(syslog.LOG_DEBUG,
104# 'discarded packet %s -> %s (%s)' % (saddr, daddr, why))
5da7763e
IJ
105
106#---------- ipif (slip subprocess) ----------
107
5bae5ba3
IJ
108class IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
109 def __init__(self):
110 self._buffer = b''
111 def connectionMade(self): pass
112 def outReceived(self, data):
ce7f1431 113 #print('RECV ', repr(data))
2b95da16
IJ
114 self._buffer += data
115 packets = slip_decode(self._buffer)
116 self._buffer = packets.pop()
5bae5ba3 117 for packet in packets:
ec0c4d95 118 if not len(packet): continue
5bae5ba3 119 (saddr, daddr) = packet_addrs(packet)
ec0c4d95 120 route(packet, saddr, daddr)
5da7763e
IJ
121 def processEnded(self, status):
122 status.raiseException()
5bae5ba3
IJ
123
124def start_ipif():
5da7763e
IJ
125 global ipif
126 ipif = IpifProcessProtocol()
127 reactor.spawnProcess(ipif,
ce7f1431 128 '/bin/sh',['sh','-xc', ipif_command],
5bae5ba3
IJ
129 childFDs={0:'w', 1:'r', 2:2})
130
5da7763e
IJ
131def queue_inbound(packet):
132 ipif.transport.write(slip_delimiter)
133 ipif.transport.write(slip_encode(packet))
134 ipif.transport.write(slip_delimiter)
5bae5ba3 135
ce7f1431
IJ
136#---------- SLIP handling ----------
137
138slip_end = b'\300'
139slip_esc = b'\333'
140slip_esc_end = b'\334'
141slip_esc_esc = b'\335'
142slip_delimiter = slip_end
143
144def slip_encode(packet):
145 return (packet
146 .replace(slip_esc, slip_esc + slip_esc_esc)
147 .replace(slip_end, slip_esc + slip_esc_end))
148
149def slip_decode(data):
150 print('DECODE ', repr(data))
151 out = []
152 for packet in data.split(slip_end):
153 pdata = b''
154 while True:
155 eix = packet.find(slip_esc)
156 if eix == -1:
157 pdata += packet
158 break
159 #print('ESC ', repr((pdata, packet, eix)))
160 pdata += packet[0 : eix]
161 ck = packet[eix+1]
8e279651
IJ
162 #print('ESC... %o' % ck)
163 if ck == slip_esc_esc[0]: pdata += slip_esc
164 elif ck == slip_esc_end[0]: pdata += slip_end
ce7f1431
IJ
165 else: raise ValueError('invalid SLIP escape')
166 packet = packet[eix+2 : ]
167 out.append(pdata)
168 print('DECODED ', repr(out))
169 return out
170
171#---------- packet parsing ----------
172
173def packet_addrs(packet):
ec0c4d95
IJ
174 version = packet[0] >> 4
175 if version == 4:
176 addrlen = 4
177 saddroff = 3*4
178 factory = ipaddress.IPv4Address
179 elif version == 6:
180 addrlen = 16
181 saddroff = 2*4
182 factory = ipaddress.IPv6Address
183 else:
184 raise ValueError('unsupported IP version %d' % version)
185 saddr = factory(packet[ saddroff : saddroff + addrlen ])
186 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
187 return (saddr, daddr)
ce7f1431 188
5da7763e 189#---------- client ----------
c4b6d990 190
ec88b1f1 191class Client():
c4b6d990 192 def __init__(self, ip, cs):
ec88b1f1
IJ
193 # instance data members
194 self._ip = ip
195 self._cs = cs
196 self.pw = cfg.get(cs, 'password')
0ac316c8
IJ
197 self._rq = collections.deque() # requests
198 self._pq = collections.deque() # packets
c4b6d990
IJ
199 # plus from config:
200 # .max_batch_down
201 # .max_queue_time
202 # .max_request_time
ec88b1f1
IJ
203 for k in ('max_batch_down','max_queue_time','max_request_time'):
204 req = cfg.getint(cs, k)
094ee3a2 205 limit = cfg.getint('limits',k)
c4b6d990
IJ
206 self.__dict__[k] = min(req, limit)
207
208 def process_arriving_data(self, d):
209 for packet in slip_decode(d):
5bae5ba3 210 (saddr, daddr) = packet_addrs(packet)
c4b6d990
IJ
211 if saddr != self._ip:
212 raise ValueError('wrong source address %s' % saddr)
ec0c4d95 213 route(packet, saddr, daddr)
ec88b1f1 214
c4b6d990
IJ
215 def _req_cancel(self, request):
216 request.finish()
217
218 def _req_error(self, err, request):
219 self._req_cancel(request)
220
0ac316c8 221 def queue_outbound(self, packet):
094ee3a2 222 self._pq.append((time.monotonic(), packet))
0ac316c8 223
c4b6d990
IJ
224 def http_request(self, request):
225 request.setHeader('Content-Type','application/octet-stream')
226 reactor.callLater(self.max_request_time, self._req_cancel, request)
227 request.notifyFinish().addErrback(self._req_error, request)
0ac316c8
IJ
228 self._rq.append(request)
229 self._check_outbound()
230
231 def _check_outbound(self):
232 while True:
233 try: request = self._rq[0]
234 except IndexError: request = None
235 if request and request.finished:
236 self._rq.popleft()
237 continue
238
239 # now request is an unfinished request, or None
240 try: (queuetime, packet) = self._pq[0]
e2d41dc1 241 except IndexError:
0ac316c8 242 # no packets, oh well
094ee3a2
IJ
243 break
244
245 age = time.monotonic() - queuetime
246 if age > self.max_queue_time:
247 self._pq.popleft()
0ac316c8
IJ
248 continue
249
094ee3a2
IJ
250 if request is None:
251 # no request
252 break
253
254 # request, and also some non-expired packets
255 while True:
256 try: (dummy, packet) = self._pq[0]
257 except IndexError: break
258
259 encoded = slip_encode(packet)
260
261 if request.sentLength > 0:
262 if (request.sentLength + len(slip_delimiter)
263 + len(encoded) > self.max_batch_down):
264 break
265 request.write(slip_delimiter)
266
267 request.write(encoded)
268 self._pq.popLeft()
269
270 assert(request.sentLength)
271 self._rq.popLeft()
272 request.finish()
273 # round again, looking for more to do
ec88b1f1 274
5da7763e 275class IphttpResource(twisted.web.resource.Resource):
c1e4910b 276 isLeaf = True
5da7763e
IJ
277 def render_POST(self, request):
278 # find client, update config, etc.
e2d41dc1 279 ci = ipaddr(request.args['i'])
5da7763e
IJ
280 c = clients[ci]
281 pw = request.args['pw']
282 if pw != c.pw: raise ValueError('bad password')
283
284 # update config
285 for r, w in (('mbd', 'max_batch_down'),
286 ('mqt', 'max_queue_time'),
287 ('mrt', 'max_request_time')):
288 try: v = request.args[r]
289 except KeyError: continue
290 v = int(v)
291 c.__dict__[w] = v
292
293 try: d = request.args['d']
294 except KeyError: d = ''
295
296 c.process_arriving_data(d)
297 c.new_request(request)
298
8e279651 299 def render_GET(self, request):
b11c6e7a 300 return b'<html><body>hippotit</body></html>'
8e279651 301
5da7763e
IJ
302def start_http():
303 resource = IphttpResource()
b11c6e7a 304 site = twisted.web.server.Site(resource)
e2d41dc1 305 for addrspec in cfg.get('server','addrs').split():
5da7763e
IJ
306 try:
307 addr = ipaddress.IPv4Address(addrspec)
308 endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
309 except AddressValueError:
310 addr = ipaddress.IPv6Address(addrspec)
311 endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
312 ep = endpointfactory(reactor, cfg.getint('server','port'), addr)
b11c6e7a 313 crash_on_defer(ep.listen(site))
5da7763e
IJ
314
315#---------- config and setup ----------
316
3fba9787
IJ
317def process_cfg():
318 global network
e75e9c17
IJ
319 global host
320 global relay
5bae5ba3 321 global ipif_command
3fba9787 322
ec88b1f1 323 network = ipnetwork(cfg.get('virtual','network'))
e75e9c17
IJ
324 if network.num_addresses < 3 + 2:
325 raise ValueError('network needs at least 2^3 addresses')
326
3fba9787 327 try:
e75e9c17
IJ
328 host = cfg.get('virtual','host')
329 except NoOptionError:
e2d41dc1 330 host = next(network.hosts())
e75e9c17
IJ
331
332 try:
333 relay = cfg.get('virtual','relay')
e2d41dc1 334 except NoOptionError:
e75e9c17 335 for search in network.hosts():
e2d41dc1 336 if search == host: continue
e75e9c17
IJ
337 relay = search
338 break
3fba9787 339
ec88b1f1
IJ
340 for cs in cfg.sections():
341 if not (':' in cs or '.' in cs): continue
e2d41dc1 342 ci = ipaddr(cs)
ec88b1f1
IJ
343 if ci not in network:
344 raise ValueError('client %s not in network' % ci)
345 if ci in clients:
346 raise ValueError('multiple client cfg sections for %s' % ci)
347 clients[ci] = Client(ci, cs)
3fba9787 348
e2d41dc1
IJ
349 global mtu
350 mtu = cfg.get('virtual','mtu')
351
5bae5ba3
IJ
352 iic_vars = { }
353 for k in ('host','relay','mtu','network'):
354 iic_vars[k] = globals()[k]
355
356 ipif_command = cfg.get('server','ipif', vars=iic_vars)
357
e75e9c17 358def startup():
e2d41dc1
IJ
359 global cfg
360
e75e9c17
IJ
361 op = OptionParser()
362 op.add_option('-c', '--config', dest='configfile',
363 default='/etc/hippottd/server.conf')
364 global opts
365 (opts, args) = op.parse_args()
366 if len(args): op.error('no non-option arguments please')
367
e2d41dc1
IJ
368 twisted.logger.globalLogPublisher.addObserver(crash_on_critical)
369
e75e9c17 370 cfg = ConfigParser()
5bae5ba3 371 cfg.read_string(defcfg)
e2d41dc1 372 cfg.read(opts.configfile)
5bae5ba3
IJ
373 process_cfg()
374
375 start_ipif()
376 start_http()
e2d41dc1
IJ
377
378startup()
379reactor.run()
aa663282 380print('CRASHED (end)', file=sys.stderr)