hippotit -> hippotat
[hippotat] / server
CommitLineData
094ee3a2 1#!/usr/bin/python3
3fba9787 2
aa663282
IJ
3import signal
4signal.signal(signal.SIGINT, signal.SIG_DFL)
5
e2d41dc1
IJ
6import sys
7import os
8
5bae5ba3 9import twisted
e2d41dc1
IJ
10import twisted.internet
11import twisted.internet.endpoints
12from twisted.internet import reactor
13from twisted.web.server import NOT_DONE_YET
14from twisted.logger import LogLevel
15
5da7763e
IJ
16#import twisted.web.server import Site
17#from twisted.web.resource import Resource
3fba9787 18
e75e9c17
IJ
19from optparse import OptionParser
20from configparser import ConfigParser
21from configparser import NoOptionError
3fba9787 22
0ac316c8
IJ
23import collections
24
c4b6d990
IJ
25import syslog
26
b0cfbfce 27from hippotit import *
3fba9787 28
b0cfbfce 29clients = { }
3fba9787 30
e75e9c17 31defcfg = '''
094ee3a2
IJ
32[DEFAULT]
33max_batch_down = 65536
34max_queue_time = 10
35max_request_time = 54
36
e75e9c17
IJ
37[virtual]
38mtu = 1500
39# network
40# [host]
41# [relay]
42
43[server]
e2d41dc1 44ipif = userv root ipif %(host)s,%(relay)s,%(mtu)s,slip %(network)s
5da7763e 45addrs = 127.0.0.1 ::1
aa663282 46port = 8099
e75e9c17 47
094ee3a2
IJ
48[limits]
49max_batch_down = 262144
50max_queue_time = 121
51max_request_time = 121
ec88b1f1
IJ
52'''
53
aa663282
IJ
54#---------- error handling ----------
55
56def crash(err):
57 print('CRASH ', err, file=sys.stderr)
58 try: reactor.stop()
59 except twisted.internet.error.ReactorNotRunning: pass
60
61def crash_on_defer(defer):
62 defer.addErrback(lambda err: crash(err))
63
64def crash_on_critical(event):
65 if event.get('log_level') >= LogLevel.critical:
66 crash(twisted.logger.formatEvent(event))
67
5da7763e
IJ
68#---------- "router" ----------
69
ec0c4d95
IJ
70def route(packet, saddr, daddr):
71 print('TRACE ', saddr, daddr, packet)
5da7763e
IJ
72 try: client = clients[daddr]
73 except KeyError: dclient = None
74 if dclient is not None:
75 dclient.queue_outbound(packet)
3a6076b4 76 elif saddr.is_link_local or daddr.is_link_local:
ec0c4d95 77 log_discard(packet, saddr, daddr, 'link-local')
e2d41dc1 78 elif daddr == host or daddr not in network:
ec0c4d95 79 print('TRACE INBOUND ', saddr, daddr, packet)
5da7763e 80 queue_inbound(packet)
e2d41dc1 81 elif daddr == relay:
5da7763e
IJ
82 log_discard(packet, saddr, daddr, 'relay')
83 else:
84 log_discard(packet, saddr, daddr, 'no client')
85
86def log_discard(packet, saddr, daddr, why):
3a6076b4 87 print('DROP ', saddr, daddr, why)
ec0c4d95
IJ
88# syslog.syslog(syslog.LOG_DEBUG,
89# 'discarded packet %s -> %s (%s)' % (saddr, daddr, why))
5da7763e
IJ
90
91#---------- ipif (slip subprocess) ----------
92
5bae5ba3
IJ
93class IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
94 def __init__(self):
95 self._buffer = b''
96 def connectionMade(self): pass
97 def outReceived(self, data):
ce7f1431 98 #print('RECV ', repr(data))
2b95da16 99 self._buffer += data
b0cfbfce 100 packets = slip.decode(self._buffer)
2b95da16 101 self._buffer = packets.pop()
5bae5ba3 102 for packet in packets:
ec0c4d95 103 if not len(packet): continue
5bae5ba3 104 (saddr, daddr) = packet_addrs(packet)
ec0c4d95 105 route(packet, saddr, daddr)
5da7763e
IJ
106 def processEnded(self, status):
107 status.raiseException()
5bae5ba3
IJ
108
109def start_ipif():
5da7763e
IJ
110 global ipif
111 ipif = IpifProcessProtocol()
112 reactor.spawnProcess(ipif,
ce7f1431 113 '/bin/sh',['sh','-xc', ipif_command],
5bae5ba3
IJ
114 childFDs={0:'w', 1:'r', 2:2})
115
5da7763e 116def queue_inbound(packet):
b0cfbfce
IJ
117 ipif.transport.write(slip.delimiter)
118 ipif.transport.write(slip.encode(packet))
119 ipif.transport.write(slip.delimiter)
ce7f1431 120
5da7763e 121#---------- client ----------
c4b6d990 122
ec88b1f1 123class Client():
c4b6d990 124 def __init__(self, ip, cs):
ec88b1f1
IJ
125 # instance data members
126 self._ip = ip
127 self._cs = cs
128 self.pw = cfg.get(cs, 'password')
0ac316c8
IJ
129 self._rq = collections.deque() # requests
130 self._pq = collections.deque() # packets
c4b6d990
IJ
131 # plus from config:
132 # .max_batch_down
133 # .max_queue_time
134 # .max_request_time
ec88b1f1
IJ
135 for k in ('max_batch_down','max_queue_time','max_request_time'):
136 req = cfg.getint(cs, k)
094ee3a2 137 limit = cfg.getint('limits',k)
c4b6d990
IJ
138 self.__dict__[k] = min(req, limit)
139
140 def process_arriving_data(self, d):
b0cfbfce 141 for packet in slip.decode(d):
5bae5ba3 142 (saddr, daddr) = packet_addrs(packet)
c4b6d990
IJ
143 if saddr != self._ip:
144 raise ValueError('wrong source address %s' % saddr)
ec0c4d95 145 route(packet, saddr, daddr)
ec88b1f1 146
c4b6d990
IJ
147 def _req_cancel(self, request):
148 request.finish()
149
150 def _req_error(self, err, request):
151 self._req_cancel(request)
152
0ac316c8 153 def queue_outbound(self, packet):
094ee3a2 154 self._pq.append((time.monotonic(), packet))
0ac316c8 155
c4b6d990
IJ
156 def http_request(self, request):
157 request.setHeader('Content-Type','application/octet-stream')
158 reactor.callLater(self.max_request_time, self._req_cancel, request)
159 request.notifyFinish().addErrback(self._req_error, request)
0ac316c8
IJ
160 self._rq.append(request)
161 self._check_outbound()
162
163 def _check_outbound(self):
164 while True:
165 try: request = self._rq[0]
166 except IndexError: request = None
167 if request and request.finished:
168 self._rq.popleft()
169 continue
170
171 # now request is an unfinished request, or None
172 try: (queuetime, packet) = self._pq[0]
e2d41dc1 173 except IndexError:
0ac316c8 174 # no packets, oh well
094ee3a2
IJ
175 break
176
177 age = time.monotonic() - queuetime
178 if age > self.max_queue_time:
179 self._pq.popleft()
0ac316c8
IJ
180 continue
181
094ee3a2
IJ
182 if request is None:
183 # no request
184 break
185
186 # request, and also some non-expired packets
187 while True:
188 try: (dummy, packet) = self._pq[0]
189 except IndexError: break
190
b0cfbfce 191 encoded = slip.encode(packet)
094ee3a2
IJ
192
193 if request.sentLength > 0:
b0cfbfce 194 if (request.sentLength + len(slip.delimiter)
094ee3a2
IJ
195 + len(encoded) > self.max_batch_down):
196 break
b0cfbfce 197 request.write(slip.delimiter)
094ee3a2
IJ
198
199 request.write(encoded)
200 self._pq.popLeft()
201
202 assert(request.sentLength)
203 self._rq.popLeft()
204 request.finish()
205 # round again, looking for more to do
ec88b1f1 206
5da7763e 207class IphttpResource(twisted.web.resource.Resource):
c1e4910b 208 isLeaf = True
5da7763e
IJ
209 def render_POST(self, request):
210 # find client, update config, etc.
e2d41dc1 211 ci = ipaddr(request.args['i'])
5da7763e
IJ
212 c = clients[ci]
213 pw = request.args['pw']
214 if pw != c.pw: raise ValueError('bad password')
215
216 # update config
217 for r, w in (('mbd', 'max_batch_down'),
218 ('mqt', 'max_queue_time'),
219 ('mrt', 'max_request_time')):
220 try: v = request.args[r]
221 except KeyError: continue
222 v = int(v)
223 c.__dict__[w] = v
224
225 try: d = request.args['d']
226 except KeyError: d = ''
227
228 c.process_arriving_data(d)
229 c.new_request(request)
230
8e279651 231 def render_GET(self, request):
b11c6e7a 232 return b'<html><body>hippotit</body></html>'
8e279651 233
5da7763e
IJ
234def start_http():
235 resource = IphttpResource()
b11c6e7a 236 site = twisted.web.server.Site(resource)
e2d41dc1 237 for addrspec in cfg.get('server','addrs').split():
5da7763e
IJ
238 try:
239 addr = ipaddress.IPv4Address(addrspec)
240 endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
241 except AddressValueError:
242 addr = ipaddress.IPv6Address(addrspec)
243 endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
244 ep = endpointfactory(reactor, cfg.getint('server','port'), addr)
b11c6e7a 245 crash_on_defer(ep.listen(site))
5da7763e
IJ
246
247#---------- config and setup ----------
248
3fba9787
IJ
249def process_cfg():
250 global network
e75e9c17
IJ
251 global host
252 global relay
5bae5ba3 253 global ipif_command
3fba9787 254
ec88b1f1 255 network = ipnetwork(cfg.get('virtual','network'))
e75e9c17
IJ
256 if network.num_addresses < 3 + 2:
257 raise ValueError('network needs at least 2^3 addresses')
258
3fba9787 259 try:
e75e9c17
IJ
260 host = cfg.get('virtual','host')
261 except NoOptionError:
e2d41dc1 262 host = next(network.hosts())
e75e9c17
IJ
263
264 try:
265 relay = cfg.get('virtual','relay')
e2d41dc1 266 except NoOptionError:
e75e9c17 267 for search in network.hosts():
e2d41dc1 268 if search == host: continue
e75e9c17
IJ
269 relay = search
270 break
3fba9787 271
ec88b1f1
IJ
272 for cs in cfg.sections():
273 if not (':' in cs or '.' in cs): continue
e2d41dc1 274 ci = ipaddr(cs)
ec88b1f1
IJ
275 if ci not in network:
276 raise ValueError('client %s not in network' % ci)
277 if ci in clients:
278 raise ValueError('multiple client cfg sections for %s' % ci)
279 clients[ci] = Client(ci, cs)
3fba9787 280
e2d41dc1
IJ
281 global mtu
282 mtu = cfg.get('virtual','mtu')
283
5bae5ba3
IJ
284 iic_vars = { }
285 for k in ('host','relay','mtu','network'):
286 iic_vars[k] = globals()[k]
287
288 ipif_command = cfg.get('server','ipif', vars=iic_vars)
289
e75e9c17 290def startup():
e2d41dc1
IJ
291 global cfg
292
e75e9c17
IJ
293 op = OptionParser()
294 op.add_option('-c', '--config', dest='configfile',
295 default='/etc/hippottd/server.conf')
296 global opts
297 (opts, args) = op.parse_args()
298 if len(args): op.error('no non-option arguments please')
299
e2d41dc1
IJ
300 twisted.logger.globalLogPublisher.addObserver(crash_on_critical)
301
e75e9c17 302 cfg = ConfigParser()
5bae5ba3 303 cfg.read_string(defcfg)
e2d41dc1 304 cfg.read(opts.configfile)
5bae5ba3
IJ
305 process_cfg()
306
307 start_ipif()
308 start_http()
e2d41dc1
IJ
309
310startup()
311reactor.run()
aa663282 312print('CRASHED (end)', file=sys.stderr)