5d0f427d354714c37f1cd544ac54fd82f5d06ec0
6 import twisted
.web
.client
10 def set_client(ci
,cs
,pw
):
13 assert(client_cs
is None)
16 c
.max_outstanding
= cfg
.getint(cs
, 'max_requests_outstanding')
17 c
.target_outstanding
= cfg
.getint(cs
, 'target_requests_outstanding')
22 global max_requests_outstanding
24 process_cfg_common_always()
28 c
.url
= cfg
.get('server','url')
31 c
.url
= c
.saddrs
[0].url()
33 process_cfg_clients(set_client
)
35 c
.routes
= cfg
.get('virtual','routes')
36 c
.max_queue_time
= cfg
.getint(client_cs
, 'max_queue_time')
37 c
.max_batch_up
= cfg
.getint(client_cs
, 'max_batch_up')
38 c
.http_timeout
= cfg
.getint(client_cs
, 'http_timeout')
39 c
.http_retry
= cfg
.getint(client_cs
, 'http_retry')
41 process_cfg_ipif(client_cs
,
51 queue
= PacketQueue('up', c
.max_queue_time
)
52 agent
= twisted
.web
.client
.Agent(reactor
, connectTimeout
= c
.http_timeout
)
54 def outbound(packet
, saddr
, daddr
):
55 #print('OUT ', saddr, daddr, repr(packet))
59 class ResponseConsumer(twisted
.internet
.protocol
.Protocol
):
61 self
._ssd
= SlipStreamDecoder(queue_inbound
)
62 def dataReceived(self
, data
):
63 try: self
._ssd
.inputdata(mime_translate(data
))
64 except Exception as e
: asyncfailure(e
)
65 def connectionMade(self
): pass
66 def connectionLost(self
, reason
):
67 if isinstance(reason
, twisted
.internet
.error
.ConnectionDone
):
68 try: self
._ssd
.flush()
69 except Exception as e
: asyncfailure(e
)
74 resp
.deliverBody(ResponseConsumer())
78 print(err
, file=sys
.stderr
)
79 reactor
.callLater(c
.http_retry
, req_fin
)
86 def asyncfailure(reason
):
95 if outstanding
>= c
.max_outstanding
: break
96 if not queue
.nonempty() and outstanding
>= c
.target_outstanding
: break
99 def moredata(s
): nonlocal d
; d
+= s
100 queue
.process((lambda: len(d
)),
106 mime
= (b
'--b' + crlf
+
107 b
'Content-Disposition: form-data; name="m"' + crlf
+ crlf
+
109 str(c
.client
) .encode('ascii') + lf
+
110 str(c
.target_outstanding
) .encode('ascii') + crlf
+
113 b
'Content-Disposition: form-data; name="d"' + crlf
+ crlf
+
114 mime_translate(d
) + crlf
115 ) if len(d
) else b
'') +
118 df
= open('data.dump.dbg', mode
='wb')
121 # POST -use -c 'multipart/form-data; boundary="b"' http://localhost:8099/ <data.dump.dbg
123 log_debug(DBG
.HTTP_FULL
, 'requesting: ' + str(mime
))
125 hh
= { 'User-Agent': ['hippotat'],
126 'Content-Type': ['multipart/form-data; boundary="b"'] }
127 req
= agent
.request(b
'POST',
129 twisted
.web
.client
.Headers(hh
))
130 req
.addTimeout(c
.http_timeout
, reactor
)
131 req
.addCallbacks(req_ok
, req_err
)
137 start_ipif(c
.ipif_command
, outbound
)