wip
[hippotat] / server
1 #!/usr/bin/python3
2
3 import sys
4 import os
5
6 import twisted
7 import twisted.internet
8 import twisted.internet.endpoints
9 from twisted.internet import reactor
10 from twisted.web.server import NOT_DONE_YET
11 from twisted.logger import LogLevel
12
13 import ipaddress
14 from ipaddress import AddressValueError
15
16 #import twisted.web.server import Site
17 #from twisted.web.resource import Resource
18
19 from optparse import OptionParser
20 from configparser import ConfigParser
21 from configparser import NoOptionError
22
23 import collections
24
25 import syslog
26
27 clients = { }
28
29 def ipaddr(input):
30 try:
31 r = ipaddress.IPv4Address(input)
32 except AddressValueError:
33 r = ipaddress.IPv6Address(input)
34 return r
35
36 def ipnetwork(input):
37 try:
38 r = ipaddress.IPv4Network(input)
39 except NetworkValueError:
40 r = ipaddress.IPv6Network(input)
41 return r
42
43 defcfg = '''
44 [DEFAULT]
45 max_batch_down = 65536
46 max_queue_time = 10
47 max_request_time = 54
48
49 [virtual]
50 mtu = 1500
51 # network
52 # [host]
53 # [relay]
54
55 [server]
56 ipif = userv root ipif %(host)s,%(relay)s,%(mtu)s,slip %(network)s
57 addrs = 127.0.0.1 ::1
58 port = 80
59
60 [limits]
61 max_batch_down = 262144
62 max_queue_time = 121
63 max_request_time = 121
64 '''
65
66 #---------- "router" ----------
67
68 def route(packet, daddr):
69 try: client = clients[daddr]
70 except KeyError: dclient = None
71 if dclient is not None:
72 dclient.queue_outbound(packet)
73 elif daddr == host or daddr not in network:
74 queue_inbound(packet)
75 elif daddr == relay:
76 log_discard(packet, saddr, daddr, 'relay')
77 else:
78 log_discard(packet, saddr, daddr, 'no client')
79
80 def log_discard(packet, saddr, daddr, why):
81 syslog.syslog(syslog.LOG_DEBUG,
82 'discarded packet %s -> %s (%s)' % (saddr, daddr, why))
83
84 #---------- ipif (slip subprocess) ----------
85
86 class IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
87 def __init__(self):
88 self._buffer = b''
89 def connectionMade(self): pass
90 def outReceived(self, data):
91 #print('RECV ', repr(data))
92 self._buffer += data
93 packets = slip_decode(self._buffer)
94 self._buffer = packets.pop()
95 for packet in packets:
96 (saddr, daddr) = packet_addrs(packet)
97 route(packet, daddr)
98 def processEnded(self, status):
99 status.raiseException()
100
101 def start_ipif():
102 global ipif
103 ipif = IpifProcessProtocol()
104 reactor.spawnProcess(ipif,
105 '/bin/sh',['sh','-xc', ipif_command],
106 childFDs={0:'w', 1:'r', 2:2})
107
108 def queue_inbound(packet):
109 ipif.transport.write(slip_delimiter)
110 ipif.transport.write(slip_encode(packet))
111 ipif.transport.write(slip_delimiter)
112
113 #---------- SLIP handling ----------
114
115 slip_end = b'\300'
116 slip_esc = b'\333'
117 slip_esc_end = b'\334'
118 slip_esc_esc = b'\335'
119 slip_delimiter = slip_end
120
121 def slip_encode(packet):
122 return (packet
123 .replace(slip_esc, slip_esc + slip_esc_esc)
124 .replace(slip_end, slip_esc + slip_esc_end))
125
126 def slip_decode(data):
127 print('DECODE ', repr(data))
128 out = []
129 for packet in data.split(slip_end):
130 pdata = b''
131 while True:
132 eix = packet.find(slip_esc)
133 if eix == -1:
134 pdata += packet
135 break
136 #print('ESC ', repr((pdata, packet, eix)))
137 pdata += packet[0 : eix]
138 ck = packet[eix+1]
139 if ck == slip_esc_esc: pdata += slip_esc
140 elif ck == slip_esc_end: pdata += slip_end
141 else: raise ValueError('invalid SLIP escape')
142 packet = packet[eix+2 : ]
143 out.append(pdata)
144 print('DECODED ', repr(out))
145 return out
146
147 #---------- packet parsing ----------
148
149 def packet_addrs(packet):
150 pass
151
152 #---------- client ----------
153
154 class Client():
155 def __init__(self, ip, cs):
156 # instance data members
157 self._ip = ip
158 self._cs = cs
159 self.pw = cfg.get(cs, 'password')
160 self._rq = collections.deque() # requests
161 self._pq = collections.deque() # packets
162 # plus from config:
163 # .max_batch_down
164 # .max_queue_time
165 # .max_request_time
166 for k in ('max_batch_down','max_queue_time','max_request_time'):
167 req = cfg.getint(cs, k)
168 limit = cfg.getint('limits',k)
169 self.__dict__[k] = min(req, limit)
170
171 def process_arriving_data(self, d):
172 for packet in slip_decode(d):
173 (saddr, daddr) = packet_addrs(packet)
174 if saddr != self._ip:
175 raise ValueError('wrong source address %s' % saddr)
176 route(packet, daddr)
177
178 def _req_cancel(self, request):
179 request.finish()
180
181 def _req_error(self, err, request):
182 self._req_cancel(request)
183
184 def queue_outbound(self, packet):
185 self._pq.append((time.monotonic(), packet))
186
187 def http_request(self, request):
188 request.setHeader('Content-Type','application/octet-stream')
189 reactor.callLater(self.max_request_time, self._req_cancel, request)
190 request.notifyFinish().addErrback(self._req_error, request)
191 self._rq.append(request)
192 self._check_outbound()
193
194 def _check_outbound(self):
195 while True:
196 try: request = self._rq[0]
197 except IndexError: request = None
198 if request and request.finished:
199 self._rq.popleft()
200 continue
201
202 # now request is an unfinished request, or None
203 try: (queuetime, packet) = self._pq[0]
204 except IndexError:
205 # no packets, oh well
206 break
207
208 age = time.monotonic() - queuetime
209 if age > self.max_queue_time:
210 self._pq.popleft()
211 continue
212
213 if request is None:
214 # no request
215 break
216
217 # request, and also some non-expired packets
218 while True:
219 try: (dummy, packet) = self._pq[0]
220 except IndexError: break
221
222 encoded = slip_encode(packet)
223
224 if request.sentLength > 0:
225 if (request.sentLength + len(slip_delimiter)
226 + len(encoded) > self.max_batch_down):
227 break
228 request.write(slip_delimiter)
229
230 request.write(encoded)
231 self._pq.popLeft()
232
233 assert(request.sentLength)
234 self._rq.popLeft()
235 request.finish()
236 # round again, looking for more to do
237
238 class IphttpResource(twisted.web.resource.Resource):
239 def render_POST(self, request):
240 # find client, update config, etc.
241 ci = ipaddr(request.args['i'])
242 c = clients[ci]
243 pw = request.args['pw']
244 if pw != c.pw: raise ValueError('bad password')
245
246 # update config
247 for r, w in (('mbd', 'max_batch_down'),
248 ('mqt', 'max_queue_time'),
249 ('mrt', 'max_request_time')):
250 try: v = request.args[r]
251 except KeyError: continue
252 v = int(v)
253 c.__dict__[w] = v
254
255 try: d = request.args['d']
256 except KeyError: d = ''
257
258 c.process_arriving_data(d)
259 c.new_request(request)
260
261 def start_http():
262 resource = IphttpResource()
263 sitefactory = twisted.web.server.Site(resource)
264 for addrspec in cfg.get('server','addrs').split():
265 try:
266 addr = ipaddress.IPv4Address(addrspec)
267 endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
268 except AddressValueError:
269 addr = ipaddress.IPv6Address(addrspec)
270 endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
271 ep = endpointfactory(reactor, cfg.getint('server','port'), addr)
272 ep.listen(sitefactory)
273
274 #---------- config and setup ----------
275
276 def process_cfg():
277 global network
278 global host
279 global relay
280 global ipif_command
281
282 network = ipnetwork(cfg.get('virtual','network'))
283 if network.num_addresses < 3 + 2:
284 raise ValueError('network needs at least 2^3 addresses')
285
286 try:
287 host = cfg.get('virtual','host')
288 except NoOptionError:
289 host = next(network.hosts())
290
291 try:
292 relay = cfg.get('virtual','relay')
293 except NoOptionError:
294 for search in network.hosts():
295 if search == host: continue
296 relay = search
297 break
298
299 for cs in cfg.sections():
300 if not (':' in cs or '.' in cs): continue
301 ci = ipaddr(cs)
302 if ci not in network:
303 raise ValueError('client %s not in network' % ci)
304 if ci in clients:
305 raise ValueError('multiple client cfg sections for %s' % ci)
306 clients[ci] = Client(ci, cs)
307
308 global mtu
309 mtu = cfg.get('virtual','mtu')
310
311 iic_vars = { }
312 for k in ('host','relay','mtu','network'):
313 iic_vars[k] = globals()[k]
314
315 ipif_command = cfg.get('server','ipif', vars=iic_vars)
316
317 def crash_on_critical(event):
318 if event.get('log_level') >= LogLevel.critical:
319 print('crashing: ', twisted.logger.formatEvent(event), file=sys.stderr)
320 #print('crashing!', file=sys.stderr)
321 #os._exit(1)
322 try: reactor.stop()
323 except twisted.internet.error.ReactorNotRunning: pass
324
325 def startup():
326 global cfg
327
328 op = OptionParser()
329 op.add_option('-c', '--config', dest='configfile',
330 default='/etc/hippottd/server.conf')
331 global opts
332 (opts, args) = op.parse_args()
333 if len(args): op.error('no non-option arguments please')
334
335 twisted.logger.globalLogPublisher.addObserver(crash_on_critical)
336
337 cfg = ConfigParser()
338 cfg.read_string(defcfg)
339 cfg.read(opts.configfile)
340 process_cfg()
341
342 start_ipif()
343 start_http()
344
345 startup()
346 reactor.run()