6 import twisted
.web
.client
12 def set_client(ci
,cs
,pw
):
15 assert(client_cs
is None)
18 c
.max_outstanding
= cfg
.getint(cs
, 'max_requests_outstanding')
19 c
.target_outstanding
= cfg
.getint(cs
, 'target_requests_outstanding')
24 global max_requests_outstanding
26 process_cfg_common_always()
30 c
.url
= cfg
.get('server','url')
33 c
.url
= c
.saddrs
[0].url()
35 process_cfg_clients(set_client
)
37 c
.routes
= cfg
.get('virtual','routes')
38 c
.max_queue_time
= cfg
.getint(client_cs
, 'max_queue_time')
39 c
.max_batch_up
= cfg
.getint(client_cs
, 'max_batch_up')
40 c
.http_timeout
= cfg
.getint(client_cs
, 'http_timeout')
41 c
.http_retry
= cfg
.getint(client_cs
, 'http_retry')
43 process_cfg_ipif(client_cs
,
53 queue
= PacketQueue('up', c
.max_queue_time
)
54 agent
= twisted
.web
.client
.Agent(reactor
, connectTimeout
= c
.http_timeout
)
56 def outbound(packet
, saddr
, daddr
):
57 #print('OUT ', saddr, daddr, repr(packet))
61 def crashy(): assert(False)
63 class ResponseConsumer(twisted
.internet
.protocol
.Protocol
):
64 def __init__(self
, req
):
66 self
._ssd
= SlipStreamDecoder(crashy
)
67 self
._log(DBG
.HTTP_CTRL
, '__init__')
69 def _log(self
, dflag
, msg
, **kwargs
):
70 log_debug(dflag
, 'RC ' + msg
, idof
=self
._req
, **kwargs
)
72 def dataReceived(self
, data
):
73 self
._log(DBG
.HTTP_CTRL
, 'dataReceived', d
=data
)
75 self
._ssd
.inputdata(mime_translate(data
))
76 except Exception as e
:
77 self
._handleexception()
79 def connectionMade(self
):
80 self
._log(DBG
.HTTP_CTRL
, 'connectionMade')
82 def connectionLost(self
, reason
):
83 self
._log(DBG
.HTTP_CTRL
, 'connectionLost ' + str(reason
))
84 if not reason
.check(twisted
.web
.client
.ResponseDone
):
85 self
._asyncfailure(reason
)
89 except Exception as e
:
90 self
._handleexception()
92 def _handleexception(self
):
93 self
._asyncfailure(traceback
.format_exc())
95 def _asyncfailure(self
, reason
):
96 self
._log(DBG
.HTTP_CTRL
, '_asyncFailure ' + str(reason
))
97 req_err(self
._req
, reason
)
99 class ErrorResponseConsumer(twisted
.internet
.protocol
.Protocol
):
100 def __init__(self
, req
, resp
):
105 self
._phrase
= resp
.phrase
.decode('utf-8')
107 self
._phrase
= repr(resp
.phrase
)
109 self
._log(DBG
.HTTP_CTRL
, '__init__ %d %s' %
(resp
.code
, self
._phrase
))
111 def _log(self
, dflag
, msg
, **kwargs
):
112 log_debug(dflag
,'ERROR-RC '+msg
, idof
=self
._req
, **kwargs
)
114 def connectionMade(self
):
117 def dataReceived(self
, data
):
118 self
._log(DBG
.HTTP_CTRL
, 'dataReceived ' + repr(data
))
121 def connectionLost(self
, reason
):
123 mbody
= self
._m
.decode('utf-8')
125 mbody
= repr(self
._m
)
126 if not reason
.check(twisted
.web
.client
.ResponseDone
):
127 mbody
+= ' || ' + str(reason
)
130 %
(self
._resp
.code
, self
._phrase
, mbody
))
132 def req_ok(req
, resp
):
133 log_debug(DBG
.HTTP_CTRL
,
134 'req_ok %d %s %s' %
(resp
.code
, repr(resp
.phrase
), str(resp
)),
137 rc
= ResponseConsumer(req
)
139 rc
= ErrorResponseConsumer(req
, resp
)
143 def req_err(req
, err
):
144 log_debug(DBG
.HTTP_CTRL
, 'req_err ' + str(err
), idof
=req
)
145 if isinstance(err
, twisted
.python
.failure
.Failure
):
146 err
= err
.getTraceback()
147 print(err
, file=sys
.stderr
)
148 reactor
.callLater(c
.http_retry
, (lambda: req_fin(req
)))
153 log_debug(DBG
.HTTP_CTRL
, 'req_fin OS=%d' % outstanding
, idof
=req
)
156 def check_outbound():
160 if outstanding
>= c
.max_outstanding
: break
161 if not queue
.nonempty() and outstanding
>= c
.target_outstanding
: break
164 def moredata(s
): nonlocal d
; d
+= s
165 queue
.process((lambda: len(d
)),
169 d
= mime_translate(d
)
173 mime
= (b
'--b' + crlf
+
174 b
'Content-Type: text/plain; charset="utf-8"' + crlf
+
175 b
'Content-Disposition: form-data; name="m"' + crlf
+ crlf
+
176 str(c
.client
) .encode('ascii') + crlf
+
178 str(c
.target_outstanding
) .encode('ascii') + crlf
+
181 b
'Content-Type: application/octet-stream' + crlf
+
182 b
'Content-Disposition: form-data; name="d"' + crlf
+ crlf
+
184 ) if len(d
) else b
'') +
187 #df = open('data.dump.dbg', mode='wb')
190 # POST -use -c 'multipart/form-data; boundary="b"' http://localhost:8099/ <data.dump.dbg
192 log_debug(DBG
.HTTP_FULL
, 'requesting: ' + str(mime
))
194 hh
= { 'User-Agent': ['hippotat'],
195 'Content-Type': ['multipart/form-data; boundary="b"'],
196 'Content-Length': [str(len(mime
))] }
198 bytesreader
= io
.BytesIO(mime
)
199 producer
= twisted
.web
.client
.FileBodyProducer(bytesreader
)
201 req
= agent
.request(b
'POST',
203 twisted
.web
.client
.Headers(hh
),
207 log_debug(DBG
.HTTP_CTRL
, 'request OS=%d' % outstanding
, idof
=req
, d
=d
)
208 req
.addTimeout(c
.http_timeout
, reactor
)
209 req
.addCallback((lambda resp
: req_ok(req
, resp
)))
210 req
.addErrback((lambda err
: req_err(req
, err
)))
215 start_ipif(c
.ipif_command
, outbound
)