wip
[hippotat] / server
CommitLineData
094ee3a2 1#!/usr/bin/python3
3fba9787 2
e2d41dc1
IJ
3import sys
4import os
5
5bae5ba3 6import twisted
e2d41dc1
IJ
7import twisted.internet
8import twisted.internet.endpoints
9from twisted.internet import reactor
10from twisted.web.server import NOT_DONE_YET
11from twisted.logger import LogLevel
12
13import ipaddress
14from ipaddress import AddressValueError
5bae5ba3 15
5da7763e
IJ
16#import twisted.web.server import Site
17#from twisted.web.resource import Resource
3fba9787 18
e75e9c17
IJ
19from optparse import OptionParser
20from configparser import ConfigParser
21from configparser import NoOptionError
3fba9787 22
0ac316c8
IJ
23import collections
24
c4b6d990
IJ
25import syslog
26
3fba9787
IJ
27clients = { }
28
e2d41dc1 29def ipaddr(input):
3fba9787 30 try:
ec88b1f1 31 r = ipaddress.IPv4Address(input)
3fba9787 32 except AddressValueError:
ec88b1f1 33 r = ipaddress.IPv6Address(input)
3fba9787
IJ
34 return r
35
36def ipnetwork(input):
37 try:
ec88b1f1 38 r = ipaddress.IPv4Network(input)
3fba9787 39 except NetworkValueError:
ec88b1f1 40 r = ipaddress.IPv6Network(input)
3fba9787
IJ
41 return r
42
e75e9c17 43defcfg = '''
094ee3a2
IJ
44[DEFAULT]
45max_batch_down = 65536
46max_queue_time = 10
47max_request_time = 54
48
e75e9c17
IJ
49[virtual]
50mtu = 1500
51# network
52# [host]
53# [relay]
54
55[server]
e2d41dc1 56ipif = userv root ipif %(host)s,%(relay)s,%(mtu)s,slip %(network)s
5da7763e
IJ
57addrs = 127.0.0.1 ::1
58port = 80
e75e9c17 59
094ee3a2
IJ
60[limits]
61max_batch_down = 262144
62max_queue_time = 121
63max_request_time = 121
ec88b1f1
IJ
64'''
65
5da7763e
IJ
66#---------- "router" ----------
67
ec0c4d95
IJ
68def route(packet, saddr, daddr):
69 print('TRACE ', saddr, daddr, packet)
5da7763e
IJ
70 try: client = clients[daddr]
71 except KeyError: dclient = None
72 if dclient is not None:
73 dclient.queue_outbound(packet)
3a6076b4 74 elif saddr.is_link_local or daddr.is_link_local:
ec0c4d95 75 log_discard(packet, saddr, daddr, 'link-local')
e2d41dc1 76 elif daddr == host or daddr not in network:
ec0c4d95 77 print('TRACE INBOUND ', saddr, daddr, packet)
5da7763e 78 queue_inbound(packet)
e2d41dc1 79 elif daddr == relay:
5da7763e
IJ
80 log_discard(packet, saddr, daddr, 'relay')
81 else:
82 log_discard(packet, saddr, daddr, 'no client')
83
84def log_discard(packet, saddr, daddr, why):
3a6076b4 85 print('DROP ', saddr, daddr, why)
ec0c4d95
IJ
86# syslog.syslog(syslog.LOG_DEBUG,
87# 'discarded packet %s -> %s (%s)' % (saddr, daddr, why))
5da7763e
IJ
88
89#---------- ipif (slip subprocess) ----------
90
5bae5ba3
IJ
91class IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
92 def __init__(self):
93 self._buffer = b''
94 def connectionMade(self): pass
95 def outReceived(self, data):
ce7f1431 96 #print('RECV ', repr(data))
2b95da16
IJ
97 self._buffer += data
98 packets = slip_decode(self._buffer)
99 self._buffer = packets.pop()
5bae5ba3 100 for packet in packets:
ec0c4d95 101 if not len(packet): continue
5bae5ba3 102 (saddr, daddr) = packet_addrs(packet)
ec0c4d95 103 route(packet, saddr, daddr)
5da7763e
IJ
104 def processEnded(self, status):
105 status.raiseException()
5bae5ba3
IJ
106
107def start_ipif():
5da7763e
IJ
108 global ipif
109 ipif = IpifProcessProtocol()
110 reactor.spawnProcess(ipif,
ce7f1431 111 '/bin/sh',['sh','-xc', ipif_command],
5bae5ba3
IJ
112 childFDs={0:'w', 1:'r', 2:2})
113
5da7763e
IJ
114def queue_inbound(packet):
115 ipif.transport.write(slip_delimiter)
116 ipif.transport.write(slip_encode(packet))
117 ipif.transport.write(slip_delimiter)
5bae5ba3 118
ce7f1431
IJ
119#---------- SLIP handling ----------
120
121slip_end = b'\300'
122slip_esc = b'\333'
123slip_esc_end = b'\334'
124slip_esc_esc = b'\335'
125slip_delimiter = slip_end
126
127def slip_encode(packet):
128 return (packet
129 .replace(slip_esc, slip_esc + slip_esc_esc)
130 .replace(slip_end, slip_esc + slip_esc_end))
131
132def slip_decode(data):
133 print('DECODE ', repr(data))
134 out = []
135 for packet in data.split(slip_end):
136 pdata = b''
137 while True:
138 eix = packet.find(slip_esc)
139 if eix == -1:
140 pdata += packet
141 break
142 #print('ESC ', repr((pdata, packet, eix)))
143 pdata += packet[0 : eix]
144 ck = packet[eix+1]
145 if ck == slip_esc_esc: pdata += slip_esc
146 elif ck == slip_esc_end: pdata += slip_end
147 else: raise ValueError('invalid SLIP escape')
148 packet = packet[eix+2 : ]
149 out.append(pdata)
150 print('DECODED ', repr(out))
151 return out
152
153#---------- packet parsing ----------
154
155def packet_addrs(packet):
ec0c4d95
IJ
156 version = packet[0] >> 4
157 if version == 4:
158 addrlen = 4
159 saddroff = 3*4
160 factory = ipaddress.IPv4Address
161 elif version == 6:
162 addrlen = 16
163 saddroff = 2*4
164 factory = ipaddress.IPv6Address
165 else:
166 raise ValueError('unsupported IP version %d' % version)
167 saddr = factory(packet[ saddroff : saddroff + addrlen ])
168 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
169 return (saddr, daddr)
ce7f1431 170
5da7763e 171#---------- client ----------
c4b6d990 172
ec88b1f1 173class Client():
c4b6d990 174 def __init__(self, ip, cs):
ec88b1f1
IJ
175 # instance data members
176 self._ip = ip
177 self._cs = cs
178 self.pw = cfg.get(cs, 'password')
0ac316c8
IJ
179 self._rq = collections.deque() # requests
180 self._pq = collections.deque() # packets
c4b6d990
IJ
181 # plus from config:
182 # .max_batch_down
183 # .max_queue_time
184 # .max_request_time
ec88b1f1
IJ
185 for k in ('max_batch_down','max_queue_time','max_request_time'):
186 req = cfg.getint(cs, k)
094ee3a2 187 limit = cfg.getint('limits',k)
c4b6d990
IJ
188 self.__dict__[k] = min(req, limit)
189
190 def process_arriving_data(self, d):
191 for packet in slip_decode(d):
5bae5ba3 192 (saddr, daddr) = packet_addrs(packet)
c4b6d990
IJ
193 if saddr != self._ip:
194 raise ValueError('wrong source address %s' % saddr)
ec0c4d95 195 route(packet, saddr, daddr)
ec88b1f1 196
c4b6d990
IJ
197 def _req_cancel(self, request):
198 request.finish()
199
200 def _req_error(self, err, request):
201 self._req_cancel(request)
202
0ac316c8 203 def queue_outbound(self, packet):
094ee3a2 204 self._pq.append((time.monotonic(), packet))
0ac316c8 205
c4b6d990
IJ
206 def http_request(self, request):
207 request.setHeader('Content-Type','application/octet-stream')
208 reactor.callLater(self.max_request_time, self._req_cancel, request)
209 request.notifyFinish().addErrback(self._req_error, request)
0ac316c8
IJ
210 self._rq.append(request)
211 self._check_outbound()
212
213 def _check_outbound(self):
214 while True:
215 try: request = self._rq[0]
216 except IndexError: request = None
217 if request and request.finished:
218 self._rq.popleft()
219 continue
220
221 # now request is an unfinished request, or None
222 try: (queuetime, packet) = self._pq[0]
e2d41dc1 223 except IndexError:
0ac316c8 224 # no packets, oh well
094ee3a2
IJ
225 break
226
227 age = time.monotonic() - queuetime
228 if age > self.max_queue_time:
229 self._pq.popleft()
0ac316c8
IJ
230 continue
231
094ee3a2
IJ
232 if request is None:
233 # no request
234 break
235
236 # request, and also some non-expired packets
237 while True:
238 try: (dummy, packet) = self._pq[0]
239 except IndexError: break
240
241 encoded = slip_encode(packet)
242
243 if request.sentLength > 0:
244 if (request.sentLength + len(slip_delimiter)
245 + len(encoded) > self.max_batch_down):
246 break
247 request.write(slip_delimiter)
248
249 request.write(encoded)
250 self._pq.popLeft()
251
252 assert(request.sentLength)
253 self._rq.popLeft()
254 request.finish()
255 # round again, looking for more to do
ec88b1f1 256
5da7763e
IJ
257class IphttpResource(twisted.web.resource.Resource):
258 def render_POST(self, request):
259 # find client, update config, etc.
e2d41dc1 260 ci = ipaddr(request.args['i'])
5da7763e
IJ
261 c = clients[ci]
262 pw = request.args['pw']
263 if pw != c.pw: raise ValueError('bad password')
264
265 # update config
266 for r, w in (('mbd', 'max_batch_down'),
267 ('mqt', 'max_queue_time'),
268 ('mrt', 'max_request_time')):
269 try: v = request.args[r]
270 except KeyError: continue
271 v = int(v)
272 c.__dict__[w] = v
273
274 try: d = request.args['d']
275 except KeyError: d = ''
276
277 c.process_arriving_data(d)
278 c.new_request(request)
279
280def start_http():
281 resource = IphttpResource()
282 sitefactory = twisted.web.server.Site(resource)
e2d41dc1 283 for addrspec in cfg.get('server','addrs').split():
5da7763e
IJ
284 try:
285 addr = ipaddress.IPv4Address(addrspec)
286 endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
287 except AddressValueError:
288 addr = ipaddress.IPv6Address(addrspec)
289 endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
290 ep = endpointfactory(reactor, cfg.getint('server','port'), addr)
a63dd501
IJ
291 defer = ep.listen(sitefactory)
292 defer.addErrback(lambda err: err.raiseException())
5da7763e
IJ
293
294#---------- config and setup ----------
295
3fba9787
IJ
296def process_cfg():
297 global network
e75e9c17
IJ
298 global host
299 global relay
5bae5ba3 300 global ipif_command
3fba9787 301
ec88b1f1 302 network = ipnetwork(cfg.get('virtual','network'))
e75e9c17
IJ
303 if network.num_addresses < 3 + 2:
304 raise ValueError('network needs at least 2^3 addresses')
305
3fba9787 306 try:
e75e9c17
IJ
307 host = cfg.get('virtual','host')
308 except NoOptionError:
e2d41dc1 309 host = next(network.hosts())
e75e9c17
IJ
310
311 try:
312 relay = cfg.get('virtual','relay')
e2d41dc1 313 except NoOptionError:
e75e9c17 314 for search in network.hosts():
e2d41dc1 315 if search == host: continue
e75e9c17
IJ
316 relay = search
317 break
3fba9787 318
ec88b1f1
IJ
319 for cs in cfg.sections():
320 if not (':' in cs or '.' in cs): continue
e2d41dc1 321 ci = ipaddr(cs)
ec88b1f1
IJ
322 if ci not in network:
323 raise ValueError('client %s not in network' % ci)
324 if ci in clients:
325 raise ValueError('multiple client cfg sections for %s' % ci)
326 clients[ci] = Client(ci, cs)
3fba9787 327
e2d41dc1
IJ
328 global mtu
329 mtu = cfg.get('virtual','mtu')
330
5bae5ba3
IJ
331 iic_vars = { }
332 for k in ('host','relay','mtu','network'):
333 iic_vars[k] = globals()[k]
334
335 ipif_command = cfg.get('server','ipif', vars=iic_vars)
336
e2d41dc1
IJ
337def crash_on_critical(event):
338 if event.get('log_level') >= LogLevel.critical:
339 print('crashing: ', twisted.logger.formatEvent(event), file=sys.stderr)
340 #print('crashing!', file=sys.stderr)
341 #os._exit(1)
342 try: reactor.stop()
343 except twisted.internet.error.ReactorNotRunning: pass
344
e75e9c17 345def startup():
e2d41dc1
IJ
346 global cfg
347
e75e9c17
IJ
348 op = OptionParser()
349 op.add_option('-c', '--config', dest='configfile',
350 default='/etc/hippottd/server.conf')
351 global opts
352 (opts, args) = op.parse_args()
353 if len(args): op.error('no non-option arguments please')
354
e2d41dc1
IJ
355 twisted.logger.globalLogPublisher.addObserver(crash_on_critical)
356
e75e9c17 357 cfg = ConfigParser()
5bae5ba3 358 cfg.read_string(defcfg)
e2d41dc1 359 cfg.read(opts.configfile)
5bae5ba3
IJ
360 process_cfg()
361
362 start_ipif()
363 start_http()
e2d41dc1
IJ
364
365startup()
366reactor.run()