7 import twisted
.internet
8 from twisted
.web
.server
import NOT_DONE_YET
10 #import twisted.web.server import Site
11 #from twisted.web.resource import Resource
17 #---------- "router" ----------
19 def route(packet
, saddr
, daddr
):
21 log_debug(DBG
.ROUTE
, 'route: %s -> %s: %s' %
(saddr
,daddr
,dest
), d
=packet
)
22 try: dclient
= clients
[daddr
]
23 except KeyError: dclient
= None
24 if dclient
is not None:
26 dclient
.queue_outbound(packet
)
27 elif daddr
== c
.server
or daddr
not in c
.network
:
32 log_discard(packet
, saddr
, daddr
, 'relay')
34 lt('discard no-client')
35 log_discard(packet
, saddr
, daddr
, 'no-client')
37 #---------- client ----------
40 def __init__(self
, ip
, cs
, pw
):
41 # instance data members
45 self
._rq
= collections
.deque() # requests
46 # self._pq = PacketQueue(...)
51 # .target_requests_outstanding
53 if ip
not in c
.network
:
54 raise ValueError('client %s not in network' % ip
)
56 for k
in ('max_batch_down','max_queue_time','max_request_time',
57 'target_requests_outstanding'):
58 req
= cfg
.getint(cs
, k
)
59 limit
= cfg
.getint('limits',k
)
60 self
.__dict__
[k
] = min(req
, limit
)
62 self
._pq
= PacketQueue(ip
, self
.max_queue_time
)
65 raise ValueError('multiple client cfg sections for %s' % ip
)
68 self
._log(DBG
.INIT
, 'new')
70 def _log(self
, pri
, msg
, **kwargs
):
71 log_debug(pri
, 'client '+self
._ip
+': '+msg
, **kwargs
)
73 def process_arriving_data(self
, d
):
74 self
._log(DBG
.FLOW
, 'req data', d
=d
)
75 for packet
in slip
.decode(d
):
76 (saddr
, daddr
) = packet_addrs(packet
)
78 raise ValueError('wrong source address %s' % saddr
)
79 route(packet
, saddr
, daddr
)
81 def _req_cancel(self
, request
):
82 self
._log(DBG
.HTTPCTRL
, 'cancel', idof
=request
)
85 def _req_error(self
, err
, request
):
86 self
._log(DBG
.HTTPCTRL
, 'error %s' % err
, idof
=request
)
87 self
._req_cancel(request
)
89 def queue_outbound(self
, packet
):
90 self
._pq
.append(packet
)
91 self
._check_outbound()
93 def new_request(self
, request
):
94 request
.setHeader('Content-Type','application/octet-stream')
95 reactor
.callLater(self
.max_request_time
, self
._req_cancel
, request
)
96 request
.notifyFinish().addErrback(self
._req_error
, request
)
97 self
._rq
.append(request
)
98 self
._check_outbound()
100 def _check_outbound(self
):
101 log_debug(DBG
.HTTPCTRL
, 'CHKO')
103 try: request
= self
._rq
[0]
104 except IndexError: request
= None
105 if request
and request
.finished
:
106 self
._log(DBG
.HTTP_CTRL
, 'CHKO req finished, discard', idof
=request
)
110 if not self
._pq
.nonempty():
111 # no packets, oh well
112 self
._log(DBG
.HTTP_CTRL
, 'CHKO no packets, OUT-DONE', idof
=request
)
117 self
._log(DBG
.HTTP_CTRL
, 'CHKO no request, OUT-DONE', idof
=request
)
120 self
._log(DBG
.HTTP_CTRL
, 'CHKO processing', idof
=request
)
121 # request, and also some non-expired packets
122 self
._pq
.process((lambda: request
.sentLength
),
126 assert(request
.sentLength
)
129 self
._log(DBG
.HTTP
, 'complete', idof
=request
)
130 # round again, looking for more to do
132 while len(self
._rq
) > self
.target_requests_outstanding
:
133 request
= self
._rq
.popleft()
134 self
._log(DBG
.HTTP
, 'CHKO above target, returning empty', idof
=request
)
137 def process_request(request
, desca
):
138 # find client, update config, etc.
139 metadata
= request
.args
['m']
140 metadata
= metadata
.split(b
'\n')
141 (ci_s
, pw
, tro
) = metadata
.split(b
'\n')[0:3]
142 desca
['m'] = [ci_s
, tro
]
146 if pw
!= cl
.pw
: raise ValueError('bad password')
149 if tro
!= cl
.target_requests_outstanding
:
150 raise ValueError('tro must be %d' % cl
.target_requests_outstanding
)
153 d
= request
.args
['d']
155 except KeyError: d
= ''
157 cl
.process_arriving_data(d
)
158 cl
.new_request(request
)
160 def log_http(desca
, msg
):
166 log_debug(DBG
.HTTP
, msg
+ repr(desca
), d
=d
)
168 class IphttpResource(twisted
.web
.resource
.Resource
):
170 def render_POST(self
, request
):
172 try: process_request(request
, desca
)
173 except Exception as e
:
174 emsg
= str(e
).encode('utf-8')
175 log_http(desca
, 'EXCEPTION ' + emsg
)
176 request
.setHeader('Content-Type','text/plain; charset="utf-8"')
177 request
.setResponseCode(400)
179 log_http(desca
, '... [%s]' % id
(request
))
182 def render_GET(self
, request
):
183 log_debug(DBG
.HTTP
, 'GET request')
184 return b
'<html><body>hippotat</body></html>'
187 resource
= IphttpResource()
188 site
= twisted
.web
.server
.Site(resource
)
190 ep
= sa
.make_endpoint()
191 crash_on_defer(ep
.listen(site
))
192 log_debug(DBG
.INIT
, 'listening on %s' % sa
)
194 #---------- config and setup ----------
197 process_cfg_common_always()
199 process_cfg_network()
202 c
.relay
= cfg
.get('virtual','relay')
203 except NoOptionError
:
204 for search
in c
.network
.hosts():
205 if search
== c
.server
: continue
210 process_cfg_clients(Client
)
212 process_cfg_ipif('server',
215 ('rnets','network')))
219 start_ipif(c
.ipif_command
, route
)