6 import twisted
.web
.client
12 def set_client(ci
,cs
,pw
):
15 assert(client_cs
is None)
18 c
.max_outstanding
= cfg
.getint(cs
, 'max_requests_outstanding')
19 c
.target_outstanding
= cfg
.getint(cs
, 'target_requests_outstanding')
24 global max_requests_outstanding
26 process_cfg_common_always()
30 c
.url
= cfg
.get('server','url')
33 c
.url
= c
.saddrs
[0].url()
35 process_cfg_clients(set_client
)
37 c
.routes
= cfg
.get('virtual','routes')
38 c
.max_queue_time
= cfg
.getint(client_cs
, 'max_queue_time')
39 c
.max_batch_up
= cfg
.getint(client_cs
, 'max_batch_up')
40 c
.http_retry
= cfg
.getint(client_cs
, 'http_retry')
41 c
.http_timeout
= (cfg
.getint(client_cs
, 'http_timeout') +
42 cfg
.getint(client_cs
, 'http_timeout_grace'))
44 process_cfg_ipif(client_cs
,
51 def log_outstanding():
52 log_debug(DBG
.CTRL_DUMP
, 'OS %s' % outstanding
)
57 queue
= PacketQueue('up', c
.max_queue_time
)
58 agent
= twisted
.web
.client
.Agent(reactor
, connectTimeout
= c
.http_timeout
)
60 def outbound(packet
, saddr
, daddr
):
61 #print('OUT ', saddr, daddr, repr(packet))
65 class GeneralResponseConsumer(twisted
.internet
.protocol
.Protocol
):
66 def __init__(self
, req
, desc
):
70 def _log(self
, dflag
, msg
, **kwargs
):
71 log_debug(dflag
, '%s: %s' %
(self
._desc
, msg
), idof
=self
._req
, **kwargs
)
73 def connectionMade(self
):
74 self
._log(DBG
.HTTP_CTRL
, 'connectionMade')
76 class ResponseConsumer(GeneralResponseConsumer
):
77 def __init__(self
, req
):
78 super().__init__(req
, 'RC')
79 ssddesc
= '[%s] %s' %
(id(req
), self
._desc
)
80 self
._ssd
= SlipStreamDecoder(ssddesc
, queue_inbound
)
81 self
._log(DBG
.HTTP_CTRL
, '__init__')
83 def dataReceived(self
, data
):
84 self
._log(DBG
.HTTP_CTRL
, 'dataReceived', d
=data
)
86 self
._ssd
.inputdata(data
)
87 except Exception as e
:
88 self
._handleexception()
90 def connectionLost(self
, reason
):
91 self
._log(DBG
.HTTP_CTRL
, 'connectionLost ' + str(reason
))
92 if not reason
.check(twisted
.web
.client
.ResponseDone
):
98 except Exception as e
:
99 self
._handleexception()
101 def _handleexception(self
):
102 self
._latefailure(traceback
.format_exc())
104 def _latefailure(self
, reason
):
105 self
._log(DBG
.HTTP_CTRL
, '_asyncFailure ' + str(reason
))
106 req_err(self
._req
, reason
)
108 class ErrorResponseConsumer(twisted
.internet
.protocol
.Protocol
):
109 def __init__(self
, req
, resp
):
110 super().__init__(req
, 'ERROR-RC')
114 self
._phrase
= resp
.phrase
.decode('utf-8')
116 self
._phrase
= repr(resp
.phrase
)
117 self
._log(DBG
.HTTP_CTRL
, '__init__ %d %s' %
(resp
.code
, self
._phrase
))
119 def dataReceived(self
, data
):
120 self
._log(DBG
.HTTP_CTRL
, 'dataReceived ' + repr(data
))
123 def connectionLost(self
, reason
):
125 mbody
= self
._m
.decode('utf-8')
127 mbody
= repr(self
._m
)
128 if not reason
.check(twisted
.web
.client
.ResponseDone
):
129 mbody
+= ' || ' + str(reason
)
132 %
(self
._resp
.code
, self
._phrase
, mbody
))
134 def req_ok(req
, resp
):
135 log_debug(DBG
.HTTP_CTRL
,
136 'req_ok %d %s %s' %
(resp
.code
, repr(resp
.phrase
), str(resp
)),
139 rc
= ResponseConsumer(req
)
141 rc
= ErrorResponseConsumer(req
, resp
)
144 # now rc is responsible for calling req_fin
146 def req_err(req
, err
):
147 # called when the Deferred fails, or (if it completes),
148 # later, by ResponsConsumer or ErrorResponsConsumer
150 log_debug(DBG
.HTTP_CTRL
, 'req_err ' + str(err
), idof
=req
)
151 if isinstance(err
, twisted
.python
.failure
.Failure
):
152 err
= err
.getTraceback()
153 print('[%#x] %s' %
(id(req
), err
), file=sys
.stderr
)
154 if not isinstance(outstanding
[req
], int):
155 raise RuntimeError('[%#x] previously %s' %
(id(req
), outstanding
[req
]))
156 outstanding
[req
] = err
158 reactor
.callLater(c
.http_retry
, partial(req_fin
, req
))
159 except Exception as e
:
160 crash(traceback
.format_exc() + '\n----- handling -----\n' + err
)
164 log_debug(DBG
.HTTP_CTRL
, 'req_fin OS=%d' %
len(outstanding
), idof
=req
)
168 def __init__(self
, req
):
171 req_err(self
._req
, err
)
173 def check_outbound():
177 if len(outstanding
) >= c
.max_outstanding
: break
178 if not queue
.nonempty() and len(outstanding
) >= c
.target_outstanding
: break
181 def moredata(s
): nonlocal d
; d
+= s
182 queue
.process((lambda: len(d
)),
186 d
= mime_translate(d
)
190 mime
= (b
'--b' + crlf
+
191 b
'Content-Type: text/plain; charset="utf-8"' + crlf
+
192 b
'Content-Disposition: form-data; name="m"' + crlf
+ crlf
+
193 str(c
.client
) .encode('ascii') + crlf
+
195 str(c
.target_outstanding
) .encode('ascii') + crlf
+
196 str(c
.http_timeout
) .encode('ascii') + crlf
+
199 b
'Content-Type: application/octet-stream' + crlf
+
200 b
'Content-Disposition: form-data; name="d"' + crlf
+ crlf
+
202 ) if len(d
) else b
'') +
205 #df = open('data.dump.dbg', mode='wb')
208 # POST -use -c 'multipart/form-data; boundary="b"' http://localhost:8099/ <data.dump.dbg
210 log_debug(DBG
.HTTP_FULL
, 'requesting: ' + str(mime
))
212 hh
= { 'User-Agent': ['hippotat'],
213 'Content-Type': ['multipart/form-data; boundary="b"'],
214 'Content-Length': [str(len(mime
))] }
216 bytesreader
= io
.BytesIO(mime
)
217 producer
= twisted
.web
.client
.FileBodyProducer(bytesreader
)
219 req
= agent
.request(b
'POST',
221 twisted
.web
.client
.Headers(hh
),
224 outstanding
[req
] = len(d
)
225 log_debug(DBG
.HTTP_CTRL
, 'request OS=%d' %
len(outstanding
), idof
=req
, d
=d
)
226 req
.addTimeout(c
.http_timeout
, reactor
)
227 req
.addCallback(partial(req_ok
, req
))
228 req
.addErrback(partial(req_err
, req
))
235 start_ipif(c
.ipif_command
, outbound
)