6080d4d6ae685ba30d22d62ba890a4bda773c316
6 import twisted
.web
.client
10 class GeneralResponseConsumer(twisted
.internet
.protocol
.Protocol
):
11 def __init__(self
, cl
, req
, desc
):
16 def _log(self
, dflag
, msg
, **kwargs
):
17 self
.cl
.log(dflag
, '%s: %s' %
(self
._desc
, msg
), idof
=self
._req
, **kwargs
)
19 def connectionMade(self
):
20 self
._log(DBG
.HTTP_CTRL
, 'connectionMade')
22 class ResponseConsumer(GeneralResponseConsumer
):
23 def __init__(self
, cl
, req
):
24 super().__init__(cl
, req
, 'RC')
25 ssddesc
= '[%s] %s' %
(id(req
), self
._desc
)
26 self
._ssd
= SlipStreamDecoder(ssddesc
, cl
.queue_inbound
)
27 self
._log(DBG
.HTTP_CTRL
, '__init__')
29 def dataReceived(self
, data
):
30 self
._log(DBG
.HTTP
, 'dataReceived', d
=data
)
32 self
._ssd
.inputdata(data
)
33 except Exception as e
:
34 self
._handleexception()
36 def connectionLost(self
, reason
):
37 self
._log(DBG
.HTTP_CTRL
, 'connectionLost ' + str(reason
))
38 if not reason
.check(twisted
.web
.client
.ResponseDone
):
42 self
._log(DBG
.HTTP
, 'ResponseDone')
44 self
.cl
.req_fin(self
._req
)
45 except Exception as e
:
46 self
._handleexception()
48 def _handleexception(self
):
49 self
._latefailure(traceback
.format_exc())
51 def _latefailure(self
, reason
):
52 self
._log(DBG
.HTTP_CTRL
, '_latefailure ' + str(reason
))
53 self
.cl
.req_err(self
._req
, reason
)
55 class ErrorResponseConsumer(twisted
.internet
.protocol
.Protocol
):
56 def __init__(self
, cl
, req
, resp
):
57 super().__init__(cl
, req
, 'ERROR-RC')
61 self
._phrase
= resp
.phrase
.decode('utf-8')
63 self
._phrase
= repr(resp
.phrase
)
64 self
._log(DBG
.HTTP_CTRL
, '__init__ %d %s' %
(resp
.code
, self
._phrase
))
66 def dataReceived(self
, data
):
67 self
._log(DBG
.HTTP_CTRL
, 'dataReceived ' + repr(data
))
70 def connectionLost(self
, reason
):
72 mbody
= self
._m
.decode('utf-8')
75 if not reason
.check(twisted
.web
.client
.ResponseDone
):
76 mbody
+= ' || ' + str(reason
)
77 self
.cl
.req_err(self
._req
,
79 %
(self
._resp
.code
, self
._phrase
, mbody
))
82 def __init__(cl
, c
,ss
,cs
):
85 cl
.desc
= '[%s %s] ' %
(ss
,cs
)
87 def log(cl
, dflag
, msg
, **kwargs
):
88 log_debug(dflag
, cl
.desc
+ msg
, **kwargs
)
90 def log_outstanding(cl
):
91 cl
.log(DBG
.CTRL_DUMP
, 'OS %s' % outstanding
)
94 cl
.queue
= PacketQueue('up', c
.max_queue_time
)
95 cl
.agent
= twisted
.web
.client
.Agent(
96 reactor
, connectTimeout
= c
.http_timeout
)
98 def outbound(cl
, packet
, saddr
, daddr
):
99 #print('OUT ', saddr, daddr, repr(packet))
100 cl
.queue
.append(packet
)
103 def req_ok(cl
, req
, resp
):
104 cl
.log(DBG
.HTTP_CTRL
,
105 'req_ok %d %s %s' %
(resp
.code
, repr(resp
.phrase
), str(resp
)),
108 rc
= ResponseConsumer(cl
, req
)
110 rc
= ErrorResponseConsumer(cl
, req
, resp
)
113 # now rc is responsible for calling req_fin
115 def req_err(cl
, req
, err
):
116 # called when the Deferred fails, or (if it completes),
117 # later, by ResponsConsumer or ErrorResponsConsumer
119 cl
.log(DBG
.HTTP_CTRL
, 'req_err ' + str(err
), idof
=req
)
120 if isinstance(err
, twisted
.python
.failure
.Failure
):
121 err
= err
.getTraceback()
122 print('[%#x] %s' %
(id(req
), err
), file=sys
.stderr
)
123 if not isinstance(outstanding
[req
], int):
124 raise RuntimeError('[%#x] previously %s' %
(id(req
), outstanding
[req
]))
125 cl
.outstanding
[req
] = err
127 reactor
.callLater(c
.http_retry
, partial(cl
.req_fin
, req
))
128 except Exception as e
:
129 crash(traceback
.format_exc() + '\n----- handling -----\n' + err
)
131 def req_fin(cl
, req
):
132 del cl
.outstanding
[req
]
133 cl
.log(DBG
.HTTP_CTRL
, 'req_fin OS=%d' %
len(outstanding
), idof
=req
)
136 def check_outbound(cl
):
138 if len(cl
.outstanding
) >= cl
.c
.max_outstanding
:
141 if (not queue
.nonempty() and
142 len(cl
.outstanding
) >= cl
.c
.target_outstanding
):
146 def moredata(s
): nonlocal d
; d
+= s
147 queue
.process((lambda: len(d
)),
151 d
= mime_translate(d
)
155 mime
= (b
'--b' + crlf
+
156 b
'Content-Type: text/plain; charset="utf-8"' + crlf
+
157 b
'Content-Disposition: form-data; name="m"' + crlf
+ crlf
+
158 str(cl
.c
.client
) .encode('ascii') + crlf
+
159 cl
.c
.password
+ crlf
+
160 str(cl
.c
.target_outstanding
).encode('ascii') + crlf
+
161 str(cl
.c
.http_timeout
) .encode('ascii') + crlf
+
164 b
'Content-Type: application/octet-stream' + crlf
+
165 b
'Content-Disposition: form-data; name="d"' + crlf
+ crlf
+
167 ) if len(d
) else b
'') +
170 #df = open('data.dump.dbg', mode='wb')
173 # POST -use -c 'multipart/form-data; boundary="b"' http://localhost:8099/ <data.dump.dbg
175 cl
.log(DBG
.HTTP_FULL
, 'requesting: ' + str(mime
))
177 hh
= { 'User-Agent': ['hippotat'],
178 'Content-Type': ['multipart/form-data; boundary="b"'],
179 'Content-Length': [str(len(mime
))] }
181 bytesreader
= io
.BytesIO(mime
)
182 producer
= twisted
.web
.client
.FileBodyProducer(bytesreader
)
184 req
= agent
.request(b
'POST',
186 twisted
.web
.client
.Headers(hh
),
189 cl
.outstanding
[req
] = len(d
)
190 cl
.log(DBG
.HTTP_CTRL
,
191 'request OS=%d' %
len(cl
.outstanding
),
193 req
.addTimeout(cl
.c
.http_timeout
, reactor
)
194 req
.addCallback(partial(cl
.req_ok
, req
))
195 req
.addErrback(partial(cl
.req_err
, req
))
201 def process_cfg(putative_servers
, putative_clients
):
204 for ss
in putative_servers
.values():
205 for (ci
,cs
) in putative_clients
.items():
208 sections
= process_cfg_client_common(c
,ss
,cs
,ci
):
209 if not sections
: continue
211 def srch(getter
,key
): return cfg_search(getter
,key
,sections
)
213 c
.http_timeout
+= srch(cfg
.getint
, 'http_timeout_grace')
214 c
.max_outstanding
= srch(cfg
.getint
, 'max_requests_outstanding')
215 c
.max_batch_up
= srch(cfg
.getint
, 'max_batch_up')
216 c
.http_retry
= srch(cfg
.getint
, 'http_retry')
217 c
.vroutes
= srch(cfg
.get
, 'vroutes')
219 process_cfg_common(c
,ss
)
220 try: c
.url
= srch(cfg
.get
,'url')
221 except NoOptionError
:
223 c
.url
= c
.saddrs
[0].url()
229 ('rnets','vroutes')))
231 clients
.append(Client(c
,ss
,cs
))
233 common_startup(process_cfg
)
237 start_ipif(cl
.c
.ipif_command
, cl
.outbound
)