Commit | Line | Data |
---|---|---|
c55f394e IJ |
1 | #!/usr/bin/python3 |
2 | ||
3 | from hippotat import * | |
4 | ||
c0c90673 IJ |
5 | import twisted.web |
6 | import twisted.web.client | |
7 | ||
dd6665ee IJ |
8 | import io |
9 | ||
034284c3 | 10 | client_cs = None |
88487243 IJ |
11 | |
12 | def set_client(ci,cs,pw): | |
034284c3 | 13 | global client_cs |
88487243 | 14 | global password |
034284c3 IJ |
15 | assert(client_cs is None) |
16 | client_cs = cs | |
17 | c.client = ci | |
88487243 | 18 | c.max_outstanding = cfg.getint(cs, 'max_requests_outstanding') |
7b07f0b5 | 19 | c.target_outstanding = cfg.getint(cs, 'target_requests_outstanding') |
88487243 IJ |
20 | password = pw |
21 | ||
87a7c0c7 IJ |
22 | def process_cfg(): |
23 | global url | |
24 | global max_requests_outstanding | |
c55f394e | 25 | |
87a7c0c7 | 26 | process_cfg_common_always() |
88487243 IJ |
27 | process_cfg_server() |
28 | ||
29 | try: | |
30 | c.url = cfg.get('server','url') | |
31 | except NoOptionError: | |
32 | process_cfg_saddrs() | |
84e763c7 | 33 | c.url = c.saddrs[0].url() |
88487243 IJ |
34 | |
35 | process_cfg_clients(set_client) | |
87a7c0c7 | 36 | |
ca732796 | 37 | c.routes = cfg.get('virtual','routes') |
7b07f0b5 IJ |
38 | c.max_queue_time = cfg.getint(client_cs, 'max_queue_time') |
39 | c.max_batch_up = cfg.getint(client_cs, 'max_batch_up') | |
4edf77a3 | 40 | c.http_retry = cfg.getint(client_cs, 'http_retry') |
ba5630fd IJ |
41 | c.http_timeout = (cfg.getint(client_cs, 'http_timeout') + |
42 | cfg.getint(client_cs, 'http_timeout_grace')) | |
034284c3 IJ |
43 | |
44 | process_cfg_ipif(client_cs, | |
45 | (('local', 'client'), | |
46 | ('peer', 'server'), | |
47 | ('rnets', 'routes'))) | |
48 | ||
0accf0d3 IJ |
49 | outstanding = { } |
50 | ||
51 | def log_outstanding(): | |
52 | log_debug(DBG.CTRL_DUMP, 'OS %s' % outstanding) | |
ca732796 IJ |
53 | |
54 | def start_client(): | |
55 | global queue | |
7b07f0b5 | 56 | global agent |
297b3ebf | 57 | queue = PacketQueue('up', c.max_queue_time) |
7b07f0b5 | 58 | agent = twisted.web.client.Agent(reactor, connectTimeout = c.http_timeout) |
ca732796 | 59 | |
034284c3 | 60 | def outbound(packet, saddr, daddr): |
ca732796 IJ |
61 | #print('OUT ', saddr, daddr, repr(packet)) |
62 | queue.append(packet) | |
63 | check_outbound() | |
64 | ||
0accf0d3 IJ |
65 | class GeneralResponseConsumer(twisted.internet.protocol.Protocol): |
66 | def __init__(self, req, desc): | |
8b62cd2c | 67 | self._req = req |
0accf0d3 | 68 | self._desc = desc |
14c6d55c IJ |
69 | |
70 | def _log(self, dflag, msg, **kwargs): | |
0accf0d3 IJ |
71 | log_debug(dflag, '%s: %s' % (self._desc, msg), idof=self._req, **kwargs) |
72 | ||
73 | def connectionMade(self): | |
74 | self._log(DBG.HTTP_CTRL, 'connectionMade') | |
75 | ||
76 | class ResponseConsumer(GeneralResponseConsumer): | |
77 | def __init__(self, req): | |
78 | super().__init__(req, 'RC') | |
79 | ssddesc = '[%s] %s' % (id(req), self._desc) | |
80 | self._ssd = SlipStreamDecoder(ssddesc, queue_inbound) | |
81 | self._log(DBG.HTTP_CTRL, '__init__') | |
bd9e77fb | 82 | |
62b51bcf | 83 | def dataReceived(self, data): |
9b65cdd4 IJ |
84 | self._log(DBG.HTTP_CTRL, 'dataReceived', d=data) |
85 | try: | |
02cdcb52 | 86 | self._ssd.inputdata(data) |
9b65cdd4 | 87 | except Exception as e: |
eedc8b30 | 88 | self._handleexception() |
ccd371b3 | 89 | |
62b51bcf | 90 | def connectionLost(self, reason): |
15407d80 | 91 | self._log(DBG.HTTP_CTRL, 'connectionLost ' + str(reason)) |
765aba55 | 92 | if not reason.check(twisted.web.client.ResponseDone): |
0accf0d3 | 93 | self.latefailure() |
765aba55 IJ |
94 | return |
95 | try: | |
96 | self._ssd.flush() | |
c13ee6e6 | 97 | req_fin(self._req) |
765aba55 | 98 | except Exception as e: |
eedc8b30 IJ |
99 | self._handleexception() |
100 | ||
101 | def _handleexception(self): | |
0accf0d3 | 102 | self._latefailure(traceback.format_exc()) |
33932420 | 103 | |
0accf0d3 | 104 | def _latefailure(self, reason): |
15407d80 | 105 | self._log(DBG.HTTP_CTRL, '_asyncFailure ' + str(reason)) |
fd87d3f3 | 106 | req_err(self._req, reason) |
bd9e77fb | 107 | |
6e4af0a2 IJ |
108 | class ErrorResponseConsumer(twisted.internet.protocol.Protocol): |
109 | def __init__(self, req, resp): | |
0accf0d3 | 110 | super().__init__(req, 'ERROR-RC') |
6e4af0a2 | 111 | self._resp = resp |
0accf0d3 | 112 | self._m = b'' |
6e4af0a2 IJ |
113 | try: |
114 | self._phrase = resp.phrase.decode('utf-8') | |
115 | except Exception: | |
116 | self._phrase = repr(resp.phrase) | |
6e4af0a2 IJ |
117 | self._log(DBG.HTTP_CTRL, '__init__ %d %s' % (resp.code, self._phrase)) |
118 | ||
765aba55 IJ |
119 | def dataReceived(self, data): |
120 | self._log(DBG.HTTP_CTRL, 'dataReceived ' + repr(data)) | |
121 | self._m += data | |
122 | ||
6e4af0a2 IJ |
123 | def connectionLost(self, reason): |
124 | try: | |
125 | mbody = self._m.decode('utf-8') | |
126 | except Exception: | |
127 | mbody = repr(self._m) | |
765aba55 IJ |
128 | if not reason.check(twisted.web.client.ResponseDone): |
129 | mbody += ' || ' + str(reason) | |
130 | req_err(self._req, | |
131 | "FAILED %d %s | %s" | |
132 | % (self._resp.code, self._phrase, mbody)) | |
6e4af0a2 | 133 | |
8b62cd2c | 134 | def req_ok(req, resp): |
5dd3275b IJ |
135 | log_debug(DBG.HTTP_CTRL, |
136 | 'req_ok %d %s %s' % (resp.code, repr(resp.phrase), str(resp)), | |
137 | idof=req) | |
6e4af0a2 IJ |
138 | if resp.code == 200: |
139 | rc = ResponseConsumer(req) | |
140 | else: | |
141 | rc = ErrorResponseConsumer(req, resp) | |
5dd3275b | 142 | |
8b62cd2c | 143 | resp.deliverBody(rc) |
0accf0d3 | 144 | # now rc is responsible for calling req_fin |
7b07f0b5 | 145 | |
fd87d3f3 | 146 | def req_err(req, err): |
0accf0d3 IJ |
147 | # called when the Deferred fails, or (if it completes), |
148 | # later, by ResponsConsumer or ErrorResponsConsumer | |
e8ed0029 IJ |
149 | try: |
150 | log_debug(DBG.HTTP_CTRL, 'req_err ' + str(err), idof=req) | |
151 | if isinstance(err, twisted.python.failure.Failure): | |
152 | err = err.getTraceback() | |
153 | print('[%#x] %s' % (id(req), err), file=sys.stderr) | |
154 | if not isinstance(outstanding[req], int): | |
155 | raise RuntimeError('[%#x] previously %s' % (id(req), outstanding[req])) | |
156 | outstanding[req] = err | |
157 | log_outstanding() | |
c13ee6e6 | 158 | reactor.callLater(c.http_retry, partial(req_fin, req)) |
e8ed0029 IJ |
159 | except Exception as e: |
160 | crash(traceback.format_exc() + '\n----- handling -----\n' + err) | |
7b07f0b5 | 161 | |
60b58030 | 162 | def req_fin(req): |
0accf0d3 IJ |
163 | del outstanding[req] |
164 | log_debug(DBG.HTTP_CTRL, 'req_fin OS=%d' % len(outstanding), idof=req) | |
4edf77a3 IJ |
165 | check_outbound() |
166 | ||
c13ee6e6 IJ |
167 | class Errb: |
168 | def __init__(self, req): | |
169 | self._req = req | |
170 | def call(self, err): | |
171 | req_err(self._req, err) | |
172 | ||
ca732796 | 173 | def check_outbound(): |
84e763c7 | 174 | global outstanding |
4edf77a3 | 175 | |
ca732796 | 176 | while True: |
0accf0d3 IJ |
177 | if len(outstanding) >= c.max_outstanding : break |
178 | if not queue.nonempty() and len(outstanding) >= c.target_outstanding: break | |
7b07f0b5 IJ |
179 | |
180 | d = b'' | |
84e763c7 | 181 | def moredata(s): nonlocal d; d += s |
7b07f0b5 | 182 | queue.process((lambda: len(d)), |
c0c90673 | 183 | moredata, |
7b07f0b5 | 184 | c.max_batch_up) |
7b07f0b5 | 185 | |
fc0ba433 IJ |
186 | d = mime_translate(d) |
187 | ||
7b07f0b5 | 188 | crlf = b'\r\n' |
60dc70f9 | 189 | lf = b'\n' |
5e234983 IJ |
190 | mime = (b'--b' + crlf + |
191 | b'Content-Type: text/plain; charset="utf-8"' + crlf + | |
192 | b'Content-Disposition: form-data; name="m"' + crlf + crlf + | |
193 | str(c.client) .encode('ascii') + crlf + | |
194 | password + crlf + | |
195 | str(c.target_outstanding) .encode('ascii') + crlf + | |
ba5630fd | 196 | str(c.http_timeout) .encode('ascii') + crlf + |
60dc70f9 | 197 | (( |
5e234983 IJ |
198 | b'--b' + crlf + |
199 | b'Content-Type: application/octet-stream' + crlf + | |
200 | b'Content-Disposition: form-data; name="d"' + crlf + crlf + | |
fc0ba433 | 201 | d + crlf |
60dc70f9 | 202 | ) if len(d) else b'') + |
5e234983 | 203 | b'--b--' + crlf) |
ca732796 | 204 | |
a518aa4b IJ |
205 | #df = open('data.dump.dbg', mode='wb') |
206 | #df.write(mime) | |
207 | #df.close() | |
534f07df | 208 | # POST -use -c 'multipart/form-data; boundary="b"' http://localhost:8099/ <data.dump.dbg |
60dc70f9 | 209 | |
297b3ebf | 210 | log_debug(DBG.HTTP_FULL, 'requesting: ' + str(mime)) |
4edf77a3 | 211 | |
7b07f0b5 | 212 | hh = { 'User-Agent': ['hippotat'], |
b37c6b53 IJ |
213 | 'Content-Type': ['multipart/form-data; boundary="b"'], |
214 | 'Content-Length': [str(len(mime))] } | |
dd6665ee IJ |
215 | |
216 | bytesreader = io.BytesIO(mime) | |
217 | producer = twisted.web.client.FileBodyProducer(bytesreader) | |
218 | ||
3dbadade | 219 | req = agent.request(b'POST', |
7b07f0b5 | 220 | c.url, |
b37c6b53 IJ |
221 | twisted.web.client.Headers(hh), |
222 | producer) | |
47191df1 | 223 | |
0accf0d3 IJ |
224 | outstanding[req] = len(d) |
225 | log_debug(DBG.HTTP_CTRL, 'request OS=%d' % len(outstanding), idof=req, d=d) | |
84e763c7 | 226 | req.addTimeout(c.http_timeout, reactor) |
c13ee6e6 IJ |
227 | req.addCallback(partial(req_ok, req)) |
228 | req.addErrback(partial(req_err, req)) | |
034284c3 | 229 | |
0accf0d3 IJ |
230 | log_outstanding() |
231 | ||
1321ad5f | 232 | common_startup() |
87a7c0c7 | 233 | process_cfg() |
7b07f0b5 | 234 | start_client() |
034284c3 | 235 | start_ipif(c.ipif_command, outbound) |
4edf77a3 | 236 | check_outbound() |
034284c3 | 237 | common_run() |