3 # Hippotat - Asinine IP Over HTTP program
4 # ./hippotatd - server main program
6 # Copyright 2017 Ian Jackson
10 # This program is free software: you can redistribute it and/or
11 # modify it under the terms of the GNU Affero General Public
12 # License as published by the Free Software Foundation, either
13 # version 3 of the License, or (at your option) any later version,
14 # with the "CAF Login Exception" as published by Ian Jackson
15 # (version 2, or at your option any later version) as an Additional
18 # This program is distributed in the hope that it will be useful,
19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
21 # Affero General Public License for more details.
23 # You should have received a copy of the GNU Affero General Public
24 # License and the CAF Login Exception along with this program, in
25 # the file AGPLv3+CAFv2. If not, email Ian Jackson
26 # <ijackson@chiark.greenend.org.uk>.
29 from hippotatlib
import *
36 import twisted
.internet
37 from twisted
.web
.server
import NOT_DONE_YET
39 import twisted
.web
.static
41 import hippotatlib
.ownsource
42 from hippotatlib
.ownsource
import SourceShipmentPreparer
44 #import twisted.web.server import Site
45 #from twisted.web.resource import Resource
53 #---------- "router" ----------
55 def route(packet
, iface
, saddr
, daddr
):
57 log_debug(DBG
.ROUTE
, 'route: %s -> %s: %s' %
(saddr
,daddr
,dest
), d
=packet
)
58 try: dclient
= clients
[daddr
]
59 except KeyError: dclient
= None
60 if dclient
is not None:
62 dclient
.queue_outbound(packet
)
63 elif daddr
== c
.vaddr
or daddr
not in c
.vnetwork
:
65 queue_inbound(ipif
, packet
)
66 elif daddr
== c
.relay
:
68 log_discard(packet
, iface
, saddr
, daddr
, 'relay')
70 lt('discard no-client')
71 log_discard(packet
, iface
, saddr
, daddr
, 'no-client')
73 #---------- client ----------
76 def __init__(self
, ip
, cc
):
77 # instance data members
80 self
._rq
= collections
.deque() # requests
81 self
._pq
= PacketQueue(str(ip
), self
.cc
.max_queue_time
)
83 if ip
not in c
.vnetwork
:
84 raise ValueError('client %s not in vnetwork' % ip
)
87 raise ValueError('multiple client cfg sections for %s' % ip
)
90 self
._log(DBG
.INIT
, 'new')
92 def _log(self
, dflag
, msg
, **kwargs
):
93 log_debug(dflag
, ('client %s: ' % self
._ip
)+msg
, **kwargs
)
95 def process_arriving_data(self
, d
):
96 self
._log(DBG
.FLOW
, "req data (enc'd)", d
=d
)
98 for packet
in slip
.decode(d
):
99 (saddr
, daddr
) = packet_addrs(packet
)
100 if saddr
!= self
._ip
:
101 raise ValueError('wrong source address %s' % saddr
)
102 route(packet
, self
._ip
, saddr
, daddr
)
104 def _req_cancel(self
, request
):
105 self
._log(DBG
.HTTP_CTRL
, 'cancel', idof
=request
)
108 def _req_error(self
, err
, request
):
109 self
._log(DBG
.HTTP_CTRL
, 'error %s' % err
, idof
=request
)
110 self
._req_cancel(request
)
112 def queue_outbound(self
, packet
):
113 self
._pq
.append(packet
)
114 self
._check_outbound()
116 def _req_fin(self
, dummy
, request
, cl
):
117 self
._log(DBG
.HTTP_CTRL
, '_req_fin ' + repr(dummy
), idof
=request
)
119 except twisted
.internet
.error
.AlreadyCalled
: pass
121 def new_request(self
, request
):
122 request
.setHeader('Content-Type','application/octet-stream')
123 cl
= reactor
.callLater(self
.cc
.http_timeout
, self
._req_cancel
, request
)
124 nf
= request
.notifyFinish()
125 nf
.addErrback(self
._req_error
, request
)
126 nf
.addCallback(self
._req_fin
, request
, cl
)
127 self
._rq
.append(request
)
128 self
._check_outbound()
130 def _req_write(self
, req
, d
):
131 self
._log(DBG
.HTTP
, 'req_write ', idof
=req
, d
=d
)
134 def _check_outbound(self
):
135 log_debug(DBG
.HTTP_CTRL
, 'CHKO')
137 try: request
= self
._rq
[0]
138 except IndexError: request
= None
139 if request
and request
.finished
:
140 self
._log(DBG
.HTTP_CTRL
, 'CHKO req finished, discard', idof
=request
)
144 if not self
._pq
.nonempty():
145 # no packets, oh well
146 self
._log(DBG
.HTTP_CTRL
, 'CHKO no packets, OUT-DONE', idof
=request
)
151 self
._log(DBG
.HTTP_CTRL
, 'CHKO no request, OUT-DONE', idof
=request
)
154 self
._log(DBG
.HTTP_CTRL
, 'CHKO processing', idof
=request
)
155 # request, and also some non-expired packets
156 self
._pq
.process((lambda: request
.sentLength
),
157 (lambda d
: self
._req_write(request
, d
)),
158 self
.cc
.max_batch_down
)
160 assert(request
.sentLength
)
163 self
._log(DBG
.HTTP
, 'complete', idof
=request
)
164 # round again, looking for more to do
166 while len(self
._rq
) > self
.cc
.target_requests_outstanding
:
167 request
= self
._rq
.popleft()
168 self
._log(DBG
.HTTP
, 'CHKO above target, returning empty', idof
=request
)
171 def process_request(request
, desca
):
172 # find client, update config, etc.
173 metadata
= request
.args
[b
'm'][0]
174 metadata
= metadata
.split(b
'\r\n')
175 (ci_s
, pw
, tro
, cto
) = metadata
[0:4]
176 desca
['m[0,2:3]'] = [ci_s
, tro
, cto
]
177 ci_s
= ci_s
.decode('utf-8')
178 tro
= int(tro
); desca
['tro']= tro
179 cto
= int(cto
); desca
['cto']= cto
183 if pw
!= cl
.cc
.password
: raise ValueError('bad password')
186 if tro
!= cl
.cc
.target_requests_outstanding
:
187 raise ValueError('tro must be %d' % cl
.cc
.target_requests_outstanding
)
189 if cto
< cl
.cc
.http_timeout
:
190 raise ValueError('cto must be >= %d' % cl
.cc
.http_timeout
)
193 d
= request
.args
[b
'd'][0]
195 desca
['dlen'] = len(d
)
200 log_http(desca
, 'processing', idof
=id(request
), d
=d
)
202 d
= mime_translate(d
)
204 cl
.process_arriving_data(d
)
205 cl
.new_request(request
)
207 def log_http(desca
, msg
, **kwargs
):
209 kwargs
['d'] = desca
['d']
213 log_debug(DBG
.HTTP
, msg
+ repr(desca
), **kwargs
)
215 class NotStupidResource(twisted
.web
.resource
.Resource
):
216 # why this is not the default is a mystery!
217 def getChild(self
, name
, request
):
218 if name
== b
'': return self
219 else: return twisted
.web
.resource
.Resource
.getChild(name
, request
)
221 class IphttpResource(NotStupidResource
):
222 def render_POST(self
, request
):
223 log_debug(DBG
.HTTP_FULL
,
224 'req recv: ' + repr(request
) + ' ' + repr(request
.args
),
227 try: process_request(request
, desca
)
228 except Exception as e
:
229 emsg
= traceback
.format_exc()
230 log_http(desca
, 'RETURNING EXCEPTION ' + emsg
)
231 request
.setHeader('Content-Type','text/plain; charset="utf-8"')
232 request
.setResponseCode(400)
233 return (emsg
+ ' # ' + repr(desca
) + '\r\n').encode('utf-8')
234 log_debug(DBG
.HTTP_CTRL
, '...', idof
=id(request
))
237 # instantiator should set
238 # self.hippotat_sources = (source_names[0], source_names[1])
240 self
.hippotat_sources
= [None, None]
243 def render_GET(self
, request
):
244 log_debug(DBG
.HTTP
, 'GET request')
249 <a href="%s">source</a>
250 (and that of dependency <a href="%s">packages</a>)
254 %
tuple(self
.hippotat_sources
)).encode('utf-8')
257 resource
= IphttpResource()
258 site
= twisted
.web
.server
.Site(resource
)
261 ep
= sa
.make_endpoint()
262 crash_on_defer(ep
.listen(site
))
263 log_debug(DBG
.INIT
, 'listening on %s' % sa
)
265 td
= tempfile
.mkdtemp()
268 try: shutil
.rmtree(td
)
269 except FileNotFoundError
: pass
271 cleanups
.append(cleanup
)
273 ssp
= SourceShipmentPreparer(td
)
274 ssp
.logger
= partial(log_debug
, DBG
.OWNSOURCE
)
275 if DBG
.OWNSOURCE
in debug_set
: ssp
.stream_debug
= sys
.stdout
279 bn
= ssp
.output_names
[ix
]
280 op
= ssp
.output_paths
[ix
]
281 resource
.hippotat_sources
[ix
] = bn
282 subresource
=twisted
.web
.static
.File(op
)
283 resource
.putChild(bn
.encode('utf-8'), subresource
)
285 reactor
.callLater(0.1, (lambda: log
.info('hippotatd started', dflag
=False)))
287 #---------- config and setup ----------
289 def process_cfg(putative_servers
, putative_clients
):
292 c
.server
= cfg
.get('SERVER','server')
294 cfg_process_common(c
, c
.server
)
295 cfg_process_saddrs(c
, c
.server
)
296 cfg_process_vnetwork(c
, c
.server
)
297 cfg_process_vaddr(c
, c
.server
)
299 for (ci
,cs
) in putative_clients
.items():
301 sections
= cfg_process_client_common(cc
,c
.server
,cs
,ci
)
302 if not sections
: continue
303 cfg_process_client_limited(cc
,c
.server
,sections
, 'max_batch_down')
304 cfg_process_client_limited(cc
,c
.server
,sections
, 'max_queue_time')
308 c
.vrelay
= cfg
.get(c
.server
, 'vrelay')
309 except NoOptionError
:
310 for search
in c
.vnetwork
.hosts():
311 if search
== c
.vaddr
: continue
316 [c
.server
, 'DEFAULT'],
319 ('rnets','vnetwork')))
321 def catch_termination():
323 for cleanup
in cleanups
:
326 atexit
.register(run_cleanups
)
328 def signal_handler(name
, sig
, *args
):
329 signal
.signal(sig
, signal
.SIG_DFL
)
330 print('exiting due to %s' % name
, file=sys
.stderr
)
332 os
.kill(os
.getpid(), sig
)
333 raise RuntimeError('did not die due to signal %s !' % name
)
335 for sig
in (signal
.SIGINT
, signal
.SIGTERM
):
336 signal
.signal(sig
, partial(signal_handler
, sig
.name
))
338 common_startup(process_cfg
)
340 ipif
= start_ipif(c
.ipif_command
, (lambda p
,s
,d
: route(p
,"[ipif]",s
,d
)))