7 import twisted
.internet
8 from twisted
.web
.server
import NOT_DONE_YET
10 #import twisted.web.server import Site
11 #from twisted.web.resource import Resource
17 #---------- "router" ----------
19 def route(packet
, saddr
, daddr
):
21 log_data(DBG
.ROUTE
, packet
, 'route: %s -> %s: %s' %
(saddr
,daddr
,dest
))
22 try: dclient
= clients
[daddr
]
23 except KeyError: dclient
= None
24 if dclient
is not None:
26 dclient
.queue_outbound(packet
)
27 elif daddr
== c
.server
or daddr
not in c
.network
:
32 log_discard(packet
, saddr
, daddr
, 'relay')
34 lt('discard no-client')
35 log_discard(packet
, saddr
, daddr
, 'no-client')
37 #---------- client ----------
40 def __init__(self
, ip
, cs
, pw
):
41 # instance data members
45 self
._rq
= collections
.deque() # requests
46 # self._pq = PacketQueue(...)
51 # .target_requests_outstanding
53 if ip
not in c
.network
:
54 raise ValueError('client %s not in network' % ip
)
56 for k
in ('max_batch_down','max_queue_time','max_request_time',
57 'target_requests_outstanding'):
58 req
= cfg
.getint(cs
, k
)
59 limit
= cfg
.getint('limits',k
)
60 self
.__dict__
[k
] = min(req
, limit
)
62 self
._pq
= PacketQueue(self
.max_queue_time
)
65 raise ValueError('multiple client cfg sections for %s' % ip
)
68 log_debug('DBG.INIT', 'new client %s' % self
)
70 def process_arriving_data(self
, d
):
71 log_data(DBG
.FLOW
, d
, 'client req data: %s' % saddr
=self
._ip
)
72 for packet
in slip
.decode(d
):
73 (saddr
, daddr
) = packet_addrs(packet
)
75 raise ValueError('wrong source address %s' % saddr
)
76 route(packet
, saddr
, daddr
)
78 def _req_cancel(self
, request
):
79 log_httpreq(DBG
.HTTPCTRL
, request
, 'cancel')
82 def _req_error(self
, err
, request
):
83 log_httpreq(DBG
.HTTPCTRL
, request
, 'error %s' % err
)
84 self
._req_cancel(request
)
86 def queue_outbound(self
, packet
):
87 self
._pq
.append(packet
)
88 self
._check_outbound()
90 def new_request(self
, request
):
91 request
.setHeader('Content-Type','application/octet-stream')
92 reactor
.callLater(self
.max_request_time
, self
._req_cancel
, request
)
93 request
.notifyFinish().addErrback(self
._req_error
, request
)
94 self
._rq
.append(request
)
95 self
._check_outbound()
97 def _check_outbound(self
):
98 log_httpreq(DBG
.HTTPCTRL
, None, 'CHKO')
100 try: request
= self
._rq
[0]
101 except IndexError: request
= None
102 if request
and request
.finished
:
103 log_httpreq(DBG
.HTTP_CTRL
, request
, 'CHKO request finished, discard')
107 if not self
._pq
.nonempty():
108 # no packets, oh well
109 log_httpreq(DBG
.HTTP_CTRL
, request
, 'CHKO no packets, OUT-DONE')
114 log_httpreq(DBG
.HTTP_CTRL
, request
, 'CHKO no request, OUT-DONE')
117 log_httpreq(DBG
.HTTP_CTRL
, request
, 'CHKO processing')
118 # request, and also some non-expired packets
119 self
._pq
.process((lambda: request
.sentLength
),
123 assert(request
.sentLength
)
126 log_httpreq(DBG
.HTTP
, request
, 'complete')
127 # round again, looking for more to do
129 while len(self
._rq
) > self
.target_requests_outstanding
:
130 request
= self
._rq
.popleft()
131 log_httpreq(DBG
.HTTP
, request
, 'CHKO above target, returning empty')
134 def process_request(request
, desca
):
135 # find client, update config, etc.
136 metadata
= request
.args
['m']
137 metadata
= metadata
.split(b
'\n')
138 (ci_s
, pw
, tro
) = metadata
.split(b
'\n')[0:3]
139 desca
['m'] = [ci_s
, tro
]
143 if pw
!= cl
.pw
: raise ValueError('bad password')
146 if tro
!= cl
.target_requests_outstanding
:
147 raise ValueError('tro must be %d' % cl
.target_requests_outstanding
)
150 d
= request
.args
['d']
152 except KeyError: d
= ''
154 cl
.process_arriving_data(d
)
155 cl
.new_request(request
)
157 class IphttpResource(twisted
.web
.resource
.Resource
):
159 def render_POST(self
, request
):
161 try: process_request(request
, desca
)
162 except Exception as e
:
163 emsg
= str(e
).encode('utf-8')
164 log_http(desca
, 'EXCEPTION ' + emsg
)
165 request
.setHeader('Content-Type','text/plain; charset="utf-8"')
166 request
.setResponseCode(400)
168 log_http(desca
, '... [%s]' % id
(request
))
171 def render_GET(self
, request
):
172 log_debug(DBG
.HTTP
, 'GET request')
173 return b
'<html><body>hippotat</body></html>'
176 resource
= IphttpResource()
177 site
= twisted
.web
.server
.Site(resource
)
179 ep
= sa
.make_endpoint()
180 crash_on_defer(ep
.listen(site
))
181 log_debug(DBG
.INIT
, 'listening on %s' % sa
)
183 #---------- config and setup ----------
186 process_cfg_common_always()
188 process_cfg_network()
191 c
.relay
= cfg
.get('virtual','relay')
192 except NoOptionError
:
193 for search
in c
.network
.hosts():
194 if search
== c
.server
: continue
199 process_cfg_clients(Client
)
201 process_cfg_ipif('server',
204 ('rnets','network')))
208 start_ipif(c
.ipif_command
, route
)