wip
[hippotat] / server
CommitLineData
094ee3a2 1#!/usr/bin/python3
3fba9787 2
5bae5ba3
IJ
3import twisted
4
094ee3a2 5import twisted.web.server import Site
3fba9787
IJ
6from twisted.web.resource import Resource
7from twisted.web.server import NOT_DONE_YET
8from twisted.internet import reactor
9
e75e9c17
IJ
10from optparse import OptionParser
11from configparser import ConfigParser
12from configparser import NoOptionError
ec88b1f1 13import ipaddress
3fba9787 14
0ac316c8
IJ
15import collections
16
c4b6d990
IJ
17import syslog
18
3fba9787
IJ
19clients = { }
20
21def ipaddress(input):
22 try:
ec88b1f1 23 r = ipaddress.IPv4Address(input)
3fba9787 24 except AddressValueError:
ec88b1f1 25 r = ipaddress.IPv6Address(input)
3fba9787
IJ
26 return r
27
28def ipnetwork(input):
29 try:
ec88b1f1 30 r = ipaddress.IPv4Network(input)
3fba9787 31 except NetworkValueError:
ec88b1f1 32 r = ipaddress.IPv6Network(input)
3fba9787
IJ
33 return r
34
e75e9c17 35defcfg = '''
094ee3a2
IJ
36[DEFAULT]
37max_batch_down = 65536
38max_queue_time = 10
39max_request_time = 54
40
e75e9c17
IJ
41[virtual]
42mtu = 1500
43# network
44# [host]
45# [relay]
46
47[server]
5bae5ba3 48ipif = userv root ipif %(host),%(relay),%(mtu),slip %(network)
e75e9c17 49
094ee3a2
IJ
50[limits]
51max_batch_down = 262144
52max_queue_time = 121
53max_request_time = 121
ec88b1f1
IJ
54'''
55
5bae5ba3
IJ
56class IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
57 def __init__(self):
58 self._buffer = b''
59 def connectionMade(self): pass
60 def outReceived(self, data):
61 buffer += data
62 packets = slip_decode(buffer)
63 buffer = packets.pop()
64 for packet in packets:
65 (saddr, daddr) = packet_addrs(packet)
66 route(packet, daddr)
67
68def start_ipif():
69 reactor.spawnProcess(IpifProcessProtocol(),
70 '/bin/sh',['-c', ipif_command],
71 childFDs={0:'w', 1:'r', 2:2})
72
73def log_discard(packet, saddr, daddr, why):
74 syslog.syslog(syslog.LOG_DEBUG,
75 'discarded packet %s -> %s (%s)' % (saddr, daddr, why))
76
c4b6d990
IJ
77def route(packet. daddr):
78 try: client = clients[daddr]
79 except KeyError: dclient = None
80 if dclient is not None:
0ac316c8 81 dclient.queue_outbound(packet)
5bae5ba3 82 else if daddr = host or daddr not in network:
0ac316c8 83 queue_inbound(packet)
5bae5ba3
IJ
84 else if daddr = relay:
85 log_discard(packet, saddr, daddr, 'relay')
c4b6d990 86 else:
5bae5ba3 87 log_discard(packet, saddr, daddr, 'no client')
c4b6d990 88
ec88b1f1 89class Client():
c4b6d990 90 def __init__(self, ip, cs):
ec88b1f1
IJ
91 # instance data members
92 self._ip = ip
93 self._cs = cs
94 self.pw = cfg.get(cs, 'password')
0ac316c8
IJ
95 self._rq = collections.deque() # requests
96 self._pq = collections.deque() # packets
c4b6d990
IJ
97 # plus from config:
98 # .max_batch_down
99 # .max_queue_time
100 # .max_request_time
ec88b1f1
IJ
101 for k in ('max_batch_down','max_queue_time','max_request_time'):
102 req = cfg.getint(cs, k)
094ee3a2 103 limit = cfg.getint('limits',k)
c4b6d990
IJ
104 self.__dict__[k] = min(req, limit)
105
106 def process_arriving_data(self, d):
107 for packet in slip_decode(d):
5bae5ba3 108 (saddr, daddr) = packet_addrs(packet)
c4b6d990
IJ
109 if saddr != self._ip:
110 raise ValueError('wrong source address %s' % saddr)
111 route(packet, daddr)
ec88b1f1 112
c4b6d990
IJ
113 def _req_cancel(self, request):
114 request.finish()
115
116 def _req_error(self, err, request):
117 self._req_cancel(request)
118
0ac316c8 119 def queue_outbound(self, packet):
094ee3a2 120 self._pq.append((time.monotonic(), packet))
0ac316c8 121
c4b6d990
IJ
122 def http_request(self, request):
123 request.setHeader('Content-Type','application/octet-stream')
124 reactor.callLater(self.max_request_time, self._req_cancel, request)
125 request.notifyFinish().addErrback(self._req_error, request)
0ac316c8
IJ
126 self._rq.append(request)
127 self._check_outbound()
128
129 def _check_outbound(self):
130 while True:
131 try: request = self._rq[0]
132 except IndexError: request = None
133 if request and request.finished:
134 self._rq.popleft()
135 continue
136
137 # now request is an unfinished request, or None
138 try: (queuetime, packet) = self._pq[0]
139 except: IndexError:
140 # no packets, oh well
094ee3a2
IJ
141 break
142
143 age = time.monotonic() - queuetime
144 if age > self.max_queue_time:
145 self._pq.popleft()
0ac316c8
IJ
146 continue
147
094ee3a2
IJ
148 if request is None:
149 # no request
150 break
151
152 # request, and also some non-expired packets
153 while True:
154 try: (dummy, packet) = self._pq[0]
155 except IndexError: break
156
157 encoded = slip_encode(packet)
158
159 if request.sentLength > 0:
160 if (request.sentLength + len(slip_delimiter)
161 + len(encoded) > self.max_batch_down):
162 break
163 request.write(slip_delimiter)
164
165 request.write(encoded)
166 self._pq.popLeft()
167
168 assert(request.sentLength)
169 self._rq.popLeft()
170 request.finish()
171 # round again, looking for more to do
ec88b1f1 172
3fba9787
IJ
173def process_cfg():
174 global network
e75e9c17
IJ
175 global host
176 global relay
5bae5ba3 177 global ipif_command
3fba9787 178
ec88b1f1 179 network = ipnetwork(cfg.get('virtual','network'))
e75e9c17
IJ
180 if network.num_addresses < 3 + 2:
181 raise ValueError('network needs at least 2^3 addresses')
182
3fba9787 183 try:
e75e9c17
IJ
184 host = cfg.get('virtual','host')
185 except NoOptionError:
186 host = network.hosts().next()
187
188 try:
189 relay = cfg.get('virtual','relay')
190 except OptionError:
191 for search in network.hosts():
192 if search = host: continue
193 relay = search
194 break
3fba9787 195
ec88b1f1
IJ
196 for cs in cfg.sections():
197 if not (':' in cs or '.' in cs): continue
198 ci = ipaddress(cs)
199 if ci not in network:
200 raise ValueError('client %s not in network' % ci)
201 if ci in clients:
202 raise ValueError('multiple client cfg sections for %s' % ci)
203 clients[ci] = Client(ci, cs)
3fba9787 204
5bae5ba3
IJ
205 iic_vars = { }
206 for k in ('host','relay','mtu','network'):
207 iic_vars[k] = globals()[k]
208
209 ipif_command = cfg.get('server','ipif', vars=iic_vars)
210
3fba9787
IJ
211class FormPage(Resource):
212 def render_POST(self, request):
ec88b1f1
IJ
213 # find client, update config, etc.
214 ci = ipaddress(request.args['i'])
215 c = clients[ci]
216 pw = request.args['pw']
217 if pw != c.pw: raise ValueError('bad password')
218
219 # update config
220 for r, w in (('mbd', 'max_batch_down'),
221 ('mqt', 'max_queue_time'),
222 ('mrt', 'max_request_time')):
223 try: v = request.args[r]
224 except KeyError: continue
225 v = int(v)
c4b6d990 226 c.__dict__[w] = v
ec88b1f1
IJ
227
228 try: d = request.args['d']
229 except KeyError: d = ''
230
231 c.process_arriving_data(d)
c4b6d990 232 c.new_request(request)
e75e9c17
IJ
233
234def startup():
235 op = OptionParser()
236 op.add_option('-c', '--config', dest='configfile',
237 default='/etc/hippottd/server.conf')
238 global opts
239 (opts, args) = op.parse_args()
240 if len(args): op.error('no non-option arguments please')
241
242 cfg = ConfigParser()
5bae5ba3
IJ
243 cfg.read_string(defcfg)
244 cfg.read_file(opts['configfile'])
245 process_cfg()
246
247 start_ipif()
248 start_http()