wip, and move PacketQueue
[hippotat] / server
1 #!/usr/bin/python3
2
3 import signal
4 signal.signal(signal.SIGINT, signal.SIG_DFL)
5
6 import sys
7 import os
8
9 import twisted.internet
10 import twisted.internet.endpoints
11 from twisted.web.server import NOT_DONE_YET
12 from twisted.logger import LogLevel
13
14 #import twisted.web.server import Site
15 #from twisted.web.resource import Resource
16
17 from optparse import OptionParser
18 from configparser import ConfigParser
19 from configparser import NoOptionError
20
21 import collections
22
23 import syslog
24
25 from hippotat import *
26
27 clients = { }
28
29 defcfg = '''
30 [DEFAULT]
31 max_batch_down = 65536
32 max_queue_time = 10
33 max_request_time = 54
34 target_requests_outstanding = 3
35
36 [virtual]
37 mtu = 1500
38 # network
39 # [host]
40 # [relay]
41
42 [server]
43 ipif = userv root ipif %(host)s,%(relay)s,%(mtu)s,slip %(network)s
44 addrs = 127.0.0.1 ::1
45 port = 8099
46
47 [limits]
48 max_batch_down = 262144
49 max_queue_time = 121
50 max_request_time = 121
51 target_requests_outstanding = 10
52 '''
53
54 #---------- error handling ----------
55
56 def crash(err):
57 print('CRASH ', err, file=sys.stderr)
58 try: reactor.stop()
59 except twisted.internet.error.ReactorNotRunning: pass
60
61 def crash_on_defer(defer):
62 defer.addErrback(lambda err: crash(err))
63
64 def crash_on_critical(event):
65 if event.get('log_level') >= LogLevel.critical:
66 crash(twisted.logger.formatEvent(event))
67
68 #---------- "router" ----------
69
70 def route(packet, saddr, daddr):
71 print('TRACE ', saddr, daddr, packet)
72 try: client = clients[daddr]
73 except KeyError: dclient = None
74 if dclient is not None:
75 dclient.queue_outbound(packet)
76 elif saddr.is_link_local or daddr.is_link_local:
77 log_discard(packet, saddr, daddr, 'link-local')
78 elif daddr == host or daddr not in network:
79 print('TRACE INBOUND ', saddr, daddr, packet)
80 queue_inbound(packet)
81 elif daddr == relay:
82 log_discard(packet, saddr, daddr, 'relay')
83 else:
84 log_discard(packet, saddr, daddr, 'no client')
85
86 def log_discard(packet, saddr, daddr, why):
87 print('DROP ', saddr, daddr, why)
88 # syslog.syslog(syslog.LOG_DEBUG,
89 # 'discarded packet %s -> %s (%s)' % (saddr, daddr, why))
90
91 #---------- client ----------
92
93 class Client():
94 def __init__(self, ip, cs):
95 # instance data members
96 self._ip = ip
97 self._cs = cs
98 self.pw = cfg.get(cs, 'password')
99 self._rq = collections.deque() # requests
100 # self._pq = PacketQueue(...)
101 # plus from config:
102 # .max_batch_down
103 # .max_queue_time
104 # .max_request_time
105 # .target_requests_outstanding
106 for k in ('max_batch_down','max_queue_time','max_request_time',
107 'target_requests_outstanding'):
108 req = cfg.getint(cs, k)
109 limit = cfg.getint('limits',k)
110 self.__dict__[k] = min(req, limit)
111 self._pq = PacketQueue(self.max_queue_time)
112
113 def process_arriving_data(self, d):
114 for packet in slip.decode(d):
115 (saddr, daddr) = packet_addrs(packet)
116 if saddr != self._ip:
117 raise ValueError('wrong source address %s' % saddr)
118 route(packet, saddr, daddr)
119
120 def _req_cancel(self, request):
121 request.finish()
122
123 def _req_error(self, err, request):
124 self._req_cancel(request)
125
126 def queue_outbound(self, packet):
127 self._pq.append(packet)
128
129 def http_request(self, request):
130 request.setHeader('Content-Type','application/octet-stream')
131 reactor.callLater(self.max_request_time, self._req_cancel, request)
132 request.notifyFinish().addErrback(self._req_error, request)
133 self._rq.append(request)
134 self._check_outbound()
135
136 def _check_outbound(self):
137 while True:
138 try: request = self._rq[0]
139 except IndexError: request = None
140 if request and request.finished:
141 self._rq.popleft()
142 continue
143
144 if not self._pq.nonempty():
145 # no packets, oh well
146 continue
147
148 if request is None:
149 # no request
150 break
151
152 # request, and also some non-expired packets
153 while True:
154 packet = self.pq.popleft()
155 if packet is None: break
156
157 encoded = slip.encode(packet)
158
159 if request.sentLength > 0:
160 if (request.sentLength + len(slip.delimiter)
161 + len(encoded) > self.max_batch_down):
162 break
163 request.write(slip.delimiter)
164
165 request.write(encoded)
166 self._pq.popLeft()
167
168 assert(request.sentLength)
169 self._rq.popLeft()
170 request.finish()
171 # round again, looking for more to do
172
173 while len(self._rq) > self.target_requests_outstanding:
174 request = self._rq.popleft()
175 request.finish()
176
177 class IphttpResource(twisted.web.resource.Resource):
178 isLeaf = True
179 def render_POST(self, request):
180 # find client, update config, etc.
181 ci = ipaddr(request.args['i'])
182 c = clients[ci]
183 pw = request.args['pw']
184 if pw != c.pw: raise ValueError('bad password')
185
186 # update config
187 for r, w in (('mbd', 'max_batch_down'),
188 ('mqt', 'max_queue_time'),
189 ('mrt', 'max_request_time'),
190 ('tro', 'target_requests_outstanding')):
191 try: v = request.args[r]
192 except KeyError: continue
193 v = int(v)
194 c.__dict__[w] = v
195
196 try: d = request.args['d']
197 except KeyError: d = ''
198
199 c.process_arriving_data(d)
200 c.new_request(request)
201
202 def render_GET(self, request):
203 return b'<html><body>hippotat</body></html>'
204
205 def start_http():
206 resource = IphttpResource()
207 site = twisted.web.server.Site(resource)
208 for addrspec in cfg.get('server','addrs').split():
209 try:
210 addr = ipaddress.IPv4Address(addrspec)
211 endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
212 except AddressValueError:
213 addr = ipaddress.IPv6Address(addrspec)
214 endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
215 ep = endpointfactory(reactor, cfg.getint('server','port'), addr)
216 crash_on_defer(ep.listen(site))
217
218 #---------- config and setup ----------
219
220 def process_cfg():
221 global network
222 global host
223 global relay
224 global ipif_command
225
226 network = ipnetwork(cfg.get('virtual','network'))
227 if network.num_addresses < 3 + 2:
228 raise ValueError('network needs at least 2^3 addresses')
229
230 try:
231 host = cfg.get('virtual','host')
232 except NoOptionError:
233 host = next(network.hosts())
234
235 try:
236 relay = cfg.get('virtual','relay')
237 except NoOptionError:
238 for search in network.hosts():
239 if search == host: continue
240 relay = search
241 break
242
243 for cs in cfg.sections():
244 if not (':' in cs or '.' in cs): continue
245 ci = ipaddr(cs)
246 if ci not in network:
247 raise ValueError('client %s not in network' % ci)
248 if ci in clients:
249 raise ValueError('multiple client cfg sections for %s' % ci)
250 clients[ci] = Client(ci, cs)
251
252 global mtu
253 mtu = cfg.get('virtual','mtu')
254
255 iic_vars = { }
256 for k in ('host','relay','mtu','network'):
257 iic_vars[k] = globals()[k]
258
259 ipif_command = cfg.get('server','ipif', vars=iic_vars)
260
261 def startup():
262 global cfg
263
264 op = OptionParser()
265 op.add_option('-c', '--config', dest='configfile',
266 default='/etc/hippottd/server.conf')
267 global opts
268 (opts, args) = op.parse_args()
269 if len(args): op.error('no non-option arguments please')
270
271 twisted.logger.globalLogPublisher.addObserver(crash_on_critical)
272
273 cfg = ConfigParser()
274 cfg.read_string(defcfg)
275 cfg.read(opts.configfile)
276 process_cfg()
277
278 start_ipif(ipif_command, route)
279 start_http()
280
281 startup()
282 reactor.run()
283 print('CRASHED (end)', file=sys.stderr)