3 # Hippotat - Asinine IP Over HTTP program
4 # ./hippotatd - server main program
6 # Copyright 2017 Ian Jackson
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU Affero General Public License as
10 # published by the Free Software Foundation, either version 3 of the
11 # License, or (at your option) any later version, with the "CAF Login
12 # Exception" as published by Ian Jackson (version 2, or at your option
13 # any later version) as an Additional Permission.
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 # GNU Affero General Public License for more details.
20 # You should have received a copy of the GNU Affero General Public
21 # License and the CAF Login Exception along with this program, in the
22 # file AGPLv3+CAFv2. If not, email Ian Jackson
23 # <ijackson@chiark.greenend.org.uk>.
26 from hippotatlib
import *
33 import twisted
.internet
34 from twisted
.web
.server
import NOT_DONE_YET
36 import twisted
.web
.static
38 import hippotatlib
.ownsource
39 from hippotatlib
.ownsource
import SourceShipmentPreparer
41 #import twisted.web.server import Site
42 #from twisted.web.resource import Resource
50 #---------- "router" ----------
52 def route(packet
, iface
, saddr
, daddr
):
54 log_debug(DBG
.ROUTE
, 'route: %s -> %s: %s' %
(saddr
,daddr
,dest
), d
=packet
)
55 try: dclient
= clients
[daddr
]
56 except KeyError: dclient
= None
57 if dclient
is not None:
59 dclient
.queue_outbound(packet
)
60 elif daddr
== c
.vaddr
or daddr
not in c
.vnetwork
:
62 queue_inbound(ipif
, packet
)
63 elif daddr
== c
.relay
:
65 log_discard(packet
, iface
, saddr
, daddr
, 'relay')
67 lt('discard no-client')
68 log_discard(packet
, iface
, saddr
, daddr
, 'no-client')
70 #---------- client ----------
73 def __init__(self
, ip
, cc
):
74 # instance data members
77 self
._rq
= collections
.deque() # requests
78 self
._pq
= PacketQueue(str(ip
), self
.cc
.max_queue_time
)
80 if ip
not in c
.vnetwork
:
81 raise ValueError('client %s not in vnetwork' % ip
)
84 raise ValueError('multiple client cfg sections for %s' % ip
)
87 self
._log(DBG
.INIT
, 'new')
89 def _log(self
, dflag
, msg
, **kwargs
):
90 log_debug(dflag
, ('client %s: ' % self
._ip
)+msg
, **kwargs
)
92 def process_arriving_data(self
, d
):
93 self
._log(DBG
.FLOW
, "req data (enc'd)", d
=d
)
95 for packet
in slip
.decode(d
):
96 (saddr
, daddr
) = packet_addrs(packet
)
98 raise ValueError('wrong source address %s' % saddr
)
99 route(packet
, self
._ip
, saddr
, daddr
)
101 def _req_cancel(self
, request
):
102 self
._log(DBG
.HTTP_CTRL
, 'cancel', idof
=request
)
105 def _req_error(self
, err
, request
):
106 self
._log(DBG
.HTTP_CTRL
, 'error %s' % err
, idof
=request
)
107 self
._req_cancel(request
)
109 def queue_outbound(self
, packet
):
110 self
._pq
.append(packet
)
111 self
._check_outbound()
113 def _req_fin(self
, dummy
, request
, cl
):
114 self
._log(DBG
.HTTP_CTRL
, '_req_fin ' + repr(dummy
), idof
=request
)
116 except twisted
.internet
.error
.AlreadyCalled
: pass
118 def new_request(self
, request
):
119 request
.setHeader('Content-Type','application/octet-stream')
120 cl
= reactor
.callLater(self
.cc
.http_timeout
, self
._req_cancel
, request
)
121 nf
= request
.notifyFinish()
122 nf
.addErrback(self
._req_error
, request
)
123 nf
.addCallback(self
._req_fin
, request
, cl
)
124 self
._rq
.append(request
)
125 self
._check_outbound()
127 def _req_write(self
, req
, d
):
128 self
._log(DBG
.HTTP
, 'req_write ', idof
=req
, d
=d
)
131 def _check_outbound(self
):
132 log_debug(DBG
.HTTP_CTRL
, 'CHKO')
134 try: request
= self
._rq
[0]
135 except IndexError: request
= None
136 if request
and request
.finished
:
137 self
._log(DBG
.HTTP_CTRL
, 'CHKO req finished, discard', idof
=request
)
141 if not self
._pq
.nonempty():
142 # no packets, oh well
143 self
._log(DBG
.HTTP_CTRL
, 'CHKO no packets, OUT-DONE', idof
=request
)
148 self
._log(DBG
.HTTP_CTRL
, 'CHKO no request, OUT-DONE', idof
=request
)
151 self
._log(DBG
.HTTP_CTRL
, 'CHKO processing', idof
=request
)
152 # request, and also some non-expired packets
153 self
._pq
.process((lambda: request
.sentLength
),
154 (lambda d
: self
._req_write(request
, d
)),
155 self
.cc
.max_batch_down
)
157 assert(request
.sentLength
)
160 self
._log(DBG
.HTTP
, 'complete', idof
=request
)
161 # round again, looking for more to do
163 while len(self
._rq
) > self
.cc
.target_requests_outstanding
:
164 request
= self
._rq
.popleft()
165 self
._log(DBG
.HTTP
, 'CHKO above target, returning empty', idof
=request
)
168 def process_request(request
, desca
):
169 # find client, update config, etc.
170 metadata
= request
.args
[b
'm'][0]
171 metadata
= metadata
.split(b
'\r\n')
172 (ci_s
, pw
, tro
, cto
) = metadata
[0:4]
173 desca
['m[0,2:3]'] = [ci_s
, tro
, cto
]
174 ci_s
= ci_s
.decode('utf-8')
175 tro
= int(tro
); desca
['tro']= tro
176 cto
= int(cto
); desca
['cto']= cto
180 if pw
!= cl
.cc
.password
: raise ValueError('bad password')
183 if tro
!= cl
.cc
.target_requests_outstanding
:
184 raise ValueError('tro must be %d' % cl
.cc
.target_requests_outstanding
)
186 if cto
< cl
.cc
.http_timeout
:
187 raise ValueError('cto must be >= %d' % cl
.cc
.http_timeout
)
190 d
= request
.args
[b
'd'][0]
192 desca
['dlen'] = len(d
)
197 log_http(desca
, 'processing', idof
=id(request
), d
=d
)
199 d
= mime_translate(d
)
201 cl
.process_arriving_data(d
)
202 cl
.new_request(request
)
204 def log_http(desca
, msg
, **kwargs
):
206 kwargs
['d'] = desca
['d']
210 log_debug(DBG
.HTTP
, msg
+ repr(desca
), **kwargs
)
212 class NotStupidResource(twisted
.web
.resource
.Resource
):
213 # why this is not the default is a mystery!
214 def getChild(self
, name
, request
):
215 if name
== b
'': return self
216 else: return twisted
.web
.resource
.Resource
.getChild(name
, request
)
218 class IphttpResource(NotStupidResource
):
219 def render_POST(self
, request
):
220 log_debug(DBG
.HTTP_FULL
,
221 'req recv: ' + repr(request
) + ' ' + repr(request
.args
),
224 try: process_request(request
, desca
)
225 except Exception as e
:
226 emsg
= traceback
.format_exc()
227 log_http(desca
, 'RETURNING EXCEPTION ' + emsg
)
228 request
.setHeader('Content-Type','text/plain; charset="utf-8"')
229 request
.setResponseCode(400)
230 return (emsg
+ ' # ' + repr(desca
) + '\r\n').encode('utf-8')
231 log_debug(DBG
.HTTP_CTRL
, '...', idof
=id(request
))
234 def render_GET(self
, request
):
235 log_debug(DBG
.HTTP
, 'GET request')
240 <a href="source">source</a>
241 (and that of dependency <a href="srcpkgs">packages</a>)
247 resource
= IphttpResource()
248 site
= twisted
.web
.server
.Site(resource
)
251 ep
= sa
.make_endpoint()
252 crash_on_defer(ep
.listen(site
))
253 log_debug(DBG
.INIT
, 'listening on %s' % sa
)
255 td
= tempfile
.mkdtemp()
258 try: shutil
.rmtree(td
)
259 except FileNotFoundError
: pass
261 cleanups
.append(cleanup
)
263 ssp
= SourceShipmentPreparer(td
)
264 ssp
.logger
= partial(log_debug
, DBG
.OWNSOURCE
)
267 resource
.putChild(b
'source', twisted
.web
.static
.File(ssp
.output_paths
[0]))
268 resource
.putChild(b
'srcpkgs', twisted
.web
.static
.File(ssp
.output_paths
[1]))
270 reactor
.callLater(0.1, (lambda: log
.info('hippotatd started', dflag
=False)))
272 #---------- config and setup ----------
274 def process_cfg(putative_servers
, putative_clients
):
277 c
.server
= cfg
.get('SERVER','server')
279 cfg_process_common(c
, c
.server
)
280 cfg_process_saddrs(c
, c
.server
)
281 cfg_process_vnetwork(c
, c
.server
)
282 cfg_process_vaddr(c
, c
.server
)
284 for (ci
,cs
) in putative_clients
.items():
286 sections
= cfg_process_client_common(cc
,c
.server
,cs
,ci
)
287 if not sections
: continue
288 cfg_process_client_limited(cc
,c
.server
,sections
, 'max_batch_down')
289 cfg_process_client_limited(cc
,c
.server
,sections
, 'max_queue_time')
293 c
.vrelay
= cfg
.get(c
.server
, 'vrelay')
294 except NoOptionError
:
295 for search
in c
.vnetwork
.hosts():
296 if search
== c
.vaddr
: continue
301 [c
.server
, 'DEFAULT'],
304 ('rnets','vnetwork')))
306 def catch_termination():
308 for cleanup
in cleanups
:
311 atexit
.register(run_cleanups
)
313 def signal_handler(name
, sig
, *args
):
314 signal
.signal(sig
, signal
.SIG_DFL
)
315 print('exiting due to %s' % name
, file=sys
.stderr
)
317 os
.kill(os
.getpid(), sig
)
318 raise RuntimeError('did not die due to signal %s !' % name
)
320 for sig
in (signal
.SIGINT
, signal
.SIGTERM
):
321 signal
.signal(sig
, partial(signal_handler
, sig
.name
))
323 common_startup(process_cfg
)
325 ipif
= start_ipif(c
.ipif_command
, (lambda p
,s
,d
: route(p
,"[ipif]",s
,d
)))