move ipif slip process handling
[hippotat] / server
1 #!/usr/bin/python3
2
3 import signal
4 signal.signal(signal.SIGINT, signal.SIG_DFL)
5
6 import sys
7 import os
8
9 import twisted
10 import twisted.internet
11 import twisted.internet.endpoints
12 from twisted.internet import reactor
13 from twisted.web.server import NOT_DONE_YET
14 from twisted.logger import LogLevel
15
16 #import twisted.web.server import Site
17 #from twisted.web.resource import Resource
18
19 from optparse import OptionParser
20 from configparser import ConfigParser
21 from configparser import NoOptionError
22
23 import collections
24
25 import syslog
26
27 from hippotat import *
28
29 clients = { }
30
31 defcfg = '''
32 [DEFAULT]
33 max_batch_down = 65536
34 max_queue_time = 10
35 max_request_time = 54
36
37 [virtual]
38 mtu = 1500
39 # network
40 # [host]
41 # [relay]
42
43 [server]
44 ipif = userv root ipif %(host)s,%(relay)s,%(mtu)s,slip %(network)s
45 addrs = 127.0.0.1 ::1
46 port = 8099
47
48 [limits]
49 max_batch_down = 262144
50 max_queue_time = 121
51 max_request_time = 121
52 '''
53
54 #---------- error handling ----------
55
56 def crash(err):
57 print('CRASH ', err, file=sys.stderr)
58 try: reactor.stop()
59 except twisted.internet.error.ReactorNotRunning: pass
60
61 def crash_on_defer(defer):
62 defer.addErrback(lambda err: crash(err))
63
64 def crash_on_critical(event):
65 if event.get('log_level') >= LogLevel.critical:
66 crash(twisted.logger.formatEvent(event))
67
68 #---------- "router" ----------
69
70 def route(packet, saddr, daddr):
71 print('TRACE ', saddr, daddr, packet)
72 try: client = clients[daddr]
73 except KeyError: dclient = None
74 if dclient is not None:
75 dclient.queue_outbound(packet)
76 elif saddr.is_link_local or daddr.is_link_local:
77 log_discard(packet, saddr, daddr, 'link-local')
78 elif daddr == host or daddr not in network:
79 print('TRACE INBOUND ', saddr, daddr, packet)
80 queue_inbound(packet)
81 elif daddr == relay:
82 log_discard(packet, saddr, daddr, 'relay')
83 else:
84 log_discard(packet, saddr, daddr, 'no client')
85
86 def log_discard(packet, saddr, daddr, why):
87 print('DROP ', saddr, daddr, why)
88 # syslog.syslog(syslog.LOG_DEBUG,
89 # 'discarded packet %s -> %s (%s)' % (saddr, daddr, why))
90
91 #---------- client ----------
92
93 class Client():
94 def __init__(self, ip, cs):
95 # instance data members
96 self._ip = ip
97 self._cs = cs
98 self.pw = cfg.get(cs, 'password')
99 self._rq = collections.deque() # requests
100 self._pq = collections.deque() # packets
101 # plus from config:
102 # .max_batch_down
103 # .max_queue_time
104 # .max_request_time
105 for k in ('max_batch_down','max_queue_time','max_request_time'):
106 req = cfg.getint(cs, k)
107 limit = cfg.getint('limits',k)
108 self.__dict__[k] = min(req, limit)
109
110 def process_arriving_data(self, d):
111 for packet in slip.decode(d):
112 (saddr, daddr) = packet_addrs(packet)
113 if saddr != self._ip:
114 raise ValueError('wrong source address %s' % saddr)
115 route(packet, saddr, daddr)
116
117 def _req_cancel(self, request):
118 request.finish()
119
120 def _req_error(self, err, request):
121 self._req_cancel(request)
122
123 def queue_outbound(self, packet):
124 self._pq.append((time.monotonic(), packet))
125
126 def http_request(self, request):
127 request.setHeader('Content-Type','application/octet-stream')
128 reactor.callLater(self.max_request_time, self._req_cancel, request)
129 request.notifyFinish().addErrback(self._req_error, request)
130 self._rq.append(request)
131 self._check_outbound()
132
133 def _check_outbound(self):
134 while True:
135 try: request = self._rq[0]
136 except IndexError: request = None
137 if request and request.finished:
138 self._rq.popleft()
139 continue
140
141 # now request is an unfinished request, or None
142 try: (queuetime, packet) = self._pq[0]
143 except IndexError:
144 # no packets, oh well
145 break
146
147 age = time.monotonic() - queuetime
148 if age > self.max_queue_time:
149 self._pq.popleft()
150 continue
151
152 if request is None:
153 # no request
154 break
155
156 # request, and also some non-expired packets
157 while True:
158 try: (dummy, packet) = self._pq[0]
159 except IndexError: break
160
161 encoded = slip.encode(packet)
162
163 if request.sentLength > 0:
164 if (request.sentLength + len(slip.delimiter)
165 + len(encoded) > self.max_batch_down):
166 break
167 request.write(slip.delimiter)
168
169 request.write(encoded)
170 self._pq.popLeft()
171
172 assert(request.sentLength)
173 self._rq.popLeft()
174 request.finish()
175 # round again, looking for more to do
176
177 class IphttpResource(twisted.web.resource.Resource):
178 isLeaf = True
179 def render_POST(self, request):
180 # find client, update config, etc.
181 ci = ipaddr(request.args['i'])
182 c = clients[ci]
183 pw = request.args['pw']
184 if pw != c.pw: raise ValueError('bad password')
185
186 # update config
187 for r, w in (('mbd', 'max_batch_down'),
188 ('mqt', 'max_queue_time'),
189 ('mrt', 'max_request_time')):
190 try: v = request.args[r]
191 except KeyError: continue
192 v = int(v)
193 c.__dict__[w] = v
194
195 try: d = request.args['d']
196 except KeyError: d = ''
197
198 c.process_arriving_data(d)
199 c.new_request(request)
200
201 def render_GET(self, request):
202 return b'<html><body>hippotat</body></html>'
203
204 def start_http():
205 resource = IphttpResource()
206 site = twisted.web.server.Site(resource)
207 for addrspec in cfg.get('server','addrs').split():
208 try:
209 addr = ipaddress.IPv4Address(addrspec)
210 endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
211 except AddressValueError:
212 addr = ipaddress.IPv6Address(addrspec)
213 endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
214 ep = endpointfactory(reactor, cfg.getint('server','port'), addr)
215 crash_on_defer(ep.listen(site))
216
217 #---------- config and setup ----------
218
219 def process_cfg():
220 global network
221 global host
222 global relay
223 global ipif_command
224
225 network = ipnetwork(cfg.get('virtual','network'))
226 if network.num_addresses < 3 + 2:
227 raise ValueError('network needs at least 2^3 addresses')
228
229 try:
230 host = cfg.get('virtual','host')
231 except NoOptionError:
232 host = next(network.hosts())
233
234 try:
235 relay = cfg.get('virtual','relay')
236 except NoOptionError:
237 for search in network.hosts():
238 if search == host: continue
239 relay = search
240 break
241
242 for cs in cfg.sections():
243 if not (':' in cs or '.' in cs): continue
244 ci = ipaddr(cs)
245 if ci not in network:
246 raise ValueError('client %s not in network' % ci)
247 if ci in clients:
248 raise ValueError('multiple client cfg sections for %s' % ci)
249 clients[ci] = Client(ci, cs)
250
251 global mtu
252 mtu = cfg.get('virtual','mtu')
253
254 iic_vars = { }
255 for k in ('host','relay','mtu','network'):
256 iic_vars[k] = globals()[k]
257
258 ipif_command = cfg.get('server','ipif', vars=iic_vars)
259
260 def startup():
261 global cfg
262
263 op = OptionParser()
264 op.add_option('-c', '--config', dest='configfile',
265 default='/etc/hippottd/server.conf')
266 global opts
267 (opts, args) = op.parse_args()
268 if len(args): op.error('no non-option arguments please')
269
270 twisted.logger.globalLogPublisher.addObserver(crash_on_critical)
271
272 cfg = ConfigParser()
273 cfg.read_string(defcfg)
274 cfg.read(opts.configfile)
275 process_cfg()
276
277 start_ipif(ipif_command, route)
278 start_http()
279
280 startup()
281 reactor.run()
282 print('CRASHED (end)', file=sys.stderr)