Commit | Line | Data |
---|---|---|
094ee3a2 | 1 | #!/usr/bin/python3 |
3fba9787 | 2 | |
5a37bac8 | 3 | from hippotatlib import * |
aa663282 | 4 | |
e2d41dc1 | 5 | import os |
4a780703 IJ |
6 | import tempfile |
7 | import atexit | |
8 | import shutil | |
e2d41dc1 | 9 | |
e2d41dc1 | 10 | import twisted.internet |
e2d41dc1 | 11 | from twisted.web.server import NOT_DONE_YET |
e2d41dc1 | 12 | |
4a780703 IJ |
13 | import hippotatlib.ownsource |
14 | from hippotatlib.ownsource import SourceShipmentPreparer | |
15 | ||
5da7763e IJ |
16 | #import twisted.web.server import Site |
17 | #from twisted.web.resource import Resource | |
3fba9787 | 18 | |
c4b6d990 IJ |
19 | import syslog |
20 | ||
4a780703 IJ |
21 | cleanups = [ ] |
22 | ||
b0cfbfce | 23 | clients = { } |
3fba9787 | 24 | |
5da7763e IJ |
25 | #---------- "router" ---------- |
26 | ||
a8827d59 | 27 | def route(packet, iface, saddr, daddr): |
d579a048 | 28 | def lt(dest): |
8c3b6620 | 29 | log_debug(DBG.ROUTE, 'route: %s -> %s: %s' % (saddr,daddr,dest), d=packet) |
84e763c7 | 30 | try: dclient = clients[daddr] |
5da7763e IJ |
31 | except KeyError: dclient = None |
32 | if dclient is not None: | |
d579a048 | 33 | lt('client') |
5da7763e | 34 | dclient.queue_outbound(packet) |
8d374606 | 35 | elif daddr == c.vaddr or daddr not in c.vnetwork: |
d579a048 | 36 | lt('inbound') |
909e0ff3 | 37 | queue_inbound(ipif, packet) |
74934d63 | 38 | elif daddr == c.relay: |
d579a048 | 39 | lt('discard relay') |
a8827d59 | 40 | log_discard(packet, iface, saddr, daddr, 'relay') |
5da7763e | 41 | else: |
d579a048 | 42 | lt('discard no-client') |
a8827d59 | 43 | log_discard(packet, iface, saddr, daddr, 'no-client') |
5da7763e | 44 | |
5da7763e | 45 | #---------- client ---------- |
c4b6d990 | 46 | |
ec88b1f1 | 47 | class Client(): |
8d374606 | 48 | def __init__(self, ip, cc): |
ec88b1f1 IJ |
49 | # instance data members |
50 | self._ip = ip | |
8d374606 | 51 | self.cc = cc |
0ac316c8 | 52 | self._rq = collections.deque() # requests |
74934d63 | 53 | self._pq = PacketQueue(str(ip), self.cc.max_queue_time) |
88487243 | 54 | |
8d374606 | 55 | if ip not in c.vnetwork: |
c7f134ce | 56 | raise ValueError('client %s not in vnetwork' % ip) |
88487243 | 57 | |
88487243 IJ |
58 | if ip in clients: |
59 | raise ValueError('multiple client cfg sections for %s' % ip) | |
60 | clients[ip] = self | |
61 | ||
8c3b6620 IJ |
62 | self._log(DBG.INIT, 'new') |
63 | ||
b68c0739 IJ |
64 | def _log(self, dflag, msg, **kwargs): |
65 | log_debug(dflag, ('client %s: ' % self._ip)+msg, **kwargs) | |
d579a048 | 66 | |
88487243 | 67 | def process_arriving_data(self, d): |
e8fcf3b7 | 68 | self._log(DBG.FLOW, "req data (enc'd)", d=d) |
8718b02c | 69 | if not len(d): return |
88487243 IJ |
70 | for packet in slip.decode(d): |
71 | (saddr, daddr) = packet_addrs(packet) | |
72 | if saddr != self._ip: | |
73 | raise ValueError('wrong source address %s' % saddr) | |
a8827d59 | 74 | route(packet, self._ip, saddr, daddr) |
88487243 IJ |
75 | |
76 | def _req_cancel(self, request): | |
8718b02c | 77 | self._log(DBG.HTTP_CTRL, 'cancel', idof=request) |
88487243 IJ |
78 | request.finish() |
79 | ||
80 | def _req_error(self, err, request): | |
8718b02c | 81 | self._log(DBG.HTTP_CTRL, 'error %s' % err, idof=request) |
88487243 IJ |
82 | self._req_cancel(request) |
83 | ||
84 | def queue_outbound(self, packet): | |
85 | self._pq.append(packet) | |
ca732796 | 86 | self._check_outbound() |
88487243 | 87 | |
7432045d IJ |
88 | def _req_fin(self, dummy, request, cl): |
89 | self._log(DBG.HTTP_CTRL, '_req_fin ' + repr(dummy), idof=request) | |
90 | try: cl.cancel() | |
91 | except twisted.internet.error.AlreadyCalled: pass | |
92 | ||
d579a048 | 93 | def new_request(self, request): |
88487243 | 94 | request.setHeader('Content-Type','application/octet-stream') |
74934d63 | 95 | cl = reactor.callLater(self.cc.http_timeout, self._req_cancel, request) |
7432045d IJ |
96 | nf = request.notifyFinish() |
97 | nf.addErrback(self._req_error, request) | |
98 | nf.addCallback(self._req_fin, request, cl) | |
88487243 IJ |
99 | self._rq.append(request) |
100 | self._check_outbound() | |
101 | ||
3d003cdd IJ |
102 | def _req_write(self, req, d): |
103 | self._log(DBG.HTTP, 'req_write ', idof=req, d=d) | |
104 | req.write(d) | |
105 | ||
88487243 | 106 | def _check_outbound(self): |
8718b02c | 107 | log_debug(DBG.HTTP_CTRL, 'CHKO') |
88487243 IJ |
108 | while True: |
109 | try: request = self._rq[0] | |
110 | except IndexError: request = None | |
111 | if request and request.finished: | |
8c3b6620 | 112 | self._log(DBG.HTTP_CTRL, 'CHKO req finished, discard', idof=request) |
88487243 IJ |
113 | self._rq.popleft() |
114 | continue | |
115 | ||
116 | if not self._pq.nonempty(): | |
117 | # no packets, oh well | |
8c3b6620 | 118 | self._log(DBG.HTTP_CTRL, 'CHKO no packets, OUT-DONE', idof=request) |
d579a048 | 119 | break |
88487243 IJ |
120 | |
121 | if request is None: | |
122 | # no request | |
8c3b6620 | 123 | self._log(DBG.HTTP_CTRL, 'CHKO no request, OUT-DONE', idof=request) |
88487243 IJ |
124 | break |
125 | ||
8c3b6620 | 126 | self._log(DBG.HTTP_CTRL, 'CHKO processing', idof=request) |
88487243 | 127 | # request, and also some non-expired packets |
7b07f0b5 | 128 | self._pq.process((lambda: request.sentLength), |
3d003cdd | 129 | (lambda d: self._req_write(request, d)), |
74934d63 | 130 | self.cc.max_batch_down) |
0ac316c8 | 131 | |
88487243 | 132 | assert(request.sentLength) |
84e763c7 | 133 | self._rq.popleft() |
88487243 | 134 | request.finish() |
8c3b6620 | 135 | self._log(DBG.HTTP, 'complete', idof=request) |
88487243 | 136 | # round again, looking for more to do |
0ac316c8 | 137 | |
74934d63 | 138 | while len(self._rq) > self.cc.target_requests_outstanding: |
88487243 | 139 | request = self._rq.popleft() |
8c3b6620 | 140 | self._log(DBG.HTTP, 'CHKO above target, returning empty', idof=request) |
88487243 | 141 | request.finish() |
650a3251 | 142 | |
d579a048 | 143 | def process_request(request, desca): |
a4e03162 | 144 | # find client, update config, etc. |
5dd3275b | 145 | metadata = request.args[b'm'][0] |
00192d6a | 146 | metadata = metadata.split(b'\r\n') |
ba5630fd IJ |
147 | (ci_s, pw, tro, cto) = metadata[0:4] |
148 | desca['m[0,2:3]'] = [ci_s, tro, cto] | |
a9a369c7 | 149 | ci_s = ci_s.decode('utf-8') |
ba5630fd IJ |
150 | tro = int(tro); desca['tro']= tro |
151 | cto = int(cto); desca['cto']= cto | |
a4e03162 | 152 | ci = ipaddr(ci_s) |
d579a048 | 153 | desca['ci'] = ci |
a4e03162 | 154 | cl = clients[ci] |
74934d63 | 155 | if pw != cl.cc.password: raise ValueError('bad password') |
b68c0739 | 156 | desca['pwok']=True |
1672ded0 | 157 | |
74934d63 IJ |
158 | if tro != cl.cc.target_requests_outstanding: |
159 | raise ValueError('tro must be %d' % cl.cc.target_requests_outstanding) | |
5da7763e | 160 | |
74934d63 IJ |
161 | if cto < cl.cc.http_timeout: |
162 | raise ValueError('cto must be >= %d' % cl.cc.http_timeout) | |
ba5630fd | 163 | |
d579a048 | 164 | try: |
e8fcf3b7 | 165 | d = request.args[b'd'][0] |
d579a048 | 166 | desca['d'] = d |
19f5f9b5 IJ |
167 | desca['dlen'] = len(d) |
168 | except KeyError: | |
169 | d = b'' | |
170 | desca['dlen'] = None | |
171 | ||
172 | log_http(desca, 'processing', idof=id(request), d=d) | |
5da7763e | 173 | |
6f387df3 IJ |
174 | d = mime_translate(d) |
175 | ||
a4e03162 IJ |
176 | cl.process_arriving_data(d) |
177 | cl.new_request(request) | |
5da7763e | 178 | |
19f5f9b5 | 179 | def log_http(desca, msg, **kwargs): |
8c3b6620 | 180 | try: |
19f5f9b5 | 181 | kwargs['d'] = desca['d'] |
8c3b6620 IJ |
182 | del desca['d'] |
183 | except KeyError: | |
19f5f9b5 IJ |
184 | pass |
185 | log_debug(DBG.HTTP, msg + repr(desca), **kwargs) | |
8c3b6620 | 186 | |
eb113b2c IJ |
187 | class NotStupidResource(twisted.web.resource.Resource): |
188 | # why this is not the default is a mystery! | |
189 | def getChild(self, name, request): | |
190 | if name == b'': return self | |
191 | else: return twisted.web.resource.Resource.getChild(name, request) | |
192 | ||
193 | class IphttpResource(NotStupidResource): | |
a4e03162 | 194 | def render_POST(self, request): |
297b3ebf IJ |
195 | log_debug(DBG.HTTP_FULL, |
196 | 'req recv: ' + repr(request) + ' ' + repr(request.args), | |
197 | idof=id(request)) | |
d579a048 IJ |
198 | desca = {'d': None} |
199 | try: process_request(request, desca) | |
0d10f35f | 200 | except Exception as e: |
68afd97b | 201 | emsg = traceback.format_exc() |
6f387df3 | 202 | log_http(desca, 'RETURNING EXCEPTION ' + emsg) |
0d10f35f IJ |
203 | request.setHeader('Content-Type','text/plain; charset="utf-8"') |
204 | request.setResponseCode(400) | |
a9a369c7 | 205 | return (emsg + ' # ' + repr(desca) + '\r\n').encode('utf-8') |
19f5f9b5 | 206 | log_debug(DBG.HTTP_CTRL, '...', idof=id(request)) |
d579a048 | 207 | return NOT_DONE_YET |
84f2d011 | 208 | |
8e279651 | 209 | def render_GET(self, request): |
d579a048 | 210 | log_debug(DBG.HTTP, 'GET request') |
040ff511 | 211 | return b'<html><body>hippotat</body></html>' |
8e279651 | 212 | |
de2b215e | 213 | class SourceResource(NotStupidResource): |
4a780703 IJ |
214 | def __init__(self): |
215 | td = tempfile.mkdtemp() | |
216 | ||
217 | def cleanup(): | |
218 | try: shutil.rmtree(td) | |
219 | except FileNotFoundError: pass | |
220 | ||
221 | cleanups.append(cleanup) | |
222 | ||
223 | self._ssp = SourceShipmentPreparer(td) | |
224 | self._ssp.logger = self.log | |
225 | self._ssp.generate() | |
226 | ||
227 | def log(self, m): | |
228 | log_debug(DBG.OWNSOURCE, m) | |
229 | ||
de2b215e IJ |
230 | def render_GET(self, request): |
231 | return b'<html><body>SUBDIR</body></html>' | |
232 | ||
5da7763e IJ |
233 | def start_http(): |
234 | resource = IphttpResource() | |
de2b215e | 235 | resource.putChild(b'source',SourceResource()) |
b11c6e7a | 236 | site = twisted.web.server.Site(resource) |
88487243 IJ |
237 | for sa in c.saddrs: |
238 | ep = sa.make_endpoint() | |
b11c6e7a | 239 | crash_on_defer(ep.listen(site)) |
d579a048 | 240 | log_debug(DBG.INIT, 'listening on %s' % sa) |
5da7763e IJ |
241 | |
242 | #---------- config and setup ---------- | |
4a780703 | 243 | |
c7fb640e | 244 | def process_cfg(putative_servers, putative_clients): |
8d374606 | 245 | global c |
c7fb640e IJ |
246 | c = ConfigResults() |
247 | c.server = cfg.get('SERVER','server') | |
248 | ||
8d374606 IJ |
249 | cfg_process_common(c, c.server) |
250 | cfg_process_saddrs(c, c.server) | |
251 | cfg_process_vnetwork(c, c.server) | |
252 | cfg_process_vaddr(c, c.server) | |
c7fb640e IJ |
253 | |
254 | for (ci,cs) in putative_clients.items(): | |
255 | cc = ConfigResults() | |
74934d63 | 256 | sections = cfg_process_client_common(cc,c.server,cs,ci) |
c7fb640e IJ |
257 | if not sections: continue |
258 | cfg_process_client_limited(cc,c.server,sections, 'max_batch_down') | |
259 | cfg_process_client_limited(cc,c.server,sections, 'max_queue_time') | |
74934d63 | 260 | Client(ci, cc) |
e75e9c17 IJ |
261 | |
262 | try: | |
74934d63 | 263 | c.vrelay = cfg.get(c.server, 'vrelay') |
e2d41dc1 | 264 | except NoOptionError: |
8d374606 IJ |
265 | for search in c.vnetwork.hosts(): |
266 | if search == c.vaddr: continue | |
74934d63 | 267 | c.vrelay = search |
e75e9c17 | 268 | break |
3fba9787 | 269 | |
8d374606 | 270 | cfg_process_ipif(c, |
c7fb640e IJ |
271 | [c.server, 'DEFAULT'], |
272 | (('local','vaddr'), | |
273 | ('peer', 'vrelay'), | |
274 | ('rnets','vnetwork'))) | |
5bae5ba3 | 275 | |
4a780703 IJ |
276 | def catch_termination(): |
277 | def run_cleanups(): | |
278 | for cleanup in cleanups: | |
279 | cleanup() | |
280 | ||
281 | atexit.register(run_cleanups) | |
282 | ||
283 | def signal_handler(name, sig, *args): | |
284 | signal.signal(sig, signal.SIG_DFL) | |
285 | print('exiting due to %s' % name, file=sys.stderr) | |
286 | run_cleanups() | |
287 | os.kill(os.getpid(), sig) | |
288 | raise RuntimeError('did not die due to signal %s !' % name) | |
289 | ||
290 | for sig in (signal.SIGINT, signal.SIGTERM): | |
291 | signal.signal(sig, partial(signal_handler, sig.name)) | |
292 | ||
5510890e | 293 | common_startup(process_cfg) |
4a780703 | 294 | catch_termination() |
909e0ff3 | 295 | ipif = start_ipif(c.ipif_command, (lambda p,s,d: route(p,"[ipif]",s,d))) |
87a7c0c7 | 296 | start_http() |
ae7c7784 | 297 | common_run() |