4 signal
.signal(signal
.SIGINT
, signal
.SIG_DFL
)
9 from twisted
.internet
import reactor
10 import twisted
.internet
.endpoints
12 from twisted
.logger
import LogLevel
13 import twisted
.python
.constants
14 from twisted
.python
.constants
import NamedConstant
17 from ipaddress
import AddressValueError
19 from optparse
import OptionParser
20 from configparser
import ConfigParser
21 from configparser
import NoOptionError
23 from functools
import partial
32 import hippotat
.slip
as slip
34 class DBG(twisted
.python
.constants
.Names
):
35 ROUTE
= NamedConstant()
36 DROP
= NamedConstant()
37 FLOW
= NamedConstant()
38 HTTP
= NamedConstant()
39 HTTP_CTRL
= NamedConstant()
40 INIT
= NamedConstant()
41 QUEUE
= NamedConstant()
42 QUEUE_CTRL
= NamedConstant()
43 HTTP_FULL
= NamedConstant()
44 SLIP_FULL
= NamedConstant()
45 CTRL_DUMP
= NamedConstant()
47 _hex_codec
= codecs
.getencoder('hex_codec')
49 log
= twisted
.logger
.Logger()
51 def log_debug(dflag
, msg
, idof
=None, d
=None):
52 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
54 msg
= '[%#x] %s' %
(id(idof
), msg
)
57 d
= _hex_codec(d
)[0].decode('ascii')
59 log
.info('{dflag} {msgcore}', dflag
=dflag
, msgcore
=msg
)
64 max_batch_down = 65536 # used by server, subject to [limits]
65 max_queue_time = 10 # used by server, subject to [limits]
66 max_request_time = 54 # used by server, subject to [limits]
67 target_requests_outstanding = 3 # must match; subject to [limits] on server
68 max_requests_outstanding = 4 # used by client
69 max_batch_up = 4000 # used by client
70 http_timeout = 30 # used by client
71 http_retry = 5 # used by client
73 #[server] or [<client>] overrides
74 ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
75 # extra interpolations: %(local)s %(peer)s %(rnet)s
76 # obtained on server [virtual]server [virtual]relay [virtual]network
77 # from on client <client> [virtual]server [virtual]routes
82 # network = <prefix>/<len> # mandatory for server
83 # server = <ipaddr> # used by both, default is computed from `network'
84 # relay = <ipaddr> # used by server, default from `network' and `server'
85 # default server is first host in network
86 # default relay is first host which is not server
89 # addrs = 127.0.0.1 ::1 # mandatory for server
90 port = 80 # used by server
91 # url # used by client; default from first `addrs' and `port'
93 # [<client-ip4-or-ipv6-address>]
94 # password = <password> # used by both, must match
97 max_batch_down = 262144 # used by server
98 max_queue_time = 121 # used by server
99 max_request_time = 121 # used by server
100 target_requests_outstanding = 10 # used by server
103 # these need to be defined here so that they can be imported by import *
105 optparser
= OptionParser()
107 _mimetrans
= bytes
.maketrans(b
'-'+slip
.esc
, slip
.esc
+b
'-')
108 def mime_translate(s
):
109 # SLIP-encoded packets cannot contain ESC ESC.
110 # Swap `-' and ESC. The result cannot contain `--'
111 return s
.translate(_mimetrans
)
114 def __init__(self
, d
= { }):
117 return 'ConfigResults('+repr(self
.__dict__
)+')'
121 def log_discard(packet
, iface
, saddr
, daddr
, why
):
123 'discarded packet [%s] %s -> %s: %s' %
(iface
, saddr
, daddr
, why
),
126 #---------- packet parsing ----------
128 def packet_addrs(packet
):
129 version
= packet
[0] >> 4
133 factory
= ipaddress
.IPv4Address
137 factory
= ipaddress
.IPv6Address
139 raise ValueError('unsupported IP version %d' % version
)
140 saddr
= factory(packet
[ saddroff
: saddroff
+ addrlen
])
141 daddr
= factory(packet
[ saddroff
+ addrlen
: saddroff
+ addrlen
*2 ])
142 return (saddr
, daddr
)
144 #---------- address handling ----------
148 r
= ipaddress
.IPv4Address(input)
149 except AddressValueError
:
150 r
= ipaddress
.IPv6Address(input)
153 def ipnetwork(input):
155 r
= ipaddress
.IPv4Network(input)
156 except NetworkValueError
:
157 r
= ipaddress
.IPv6Network(input)
160 #---------- ipif (SLIP) subprocess ----------
162 class SlipStreamDecoder():
163 def __init__(self
, desc
, on_packet
):
165 self
._on_packet
= on_packet
167 self
._log('__init__')
169 def _log(self
, msg
, **kwargs
):
170 log_debug(DBG
.SLIP_FULL
, 'slip %s: %s' %
(self
._desc
, msg
), **kwargs
)
172 def inputdata(self
, data
):
173 self
._log('inputdata', d
=data
)
174 data
= self
._buffer
+ data
176 packets
= slip
.decode(data
)
177 self
._buffer
= packets
.pop()
178 for packet
in packets
:
179 self
._maybe_packet(packet
)
180 self
._log('bufremain', d
=self
._buffer
)
182 def _maybe_packet(self
, packet
):
183 self
._log('maybepacket', d
=packet
)
185 self
._on_packet(packet
)
189 self
._maybe_packet(self
._buffer
)
192 class _IpifProcessProtocol(twisted
.internet
.protocol
.ProcessProtocol
):
193 def __init__(self
, router
):
194 self
._router
= router
195 self
._decoder
= SlipStreamDecoder('ipif', self
.slip_on_packet
)
196 def connectionMade(self
): pass
197 def outReceived(self
, data
):
198 self
._decoder
.inputdata(data
)
199 def slip_on_packet(self
, packet
):
200 (saddr
, daddr
) = packet_addrs(packet
)
201 if saddr
.is_link_local
or daddr
.is_link_local
:
202 log_discard(packet
, 'ipif', saddr
, daddr
, 'link-local')
204 self
._router(packet
, saddr
, daddr
)
205 def processEnded(self
, status
):
206 status
.raiseException()
208 def start_ipif(command
, router
):
210 ipif
= _IpifProcessProtocol(router
)
211 reactor
.spawnProcess(ipif
,
212 '/bin/sh',['sh','-xc', command
],
213 childFDs
={0:'w', 1:'r', 2:2},
216 def queue_inbound(packet
):
217 log_debug(DBG
.FLOW
, "queue_inbound", d
=packet
)
218 ipif
.transport
.write(slip
.delimiter
)
219 ipif
.transport
.write(slip
.encode(packet
))
220 ipif
.transport
.write(slip
.delimiter
)
222 #---------- packet queue ----------
225 def __init__(self
, desc
, max_queue_time
):
228 self
._max_queue_time
= max_queue_time
229 self
._pq
= collections
.deque() # packets
231 def _log(self
, dflag
, msg
, **kwargs
):
232 log_debug(dflag
, self
._desc
+' pq: '+msg
, **kwargs
)
234 def append(self
, packet
):
235 self
._log(DBG
.QUEUE
, 'append', d
=packet
)
236 self
._pq
.append((time
.monotonic(), packet
))
239 self
._log(DBG
.QUEUE
, 'nonempty ?')
241 try: (queuetime
, packet
) = self
._pq
[0]
243 self
._log(DBG
.QUEUE
, 'nonempty ? empty.')
246 age
= time
.monotonic() - queuetime
247 if age
> self
._max_queue_time
:
248 # strip old packets off the front
249 self
._log(DBG
.QUEUE
, 'dropping (old)', d
=packet
)
253 self
._log(DBG
.QUEUE
, 'nonempty ? nonempty.')
256 def process(self
, sizequery
, moredata
, max_batch
):
257 # sizequery() should return size of batch so far
258 # moredata(s) should add s to batch
259 self
._log(DBG
.QUEUE
, 'process...')
261 try: (dummy
, packet
) = self
._pq
[0]
263 self
._log(DBG
.QUEUE
, 'process... empty')
266 self
._log(DBG
.QUEUE_CTRL
, 'process... packet', d
=packet
)
268 encoded
= slip
.encode(packet
)
271 self
._log(DBG
.QUEUE_CTRL
,
272 'process... (sofar=%d, max=%d) encoded' %
(sofar
, max_batch
),
276 if sofar
+ len(slip
.delimiter
) + len(encoded
) > max_batch
:
277 self
._log(DBG
.QUEUE_CTRL
, 'process... overflow')
279 moredata(slip
.delimiter
)
284 #---------- error handling ----------
291 print('========== CRASH ==========', err
,
292 '===========================', file=sys
.stderr
)
294 except twisted
.internet
.error
.ReactorNotRunning
: pass
296 def crash_on_defer(defer
):
297 defer
.addErrback(lambda err
: crash(err
))
299 def crash_on_critical(event
):
300 if event
.get('log_level') >= LogLevel
.critical
:
301 crash(twisted
.logger
.formatEvent(event
))
303 #---------- config processing ----------
305 def process_cfg_common_always():
307 c
.mtu
= cfg
.get('virtual','mtu')
309 def process_cfg_ipif(section
, varmap
):
311 try: v
= getattr(c
, s
)
312 except AttributeError: continue
317 c
.ipif_command
= cfg
.get(section
,'ipif', vars=c
.__dict__
)
319 def process_cfg_network():
320 c
.network
= ipnetwork(cfg
.get('virtual','network'))
321 if c
.network
.num_addresses
< 3 + 2:
322 raise ValueError('network needs at least 2^3 addresses')
324 def process_cfg_server():
326 c
.server
= cfg
.get('virtual','server')
327 except NoOptionError
:
328 process_cfg_network()
329 c
.server
= next(c
.network
.hosts())
332 def __init__(self
, port
, addrspec
):
336 self
.addr
= ipaddress
.IPv4Address(addrspec
)
337 self
._endpointfactory
= twisted
.internet
.endpoints
.TCP4ServerEndpoint
339 except AddressValueError
:
340 self
.addr
= ipaddress
.IPv6Address(addrspec
)
341 self
._endpointfactory
= twisted
.internet
.endpoints
.TCP6ServerEndpoint
342 self
._inurl
= b
'[%s]'
343 def make_endpoint(self
):
344 return self
._endpointfactory(reactor
, self
.port
, self
.addr
)
346 url
= b
'http://' + (self
._inurl % str
(self
.addr
).encode('ascii'))
347 if self
.port
!= 80: url
+= b
':%d' % self
.port
351 def process_cfg_saddrs():
352 try: port
= cfg
.getint('server','port')
353 except NoOptionError
: port
= 80
356 for addrspec
in cfg
.get('server','addrs').split():
357 sa
= ServerAddr(port
, addrspec
)
360 def process_cfg_clients(constructor
):
362 for cs
in cfg
.sections():
363 if not (':' in cs
or '.' in cs
): continue
365 pw
= cfg
.get(cs
, 'password')
366 pw
= pw
.encode('utf-8')
367 constructor(ci
,cs
,pw
)
369 #---------- startup ----------
371 def common_startup():
372 log_formatter
= twisted
.logger
.formatEventAsClassicLogText
373 log_observer
= twisted
.logger
.FileLogObserver(sys
.stderr
, log_formatter
)
374 twisted
.logger
.globalLogBeginner
.beginLoggingTo(
375 [ log_observer
, crash_on_critical
]
378 optparser
.add_option('-c', '--config', dest
='configfile',
379 default
='/etc/hippotat/config')
380 (opts
, args
) = optparser
.parse_args()
381 if len(args
): optparser
.error('no non-option arguments please')
383 re
= regexp
.compile('#.*')
384 cfg
.read_string(re
.sub('', defcfg
))
385 cfg
.read(opts
.configfile
)
388 log_debug(DBG
.INIT
, 'entering reactor')
389 if not _crashing
: reactor
.run()
390 print('CRASHED (end)', file=sys
.stderr
)