4 signal
.signal(signal
.SIGINT
, signal
.SIG_DFL
)
9 from twisted
.internet
import reactor
10 import twisted
.internet
.endpoints
12 from twisted
.logger
import LogLevel
13 import twisted
.python
.constants
14 from twisted
.python
.constants
import NamedConstant
17 from ipaddress
import AddressValueError
19 from optparse
import OptionParser
20 from configparser
import ConfigParser
21 from configparser
import NoOptionError
29 import hippotat
.slip
as slip
31 class DBG(twisted
.python
.constants
.Names
):
32 ROUTE
= NamedConstant()
33 DROP
= NamedConstant()
34 FLOW
= NamedConstant()
35 HTTP
= NamedConstant()
36 HTTP_CTRL
= NamedConstant()
37 INIT
= NamedConstant()
38 QUEUE
= NamedConstant()
39 QUEUE_CTRL
= NamedConstant()
40 HTTP_FULL
= NamedConstant()
42 _hex_codec
= codecs
.getencoder('hex_codec')
44 log
= twisted
.logger
.Logger()
46 def log_debug(dflag
, msg
, idof
=None, d
=None):
47 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
49 msg
= '[%d] %s' %
(id(idof
), msg
)
52 d
= _hex_codec(d
)[0].decode('ascii')
54 log
.info('{dflag} {msgcore}', dflag
=dflag
, msgcore
=msg
)
59 max_batch_down = 65536 # used by server, subject to [limits]
60 max_queue_time = 10 # used by server, subject to [limits]
61 max_request_time = 54 # used by server, subject to [limits]
62 target_requests_outstanding = 3 # must match; subject to [limits] on server
63 max_requests_outstanding = 4 # used by client
64 max_batch_up = 4000 # used by client
65 http_timeout = 30 # used by client
66 http_retry = 5 # used by client
68 #[server] or [<client>] overrides
69 ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
70 # extra interpolations: %(local)s %(peer)s %(rnet)s
71 # obtained on server [virtual]server [virtual]relay [virtual]network
72 # from on client <client> [virtual]server [virtual]routes
77 # network = <prefix>/<len> # mandatory for server
78 # server = <ipaddr> # used by both, default is computed from `network'
79 # relay = <ipaddr> # used by server, default from `network' and `server'
80 # default server is first host in network
81 # default relay is first host which is not server
84 # addrs = 127.0.0.1 ::1 # mandatory for server
85 port = 80 # used by server
86 # url # used by client; default from first `addrs' and `port'
88 # [<client-ip4-or-ipv6-address>]
89 # password = <password> # used by both, must match
92 max_batch_down = 262144 # used by server
93 max_queue_time = 121 # used by server
94 max_request_time = 121 # used by server
95 target_requests_outstanding = 10 # used by server
98 # these need to be defined here so that they can be imported by import *
100 optparser
= OptionParser()
102 _mimetrans
= bytes
.maketrans(b
'-'+slip
.esc
, slip
.esc
+b
'-')
103 def mime_translate(s
):
104 # SLIP-encoded packets cannot contain ESC ESC.
105 # Swap `-' and ESC. The result cannot contain `--'
106 return s
.translate(_mimetrans
)
109 def __init__(self
, d
= { }):
112 return 'ConfigResults('+repr(self
.__dict__
)+')'
116 def log_discard(packet
, iface
, saddr
, daddr
, why
):
118 'discarded packet [%s] %s -> %s: %s' %
(iface
, saddr
, daddr
, why
),
121 #---------- packet parsing ----------
123 def packet_addrs(packet
):
124 version
= packet
[0] >> 4
128 factory
= ipaddress
.IPv4Address
132 factory
= ipaddress
.IPv6Address
134 raise ValueError('unsupported IP version %d' % version
)
135 saddr
= factory(packet
[ saddroff
: saddroff
+ addrlen
])
136 daddr
= factory(packet
[ saddroff
+ addrlen
: saddroff
+ addrlen
*2 ])
137 return (saddr
, daddr
)
139 #---------- address handling ----------
143 r
= ipaddress
.IPv4Address(input)
144 except AddressValueError
:
145 r
= ipaddress
.IPv6Address(input)
148 def ipnetwork(input):
150 r
= ipaddress
.IPv4Network(input)
151 except NetworkValueError
:
152 r
= ipaddress
.IPv6Network(input)
155 #---------- ipif (SLIP) subprocess ----------
157 class SlipStreamDecoder():
158 def __init__(self
, on_packet
):
159 # we will call packet(<packet>)
161 self
._on_packet
= on_packet
163 def inputdata(self
, data
):
164 #print('SLIP-GOT ', repr(data))
166 packets
= slip
.decode(self
._buffer
)
167 self
._buffer
= packets
.pop()
168 for packet
in packets
:
169 self
._maybe_packet(packet
)
171 def _maybe_packet(self
, packet
):
173 self
._on_packet(packet
)
176 self
._maybe_packet(self
._buffer
)
179 class _IpifProcessProtocol(twisted
.internet
.protocol
.ProcessProtocol
):
180 def __init__(self
, router
):
181 self
._router
= router
182 self
._decoder
= SlipStreamDecoder(self
.slip_on_packet
)
183 def connectionMade(self
): pass
184 def outReceived(self
, data
):
185 self
._decoder
.inputdata(data
)
186 def slip_on_packet(self
, packet
):
187 (saddr
, daddr
) = packet_addrs(packet
)
188 if saddr
.is_link_local
or daddr
.is_link_local
:
189 log_discard(packet
, 'ipif', saddr
, daddr
, 'link-local')
191 self
._router(packet
, saddr
, daddr
)
192 def processEnded(self
, status
):
193 status
.raiseException()
195 def start_ipif(command
, router
):
197 ipif
= _IpifProcessProtocol(router
)
198 reactor
.spawnProcess(ipif
,
199 '/bin/sh',['sh','-xc', command
],
200 childFDs
={0:'w', 1:'r', 2:2},
203 def queue_inbound(packet
):
204 ipif
.transport
.write(slip
.delimiter
)
205 ipif
.transport
.write(slip
.encode(packet
))
206 ipif
.transport
.write(slip
.delimiter
)
208 #---------- packet queue ----------
211 def __init__(self
, desc
, max_queue_time
):
214 self
._max_queue_time
= max_queue_time
215 self
._pq
= collections
.deque() # packets
217 def _log(self
, dflag
, msg
, **kwargs
):
218 log_debug(dflag
, self
._desc
+' pq: '+msg
, **kwargs
)
220 def append(self
, packet
):
221 self
._log(DBG
.QUEUE
, 'append', d
=packet
)
222 self
._pq
.append((time
.monotonic(), packet
))
225 self
._log(DBG
.QUEUE
, 'nonempty ?')
227 try: (queuetime
, packet
) = self
._pq
[0]
229 self
._log(DBG
.QUEUE
, 'nonempty ? empty.')
232 age
= time
.monotonic() - queuetime
233 if age
> self
._max_queue_time
:
234 # strip old packets off the front
235 self
._log(DBG
.QUEUE
, 'dropping (old)', d
=packet
)
239 self
._log(DBG
.QUEUE
, 'nonempty ? nonempty.')
242 def process(self
, sizequery
, moredata
, max_batch
):
243 # sizequery() should return size of batch so far
244 # moredata(s) should add s to batch
245 self
._log(DBG
.QUEUE
, 'process...')
247 try: (dummy
, packet
) = self
._pq
[0]
249 self
._log(DBG
.QUEUE
, 'process... empty')
252 self
._log(DBG
.QUEUE_CTRL
, 'process... packet', d
=packet
)
254 encoded
= slip
.encode(packet
)
257 self
._log(DBG
.QUEUE_CTRL
,
258 'process... (sofar=%d, max=%d) encoded' %
(sofar
, max_batch
),
262 if sofar
+ len(slip
.delimiter
) + len(encoded
) > max_batch
:
263 self
._log(DBG
.QUEUE_CTRL
, 'process... overflow')
265 moredata(slip
.delimiter
)
270 #---------- error handling ----------
277 print('CRASH ', err
, file=sys
.stderr
)
279 except twisted
.internet
.error
.ReactorNotRunning
: pass
281 def crash_on_defer(defer
):
282 defer
.addErrback(lambda err
: crash(err
))
284 def crash_on_critical(event
):
285 if event
.get('log_level') >= LogLevel
.critical
:
286 crash(twisted
.logger
.formatEvent(event
))
288 #---------- config processing ----------
290 def process_cfg_common_always():
292 c
.mtu
= cfg
.get('virtual','mtu')
294 def process_cfg_ipif(section
, varmap
):
296 try: v
= getattr(c
, s
)
297 except AttributeError: continue
302 c
.ipif_command
= cfg
.get(section
,'ipif', vars=c
.__dict__
)
304 def process_cfg_network():
305 c
.network
= ipnetwork(cfg
.get('virtual','network'))
306 if c
.network
.num_addresses
< 3 + 2:
307 raise ValueError('network needs at least 2^3 addresses')
309 def process_cfg_server():
311 c
.server
= cfg
.get('virtual','server')
312 except NoOptionError
:
313 process_cfg_network()
314 c
.server
= next(c
.network
.hosts())
317 def __init__(self
, port
, addrspec
):
321 self
.addr
= ipaddress
.IPv4Address(addrspec
)
322 self
._endpointfactory
= twisted
.internet
.endpoints
.TCP4ServerEndpoint
324 except AddressValueError
:
325 self
.addr
= ipaddress
.IPv6Address(addrspec
)
326 self
._endpointfactory
= twisted
.internet
.endpoints
.TCP6ServerEndpoint
327 self
._inurl
= b
'[%s]'
328 def make_endpoint(self
):
329 return self
._endpointfactory(reactor
, self
.port
, self
.addr
)
331 url
= b
'http://' + (self
._inurl % str
(self
.addr
).encode('ascii'))
332 if self
.port
!= 80: url
+= b
':%d' % self
.port
336 def process_cfg_saddrs():
337 try: port
= cfg
.getint('server','port')
338 except NoOptionError
: port
= 80
341 for addrspec
in cfg
.get('server','addrs').split():
342 sa
= ServerAddr(port
, addrspec
)
345 def process_cfg_clients(constructor
):
347 for cs
in cfg
.sections():
348 if not (':' in cs
or '.' in cs
): continue
350 pw
= cfg
.get(cs
, 'password')
351 pw
= pw
.encode('utf-8')
352 constructor(ci
,cs
,pw
)
354 #---------- startup ----------
356 def common_startup():
357 log_formatter
= twisted
.logger
.formatEventAsClassicLogText
358 log_observer
= twisted
.logger
.FileLogObserver(sys
.stderr
, log_formatter
)
359 twisted
.logger
.globalLogBeginner
.beginLoggingTo(
360 [ log_observer
, crash_on_critical
]
363 optparser
.add_option('-c', '--config', dest
='configfile',
364 default
='/etc/hippotat/config')
365 (opts
, args
) = optparser
.parse_args()
366 if len(args
): optparser
.error('no non-option arguments please')
368 re
= regexp
.compile('#.*')
369 cfg
.read_string(re
.sub('', defcfg
))
370 cfg
.read(opts
.configfile
)
373 log_debug(DBG
.INIT
, 'entering reactor')
374 if not _crashing
: reactor
.run()
375 print('CRASHED (end)', file=sys
.stderr
)