6 import twisted
.web
.client
12 def set_client(ci
,cs
,pw
):
15 assert(client_cs
is None)
18 c
.max_outstanding
= cfg
.getint(cs
, 'max_requests_outstanding')
19 c
.target_outstanding
= cfg
.getint(cs
, 'target_requests_outstanding')
24 global max_requests_outstanding
26 process_cfg_common_always()
30 c
.url
= cfg
.get('server','url')
33 c
.url
= c
.saddrs
[0].url()
35 process_cfg_clients(set_client
)
37 c
.routes
= cfg
.get('virtual','routes')
38 c
.max_queue_time
= cfg
.getint(client_cs
, 'max_queue_time')
39 c
.max_batch_up
= cfg
.getint(client_cs
, 'max_batch_up')
40 c
.http_retry
= cfg
.getint(client_cs
, 'http_retry')
41 c
.http_timeout
= (cfg
.getint(client_cs
, 'http_timeout') +
42 cfg
.getint(client_cs
, 'http_timeout_grace'))
44 process_cfg_ipif(client_cs
,
51 def log_outstanding():
52 log_debug(DBG
.CTRL_DUMP
, 'OS %s' % outstanding
)
57 queue
= PacketQueue('up', c
.max_queue_time
)
58 agent
= twisted
.web
.client
.Agent(reactor
, connectTimeout
= c
.http_timeout
)
60 def outbound(packet
, saddr
, daddr
):
61 #print('OUT ', saddr, daddr, repr(packet))
65 class GeneralResponseConsumer(twisted
.internet
.protocol
.Protocol
):
66 def __init__(self
, req
, desc
):
70 def _log(self
, dflag
, msg
, **kwargs
):
71 log_debug(dflag
, '%s: %s' %
(self
._desc
, msg
), idof
=self
._req
, **kwargs
)
73 def connectionMade(self
):
74 self
._log(DBG
.HTTP_CTRL
, 'connectionMade')
76 class ResponseConsumer(GeneralResponseConsumer
):
77 def __init__(self
, req
):
78 super().__init__(req
, 'RC')
79 ssddesc
= '[%s] %s' %
(id(req
), self
._desc
)
80 self
._ssd
= SlipStreamDecoder(ssddesc
, queue_inbound
)
81 self
._log(DBG
.HTTP_CTRL
, '__init__')
83 def dataReceived(self
, data
):
84 self
._log(DBG
.HTTP
, 'dataReceived', d
=data
)
86 self
._ssd
.inputdata(data
)
87 except Exception as e
:
88 self
._handleexception()
90 def connectionLost(self
, reason
):
91 self
._log(DBG
.HTTP_CTRL
, 'connectionLost ' + str(reason
))
92 if not reason
.check(twisted
.web
.client
.ResponseDone
):
96 self
._log(DBG
.HTTP
, 'ResponseDone')
99 except Exception as e
:
100 self
._handleexception()
102 def _handleexception(self
):
103 self
._latefailure(traceback
.format_exc())
105 def _latefailure(self
, reason
):
106 self
._log(DBG
.HTTP_CTRL
, '_latefailure ' + str(reason
))
107 req_err(self
._req
, reason
)
109 class ErrorResponseConsumer(twisted
.internet
.protocol
.Protocol
):
110 def __init__(self
, req
, resp
):
111 super().__init__(req
, 'ERROR-RC')
115 self
._phrase
= resp
.phrase
.decode('utf-8')
117 self
._phrase
= repr(resp
.phrase
)
118 self
._log(DBG
.HTTP_CTRL
, '__init__ %d %s' %
(resp
.code
, self
._phrase
))
120 def dataReceived(self
, data
):
121 self
._log(DBG
.HTTP_CTRL
, 'dataReceived ' + repr(data
))
124 def connectionLost(self
, reason
):
126 mbody
= self
._m
.decode('utf-8')
128 mbody
= repr(self
._m
)
129 if not reason
.check(twisted
.web
.client
.ResponseDone
):
130 mbody
+= ' || ' + str(reason
)
133 %
(self
._resp
.code
, self
._phrase
, mbody
))
135 def req_ok(req
, resp
):
136 log_debug(DBG
.HTTP_CTRL
,
137 'req_ok %d %s %s' %
(resp
.code
, repr(resp
.phrase
), str(resp
)),
140 rc
= ResponseConsumer(req
)
142 rc
= ErrorResponseConsumer(req
, resp
)
145 # now rc is responsible for calling req_fin
147 def req_err(req
, err
):
148 # called when the Deferred fails, or (if it completes),
149 # later, by ResponsConsumer or ErrorResponsConsumer
151 log_debug(DBG
.HTTP_CTRL
, 'req_err ' + str(err
), idof
=req
)
152 if isinstance(err
, twisted
.python
.failure
.Failure
):
153 err
= err
.getTraceback()
154 print('[%#x] %s' %
(id(req
), err
), file=sys
.stderr
)
155 if not isinstance(outstanding
[req
], int):
156 raise RuntimeError('[%#x] previously %s' %
(id(req
), outstanding
[req
]))
157 outstanding
[req
] = err
159 reactor
.callLater(c
.http_retry
, partial(req_fin
, req
))
160 except Exception as e
:
161 crash(traceback
.format_exc() + '\n----- handling -----\n' + err
)
165 log_debug(DBG
.HTTP_CTRL
, 'req_fin OS=%d' %
len(outstanding
), idof
=req
)
169 def __init__(self
, req
):
172 req_err(self
._req
, err
)
174 def check_outbound():
178 if len(outstanding
) >= c
.max_outstanding
: break
179 if not queue
.nonempty() and len(outstanding
) >= c
.target_outstanding
: break
182 def moredata(s
): nonlocal d
; d
+= s
183 queue
.process((lambda: len(d
)),
187 d
= mime_translate(d
)
191 mime
= (b
'--b' + crlf
+
192 b
'Content-Type: text/plain; charset="utf-8"' + crlf
+
193 b
'Content-Disposition: form-data; name="m"' + crlf
+ crlf
+
194 str(c
.client
) .encode('ascii') + crlf
+
196 str(c
.target_outstanding
) .encode('ascii') + crlf
+
197 str(c
.http_timeout
) .encode('ascii') + crlf
+
200 b
'Content-Type: application/octet-stream' + crlf
+
201 b
'Content-Disposition: form-data; name="d"' + crlf
+ crlf
+
203 ) if len(d
) else b
'') +
206 #df = open('data.dump.dbg', mode='wb')
209 # POST -use -c 'multipart/form-data; boundary="b"' http://localhost:8099/ <data.dump.dbg
211 log_debug(DBG
.HTTP_FULL
, 'requesting: ' + str(mime
))
213 hh
= { 'User-Agent': ['hippotat'],
214 'Content-Type': ['multipart/form-data; boundary="b"'],
215 'Content-Length': [str(len(mime
))] }
217 bytesreader
= io
.BytesIO(mime
)
218 producer
= twisted
.web
.client
.FileBodyProducer(bytesreader
)
220 req
= agent
.request(b
'POST',
222 twisted
.web
.client
.Headers(hh
),
225 outstanding
[req
] = len(d
)
226 log_debug(DBG
.HTTP_CTRL
, 'request OS=%d' %
len(outstanding
), idof
=req
, d
=d
)
227 req
.addTimeout(c
.http_timeout
, reactor
)
228 req
.addCallback(partial(req_ok
, req
))
229 req
.addErrback(partial(req_err
, req
))
236 start_ipif(c
.ipif_command
, outbound
)