7 def set_client(ci
,cs
,pw
):
10 assert(client_cs
is None)
13 c
.max_outstanding
= cfg
.getint(cs
, 'max_requests_outstanding')
14 c
.target_outstanding
= cfg
.getint(cs
, 'target_requests_outstanding')
19 global max_requests_outstanding
21 process_cfg_common_always()
25 c
.url
= cfg
.get('server','url')
28 sa
= c
.saddrs
[0].url()
30 process_cfg_clients(set_client
)
32 c
.routes
= cfg
.get('virtual','routes')
33 c
.max_queue_time
= cfg
.getint(client_cs
, 'max_queue_time')
34 c
.max_batch_up
= cfg
.getint(client_cs
, 'max_batch_up')
36 process_cfg_ipif(client_cs
,
46 queue
= PacketQueue(c
.max_queue_time
)
47 agent
= twisted
.web
.client
.Agent(reactor
, connectTimeout
= c
.http_timeout
)
49 def outbound(packet
, saddr
, daddr
):
50 #print('OUT ', saddr, daddr, repr(packet))
54 class ResponseConsumer(twisted
.internet
.protocol
.Protocol
):
56 self
._ssd
= SlipStreamDecoder(queue_inbound
)
57 def dataReceived(self
, data
):
58 self
._ssd
.inputdata(mime_translate(data
))
59 def connectionMade(self
): pass
60 def connectionLost(self
, reason
):
61 if isinstance(reason
, twisted
.internet
.error
.ConnectionDone
):
64 print(reason
, file=sys
.stderr
)
67 resp
.deliverBody(ResponseConsumer())
70 print(err
, >>sys
.stderr
)
77 if outstanding
>= c
.max_outstanding
: break
78 if not queue
.nonempty() && outstanding
>= c
.target_outstanding
: break
81 queue
.process((lambda: len(d
)),
87 mime
= (b
'--b' + crlf
+
88 b
'Content-Disposition: form-data; name="m"' + crlf
+
91 c
.target_outstanding
+ crlf
+
93 b
'Content-Disposition: form-data; name="d"' + crlf
+
94 mime_translate(d
) + crlf
+
97 hh
= { 'User-Agent': ['hippotat'],
98 'Content-Type': ['multipart/form-data; boundary="b"'] }
99 req
= agent
.request(b
'POST',
101 twisted
.web
.client
.Headers(hh
))
102 req
.addTimeout(c
.http_timeout
,
103 req
.addCallbacks(req_ok
, req_err
)
110 start_ipif(c
.ipif_command
, outbound
)