wip, and move PacketQueue
[hippotat] / server
CommitLineData
094ee3a2 1#!/usr/bin/python3
3fba9787 2
aa663282
IJ
3import signal
4signal.signal(signal.SIGINT, signal.SIG_DFL)
5
e2d41dc1
IJ
6import sys
7import os
8
e2d41dc1
IJ
9import twisted.internet
10import twisted.internet.endpoints
e2d41dc1
IJ
11from twisted.web.server import NOT_DONE_YET
12from twisted.logger import LogLevel
13
5da7763e
IJ
14#import twisted.web.server import Site
15#from twisted.web.resource import Resource
3fba9787 16
e75e9c17
IJ
17from optparse import OptionParser
18from configparser import ConfigParser
19from configparser import NoOptionError
3fba9787 20
0ac316c8
IJ
21import collections
22
c4b6d990
IJ
23import syslog
24
040ff511 25from hippotat import *
3fba9787 26
b0cfbfce 27clients = { }
3fba9787 28
e75e9c17 29defcfg = '''
094ee3a2
IJ
30[DEFAULT]
31max_batch_down = 65536
32max_queue_time = 10
33max_request_time = 54
650a3251 34target_requests_outstanding = 3
094ee3a2 35
e75e9c17
IJ
36[virtual]
37mtu = 1500
38# network
39# [host]
40# [relay]
41
42[server]
e2d41dc1 43ipif = userv root ipif %(host)s,%(relay)s,%(mtu)s,slip %(network)s
5da7763e 44addrs = 127.0.0.1 ::1
aa663282 45port = 8099
e75e9c17 46
094ee3a2
IJ
47[limits]
48max_batch_down = 262144
49max_queue_time = 121
50max_request_time = 121
650a3251 51target_requests_outstanding = 10
ec88b1f1
IJ
52'''
53
aa663282
IJ
54#---------- error handling ----------
55
56def crash(err):
57 print('CRASH ', err, file=sys.stderr)
58 try: reactor.stop()
59 except twisted.internet.error.ReactorNotRunning: pass
60
61def crash_on_defer(defer):
62 defer.addErrback(lambda err: crash(err))
63
64def crash_on_critical(event):
65 if event.get('log_level') >= LogLevel.critical:
66 crash(twisted.logger.formatEvent(event))
67
5da7763e
IJ
68#---------- "router" ----------
69
ec0c4d95
IJ
70def route(packet, saddr, daddr):
71 print('TRACE ', saddr, daddr, packet)
5da7763e
IJ
72 try: client = clients[daddr]
73 except KeyError: dclient = None
74 if dclient is not None:
75 dclient.queue_outbound(packet)
3a6076b4 76 elif saddr.is_link_local or daddr.is_link_local:
ec0c4d95 77 log_discard(packet, saddr, daddr, 'link-local')
e2d41dc1 78 elif daddr == host or daddr not in network:
ec0c4d95 79 print('TRACE INBOUND ', saddr, daddr, packet)
5da7763e 80 queue_inbound(packet)
e2d41dc1 81 elif daddr == relay:
5da7763e
IJ
82 log_discard(packet, saddr, daddr, 'relay')
83 else:
84 log_discard(packet, saddr, daddr, 'no client')
85
86def log_discard(packet, saddr, daddr, why):
3a6076b4 87 print('DROP ', saddr, daddr, why)
ec0c4d95
IJ
88# syslog.syslog(syslog.LOG_DEBUG,
89# 'discarded packet %s -> %s (%s)' % (saddr, daddr, why))
5da7763e 90
5da7763e 91#---------- client ----------
c4b6d990 92
ec88b1f1 93class Client():
c4b6d990 94 def __init__(self, ip, cs):
ec88b1f1
IJ
95 # instance data members
96 self._ip = ip
97 self._cs = cs
98 self.pw = cfg.get(cs, 'password')
0ac316c8 99 self._rq = collections.deque() # requests
650a3251 100 # self._pq = PacketQueue(...)
c4b6d990
IJ
101 # plus from config:
102 # .max_batch_down
103 # .max_queue_time
104 # .max_request_time
650a3251
IJ
105 # .target_requests_outstanding
106 for k in ('max_batch_down','max_queue_time','max_request_time',
107 'target_requests_outstanding'):
ec88b1f1 108 req = cfg.getint(cs, k)
094ee3a2 109 limit = cfg.getint('limits',k)
c4b6d990 110 self.__dict__[k] = min(req, limit)
650a3251 111 self._pq = PacketQueue(self.max_queue_time)
c4b6d990
IJ
112
113 def process_arriving_data(self, d):
b0cfbfce 114 for packet in slip.decode(d):
5bae5ba3 115 (saddr, daddr) = packet_addrs(packet)
c4b6d990
IJ
116 if saddr != self._ip:
117 raise ValueError('wrong source address %s' % saddr)
ec0c4d95 118 route(packet, saddr, daddr)
ec88b1f1 119
c4b6d990
IJ
120 def _req_cancel(self, request):
121 request.finish()
122
123 def _req_error(self, err, request):
124 self._req_cancel(request)
125
0ac316c8 126 def queue_outbound(self, packet):
650a3251 127 self._pq.append(packet)
0ac316c8 128
c4b6d990
IJ
129 def http_request(self, request):
130 request.setHeader('Content-Type','application/octet-stream')
131 reactor.callLater(self.max_request_time, self._req_cancel, request)
132 request.notifyFinish().addErrback(self._req_error, request)
0ac316c8
IJ
133 self._rq.append(request)
134 self._check_outbound()
135
136 def _check_outbound(self):
137 while True:
138 try: request = self._rq[0]
139 except IndexError: request = None
140 if request and request.finished:
141 self._rq.popleft()
142 continue
143
650a3251 144 if not self._pq.nonempty():
0ac316c8
IJ
145 # no packets, oh well
146 continue
147
094ee3a2
IJ
148 if request is None:
149 # no request
150 break
151
152 # request, and also some non-expired packets
153 while True:
650a3251
IJ
154 packet = self.pq.popleft()
155 if packet is None: break
094ee3a2 156
b0cfbfce 157 encoded = slip.encode(packet)
094ee3a2
IJ
158
159 if request.sentLength > 0:
b0cfbfce 160 if (request.sentLength + len(slip.delimiter)
094ee3a2
IJ
161 + len(encoded) > self.max_batch_down):
162 break
b0cfbfce 163 request.write(slip.delimiter)
094ee3a2
IJ
164
165 request.write(encoded)
166 self._pq.popLeft()
167
168 assert(request.sentLength)
169 self._rq.popLeft()
170 request.finish()
171 # round again, looking for more to do
ec88b1f1 172
650a3251
IJ
173 while len(self._rq) > self.target_requests_outstanding:
174 request = self._rq.popleft()
175 request.finish()
176
5da7763e 177class IphttpResource(twisted.web.resource.Resource):
c1e4910b 178 isLeaf = True
5da7763e
IJ
179 def render_POST(self, request):
180 # find client, update config, etc.
e2d41dc1 181 ci = ipaddr(request.args['i'])
5da7763e
IJ
182 c = clients[ci]
183 pw = request.args['pw']
184 if pw != c.pw: raise ValueError('bad password')
185
186 # update config
187 for r, w in (('mbd', 'max_batch_down'),
188 ('mqt', 'max_queue_time'),
650a3251
IJ
189 ('mrt', 'max_request_time'),
190 ('tro', 'target_requests_outstanding')):
5da7763e
IJ
191 try: v = request.args[r]
192 except KeyError: continue
193 v = int(v)
194 c.__dict__[w] = v
195
196 try: d = request.args['d']
197 except KeyError: d = ''
198
199 c.process_arriving_data(d)
200 c.new_request(request)
201
8e279651 202 def render_GET(self, request):
040ff511 203 return b'<html><body>hippotat</body></html>'
8e279651 204
5da7763e
IJ
205def start_http():
206 resource = IphttpResource()
b11c6e7a 207 site = twisted.web.server.Site(resource)
e2d41dc1 208 for addrspec in cfg.get('server','addrs').split():
5da7763e
IJ
209 try:
210 addr = ipaddress.IPv4Address(addrspec)
211 endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
212 except AddressValueError:
213 addr = ipaddress.IPv6Address(addrspec)
214 endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
215 ep = endpointfactory(reactor, cfg.getint('server','port'), addr)
b11c6e7a 216 crash_on_defer(ep.listen(site))
5da7763e
IJ
217
218#---------- config and setup ----------
219
3fba9787
IJ
220def process_cfg():
221 global network
e75e9c17
IJ
222 global host
223 global relay
5bae5ba3 224 global ipif_command
3fba9787 225
ec88b1f1 226 network = ipnetwork(cfg.get('virtual','network'))
e75e9c17
IJ
227 if network.num_addresses < 3 + 2:
228 raise ValueError('network needs at least 2^3 addresses')
229
3fba9787 230 try:
e75e9c17
IJ
231 host = cfg.get('virtual','host')
232 except NoOptionError:
e2d41dc1 233 host = next(network.hosts())
e75e9c17
IJ
234
235 try:
236 relay = cfg.get('virtual','relay')
e2d41dc1 237 except NoOptionError:
e75e9c17 238 for search in network.hosts():
e2d41dc1 239 if search == host: continue
e75e9c17
IJ
240 relay = search
241 break
3fba9787 242
ec88b1f1
IJ
243 for cs in cfg.sections():
244 if not (':' in cs or '.' in cs): continue
e2d41dc1 245 ci = ipaddr(cs)
ec88b1f1
IJ
246 if ci not in network:
247 raise ValueError('client %s not in network' % ci)
248 if ci in clients:
249 raise ValueError('multiple client cfg sections for %s' % ci)
250 clients[ci] = Client(ci, cs)
3fba9787 251
e2d41dc1
IJ
252 global mtu
253 mtu = cfg.get('virtual','mtu')
254
5bae5ba3
IJ
255 iic_vars = { }
256 for k in ('host','relay','mtu','network'):
257 iic_vars[k] = globals()[k]
258
259 ipif_command = cfg.get('server','ipif', vars=iic_vars)
260
e75e9c17 261def startup():
e2d41dc1
IJ
262 global cfg
263
e75e9c17
IJ
264 op = OptionParser()
265 op.add_option('-c', '--config', dest='configfile',
266 default='/etc/hippottd/server.conf')
267 global opts
268 (opts, args) = op.parse_args()
269 if len(args): op.error('no non-option arguments please')
270
e2d41dc1
IJ
271 twisted.logger.globalLogPublisher.addObserver(crash_on_critical)
272
e75e9c17 273 cfg = ConfigParser()
5bae5ba3 274 cfg.read_string(defcfg)
e2d41dc1 275 cfg.read(opts.configfile)
5bae5ba3
IJ
276 process_cfg()
277
040ff511 278 start_ipif(ipif_command, route)
5bae5ba3 279 start_http()
e2d41dc1
IJ
280
281startup()
282reactor.run()
aa663282 283print('CRASHED (end)', file=sys.stderr)