6 import twisted
.web
.client
10 def set_client(ci
,cs
,pw
):
13 assert(client_cs
is None)
16 c
.max_outstanding
= cfg
.getint(cs
, 'max_requests_outstanding')
17 c
.target_outstanding
= cfg
.getint(cs
, 'target_requests_outstanding')
22 global max_requests_outstanding
24 process_cfg_common_always()
28 c
.url
= cfg
.get('server','url')
31 sa
= c
.saddrs
[0].url()
33 process_cfg_clients(set_client
)
35 c
.routes
= cfg
.get('virtual','routes')
36 c
.max_queue_time
= cfg
.getint(client_cs
, 'max_queue_time')
37 c
.max_batch_up
= cfg
.getint(client_cs
, 'max_batch_up')
38 c
.http_timeout
= cfg
.getint(client_cs
, 'http_timeout')
40 process_cfg_ipif(client_cs
,
50 queue
= PacketQueue(c
.max_queue_time
)
51 agent
= twisted
.web
.client
.Agent(reactor
, connectTimeout
= c
.http_timeout
)
53 def outbound(packet
, saddr
, daddr
):
54 #print('OUT ', saddr, daddr, repr(packet))
58 class ResponseConsumer(twisted
.internet
.protocol
.Protocol
):
60 self
._ssd
= SlipStreamDecoder(queue_inbound
)
61 def dataReceived(self
, data
):
62 self
._ssd
.inputdata(mime_translate(data
))
63 def connectionMade(self
): pass
64 def connectionLost(self
, reason
):
65 if isinstance(reason
, twisted
.internet
.error
.ConnectionDone
):
68 print(reason
, file=sys
.stderr
)
71 resp
.deliverBody(ResponseConsumer())
74 print(err
, file=sys
.stderr
)
81 if outstanding
>= c
.max_outstanding
: break
82 if not queue
.nonempty() and outstanding
>= c
.target_outstanding
: break
85 def moredata(s
): global d
; d
+= s
86 queue
.process((lambda: len(d
)),
92 mime
= (b
'--b' + crlf
+
93 b
'Content-Disposition: form-data; name="m"' + crlf
+
96 c
.target_outstanding
+ crlf
+
98 b
'Content-Disposition: form-data; name="d"' + crlf
+
99 mime_translate(d
) + crlf
+
102 hh
= { 'User-Agent': ['hippotat'],
103 'Content-Type': ['multipart/form-data; boundary="b"'] }
104 req
= agent
.request(b
'POST',
106 twisted
.web
.client
.Headers(hh
))
107 req
.addTimeout(c
.http_timeout
)
108 req
.addCallbacks(req_ok
, req_err
)
115 start_ipif(c
.ipif_command
, outbound
)