Commit | Line | Data |
---|---|---|
c55f394e IJ |
1 | #!/usr/bin/python3 |
2 | ||
3 | from hippotat import * | |
4 | ||
c0c90673 IJ |
5 | import twisted.web |
6 | import twisted.web.client | |
7 | ||
dd6665ee IJ |
8 | import io |
9 | ||
034284c3 | 10 | client_cs = None |
88487243 IJ |
11 | |
12 | def set_client(ci,cs,pw): | |
034284c3 | 13 | global client_cs |
88487243 | 14 | global password |
034284c3 IJ |
15 | assert(client_cs is None) |
16 | client_cs = cs | |
17 | c.client = ci | |
88487243 | 18 | c.max_outstanding = cfg.getint(cs, 'max_requests_outstanding') |
7b07f0b5 | 19 | c.target_outstanding = cfg.getint(cs, 'target_requests_outstanding') |
88487243 IJ |
20 | password = pw |
21 | ||
87a7c0c7 IJ |
22 | def process_cfg(): |
23 | global url | |
24 | global max_requests_outstanding | |
c55f394e | 25 | |
87a7c0c7 | 26 | process_cfg_common_always() |
88487243 IJ |
27 | process_cfg_server() |
28 | ||
29 | try: | |
30 | c.url = cfg.get('server','url') | |
31 | except NoOptionError: | |
32 | process_cfg_saddrs() | |
84e763c7 | 33 | c.url = c.saddrs[0].url() |
88487243 IJ |
34 | |
35 | process_cfg_clients(set_client) | |
87a7c0c7 | 36 | |
ca732796 | 37 | c.routes = cfg.get('virtual','routes') |
7b07f0b5 IJ |
38 | c.max_queue_time = cfg.getint(client_cs, 'max_queue_time') |
39 | c.max_batch_up = cfg.getint(client_cs, 'max_batch_up') | |
ff613365 | 40 | c.http_timeout = cfg.getint(client_cs, 'http_timeout') |
4edf77a3 | 41 | c.http_retry = cfg.getint(client_cs, 'http_retry') |
034284c3 IJ |
42 | |
43 | process_cfg_ipif(client_cs, | |
44 | (('local', 'client'), | |
45 | ('peer', 'server'), | |
46 | ('rnets', 'routes'))) | |
47 | ||
ca732796 IJ |
48 | outstanding = 0 |
49 | ||
50 | def start_client(): | |
51 | global queue | |
7b07f0b5 | 52 | global agent |
297b3ebf | 53 | queue = PacketQueue('up', c.max_queue_time) |
7b07f0b5 | 54 | agent = twisted.web.client.Agent(reactor, connectTimeout = c.http_timeout) |
ca732796 | 55 | |
034284c3 | 56 | def outbound(packet, saddr, daddr): |
ca732796 IJ |
57 | #print('OUT ', saddr, daddr, repr(packet)) |
58 | queue.append(packet) | |
59 | check_outbound() | |
60 | ||
62b51bcf | 61 | class ResponseConsumer(twisted.internet.protocol.Protocol): |
8b62cd2c | 62 | def __init__(self, req): |
8b62cd2c | 63 | self._req = req |
62b51bcf | 64 | self._ssd = SlipStreamDecoder(queue_inbound) |
14c6d55c IJ |
65 | self._log(DBG.HTTP_CTRL, '__init__') |
66 | ||
67 | def _log(self, dflag, msg, **kwargs): | |
68 | log_debug(dflag, 'RC ' + msg, idof=self._req, **kwargs) | |
bd9e77fb | 69 | |
62b51bcf | 70 | def dataReceived(self, data): |
9b65cdd4 IJ |
71 | self._log(DBG.HTTP_CTRL, 'dataReceived', d=data) |
72 | try: | |
73 | self._ssd.inputdata(mime_translate(data)) | |
74 | except Exception as e: | |
75 | self._asyncfailure(e) | |
ccd371b3 IJ |
76 | |
77 | def connectionMade(self): | |
78 | self._log(DBG.HTTP_CTRL, 'connectionMade') | |
79 | ||
62b51bcf | 80 | def connectionLost(self, reason): |
4bf8a9a0 | 81 | self._log(DBG.HTTP_CTRL, 'connectionLost') |
62b51bcf | 82 | if isinstance(reason, twisted.internet.error.ConnectionDone): |
4edf77a3 | 83 | try: self._ssd.flush() |
082d973e IJ |
84 | except Exception as e: |
85 | self._asyncfailure(e) | |
62b51bcf | 86 | else: |
33932420 IJ |
87 | self._asyncfailure(reason) |
88 | ||
89 | def _asyncfailure(self, reason): | |
90 | global outstanding | |
91 | outstanding += 1 | |
fd87d3f3 | 92 | req_err(self._req, reason) |
bd9e77fb | 93 | |
8b62cd2c IJ |
94 | def req_ok(req, resp): |
95 | rc = ResponseConsumer(req) | |
96 | resp.deliverBody(rc) | |
9aa11043 | 97 | req_fin() |
7b07f0b5 | 98 | |
fd87d3f3 | 99 | def req_err(req, err): |
3a7aaa41 | 100 | log_debug(DBG.HTTP_CTRL, 'req_err ' + str(err), idof=req) |
c0c90673 | 101 | print(err, file=sys.stderr) |
9aa11043 | 102 | reactor.callLater(c.http_retry, req_fin) |
7b07f0b5 | 103 | |
9aa11043 | 104 | def req_fin(*args): |
84e763c7 | 105 | global outstanding |
c0c90673 | 106 | outstanding -= 1 |
4edf77a3 IJ |
107 | check_outbound() |
108 | ||
9aa11043 IJ |
109 | def asyncfailure(reason): |
110 | global outstanding | |
111 | outstanding += 1 | |
fd87d3f3 | 112 | req_err(None, reason) |
9aa11043 | 113 | |
ca732796 | 114 | def check_outbound(): |
84e763c7 | 115 | global outstanding |
4edf77a3 | 116 | |
ca732796 | 117 | while True: |
c0c90673 IJ |
118 | if outstanding >= c.max_outstanding : break |
119 | if not queue.nonempty() and outstanding >= c.target_outstanding: break | |
7b07f0b5 IJ |
120 | |
121 | d = b'' | |
84e763c7 | 122 | def moredata(s): nonlocal d; d += s |
7b07f0b5 | 123 | queue.process((lambda: len(d)), |
c0c90673 | 124 | moredata, |
7b07f0b5 | 125 | c.max_batch_up) |
7b07f0b5 | 126 | |
fc0ba433 IJ |
127 | d = mime_translate(d) |
128 | ||
7b07f0b5 | 129 | crlf = b'\r\n' |
60dc70f9 | 130 | lf = b'\n' |
5e234983 IJ |
131 | mime = (b'--b' + crlf + |
132 | b'Content-Type: text/plain; charset="utf-8"' + crlf + | |
133 | b'Content-Disposition: form-data; name="m"' + crlf + crlf + | |
134 | str(c.client) .encode('ascii') + crlf + | |
135 | password + crlf + | |
136 | str(c.target_outstanding) .encode('ascii') + crlf + | |
60dc70f9 | 137 | (( |
5e234983 IJ |
138 | b'--b' + crlf + |
139 | b'Content-Type: application/octet-stream' + crlf + | |
140 | b'Content-Disposition: form-data; name="d"' + crlf + crlf + | |
fc0ba433 | 141 | d + crlf |
60dc70f9 | 142 | ) if len(d) else b'') + |
5e234983 | 143 | b'--b--' + crlf) |
ca732796 | 144 | |
a518aa4b IJ |
145 | #df = open('data.dump.dbg', mode='wb') |
146 | #df.write(mime) | |
147 | #df.close() | |
534f07df | 148 | # POST -use -c 'multipart/form-data; boundary="b"' http://localhost:8099/ <data.dump.dbg |
60dc70f9 | 149 | |
297b3ebf | 150 | log_debug(DBG.HTTP_FULL, 'requesting: ' + str(mime)) |
4edf77a3 | 151 | |
7b07f0b5 | 152 | hh = { 'User-Agent': ['hippotat'], |
b37c6b53 IJ |
153 | 'Content-Type': ['multipart/form-data; boundary="b"'], |
154 | 'Content-Length': [str(len(mime))] } | |
dd6665ee IJ |
155 | |
156 | bytesreader = io.BytesIO(mime) | |
157 | producer = twisted.web.client.FileBodyProducer(bytesreader) | |
158 | ||
3dbadade | 159 | req = agent.request(b'POST', |
7b07f0b5 | 160 | c.url, |
b37c6b53 IJ |
161 | twisted.web.client.Headers(hh), |
162 | producer) | |
84e763c7 | 163 | req.addTimeout(c.http_timeout, reactor) |
8b62cd2c | 164 | req.addCallback((lambda resp: req_ok(req, resp))) |
fd87d3f3 | 165 | req.addErrback((lambda err: req_err(req, err))) |
c0c90673 | 166 | outstanding += 1 |
034284c3 | 167 | |
1321ad5f | 168 | common_startup() |
87a7c0c7 | 169 | process_cfg() |
7b07f0b5 | 170 | start_client() |
034284c3 | 171 | start_ipif(c.ipif_command, outbound) |
4edf77a3 | 172 | check_outbound() |
034284c3 | 173 | common_run() |