3 # Hippotat - Asinine IP Over HTTP program
4 # ./hippotat - client main program
6 # Copyright 2017 Ian Jackson
10 # This program is free software: you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License as published by
12 # the Free Software Foundation, either version 3 of the License, or
13 # (at your option) any later version.
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 # GNU General Public License for more details.
20 # You should have received a copy of the GNU General Public License
21 # along with this program, in the file GPLv3. If not,
22 # see <http://www.gnu.org/licenses/>.
24 #@ import sys; sys.path.append('@PYBUILD_INSTALL_DIR@')
25 from hippotatlib
import *
28 import twisted
.web
.client
32 class GeneralResponseConsumer(twisted
.internet
.protocol
.Protocol
):
33 def __init__(self
, cl
, req
, resp
, desc
):
39 def _log(self
, dflag
, msg
, **kwargs
):
40 self
._cl
.log(dflag
, '%s: %s' %
(self
._desc
, msg
), idof
=self
._req
, **kwargs
)
42 def connectionMade(self
):
43 self
._log(DBG
.HTTP_CTRL
, 'connectionMade')
45 def connectionLostOK(self
, reason
):
46 return reason
.check(twisted
.web
.client
.ResponseDone
)
48 class ResponseConsumer(GeneralResponseConsumer
):
49 def __init__(self
, cl
, req
, resp
):
50 super().__init__(cl
, req
, resp
, 'RC')
51 ssddesc
= '[%s] %s' %
(id(req
), self
._desc
)
52 self
._ssd
= SlipStreamDecoder(ssddesc
, partial(queue_inbound
, cl
.ipif
))
53 self
._log(DBG
.HTTP_CTRL
, '__init__')
55 def dataReceived(self
, data
):
56 self
._log(DBG
.HTTP
, 'dataReceived', d
=data
)
58 self
._ssd
.inputdata(data
)
59 except Exception as e
:
60 self
._handleexception()
62 def connectionLost(self
, reason
):
63 reason_msg
= 'connectionLost ' + str(reason
)
64 self
._log(DBG
.HTTP_CTRL
, reason_msg
)
65 if not self
.connectionLostOK(reason
):
66 self
._latefailure(reason_msg
)
69 self
._log(DBG
.HTTP
, 'ResponseDone')
71 self
._cl
.req_fin(self
._req
)
72 except Exception as e
:
73 self
._handleexception()
74 self
._cl
.report_running()
76 def _handleexception(self
):
77 self
._latefailure(traceback
.format_exc())
79 def _latefailure(self
, reason
):
80 self
._log(DBG
.HTTP_CTRL
, '_latefailure ' + str(reason
))
81 self
._cl
.req_err(self
._req
, reason
)
83 class ErrorResponseConsumer(GeneralResponseConsumer
):
84 def __init__(self
, cl
, req
, resp
):
85 super().__init__(cl
, req
, resp
, 'ERROR-RC')
88 self
._phrase
= resp
.phrase
.decode('utf-8')
90 self
._phrase
= repr(resp
.phrase
)
91 self
._log(DBG
.HTTP_CTRL
, '__init__ %d %s' %
(resp
.code
, self
._phrase
))
93 def dataReceived(self
, data
):
94 self
._log(DBG
.HTTP_CTRL
, 'dataReceived ' + repr(data
))
97 def connectionLost(self
, reason
):
99 mbody
= self
._m
.decode('utf-8')
101 mbody
= repr(self
._m
)
102 if not self
.connectionLostOK(reason
):
103 mbody
+= ' || ' + str(reason
)
104 self
._cl
.req_err(self
._req
,
106 %
(self
._resp
.code
, self
._phrase
, mbody
))
109 def __init__(cl
, c
,ss
,cs
):
112 cl
.desc
= '[%s %s] ' %
(ss
,cs
)
113 cl
.running_reported
= False
114 cl
.log_info('setting up')
116 def log_info(cl
, msg
):
117 log
.info(cl
.desc
+ msg
, dflag
=False)
119 def report_running(cl
):
120 if not cl
.running_reported
:
121 cl
.log_info('running OK')
122 cl
.running_reported
= True
124 def log(cl
, dflag
, msg
, **kwargs
):
125 log_debug(dflag
, cl
.desc
+ msg
, **kwargs
)
127 def log_outstanding(cl
):
128 cl
.log(DBG
.CTRL_DUMP
, 'OS %s' % cl
.outstanding
)
131 cl
.queue
= PacketQueue('up', cl
.c
.max_queue_time
)
132 cl
.agent
= twisted
.web
.client
.Agent(
133 reactor
, connectTimeout
= cl
.c
.http_timeout
)
135 def outbound(cl
, packet
, saddr
, daddr
):
136 #print('OUT ', saddr, daddr, repr(packet))
137 cl
.queue
.append(packet
)
140 def req_ok(cl
, req
, resp
):
141 cl
.log(DBG
.HTTP_CTRL
,
142 'req_ok %d %s %s' %
(resp
.code
, repr(resp
.phrase
), str(resp
)),
145 rc
= ResponseConsumer(cl
, req
, resp
)
147 rc
= ErrorResponseConsumer(cl
, req
, resp
)
150 # now rc is responsible for calling req_fin
152 def req_err(cl
, req
, err
):
153 # called when the Deferred fails, or (if it completes),
154 # later, by ResponsConsumer or ErrorResponsConsumer
156 cl
.log(DBG
.HTTP_CTRL
, 'req_err ' + str(err
), idof
=req
)
157 cl
.running_reported
= False
158 if isinstance(err
, twisted
.python
.failure
.Failure
):
159 err
= err
.getTraceback()
160 print('%s[%#x] %s' %
(cl
.desc
, id(req
), err
.strip('\n').replace('\n',' / ')),
162 if not isinstance(cl
.outstanding
[req
], int):
163 raise RuntimeError('[%#x] previously %s' %
164 (id(req
), cl
.outstanding
[req
]))
165 cl
.outstanding
[req
] = err
167 reactor
.callLater(cl
.c
.http_retry
, partial(cl
.req_fin
, req
))
168 except Exception as e
:
169 crash(traceback
.format_exc() + '\n----- handling -----\n' + err
)
171 def req_fin(cl
, req
):
172 del cl
.outstanding
[req
]
173 cl
.log(DBG
.HTTP_CTRL
, 'req_fin OS=%d' %
len(cl
.outstanding
), idof
=req
)
176 def check_outbound(cl
):
178 if len(cl
.outstanding
) >= cl
.c
.max_outstanding
:
181 if (not cl
.queue
.nonempty() and
182 len(cl
.outstanding
) >= cl
.c
.target_requests_outstanding
):
186 def moredata(s
): nonlocal d
; d
+= s
187 cl
.queue
.process((lambda: len(d
)),
191 d
= mime_translate(d
)
193 token
= authtoken_make(cl
.c
.secret
)
197 mime
= (b
'--b' + crlf
+
198 b
'Content-Type: text/plain; charset="utf-8"' + crlf
+
199 b
'Content-Disposition: form-data; name="m"' + crlf
+ crlf
+
200 str(cl
.c
.client
) .encode('ascii') + crlf
+
202 str(cl
.c
.target_requests_outstanding
)
203 .encode('ascii') + crlf
+
204 str(cl
.c
.http_timeout
) .encode('ascii') + crlf
+
207 b
'Content-Type: application/octet-stream' + crlf
+
208 b
'Content-Disposition: form-data; name="d"' + crlf
+ crlf
+
210 ) if len(d
) else b
'') +
213 #df = open('data.dump.dbg', mode='wb')
216 # POST -use -c 'multipart/form-data; boundary="b"' http://localhost:8099/ <data.dump.dbg
218 cl
.log(DBG
.HTTP_FULL
, 'requesting: ' + str(mime
))
220 hh
= { 'User-Agent': ['hippotat'],
221 'Content-Type': ['multipart/form-data; boundary="b"'],
222 'Content-Length': [str(len(mime
))] }
224 bytesreader
= io
.BytesIO(mime
)
225 producer
= twisted
.web
.client
.FileBodyProducer(bytesreader
)
227 req
= cl
.agent
.request(b
'POST',
229 twisted
.web
.client
.Headers(hh
),
232 cl
.outstanding
[req
] = len(d
)
233 cl
.log(DBG
.HTTP_CTRL
,
234 'request OS=%d' %
len(cl
.outstanding
),
236 req
.addTimeout(cl
.c
.http_timeout
, reactor
)
237 req
.addCallback(partial(cl
.req_ok
, req
))
238 req
.addErrback(partial(cl
.req_err
, req
))
244 def process_cfg(_opts
, putative_servers
, putative_clients
):
247 for ss
in putative_servers
.values():
248 for (ci
,cs
) in putative_clients
.items():
251 sections
= cfg_process_client_common(c
,ss
,cs
,ci
)
252 if not sections
: continue
254 log_debug_config('processing client [%s %s]' %
(ss
, cs
))
256 def srch(getter
,key
): return cfg_search(getter
,key
,sections
)
258 c
.http_timeout
+= srch(cfg
.getint
, 'http_timeout_grace')
259 c
.max_outstanding
= srch(cfg
.getint
, 'max_requests_outstanding')
260 c
.max_batch_up
= srch(cfg
.getint
, 'max_batch_up')
261 c
.http_retry
= srch(cfg
.getint
, 'http_retry')
262 c
.max_queue_time
= srch(cfg
.getint
, 'max_queue_time')
263 c
.vroutes
= srch(cfg
.get
, 'vroutes')
265 try: c
.ifname
= srch(cfg_get_raw
, 'ifname_client')
266 except NoOptionError
: pass
268 try: c
.url
= srch(cfg
.get
,'url')
269 except NoOptionError
:
270 cfg_process_saddrs(c
, ss
)
271 c
.url
= c
.saddrs
[0].url()
275 cfg_process_vaddr(c
,ss
)
281 ('rnets','vroutes')))
283 clients
.append(Client(c
,ss
,cs
))
285 common_startup(process_cfg
)
289 cl
.ipif
= start_ipif(cl
.c
.ipif_command
, cl
.outbound
)