4 signal
.signal(signal
.SIGINT
, signal
.SIG_DFL
)
9 from zope
.interface
import implementer
12 from twisted
.internet
import reactor
13 import twisted
.internet
.endpoints
15 from twisted
.logger
import LogLevel
16 import twisted
.python
.constants
17 from twisted
.python
.constants
import NamedConstant
20 from ipaddress
import AddressValueError
22 from optparse
import OptionParser
24 from configparser
import ConfigParser
25 from configparser
import NoOptionError
27 from functools
import partial
36 import hippotat
.slip
as slip
38 class DBG(twisted
.python
.constants
.Names
):
39 INIT
= NamedConstant()
40 CONFIG
= NamedConstant()
41 ROUTE
= NamedConstant()
42 DROP
= NamedConstant()
43 FLOW
= NamedConstant()
44 HTTP
= NamedConstant()
45 TWISTED
= NamedConstant()
46 QUEUE
= NamedConstant()
47 HTTP_CTRL
= NamedConstant()
48 QUEUE_CTRL
= NamedConstant()
49 HTTP_FULL
= NamedConstant()
50 CTRL_DUMP
= NamedConstant()
51 SLIP_FULL
= NamedConstant()
52 DATA_COMPLETE
= NamedConstant()
54 _hex_codec
= codecs
.getencoder('hex_codec')
56 #---------- logging ----------
58 org_stderr
= sys
.stderr
60 log
= twisted
.logger
.Logger()
63 debug_def_detail
= DBG
.HTTP
65 def log_debug(dflag
, msg
, idof
=None, d
=None):
66 if dflag
not in debug_set
: return
67 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
69 msg
= '[%#x] %s' %
(id(idof
), msg
)
72 if not DBG
.DATA_COMPLETE
in debug_set
:
76 d
= _hex_codec(d
)[0].decode('ascii')
77 msg
+= ' ' + d
+ trunc
78 log
.info('{dflag} {msgcore}', dflag
=dflag
, msgcore
=msg
)
80 @implementer(twisted
.logger
.ILogFilterPredicate
)
81 class LogNotBoringTwisted
:
82 def __call__(self
, event
):
83 yes
= twisted
.logger
.PredicateResult
.yes
84 no
= twisted
.logger
.PredicateResult
.no
86 if event
.get('log_level') != LogLevel
.info
:
88 dflag
= event
.get('dflag')
89 if dflag
is False : return yes
90 if dflag
in debug_set
: return yes
91 if dflag
is None and DBG
.TWISTED
in debug_set
: return yes
94 print(traceback
.format_exc(), file=org_stderr
)
97 #---------- default config ----------
101 max_batch_down = 65536
103 target_requests_outstanding = 3
105 http_timeout_grace = 5
106 max_requests_outstanding = 6
112 #[server] or [<client>] overrides
113 ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
115 # relating to virtual network
120 # addrs = 127.0.0.1 ::1
123 # relating to virtual network
124 vvnetwork = 172.24.230.192
125 # vnetwork = <prefix>/<len>
130 # [<client-ip4-or-ipv6-address>]
131 # password = <password> # used by both, must match
134 max_batch_down = 262144
137 target_requests_outstanding = 10
140 # these need to be defined here so that they can be imported by import *
141 cfg
= ConfigParser(strict
=False)
142 optparser
= OptionParser()
144 _mimetrans
= bytes
.maketrans(b
'-'+slip
.esc
, slip
.esc
+b
'-')
145 def mime_translate(s
):
146 # SLIP-encoded packets cannot contain ESC ESC.
147 # Swap `-' and ESC. The result cannot contain `--'
148 return s
.translate(_mimetrans
)
154 return 'ConfigResults('+repr(self
.__dict__
)+')'
156 def log_discard(packet
, iface
, saddr
, daddr
, why
):
158 'discarded packet [%s] %s -> %s: %s' %
(iface
, saddr
, daddr
, why
),
161 #---------- packet parsing ----------
163 def packet_addrs(packet
):
164 version
= packet
[0] >> 4
168 factory
= ipaddress
.IPv4Address
172 factory
= ipaddress
.IPv6Address
174 raise ValueError('unsupported IP version %d' % version
)
175 saddr
= factory(packet
[ saddroff
: saddroff
+ addrlen
])
176 daddr
= factory(packet
[ saddroff
+ addrlen
: saddroff
+ addrlen
*2 ])
177 return (saddr
, daddr
)
179 #---------- address handling ----------
183 r
= ipaddress
.IPv4Address(input)
184 except AddressValueError
:
185 r
= ipaddress
.IPv6Address(input)
188 def ipnetwork(input):
190 r
= ipaddress
.IPv4Network(input)
191 except NetworkValueError
:
192 r
= ipaddress
.IPv6Network(input)
195 #---------- ipif (SLIP) subprocess ----------
197 class SlipStreamDecoder():
198 def __init__(self
, desc
, on_packet
):
200 self
._on_packet
= on_packet
202 self
._log('__init__')
204 def _log(self
, msg
, **kwargs
):
205 log_debug(DBG
.SLIP_FULL
, 'slip %s: %s' %
(self
._desc
, msg
), **kwargs
)
207 def inputdata(self
, data
):
208 self
._log('inputdata', d
=data
)
209 packets
= slip
.decode(data
)
210 packets
[0] = self
._buffer
+ packets
[0]
211 self
._buffer
= packets
.pop()
212 for packet
in packets
:
213 self
._maybe_packet(packet
)
214 self
._log('bufremain', d
=self
._buffer
)
216 def _maybe_packet(self
, packet
):
217 self
._log('maybepacket', d
=packet
)
219 self
._on_packet(packet
)
223 self
._maybe_packet(self
._buffer
)
226 class _IpifProcessProtocol(twisted
.internet
.protocol
.ProcessProtocol
):
227 def __init__(self
, router
):
228 self
._router
= router
229 self
._decoder
= SlipStreamDecoder('ipif', self
.slip_on_packet
)
230 def connectionMade(self
): pass
231 def outReceived(self
, data
):
232 self
._decoder
.inputdata(data
)
233 def slip_on_packet(self
, packet
):
234 (saddr
, daddr
) = packet_addrs(packet
)
235 if saddr
.is_link_local
or daddr
.is_link_local
:
236 log_discard(packet
, 'ipif', saddr
, daddr
, 'link-local')
238 self
._router(packet
, saddr
, daddr
)
239 def processEnded(self
, status
):
240 status
.raiseException()
242 def start_ipif(command
, router
):
244 ipif
= _IpifProcessProtocol(router
)
245 reactor
.spawnProcess(ipif
,
246 '/bin/sh',['sh','-xc', command
],
247 childFDs
={0:'w', 1:'r', 2:2},
250 def queue_inbound(packet
):
251 log_debug(DBG
.FLOW
, "queue_inbound", d
=packet
)
252 ipif
.transport
.write(slip
.delimiter
)
253 ipif
.transport
.write(slip
.encode(packet
))
254 ipif
.transport
.write(slip
.delimiter
)
256 #---------- packet queue ----------
259 def __init__(self
, desc
, max_queue_time
):
262 self
._max_queue_time
= max_queue_time
263 self
._pq
= collections
.deque() # packets
265 def _log(self
, dflag
, msg
, **kwargs
):
266 log_debug(dflag
, self
._desc
+' pq: '+msg
, **kwargs
)
268 def append(self
, packet
):
269 self
._log(DBG
.QUEUE
, 'append', d
=packet
)
270 self
._pq
.append((time
.monotonic(), packet
))
273 self
._log(DBG
.QUEUE
, 'nonempty ?')
275 try: (queuetime
, packet
) = self
._pq
[0]
277 self
._log(DBG
.QUEUE
, 'nonempty ? empty.')
280 age
= time
.monotonic() - queuetime
281 if age
> self
._max_queue_time
:
282 # strip old packets off the front
283 self
._log(DBG
.QUEUE
, 'dropping (old)', d
=packet
)
287 self
._log(DBG
.QUEUE
, 'nonempty ? nonempty.')
290 def process(self
, sizequery
, moredata
, max_batch
):
291 # sizequery() should return size of batch so far
292 # moredata(s) should add s to batch
293 self
._log(DBG
.QUEUE
, 'process...')
295 try: (dummy
, packet
) = self
._pq
[0]
297 self
._log(DBG
.QUEUE
, 'process... empty')
300 self
._log(DBG
.QUEUE_CTRL
, 'process... packet', d
=packet
)
302 encoded
= slip
.encode(packet
)
305 self
._log(DBG
.QUEUE_CTRL
,
306 'process... (sofar=%d, max=%d) encoded' %
(sofar
, max_batch
),
310 if sofar
+ len(slip
.delimiter
) + len(encoded
) > max_batch
:
311 self
._log(DBG
.QUEUE_CTRL
, 'process... overflow')
313 moredata(slip
.delimiter
)
318 #---------- error handling ----------
325 print('========== CRASH ==========', err
,
326 '===========================', file=sys
.stderr
)
328 except twisted
.internet
.error
.ReactorNotRunning
: pass
330 def crash_on_defer(defer
):
331 defer
.addErrback(lambda err
: crash(err
))
333 def crash_on_critical(event
):
334 if event
.get('log_level') >= LogLevel
.critical
:
335 crash(twisted
.logger
.formatEvent(event
))
337 #---------- config processing ----------
339 def _cfg_process_putatives():
342 # maps from abstract object to canonical name for cs's
344 def putative(cmap
, abstract
, canoncs
):
346 current_canoncs
= cmap
[abstract
]
350 assert(current_canoncs
== canoncs
)
351 cmap
[abstract
] = canoncs
353 server_pat
= r
'[-.0-9A-Za-z]+'
354 client_pat
= r
'[.:0-9a-f]+'
355 server_re
= regexp
.compile(server_pat
)
356 serverclient_re
= regexp
.compile(server_pat
+ r
' ' + client_pat
)
358 for cs
in cfg
.sections():
364 # plan B "[<client>]" part 1
366 except AddressValueError
:
368 if server_re
.fullmatch(cs
):
369 # plan C "[<servername>]"
370 putative(servers
, cs
, cs
)
373 if serverclient_re
.fullmatch(cs
):
374 # plan D "[<servername> <client>]" part 1
375 (pss
,pcs
) = cs
.split(' ')
378 # plan E "[<servername> LIMIT]"
382 # plan D "[<servername> <client>]" part 2
384 except AddressValueError
:
385 # plan F "[<some thing we do not understand>]"
386 # well, we ignore this
387 print('warning: ignoring config section %s' % cs
, file=sys
.stderr
)
390 else: # no AddressValueError
391 # plan D "[<servername> <client]" part 3
392 putative(clients
, ci
, pcs
)
393 putative(servers
, pss
, pss
)
396 else: # no AddressValueError
397 # plan B "[<client>" part 2
398 putative(clients
, ci
, cs
)
401 return (servers
, clients
)
403 def cfg_process_common(ss
):
404 c
.mtu
= cfg
.getint(ss
, 'mtu')
406 def cfg_process_saddrs(c
, ss
):
408 def __init__(self
, port
, addrspec
):
412 self
.addr
= ipaddress
.IPv4Address(addrspec
)
413 self
._endpointfactory
= twisted
.internet
.endpoints
.TCP4ServerEndpoint
415 except AddressValueError
:
416 self
.addr
= ipaddress
.IPv6Address(addrspec
)
417 self
._endpointfactory
= twisted
.internet
.endpoints
.TCP6ServerEndpoint
418 self
._inurl
= b
'[%s]'
419 def make_endpoint(self
):
420 return self
._endpointfactory(reactor
, self
.port
, self
.addr
)
422 url
= b
'http://' + (self
._inurl % str
(self
.addr
).encode('ascii'))
423 if self
.port
!= 80: url
+= b
':%d' % self
.port
427 c
.port
= cfg
.getint(ss
,'port')
429 for addrspec
in cfg
.get(ss
, 'addrs').split():
430 sa
= ServerAddr(c
.port
, addrspec
)
433 def cfg_process_vnetwork(c
, ss
):
434 c
.vnetwork
= ipnetwork(cfg
.get(ss
,'vnetwork'))
435 if c
.vnetwork
.num_addresses
< 3 + 2:
436 raise ValueError('vnetwork needs at least 2^3 addresses')
438 def cfg_process_vaddr(c
, ss
):
440 c
.vaddr
= cfg
.get(ss
,'vaddr')
441 except NoOptionError
:
442 cfg_process_vnetwork(c
, ss
)
443 c
.vaddr
= next(c
.vnetwork
.hosts())
445 def cfg_search_section(key
,sections
):
446 for section
in sections
:
447 if cfg
.has_option(section
, key
):
449 raise NoOptionError(key
, repr(sections
))
451 def cfg_search(getter
,key
,sections
):
452 section
= cfg_search_section(key
,sections
)
453 return getter(section
, key
)
455 def cfg_process_client_limited(cc
,ss
,sections
,key
):
456 val
= cfg_search(cfg
.getint
, key
, sections
)
457 lim
= cfg_search(cfg
.getint
, key
, ['%s LIMIT' % ss
, 'LIMIT'])
458 cc
.__dict__
[key
] = min(val
,lim
)
460 def cfg_process_client_common(cc
,ss
,cs
,ci
):
461 # returns sections to search in, iff password is defined, otherwise None
464 sections
= ['%s %s' %
(ss
,cs
),
469 try: pwsection
= cfg_search_section('password', sections
)
470 except NoOptionError
: return None
472 pw
= cfg
.get(pwsection
, 'password')
473 cc
.password
= pw
.encode('utf-8')
475 cfg_process_client_limited(cc
,ss
,sections
,'target_requests_outstanding')
476 cfg_process_client_limited(cc
,ss
,sections
,'http_timeout')
480 def cfg_process_ipif(c
, sections
, varmap
):
482 try: v
= getattr(c
, s
)
483 except AttributeError: continue
486 #print('CFGIPIF',repr((varmap, sections, c.__dict__)),file=sys.stderr)
488 section
= cfg_search_section('ipif', sections
)
489 c
.ipif_command
= cfg
.get(section
,'ipif', vars=c
.__dict__
)
491 #---------- startup ----------
493 def common_startup(process_cfg
):
494 # calls process_cfg(putative_clients, putative_servers)
496 # ConfigParser hates #-comments after values
497 trailingcomments_re
= regexp
.compile(r
'#.*')
498 cfg
.read_string(trailingcomments_re
.sub('', defcfg
))
501 def readconfig(pathname
, mandatory
=True):
502 def log(m
, p
=pathname
):
503 if not DBG
.CONFIG
in debug_set
: return
504 print('DBG.CONFIG: %s: %s' %
(m
, pathname
))
507 files
= os
.listdir(pathname
)
509 except FileNotFoundError
:
514 except NotADirectoryError
:
521 re
= regexp
.compile('[^-A-Za-z0-9_]')
522 for f
in os
.listdir(cdir
):
523 if re
.search(f
): continue
524 subpath
= pathname
+ '/' + f
527 except FileNotFoundError
:
528 log('entry skipped', subpath
)
531 log('entry read', subpath
)
533 def oc_config(od
,os
, value
, op
):
538 def dfs_less_detailed(dl
):
539 return [df
for df
in DBG
.iterconstants() if df
<= dl
]
541 def ds_default(od
,os
,dl
,op
):
543 debug_set
= set(dfs_less_detailed(debug_def_detail
))
545 def ds_select(od
,os
, spec
, op
):
546 for it
in spec
.split(','):
548 if it
.startswith('-'):
549 mutator
= debug_set
.discard
552 mutator
= debug_set
.add
555 dfs
= DBG
.iterconstants()
559 mapper
= dfs_less_detailed
562 mapper
= lambda x
: [x
]
565 dfspec
= DBG
.lookupByName(it
)
567 optparser
.error('unknown debug flag %s in --debug-select' % it
)
574 optparser
.add_option('-D', '--debug',
577 help='enable default debug (to stdout)',
578 callback
= ds_default
)
580 optparser
.add_option('--debug-select',
583 metavar
='[-]DFLAG[+]|[-]+,...',
585 '''enable (`-': disable) each specified DFLAG;
586 `+': do same for all "more interesting" DFLAGSs;
587 just `+': all DFLAGs.
588 DFLAGS: ''' + ' '.join([df
.name
for df
in DBG
.iterconstants()]),
592 optparser
.add_option('-c', '--config',
595 metavar
='CONFIGFILE',
600 (opts
, args
) = optparser
.parse_args()
601 if len(args
): optparser
.error('no non-option arguments please')
604 readconfig('/etc/hippotat/config', False)
605 readconfig('/etc/hippotat/config.d', False)
608 (pss
, pcs
) = _cfg_process_putatives()
609 process_cfg(pss
, pcs
)
610 except (configparser
.Error
, ValueError):
611 traceback
.print_exc(file=sys
.stderr
)
612 print('\nInvalid configuration, giving up.', file=sys
.stderr
)
615 #print(repr(debug_set), file=sys.stderr)
617 log_formatter
= twisted
.logger
.formatEventAsClassicLogText
618 stdout_obs
= twisted
.logger
.FileLogObserver(sys
.stdout
, log_formatter
)
619 stderr_obs
= twisted
.logger
.FileLogObserver(sys
.stderr
, log_formatter
)
620 pred
= twisted
.logger
.LogLevelFilterPredicate(LogLevel
.error
)
621 stdsomething_obs
= twisted
.logger
.FilteringLogObserver(
622 stderr_obs
, [pred
], stdout_obs
624 log_observer
= twisted
.logger
.FilteringLogObserver(
625 stdsomething_obs
, [LogNotBoringTwisted()]
627 #log_observer = stdsomething_obs
628 twisted
.logger
.globalLogBeginner
.beginLoggingTo(
629 [ log_observer
, crash_on_critical
]
633 log_debug(DBG
.INIT
, 'entering reactor')
634 if not _crashing
: reactor
.run()
635 print('CRASHED (end)', file=sys
.stderr
)