7 import twisted
.internet
8 from twisted
.web
.server
import NOT_DONE_YET
10 #import twisted.web.server import Site
11 #from twisted.web.resource import Resource
17 #---------- "router" ----------
19 def route(packet
, iface
, saddr
, daddr
):
21 log_debug(DBG
.ROUTE
, 'route: %s -> %s: %s' %
(saddr
,daddr
,dest
), d
=packet
)
22 try: dclient
= clients
[daddr
]
23 except KeyError: dclient
= None
24 if dclient
is not None:
26 dclient
.queue_outbound(packet
)
27 elif daddr
== c
.vaddr
or daddr
not in c
.vnetwork
:
32 log_discard(packet
, iface
, saddr
, daddr
, 'relay')
34 lt('discard no-client')
35 log_discard(packet
, iface
, saddr
, daddr
, 'no-client')
37 #---------- client ----------
40 def __init__(self
, ip
, cc
):
41 # instance data members
44 self
._rq
= collections
.deque() # requests
45 # self._pq = PacketQueue(...)
49 # .target_requests_outstanding
51 if ip
not in c
.vnetwork
:
52 raise ValueError('client %s not in vnetwork' % ip
)
54 self
._pq
= PacketQueue(str(ip
), self
.max_queue_time
)
57 raise ValueError('multiple client cfg sections for %s' % ip
)
60 self
._log(DBG
.INIT
, 'new')
62 def _log(self
, dflag
, msg
, **kwargs
):
63 log_debug(dflag
, ('client %s: ' % self
._ip
)+msg
, **kwargs
)
65 def process_arriving_data(self
, d
):
66 self
._log(DBG
.FLOW
, "req data (enc'd)", d
=d
)
68 for packet
in slip
.decode(d
):
69 (saddr
, daddr
) = packet_addrs(packet
)
71 raise ValueError('wrong source address %s' % saddr
)
72 route(packet
, self
._ip
, saddr
, daddr
)
74 def _req_cancel(self
, request
):
75 self
._log(DBG
.HTTP_CTRL
, 'cancel', idof
=request
)
78 def _req_error(self
, err
, request
):
79 self
._log(DBG
.HTTP_CTRL
, 'error %s' % err
, idof
=request
)
80 self
._req_cancel(request
)
82 def queue_outbound(self
, packet
):
83 self
._pq
.append(packet
)
84 self
._check_outbound()
86 def _req_fin(self
, dummy
, request
, cl
):
87 self
._log(DBG
.HTTP_CTRL
, '_req_fin ' + repr(dummy
), idof
=request
)
89 except twisted
.internet
.error
.AlreadyCalled
: pass
91 def new_request(self
, request
):
92 request
.setHeader('Content-Type','application/octet-stream')
93 cl
= reactor
.callLater(self
.http_timeout
, self
._req_cancel
, request
)
94 nf
= request
.notifyFinish()
95 nf
.addErrback(self
._req_error
, request
)
96 nf
.addCallback(self
._req_fin
, request
, cl
)
97 self
._rq
.append(request
)
98 self
._check_outbound()
100 def _req_write(self
, req
, d
):
101 self
._log(DBG
.HTTP
, 'req_write ', idof
=req
, d
=d
)
104 def _check_outbound(self
):
105 log_debug(DBG
.HTTP_CTRL
, 'CHKO')
107 try: request
= self
._rq
[0]
108 except IndexError: request
= None
109 if request
and request
.finished
:
110 self
._log(DBG
.HTTP_CTRL
, 'CHKO req finished, discard', idof
=request
)
114 if not self
._pq
.nonempty():
115 # no packets, oh well
116 self
._log(DBG
.HTTP_CTRL
, 'CHKO no packets, OUT-DONE', idof
=request
)
121 self
._log(DBG
.HTTP_CTRL
, 'CHKO no request, OUT-DONE', idof
=request
)
124 self
._log(DBG
.HTTP_CTRL
, 'CHKO processing', idof
=request
)
125 # request, and also some non-expired packets
126 self
._pq
.process((lambda: request
.sentLength
),
127 (lambda d
: self
._req_write(request
, d
)),
130 assert(request
.sentLength
)
133 self
._log(DBG
.HTTP
, 'complete', idof
=request
)
134 # round again, looking for more to do
136 while len(self
._rq
) > self
.target_requests_outstanding
:
137 request
= self
._rq
.popleft()
138 self
._log(DBG
.HTTP
, 'CHKO above target, returning empty', idof
=request
)
141 def process_request(request
, desca
):
142 # find client, update config, etc.
143 metadata
= request
.args
[b
'm'][0]
144 metadata
= metadata
.split(b
'\r\n')
145 (ci_s
, pw
, tro
, cto
) = metadata
[0:4]
146 desca
['m[0,2:3]'] = [ci_s
, tro
, cto
]
147 ci_s
= ci_s
.decode('utf-8')
148 tro
= int(tro
); desca
['tro']= tro
149 cto
= int(cto
); desca
['cto']= cto
153 if pw
!= cl
.pw
: raise ValueError('bad password')
156 if tro
!= cl
.target_requests_outstanding
:
157 raise ValueError('tro must be %d' % cl
.target_requests_outstanding
)
159 if cto
< cl
.http_timeout
:
160 raise ValueError('cto must be >= %d' % cl
.http_timeout
)
163 d
= request
.args
[b
'd'][0]
165 desca
['dlen'] = len(d
)
170 log_http(desca
, 'processing', idof
=id(request
), d
=d
)
172 d
= mime_translate(d
)
174 cl
.process_arriving_data(d
)
175 cl
.new_request(request
)
177 def log_http(desca
, msg
, **kwargs
):
179 kwargs
['d'] = desca
['d']
183 log_debug(DBG
.HTTP
, msg
+ repr(desca
), **kwargs
)
185 class IphttpResource(twisted
.web
.resource
.Resource
):
187 def render_POST(self
, request
):
188 log_debug(DBG
.HTTP_FULL
,
189 'req recv: ' + repr(request
) + ' ' + repr(request
.args
),
192 try: process_request(request
, desca
)
193 except Exception as e
:
194 emsg
= traceback
.format_exc()
195 log_http(desca
, 'RETURNING EXCEPTION ' + emsg
)
196 request
.setHeader('Content-Type','text/plain; charset="utf-8"')
197 request
.setResponseCode(400)
198 return (emsg
+ ' # ' + repr(desca
) + '\r\n').encode('utf-8')
199 log_debug(DBG
.HTTP_CTRL
, '...', idof
=id(request
))
202 def render_GET(self
, request
):
203 log_debug(DBG
.HTTP
, 'GET request')
204 return b
'<html><body>hippotat</body></html>'
207 resource
= IphttpResource()
208 site
= twisted
.web
.server
.Site(resource
)
210 ep
= sa
.make_endpoint()
211 crash_on_defer(ep
.listen(site
))
212 log_debug(DBG
.INIT
, 'listening on %s' % sa
)
214 #---------- config and setup ----------
216 def process_cfg(putative_servers
, putative_clients
):
219 c
.server
= cfg
.get('SERVER','server')
221 cfg_process_common(c
, c
.server
)
222 cfg_process_saddrs(c
, c
.server
)
223 cfg_process_vnetwork(c
, c
.server
)
224 cfg_process_vaddr(c
, c
.server
)
226 for (ci
,cs
) in putative_clients
.items():
228 sections
= cfg_process_client_common(cc
,c
.server
,cs
,ci
):
229 if not sections
: continue
230 cfg_process_client_limited(cc
,c
.server
,sections
, 'max_batch_down')
231 cfg_process_client_limited(cc
,c
.server
,sections
, 'max_queue_time')
235 c
.relay
= cfg
.get(c
.server
, 'relay')
236 except NoOptionError
:
237 for search
in c
.vnetwork
.hosts():
238 if search
== c
.vaddr
: continue
243 [c
.server
, 'DEFAULT'],
246 ('rnets','vnetwork')))
248 common_startup(process_cfg
)
249 start_ipif(c
.ipif_command
, (lambda p
,s
,d
: route(p
,"[ipif]",s
,d
)))