wip
[hippotat] / server
CommitLineData
094ee3a2 1#!/usr/bin/python3
3fba9787 2
5bae5ba3
IJ
3import twisted
4
5da7763e
IJ
5#import twisted.web.server import Site
6#from twisted.web.resource import Resource
3fba9787
IJ
7from twisted.web.server import NOT_DONE_YET
8from twisted.internet import reactor
9
e75e9c17
IJ
10from optparse import OptionParser
11from configparser import ConfigParser
12from configparser import NoOptionError
ec88b1f1 13import ipaddress
3fba9787 14
0ac316c8
IJ
15import collections
16
c4b6d990
IJ
17import syslog
18
3fba9787
IJ
19clients = { }
20
21def ipaddress(input):
22 try:
ec88b1f1 23 r = ipaddress.IPv4Address(input)
3fba9787 24 except AddressValueError:
ec88b1f1 25 r = ipaddress.IPv6Address(input)
3fba9787
IJ
26 return r
27
28def ipnetwork(input):
29 try:
ec88b1f1 30 r = ipaddress.IPv4Network(input)
3fba9787 31 except NetworkValueError:
ec88b1f1 32 r = ipaddress.IPv6Network(input)
3fba9787
IJ
33 return r
34
e75e9c17 35defcfg = '''
094ee3a2
IJ
36[DEFAULT]
37max_batch_down = 65536
38max_queue_time = 10
39max_request_time = 54
40
e75e9c17
IJ
41[virtual]
42mtu = 1500
43# network
44# [host]
45# [relay]
46
47[server]
5bae5ba3 48ipif = userv root ipif %(host),%(relay),%(mtu),slip %(network)
5da7763e
IJ
49addrs = 127.0.0.1 ::1
50port = 80
e75e9c17 51
094ee3a2
IJ
52[limits]
53max_batch_down = 262144
54max_queue_time = 121
55max_request_time = 121
ec88b1f1
IJ
56'''
57
5da7763e
IJ
58#---------- "router" ----------
59
60def route(packet. daddr):
61 try: client = clients[daddr]
62 except KeyError: dclient = None
63 if dclient is not None:
64 dclient.queue_outbound(packet)
65 else if daddr = host or daddr not in network:
66 queue_inbound(packet)
67 else if daddr = relay:
68 log_discard(packet, saddr, daddr, 'relay')
69 else:
70 log_discard(packet, saddr, daddr, 'no client')
71
72def log_discard(packet, saddr, daddr, why):
73 syslog.syslog(syslog.LOG_DEBUG,
74 'discarded packet %s -> %s (%s)' % (saddr, daddr, why))
75
76#---------- ipif (slip subprocess) ----------
77
5bae5ba3
IJ
78class IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
79 def __init__(self):
80 self._buffer = b''
81 def connectionMade(self): pass
82 def outReceived(self, data):
83 buffer += data
84 packets = slip_decode(buffer)
85 buffer = packets.pop()
86 for packet in packets:
87 (saddr, daddr) = packet_addrs(packet)
88 route(packet, daddr)
5da7763e
IJ
89 def processEnded(self, status):
90 status.raiseException()
5bae5ba3
IJ
91
92def start_ipif():
5da7763e
IJ
93 global ipif
94 ipif = IpifProcessProtocol()
95 reactor.spawnProcess(ipif,
5bae5ba3
IJ
96 '/bin/sh',['-c', ipif_command],
97 childFDs={0:'w', 1:'r', 2:2})
98
5da7763e
IJ
99def queue_inbound(packet):
100 ipif.transport.write(slip_delimiter)
101 ipif.transport.write(slip_encode(packet))
102 ipif.transport.write(slip_delimiter)
5bae5ba3 103
5da7763e 104#---------- client ----------
c4b6d990 105
ec88b1f1 106class Client():
c4b6d990 107 def __init__(self, ip, cs):
ec88b1f1
IJ
108 # instance data members
109 self._ip = ip
110 self._cs = cs
111 self.pw = cfg.get(cs, 'password')
0ac316c8
IJ
112 self._rq = collections.deque() # requests
113 self._pq = collections.deque() # packets
c4b6d990
IJ
114 # plus from config:
115 # .max_batch_down
116 # .max_queue_time
117 # .max_request_time
ec88b1f1
IJ
118 for k in ('max_batch_down','max_queue_time','max_request_time'):
119 req = cfg.getint(cs, k)
094ee3a2 120 limit = cfg.getint('limits',k)
c4b6d990
IJ
121 self.__dict__[k] = min(req, limit)
122
123 def process_arriving_data(self, d):
124 for packet in slip_decode(d):
5bae5ba3 125 (saddr, daddr) = packet_addrs(packet)
c4b6d990
IJ
126 if saddr != self._ip:
127 raise ValueError('wrong source address %s' % saddr)
128 route(packet, daddr)
ec88b1f1 129
c4b6d990
IJ
130 def _req_cancel(self, request):
131 request.finish()
132
133 def _req_error(self, err, request):
134 self._req_cancel(request)
135
0ac316c8 136 def queue_outbound(self, packet):
094ee3a2 137 self._pq.append((time.monotonic(), packet))
0ac316c8 138
c4b6d990
IJ
139 def http_request(self, request):
140 request.setHeader('Content-Type','application/octet-stream')
141 reactor.callLater(self.max_request_time, self._req_cancel, request)
142 request.notifyFinish().addErrback(self._req_error, request)
0ac316c8
IJ
143 self._rq.append(request)
144 self._check_outbound()
145
146 def _check_outbound(self):
147 while True:
148 try: request = self._rq[0]
149 except IndexError: request = None
150 if request and request.finished:
151 self._rq.popleft()
152 continue
153
154 # now request is an unfinished request, or None
155 try: (queuetime, packet) = self._pq[0]
156 except: IndexError:
157 # no packets, oh well
094ee3a2
IJ
158 break
159
160 age = time.monotonic() - queuetime
161 if age > self.max_queue_time:
162 self._pq.popleft()
0ac316c8
IJ
163 continue
164
094ee3a2
IJ
165 if request is None:
166 # no request
167 break
168
169 # request, and also some non-expired packets
170 while True:
171 try: (dummy, packet) = self._pq[0]
172 except IndexError: break
173
174 encoded = slip_encode(packet)
175
176 if request.sentLength > 0:
177 if (request.sentLength + len(slip_delimiter)
178 + len(encoded) > self.max_batch_down):
179 break
180 request.write(slip_delimiter)
181
182 request.write(encoded)
183 self._pq.popLeft()
184
185 assert(request.sentLength)
186 self._rq.popLeft()
187 request.finish()
188 # round again, looking for more to do
ec88b1f1 189
5da7763e
IJ
190class IphttpResource(twisted.web.resource.Resource):
191 def render_POST(self, request):
192 # find client, update config, etc.
193 ci = ipaddress(request.args['i'])
194 c = clients[ci]
195 pw = request.args['pw']
196 if pw != c.pw: raise ValueError('bad password')
197
198 # update config
199 for r, w in (('mbd', 'max_batch_down'),
200 ('mqt', 'max_queue_time'),
201 ('mrt', 'max_request_time')):
202 try: v = request.args[r]
203 except KeyError: continue
204 v = int(v)
205 c.__dict__[w] = v
206
207 try: d = request.args['d']
208 except KeyError: d = ''
209
210 c.process_arriving_data(d)
211 c.new_request(request)
212
213def start_http():
214 resource = IphttpResource()
215 sitefactory = twisted.web.server.Site(resource)
216 for addrspec in cfg.get('server','addresses').split():
217 try:
218 addr = ipaddress.IPv4Address(addrspec)
219 endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
220 except AddressValueError:
221 addr = ipaddress.IPv6Address(addrspec)
222 endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
223 ep = endpointfactory(reactor, cfg.getint('server','port'), addr)
224 ep.listen(sitefactory)
225
226#---------- config and setup ----------
227
3fba9787
IJ
228def process_cfg():
229 global network
e75e9c17
IJ
230 global host
231 global relay
5bae5ba3 232 global ipif_command
3fba9787 233
ec88b1f1 234 network = ipnetwork(cfg.get('virtual','network'))
e75e9c17
IJ
235 if network.num_addresses < 3 + 2:
236 raise ValueError('network needs at least 2^3 addresses')
237
3fba9787 238 try:
e75e9c17
IJ
239 host = cfg.get('virtual','host')
240 except NoOptionError:
241 host = network.hosts().next()
242
243 try:
244 relay = cfg.get('virtual','relay')
245 except OptionError:
246 for search in network.hosts():
247 if search = host: continue
248 relay = search
249 break
3fba9787 250
ec88b1f1
IJ
251 for cs in cfg.sections():
252 if not (':' in cs or '.' in cs): continue
253 ci = ipaddress(cs)
254 if ci not in network:
255 raise ValueError('client %s not in network' % ci)
256 if ci in clients:
257 raise ValueError('multiple client cfg sections for %s' % ci)
258 clients[ci] = Client(ci, cs)
3fba9787 259
5bae5ba3
IJ
260 iic_vars = { }
261 for k in ('host','relay','mtu','network'):
262 iic_vars[k] = globals()[k]
263
264 ipif_command = cfg.get('server','ipif', vars=iic_vars)
265
e75e9c17
IJ
266def startup():
267 op = OptionParser()
268 op.add_option('-c', '--config', dest='configfile',
269 default='/etc/hippottd/server.conf')
270 global opts
271 (opts, args) = op.parse_args()
272 if len(args): op.error('no non-option arguments please')
273
274 cfg = ConfigParser()
5bae5ba3
IJ
275 cfg.read_string(defcfg)
276 cfg.read_file(opts['configfile'])
277 process_cfg()
278
279 start_ipif()
280 start_http()