move sigint
[hippotat] / server
1 #!/usr/bin/python3
2
3 from hippotat import *
4
5 import sys
6 import os
7
8 import twisted.internet
9 import twisted.internet.endpoints
10 from twisted.web.server import NOT_DONE_YET
11 from twisted.logger import LogLevel
12
13 #import twisted.web.server import Site
14 #from twisted.web.resource import Resource
15
16 from optparse import OptionParser
17 from configparser import ConfigParser
18 from configparser import NoOptionError
19
20 import collections
21
22 import syslog
23
24 clients = { }
25
26 defcfg = '''
27 [DEFAULT]
28 max_batch_down = 65536
29 max_queue_time = 10
30 max_request_time = 54
31 target_requests_outstanding = 3
32
33 [virtual]
34 mtu = 1500
35 # network
36 # [host]
37 # [relay]
38
39 [server]
40 ipif = userv root ipif %(host)s,%(relay)s,%(mtu)s,slip %(network)s
41 addrs = 127.0.0.1 ::1
42 port = 8099
43
44 [limits]
45 max_batch_down = 262144
46 max_queue_time = 121
47 max_request_time = 121
48 target_requests_outstanding = 10
49 '''
50
51 #---------- error handling ----------
52
53 def crash(err):
54 print('CRASH ', err, file=sys.stderr)
55 try: reactor.stop()
56 except twisted.internet.error.ReactorNotRunning: pass
57
58 def crash_on_defer(defer):
59 defer.addErrback(lambda err: crash(err))
60
61 def crash_on_critical(event):
62 if event.get('log_level') >= LogLevel.critical:
63 crash(twisted.logger.formatEvent(event))
64
65 #---------- "router" ----------
66
67 def route(packet, saddr, daddr):
68 print('TRACE ', saddr, daddr, packet)
69 try: client = clients[daddr]
70 except KeyError: dclient = None
71 if dclient is not None:
72 dclient.queue_outbound(packet)
73 elif saddr.is_link_local or daddr.is_link_local:
74 log_discard(packet, saddr, daddr, 'link-local')
75 elif daddr == host or daddr not in network:
76 print('TRACE INBOUND ', saddr, daddr, packet)
77 queue_inbound(packet)
78 elif daddr == relay:
79 log_discard(packet, saddr, daddr, 'relay')
80 else:
81 log_discard(packet, saddr, daddr, 'no client')
82
83 def log_discard(packet, saddr, daddr, why):
84 print('DROP ', saddr, daddr, why)
85 # syslog.syslog(syslog.LOG_DEBUG,
86 # 'discarded packet %s -> %s (%s)' % (saddr, daddr, why))
87
88 #---------- client ----------
89
90 class Client():
91 def __init__(self, ip, cs):
92 # instance data members
93 self._ip = ip
94 self._cs = cs
95 self.pw = cfg.get(cs, 'password')
96 self._rq = collections.deque() # requests
97 # self._pq = PacketQueue(...)
98 # plus from config:
99 # .max_batch_down
100 # .max_queue_time
101 # .max_request_time
102 # .target_requests_outstanding
103 for k in ('max_batch_down','max_queue_time','max_request_time',
104 'target_requests_outstanding'):
105 req = cfg.getint(cs, k)
106 limit = cfg.getint('limits',k)
107 self.__dict__[k] = min(req, limit)
108 self._pq = PacketQueue(self.max_queue_time)
109
110 def process_arriving_data(self, d):
111 for packet in slip.decode(d):
112 (saddr, daddr) = packet_addrs(packet)
113 if saddr != self._ip:
114 raise ValueError('wrong source address %s' % saddr)
115 route(packet, saddr, daddr)
116
117 def _req_cancel(self, request):
118 request.finish()
119
120 def _req_error(self, err, request):
121 self._req_cancel(request)
122
123 def queue_outbound(self, packet):
124 self._pq.append(packet)
125
126 def http_request(self, request):
127 request.setHeader('Content-Type','application/octet-stream')
128 reactor.callLater(self.max_request_time, self._req_cancel, request)
129 request.notifyFinish().addErrback(self._req_error, request)
130 self._rq.append(request)
131 self._check_outbound()
132
133 def _check_outbound(self):
134 while True:
135 try: request = self._rq[0]
136 except IndexError: request = None
137 if request and request.finished:
138 self._rq.popleft()
139 continue
140
141 if not self._pq.nonempty():
142 # no packets, oh well
143 continue
144
145 if request is None:
146 # no request
147 break
148
149 # request, and also some non-expired packets
150 while True:
151 packet = self.pq.popleft()
152 if packet is None: break
153
154 encoded = slip.encode(packet)
155
156 if request.sentLength > 0:
157 if (request.sentLength + len(slip.delimiter)
158 + len(encoded) > self.max_batch_down):
159 break
160 request.write(slip.delimiter)
161
162 request.write(encoded)
163 self._pq.popLeft()
164
165 assert(request.sentLength)
166 self._rq.popLeft()
167 request.finish()
168 # round again, looking for more to do
169
170 while len(self._rq) > self.target_requests_outstanding:
171 request = self._rq.popleft()
172 request.finish()
173
174 class IphttpResource(twisted.web.resource.Resource):
175 isLeaf = True
176 def render_POST(self, request):
177 # find client, update config, etc.
178 ci = ipaddr(request.args['i'])
179 c = clients[ci]
180 pw = request.args['pw']
181 if pw != c.pw: raise ValueError('bad password')
182
183 # update config
184 for r, w in (('mbd', 'max_batch_down'),
185 ('mqt', 'max_queue_time'),
186 ('mrt', 'max_request_time'),
187 ('tro', 'target_requests_outstanding')):
188 try: v = request.args[r]
189 except KeyError: continue
190 v = int(v)
191 c.__dict__[w] = v
192
193 try: d = request.args['d']
194 except KeyError: d = ''
195
196 c.process_arriving_data(d)
197 c.new_request(request)
198
199 def render_GET(self, request):
200 return b'<html><body>hippotat</body></html>'
201
202 def start_http():
203 resource = IphttpResource()
204 site = twisted.web.server.Site(resource)
205 for addrspec in cfg.get('server','addrs').split():
206 try:
207 addr = ipaddress.IPv4Address(addrspec)
208 endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
209 except AddressValueError:
210 addr = ipaddress.IPv6Address(addrspec)
211 endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
212 ep = endpointfactory(reactor, cfg.getint('server','port'), addr)
213 crash_on_defer(ep.listen(site))
214
215 #---------- config and setup ----------
216
217 def process_cfg():
218 global network
219 global host
220 global relay
221 global ipif_command
222
223 network = ipnetwork(cfg.get('virtual','network'))
224 if network.num_addresses < 3 + 2:
225 raise ValueError('network needs at least 2^3 addresses')
226
227 try:
228 host = cfg.get('virtual','host')
229 except NoOptionError:
230 host = next(network.hosts())
231
232 try:
233 relay = cfg.get('virtual','relay')
234 except NoOptionError:
235 for search in network.hosts():
236 if search == host: continue
237 relay = search
238 break
239
240 for cs in cfg.sections():
241 if not (':' in cs or '.' in cs): continue
242 ci = ipaddr(cs)
243 if ci not in network:
244 raise ValueError('client %s not in network' % ci)
245 if ci in clients:
246 raise ValueError('multiple client cfg sections for %s' % ci)
247 clients[ci] = Client(ci, cs)
248
249 global mtu
250 mtu = cfg.get('virtual','mtu')
251
252 iic_vars = { }
253 for k in ('host','relay','mtu','network'):
254 iic_vars[k] = globals()[k]
255
256 ipif_command = cfg.get('server','ipif', vars=iic_vars)
257
258 def startup():
259 global cfg
260
261 op = OptionParser()
262 op.add_option('-c', '--config', dest='configfile',
263 default='/etc/hippottd/server.conf')
264 global opts
265 (opts, args) = op.parse_args()
266 if len(args): op.error('no non-option arguments please')
267
268 twisted.logger.globalLogPublisher.addObserver(crash_on_critical)
269
270 cfg = ConfigParser()
271 cfg.read_string(defcfg)
272 cfg.read(opts.configfile)
273 process_cfg()
274
275 start_ipif(ipif_command, route)
276 start_http()
277
278 startup()
279 reactor.run()
280 print('CRASHED (end)', file=sys.stderr)