wip, python3
[hippotat] / server
1 #!/usr/bin/python3
2
3 import twisted.web.server import Site
4 from twisted.web.resource import Resource
5 from twisted.web.server import NOT_DONE_YET
6 from twisted.internet import reactor
7
8 import configparser
9 import ipaddress
10
11 import collections
12
13 import syslog
14
15 clients = { }
16
17 def ipaddress(input):
18 try:
19 r = ipaddress.IPv4Address(input)
20 except AddressValueError:
21 r = ipaddress.IPv6Address(input)
22 return r
23
24 def ipnetwork(input):
25 try:
26 r = ipaddress.IPv4Network(input)
27 except NetworkValueError:
28 r = ipaddress.IPv6Network(input)
29 return r
30
31 defcfg = u'''
32 [DEFAULT]
33 max_batch_down = 65536
34 max_queue_time = 10
35 max_request_time = 54
36
37 [limits]
38 max_batch_down = 262144
39 max_queue_time = 121
40 max_request_time = 121
41 '''
42
43 def route(packet. daddr):
44 try: client = clients[daddr]
45 except KeyError: dclient = None
46 if dclient is not None:
47 dclient.queue_outbound(packet)
48 else if daddr = server or daddr not in network:
49 queue_inbound(packet)
50 else:
51 syslog.syslog(syslog.LOG_DEBUG, 'no client for %s' % daddr)
52
53 class Client():
54 def __init__(self, ip, cs):
55 # instance data members
56 self._ip = ip
57 self._cs = cs
58 self.pw = cfg.get(cs, 'password')
59 self._rq = collections.deque() # requests
60 self._pq = collections.deque() # packets
61 # plus from config:
62 # .max_batch_down
63 # .max_queue_time
64 # .max_request_time
65 for k in ('max_batch_down','max_queue_time','max_request_time'):
66 req = cfg.getint(cs, k)
67 limit = cfg.getint('limits',k)
68 self.__dict__[k] = min(req, limit)
69
70 def process_arriving_data(self, d):
71 for packet in slip_decode(d):
72 (saddr, daddr) = ip_64_addrs(packet)
73 if saddr != self._ip:
74 raise ValueError('wrong source address %s' % saddr)
75 route(packet, daddr)
76
77 def _req_cancel(self, request):
78 request.finish()
79
80 def _req_error(self, err, request):
81 self._req_cancel(request)
82
83 def queue_outbound(self, packet):
84 self._pq.append((time.monotonic(), packet))
85
86 def http_request(self, request):
87 request.setHeader('Content-Type','application/octet-stream')
88 reactor.callLater(self.max_request_time, self._req_cancel, request)
89 request.notifyFinish().addErrback(self._req_error, request)
90 self._rq.append(request)
91 self._check_outbound()
92
93 def _check_outbound(self):
94 while True:
95 try: request = self._rq[0]
96 except IndexError: request = None
97 if request and request.finished:
98 self._rq.popleft()
99 continue
100
101 # now request is an unfinished request, or None
102 try: (queuetime, packet) = self._pq[0]
103 except: IndexError:
104 # no packets, oh well
105 break
106
107 age = time.monotonic() - queuetime
108 if age > self.max_queue_time:
109 self._pq.popleft()
110 continue
111
112 if request is None:
113 # no request
114 break
115
116 # request, and also some non-expired packets
117 while True:
118 try: (dummy, packet) = self._pq[0]
119 except IndexError: break
120
121 encoded = slip_encode(packet)
122
123 if request.sentLength > 0:
124 if (request.sentLength + len(slip_delimiter)
125 + len(encoded) > self.max_batch_down):
126 break
127 request.write(slip_delimiter)
128
129 request.write(encoded)
130 self._pq.popLeft()
131
132 assert(request.sentLength)
133 self._rq.popLeft()
134 request.finish()
135 # round again, looking for more to do
136
137 def process_cfg():
138 global network
139 global ourself
140
141 network = ipnetwork(cfg.get('virtual','network'))
142 try:
143 ourself = cfg.get('virtual','server')
144 except ConfigParser.NoOptionError:
145 ourself = network.hosts().next()
146
147 for cs in cfg.sections():
148 if not (':' in cs or '.' in cs): continue
149 ci = ipaddress(cs)
150 if ci not in network:
151 raise ValueError('client %s not in network' % ci)
152 if ci in clients:
153 raise ValueError('multiple client cfg sections for %s' % ci)
154 clients[ci] = Client(ci, cs)
155
156 class FormPage(Resource):
157 def render_POST(self, request):
158 # find client, update config, etc.
159 ci = ipaddress(request.args['i'])
160 c = clients[ci]
161 pw = request.args['pw']
162 if pw != c.pw: raise ValueError('bad password')
163
164 # update config
165 for r, w in (('mbd', 'max_batch_down'),
166 ('mqt', 'max_queue_time'),
167 ('mrt', 'max_request_time')):
168 try: v = request.args[r]
169 except KeyError: continue
170 v = int(v)
171 c.__dict__[w] = v
172
173 try: d = request.args['d']
174 except KeyError: d = ''
175
176 c.process_arriving_data(d)
177 c.new_request(request)