4 signal
.signal(signal
.SIGINT
, signal
.SIG_DFL
)
9 from twisted
.internet
import reactor
10 import twisted
.internet
.endpoints
12 from twisted
.logger
import LogLevel
13 import twisted
.python
.constants
14 from twisted
.python
.constants
import NamedConstant
17 from ipaddress
import AddressValueError
19 from optparse
import OptionParser
20 from configparser
import ConfigParser
21 from configparser
import NoOptionError
23 from functools
import partial
32 import hippotat
.slip
as slip
34 class DBG(twisted
.python
.constants
.Names
):
35 INIT
= NamedConstant()
36 ROUTE
= NamedConstant()
37 DROP
= NamedConstant()
38 FLOW
= NamedConstant()
39 HTTP
= NamedConstant()
40 TWISTED
= NamedConstant()
41 QUEUE
= NamedConstant()
42 HTTP_CTRL
= NamedConstant()
43 QUEUE_CTRL
= NamedConstant()
44 HTTP_FULL
= NamedConstant()
45 CTRL_DUMP
= NamedConstant()
46 SLIP_FULL
= NamedConstant()
48 _hex_codec
= codecs
.getencoder('hex_codec')
50 log
= twisted
.logger
.Logger()
52 debug_set
= set([x
for x
in DBG
.iterconstants() if x
<= DBG
.HTTP
])
54 def log_debug(dflag
, msg
, idof
=None, d
=None):
55 if dflag
not in debug_set
: return
56 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
58 msg
= '[%#x] %s' %
(id(idof
), msg
)
61 d
= _hex_codec(d
)[0].decode('ascii')
63 log
.info('{dflag} {msgcore}', dflag
=dflag
, msgcore
=msg
)
68 max_batch_down = 65536 # used by server, subject to [limits]
69 max_queue_time = 10 # used by server, subject to [limits]
70 target_requests_outstanding = 3 # must match; subject to [limits] on server
71 http_timeout = 30 # used by both } must be
72 http_timeout_grace = 5 # used by both } compatible
73 max_requests_outstanding = 4 # used by client
74 max_batch_up = 4000 # used by client
75 http_retry = 5 # used by client
77 #[server] or [<client>] overrides
78 ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
79 # extra interpolations: %(local)s %(peer)s %(rnet)s
80 # obtained on server [virtual]server [virtual]relay [virtual]network
81 # from on client <client> [virtual]server [virtual]routes
86 # network = <prefix>/<len> # mandatory for server
87 # server = <ipaddr> # used by both, default is computed from `network'
88 # relay = <ipaddr> # used by server, default from `network' and `server'
89 # default server is first host in network
90 # default relay is first host which is not server
93 # addrs = 127.0.0.1 ::1 # mandatory for server
94 port = 80 # used by server
95 # url # used by client; default from first `addrs' and `port'
97 # [<client-ip4-or-ipv6-address>]
98 # password = <password> # used by both, must match
101 max_batch_down = 262144 # used by server
102 max_queue_time = 121 # used by server
103 http_timeout = 121 # used by server
104 target_requests_outstanding = 10 # used by server
107 # these need to be defined here so that they can be imported by import *
109 optparser
= OptionParser()
111 _mimetrans
= bytes
.maketrans(b
'-'+slip
.esc
, slip
.esc
+b
'-')
112 def mime_translate(s
):
113 # SLIP-encoded packets cannot contain ESC ESC.
114 # Swap `-' and ESC. The result cannot contain `--'
115 return s
.translate(_mimetrans
)
118 def __init__(self
, d
= { }):
121 return 'ConfigResults('+repr(self
.__dict__
)+')'
125 def log_discard(packet
, iface
, saddr
, daddr
, why
):
127 'discarded packet [%s] %s -> %s: %s' %
(iface
, saddr
, daddr
, why
),
130 #---------- packet parsing ----------
132 def packet_addrs(packet
):
133 version
= packet
[0] >> 4
137 factory
= ipaddress
.IPv4Address
141 factory
= ipaddress
.IPv6Address
143 raise ValueError('unsupported IP version %d' % version
)
144 saddr
= factory(packet
[ saddroff
: saddroff
+ addrlen
])
145 daddr
= factory(packet
[ saddroff
+ addrlen
: saddroff
+ addrlen
*2 ])
146 return (saddr
, daddr
)
148 #---------- address handling ----------
152 r
= ipaddress
.IPv4Address(input)
153 except AddressValueError
:
154 r
= ipaddress
.IPv6Address(input)
157 def ipnetwork(input):
159 r
= ipaddress
.IPv4Network(input)
160 except NetworkValueError
:
161 r
= ipaddress
.IPv6Network(input)
164 #---------- ipif (SLIP) subprocess ----------
166 class SlipStreamDecoder():
167 def __init__(self
, desc
, on_packet
):
169 self
._on_packet
= on_packet
171 self
._log('__init__')
173 def _log(self
, msg
, **kwargs
):
174 log_debug(DBG
.SLIP_FULL
, 'slip %s: %s' %
(self
._desc
, msg
), **kwargs
)
176 def inputdata(self
, data
):
177 self
._log('inputdata', d
=data
)
178 packets
= slip
.decode(data
)
179 packets
[0] = self
._buffer
+ packets
[0]
180 self
._buffer
= packets
.pop()
181 for packet
in packets
:
182 self
._maybe_packet(packet
)
183 self
._log('bufremain', d
=self
._buffer
)
185 def _maybe_packet(self
, packet
):
186 self
._log('maybepacket', d
=packet
)
188 self
._on_packet(packet
)
192 self
._maybe_packet(self
._buffer
)
195 class _IpifProcessProtocol(twisted
.internet
.protocol
.ProcessProtocol
):
196 def __init__(self
, router
):
197 self
._router
= router
198 self
._decoder
= SlipStreamDecoder('ipif', self
.slip_on_packet
)
199 def connectionMade(self
): pass
200 def outReceived(self
, data
):
201 self
._decoder
.inputdata(data
)
202 def slip_on_packet(self
, packet
):
203 (saddr
, daddr
) = packet_addrs(packet
)
204 if saddr
.is_link_local
or daddr
.is_link_local
:
205 log_discard(packet
, 'ipif', saddr
, daddr
, 'link-local')
207 self
._router(packet
, saddr
, daddr
)
208 def processEnded(self
, status
):
209 status
.raiseException()
211 def start_ipif(command
, router
):
213 ipif
= _IpifProcessProtocol(router
)
214 reactor
.spawnProcess(ipif
,
215 '/bin/sh',['sh','-xc', command
],
216 childFDs
={0:'w', 1:'r', 2:2},
219 def queue_inbound(packet
):
220 log_debug(DBG
.FLOW
, "queue_inbound", d
=packet
)
221 ipif
.transport
.write(slip
.delimiter
)
222 ipif
.transport
.write(slip
.encode(packet
))
223 ipif
.transport
.write(slip
.delimiter
)
225 #---------- packet queue ----------
228 def __init__(self
, desc
, max_queue_time
):
231 self
._max_queue_time
= max_queue_time
232 self
._pq
= collections
.deque() # packets
234 def _log(self
, dflag
, msg
, **kwargs
):
235 log_debug(dflag
, self
._desc
+' pq: '+msg
, **kwargs
)
237 def append(self
, packet
):
238 self
._log(DBG
.QUEUE
, 'append', d
=packet
)
239 self
._pq
.append((time
.monotonic(), packet
))
242 self
._log(DBG
.QUEUE
, 'nonempty ?')
244 try: (queuetime
, packet
) = self
._pq
[0]
246 self
._log(DBG
.QUEUE
, 'nonempty ? empty.')
249 age
= time
.monotonic() - queuetime
250 if age
> self
._max_queue_time
:
251 # strip old packets off the front
252 self
._log(DBG
.QUEUE
, 'dropping (old)', d
=packet
)
256 self
._log(DBG
.QUEUE
, 'nonempty ? nonempty.')
259 def process(self
, sizequery
, moredata
, max_batch
):
260 # sizequery() should return size of batch so far
261 # moredata(s) should add s to batch
262 self
._log(DBG
.QUEUE
, 'process...')
264 try: (dummy
, packet
) = self
._pq
[0]
266 self
._log(DBG
.QUEUE
, 'process... empty')
269 self
._log(DBG
.QUEUE_CTRL
, 'process... packet', d
=packet
)
271 encoded
= slip
.encode(packet
)
274 self
._log(DBG
.QUEUE_CTRL
,
275 'process... (sofar=%d, max=%d) encoded' %
(sofar
, max_batch
),
279 if sofar
+ len(slip
.delimiter
) + len(encoded
) > max_batch
:
280 self
._log(DBG
.QUEUE_CTRL
, 'process... overflow')
282 moredata(slip
.delimiter
)
287 #---------- error handling ----------
294 print('========== CRASH ==========', err
,
295 '===========================', file=sys
.stderr
)
297 except twisted
.internet
.error
.ReactorNotRunning
: pass
299 def crash_on_defer(defer
):
300 defer
.addErrback(lambda err
: crash(err
))
302 def crash_on_critical(event
):
303 if event
.get('log_level') >= LogLevel
.critical
:
304 crash(twisted
.logger
.formatEvent(event
))
306 #---------- config processing ----------
308 def process_cfg_common_always():
310 c
.mtu
= cfg
.get('virtual','mtu')
312 def process_cfg_ipif(section
, varmap
):
314 try: v
= getattr(c
, s
)
315 except AttributeError: continue
320 c
.ipif_command
= cfg
.get(section
,'ipif', vars=c
.__dict__
)
322 def process_cfg_network():
323 c
.network
= ipnetwork(cfg
.get('virtual','network'))
324 if c
.network
.num_addresses
< 3 + 2:
325 raise ValueError('network needs at least 2^3 addresses')
327 def process_cfg_server():
329 c
.server
= cfg
.get('virtual','server')
330 except NoOptionError
:
331 process_cfg_network()
332 c
.server
= next(c
.network
.hosts())
335 def __init__(self
, port
, addrspec
):
339 self
.addr
= ipaddress
.IPv4Address(addrspec
)
340 self
._endpointfactory
= twisted
.internet
.endpoints
.TCP4ServerEndpoint
342 except AddressValueError
:
343 self
.addr
= ipaddress
.IPv6Address(addrspec
)
344 self
._endpointfactory
= twisted
.internet
.endpoints
.TCP6ServerEndpoint
345 self
._inurl
= b
'[%s]'
346 def make_endpoint(self
):
347 return self
._endpointfactory(reactor
, self
.port
, self
.addr
)
349 url
= b
'http://' + (self
._inurl % str
(self
.addr
).encode('ascii'))
350 if self
.port
!= 80: url
+= b
':%d' % self
.port
354 def process_cfg_saddrs():
355 try: port
= cfg
.getint('server','port')
356 except NoOptionError
: port
= 80
359 for addrspec
in cfg
.get('server','addrs').split():
360 sa
= ServerAddr(port
, addrspec
)
363 def process_cfg_clients(constructor
):
365 for cs
in cfg
.sections():
366 if not (':' in cs
or '.' in cs
): continue
368 pw
= cfg
.get(cs
, 'password')
369 pw
= pw
.encode('utf-8')
370 constructor(ci
,cs
,pw
)
372 #---------- startup ----------
374 def common_startup():
375 log_formatter
= twisted
.logger
.formatEventAsClassicLogText
376 stdout_obs
= twisted
.logger
.FileLogObserver(sys
.stdout
, log_formatter
)
377 stderr_obs
= twisted
.logger
.FileLogObserver(sys
.stderr
, log_formatter
)
378 pred
= twisted
.logger
.LogLevelFilterPredicate(LogLevel
.error
)
379 log_observer
= twisted
.logger
.FilteringLogObserver(
380 stderr_obs
, [pred
], stdout_obs
382 twisted
.logger
.globalLogBeginner
.beginLoggingTo(
383 [ log_observer
, crash_on_critical
]
386 optparser
.add_option('-c', '--config', dest
='configfile',
387 default
='/etc/hippotat/config')
388 (opts
, args
) = optparser
.parse_args()
389 if len(args
): optparser
.error('no non-option arguments please')
391 re
= regexp
.compile('#.*')
392 cfg
.read_string(re
.sub('', defcfg
))
393 cfg
.read(opts
.configfile
)
396 log_debug(DBG
.INIT
, 'entering reactor')
397 if not _crashing
: reactor
.run()
398 print('CRASHED (end)', file=sys
.stderr
)