3 # Hippotat - Asinine IP Over HTTP program
4 # hippotatlib/__init__.py - common library code
6 # Copyright 2017 Ian Jackson
10 # This program is free software: you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License as published by
12 # the Free Software Foundation, either version 3 of the License, or
13 # (at your option) any later version.
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 # GNU General Public License for more details.
20 # You should have received a copy of the GNU General Public License
21 # along with this program, in the file GPLv3. If not,
22 # see <http://www.gnu.org/licenses/>.
26 signal
.signal(signal
.SIGINT
, signal
.SIG_DFL
)
31 from zope
.interface
import implementer
34 from twisted
.internet
import reactor
35 import twisted
.internet
.endpoints
37 from twisted
.logger
import LogLevel
38 import twisted
.python
.constants
39 from twisted
.python
.constants
import NamedConstant
42 from ipaddress
import AddressValueError
44 from optparse
import OptionParser
46 from configparser
import ConfigParser
47 from configparser
import NoOptionError
49 from functools
import partial
61 import hippotatlib
.slip
as slip
63 class DBG(twisted
.python
.constants
.Names
):
64 INIT
= NamedConstant()
65 CONFIG
= NamedConstant()
66 ROUTE
= NamedConstant()
67 DROP
= NamedConstant()
68 OWNSOURCE
= NamedConstant()
69 FLOW
= NamedConstant()
70 HTTP
= NamedConstant()
71 TWISTED
= NamedConstant()
72 QUEUE
= NamedConstant()
73 HTTP_CTRL
= NamedConstant()
74 QUEUE_CTRL
= NamedConstant()
75 HTTP_FULL
= NamedConstant()
76 CTRL_DUMP
= NamedConstant()
77 SLIP_FULL
= NamedConstant()
78 DATA_COMPLETE
= NamedConstant()
80 _hex_codec
= codecs
.getencoder('hex_codec')
82 #---------- logging ----------
84 org_stderr
= sys
.stderr
86 log
= twisted
.logger
.Logger()
89 debug_def_detail
= DBG
.HTTP
91 def log_debug(dflag
, msg
, idof
=None, d
=None):
92 if dflag
not in debug_set
: return
93 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
95 msg
= '[%#x] %s' %
(id(idof
), msg
)
98 if not DBG
.DATA_COMPLETE
in debug_set
:
102 d
= _hex_codec(d
)[0].decode('ascii')
103 msg
+= ' ' + d
+ trunc
104 log
.info('{dflag} {msgcore}', dflag
=dflag
, msgcore
=msg
)
106 def logevent_is_boringtwisted(event
):
108 if event
.get('log_level') != LogLevel
.info
:
110 dflag
= event
.get('dflag')
111 if dflag
is False : return False
112 if dflag
in debug_set
: return False
113 if dflag
is None and DBG
.TWISTED
in debug_set
: return False
116 print('EXCEPTION (IN BORINGTWISTED CHECK)',
117 traceback
.format_exc(), file=org_stderr
)
120 @implementer(twisted
.logger
.ILogFilterPredicate
)
121 class LogNotBoringTwisted
:
122 def __call__(self
, event
):
124 twisted
.logger
.PredicateResult
.no
125 if logevent_is_boringtwisted(event
) else
126 twisted
.logger
.PredicateResult
.yes
129 #---------- default config ----------
133 max_batch_down = 65536
135 target_requests_outstanding = 3
137 http_timeout_grace = 5
138 max_requests_outstanding = 6
143 ifname_client = hippo%%d
144 ifname_server = shippo%%d
147 #[server] or [<client>] overrides
148 ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip,%(ifname)s %(rnets)s
150 # relating to virtual network
153 # addrs = 127.0.0.1 ::1
156 # relating to virtual network
157 vvnetwork = 172.24.230.192
158 # vnetwork = <prefix>/<len>
163 # [<client-ip4-or-ipv6-address>]
164 # secret = <secret> # used by both, must match
167 max_batch_down = 262144
170 target_requests_outstanding = 10
173 # these need to be defined here so that they can be imported by import *
174 cfg
= ConfigParser(strict
=False)
175 optparser
= OptionParser()
177 _mimetrans
= bytes
.maketrans(b
'-'+slip
.esc
, slip
.esc
+b
'-')
178 def mime_translate(s
):
179 # SLIP-encoded packets cannot contain ESC ESC.
180 # Swap `-' and ESC. The result cannot contain `--'
181 return s
.translate(_mimetrans
)
187 return 'ConfigResults('+repr(self
.__dict__
)+')'
189 def log_discard(packet
, iface
, saddr
, daddr
, why
):
191 'discarded packet [%s] %s -> %s: %s' %
(iface
, saddr
, daddr
, why
),
194 #---------- packet parsing ----------
196 def packet_addrs(packet
):
197 version
= packet
[0] >> 4
201 factory
= ipaddress
.IPv4Address
205 factory
= ipaddress
.IPv6Address
207 raise ValueError('unsupported IP version %d' % version
)
208 saddr
= factory(packet
[ saddroff
: saddroff
+ addrlen
])
209 daddr
= factory(packet
[ saddroff
+ addrlen
: saddroff
+ addrlen
*2 ])
210 return (saddr
, daddr
)
212 #---------- address handling ----------
216 r
= ipaddress
.IPv4Address(input)
217 except AddressValueError
:
218 r
= ipaddress
.IPv6Address(input)
221 def ipnetwork(input):
223 r
= ipaddress
.IPv4Network(input)
224 except NetworkValueError
:
225 r
= ipaddress
.IPv6Network(input)
228 #---------- ipif (SLIP) subprocess ----------
230 class SlipStreamDecoder():
231 def __init__(self
, desc
, on_packet
):
233 self
._on_packet
= on_packet
235 self
._log('__init__')
237 def _log(self
, msg
, **kwargs
):
238 log_debug(DBG
.SLIP_FULL
, 'slip %s: %s' %
(self
._desc
, msg
), **kwargs
)
240 def inputdata(self
, data
):
241 self
._log('inputdata', d
=data
)
242 data
= self
._buffer
+ data
244 packets
= slip
.decode(data
, True)
245 self
._buffer
= packets
.pop()
246 for packet
in packets
:
247 self
._maybe_packet(packet
)
248 self
._log('bufremain', d
=self
._buffer
)
250 def _maybe_packet(self
, packet
):
251 self
._log('maybepacket', d
=packet
)
253 self
._on_packet(packet
)
259 packets
= slip
.decode(data
)
260 assert(len(packets
) == 1)
261 self
._maybe_packet(packets
[0])
263 class _IpifProcessProtocol(twisted
.internet
.protocol
.ProcessProtocol
):
264 def __init__(self
, router
):
265 self
._router
= router
266 self
._decoder
= SlipStreamDecoder('ipif', self
.slip_on_packet
)
267 def connectionMade(self
): pass
268 def outReceived(self
, data
):
269 self
._decoder
.inputdata(data
)
270 def slip_on_packet(self
, packet
):
271 (saddr
, daddr
) = packet_addrs(packet
)
272 if saddr
.is_link_local
or daddr
.is_link_local
:
273 log_discard(packet
, 'ipif', saddr
, daddr
, 'link-local')
275 self
._router(packet
, saddr
, daddr
)
276 def processEnded(self
, status
):
277 status
.raiseException()
279 def start_ipif(command
, router
):
280 ipif
= _IpifProcessProtocol(router
)
281 reactor
.spawnProcess(ipif
,
282 '/bin/sh',['sh','-xc', command
],
283 childFDs
={0:'w', 1:'r', 2:2},
287 def queue_inbound(ipif
, packet
):
288 log_debug(DBG
.FLOW
, "queue_inbound", d
=packet
)
289 ipif
.transport
.write(slip
.delimiter
)
290 ipif
.transport
.write(slip
.encode(packet
))
291 ipif
.transport
.write(slip
.delimiter
)
293 #---------- packet queue ----------
296 def __init__(self
, desc
, max_queue_time
):
299 self
._max_queue_time
= max_queue_time
300 self
._pq
= collections
.deque() # packets
302 def _log(self
, dflag
, msg
, **kwargs
):
303 log_debug(dflag
, self
._desc
+' pq: '+msg
, **kwargs
)
305 def append(self
, packet
):
306 self
._log(DBG
.QUEUE
, 'append', d
=packet
)
307 self
._pq
.append((time
.monotonic(), packet
))
310 self
._log(DBG
.QUEUE
, 'nonempty ?')
312 try: (queuetime
, packet
) = self
._pq
[0]
314 self
._log(DBG
.QUEUE
, 'nonempty ? empty.')
317 age
= time
.monotonic() - queuetime
318 if age
> self
._max_queue_time
:
319 # strip old packets off the front
320 self
._log(DBG
.QUEUE
, 'dropping (old)', d
=packet
)
324 self
._log(DBG
.QUEUE
, 'nonempty ? nonempty.')
327 def process(self
, sizequery
, moredata
, max_batch
):
328 # sizequery() should return size of batch so far
329 # moredata(s) should add s to batch
330 self
._log(DBG
.QUEUE
, 'process...')
332 try: (dummy
, packet
) = self
._pq
[0]
334 self
._log(DBG
.QUEUE
, 'process... empty')
337 self
._log(DBG
.QUEUE_CTRL
, 'process... packet', d
=packet
)
339 encoded
= slip
.encode(packet
)
342 self
._log(DBG
.QUEUE_CTRL
,
343 'process... (sofar=%d, max=%d) encoded' %
(sofar
, max_batch
),
347 if sofar
+ len(slip
.delimiter
) + len(encoded
) > max_batch
:
348 self
._log(DBG
.QUEUE_CTRL
, 'process... overflow')
350 moredata(slip
.delimiter
)
355 #---------- error handling ----------
362 print('========== CRASH ==========', err
,
363 '===========================', file=sys
.stderr
)
365 except twisted
.internet
.error
.ReactorNotRunning
: pass
367 def crash_on_defer(defer
):
368 defer
.addErrback(lambda err
: crash(err
))
370 def crash_on_critical(event
):
371 if event
.get('log_level') >= LogLevel
.critical
:
372 crash(twisted
.logger
.formatEvent(event
))
374 #---------- authentication tokens ----------
376 _authtoken_digest
= hashlib
.sha256
378 def _authtoken_time():
379 return int(time
.time())
381 def _authtoken_hmac(secret
, hextime
):
382 return hmac
.new(secret
, hextime
, _authtoken_digest
).digest()
384 def authtoken_make(secret
):
385 hextime
= ('%x' %
_authtoken_time()).encode('ascii')
386 mac
= _authtoken_hmac(secret
, hextime
)
387 return hextime
+ b
' ' + base64
.b64encode(mac
)
389 def authtoken_check(secret
, token
, maxskew
):
390 (hextime
, theirmac64
) = token
.split(b
' ')
391 now
= _authtoken_time()
392 then
= int(hextime
, 16)
394 if (abs(skew
) > maxskew
):
395 raise ValueError('too much clock skew (client %ds ahead)' % skew
)
396 theirmac
= base64
.b64decode(theirmac64
)
397 ourmac
= _authtoken_hmac(secret
, hextime
)
398 if not hmac
.compare_digest(theirmac
, ourmac
):
399 raise ValueError('invalid token (wrong secret?)')
402 #---------- config processing ----------
404 def _cfg_process_putatives():
407 # maps from abstract object to canonical name for cs's
409 def putative(cmap
, abstract
, canoncs
):
411 current_canoncs
= cmap
[abstract
]
415 assert(current_canoncs
== canoncs
)
416 cmap
[abstract
] = canoncs
418 server_pat
= r
'[-.0-9A-Za-z]+'
419 client_pat
= r
'[.:0-9a-f]+'
420 server_re
= regexp
.compile(server_pat
)
421 serverclient_re
= regexp
.compile(
422 server_pat
+ r
' ' + '(?:' + client_pat
+ '|LIMIT)')
424 for cs
in cfg
.sections():
426 log_debug_config('putatives: section [%s] %s' %
(cs
, m
))
429 dbg('X ignore: %s' %
(why
))
430 print('warning: ignoring config section [%s] (%s)' %
(cs
, why
),
433 if cs
== 'LIMIT' or cs
== 'COMMON':
434 # plan A "[LIMIT]" or "[COMMON]"
439 # plan B "[<client>]" part 1
441 except AddressValueError
:
443 if server_re
.fullmatch(cs
):
444 # plan C "[<servername>]"
446 putative(servers
, cs
, cs
)
449 if serverclient_re
.fullmatch(cs
):
450 # plan D "[<servername> <client>]" part 1
451 (pss
,pcs
) = cs
.split(' ')
454 # plan E "[<servername> LIMIT]"
455 dbg('E <server> LIMIT')
459 # plan D "[<servername> <client>]" part 2
461 except AddressValueError
:
462 # plan F branch 1 "[<some thing we do not understand>]"
463 log_ignore('bad-addr')
466 else: # no AddressValueError
467 # plan D "[<servername> <client>]" part 3
468 dbg('D <server> <client>')
469 putative(clients
, ci
, pcs
)
470 putative(servers
, pss
, pss
)
473 # plan F branch 2 "[<some thing we do not understand>]"
474 log_ignore('nomatch '+ repr(serverclient_re
))
476 else: # no AddressValueError
477 # plan B "[<client>" part 2
479 putative(clients
, ci
, cs
)
482 return (servers
, clients
)
484 def cfg_process_general(c
, ss
):
485 c
.mtu
= cfg1getint(ss
, 'mtu')
487 def cfg_process_saddrs(c
, ss
):
489 def __init__(self
, port
, addrspec
):
493 self
.addr
= ipaddress
.IPv4Address(addrspec
)
494 self
._endpointfactory
= twisted
.internet
.endpoints
.TCP4ServerEndpoint
496 except AddressValueError
:
497 self
.addr
= ipaddress
.IPv6Address(addrspec
)
498 self
._endpointfactory
= twisted
.internet
.endpoints
.TCP6ServerEndpoint
499 self
._inurl
= b
'[%s]'
500 def make_endpoint(self
):
501 return self
._endpointfactory(reactor
, self
.port
,
502 interface
= '%s' % self
.addr
)
504 url
= b
'http://' + (self
._inurl % str
(self
.addr
).encode('ascii'))
505 if self
.port
!= 80: url
+= b
':%d' % self
.port
509 return 'ServerAddr'+repr((self
.port
,self
.addr
))
511 c
.port
= cfg1getint(ss
,'port')
513 for addrspec
in cfg1get(ss
, 'addrs').split():
514 sa
= ServerAddr(c
.port
, addrspec
)
517 def cfg_process_vnetwork(c
, ss
):
518 c
.vnetwork
= ipnetwork(cfg1get(ss
,'vnetwork'))
519 if c
.vnetwork
.num_addresses
< 3 + 2:
520 raise ValueError('vnetwork needs at least 2^3 addresses')
522 def cfg_process_vaddr(c
, ss
):
524 c
.vaddr
= cfg1get(ss
,'vaddr')
525 except NoOptionError
:
526 cfg_process_vnetwork(c
, ss
)
527 c
.vaddr
= next(c
.vnetwork
.hosts())
529 def cfg_search_section(key
,sections
):
530 for section
in sections
:
531 if cfg
.has_option(section
, key
):
533 raise NoOptionError(key
, repr(sections
))
535 def cfg_get_raw(*args
, **kwargs
):
536 # for passing to cfg_search
537 return cfg
.get(*args
, raw
=True, **kwargs
)
539 def cfg_search(getter
,key
,sections
):
540 section
= cfg_search_section(key
,sections
)
541 return getter(section
, key
)
543 def cfg1get(section
,key
, getter
=cfg
.get
,**kwargs
):
544 section
= cfg_search_section(key
,[section
,'COMMON'])
545 return getter(section
,key
,**kwargs
)
547 def cfg1getint(section
,key
, **kwargs
):
548 return cfg1get(section
,key
, getter
=cfg
.getint
,**kwargs
);
550 def cfg_process_client_limited(cc
,ss
,sections
,key
):
551 val
= cfg_search(cfg1getint
, key
, sections
)
552 lim
= cfg_search(cfg1getint
, key
, ['%s LIMIT' % ss
, 'LIMIT'])
553 cc
.__dict__
[key
] = min(val
,lim
)
555 def cfg_process_client_common(cc
,ss
,cs
,ci
):
556 # returns sections to search in, iff secret is defined, otherwise None
559 sections
= ['%s %s' %
(ss
,cs
),
564 try: pwsection
= cfg_search_section('secret', sections
)
565 except NoOptionError
: return None
567 pw
= cfg1get(pwsection
, 'secret')
568 cc
.secret
= pw
.encode('utf-8')
570 cfg_process_client_limited(cc
,ss
,sections
,'target_requests_outstanding')
571 cfg_process_client_limited(cc
,ss
,sections
,'http_timeout')
575 def cfg_process_ipif(c
, sections
, varmap
):
577 try: v
= getattr(c
, s
)
578 except AttributeError: continue
581 v
= cfg_search(cfg
.get
, d
, sections
)
584 #print('CFGIPIF',repr((varmap, sections, c.__dict__)),file=sys.stderr)
586 section
= cfg_search_section('ipif', sections
)
587 c
.ipif_command
= cfg1get(section
,'ipif', vars=c
.__dict__
)
589 #---------- startup ----------
591 def log_debug_config(m
):
592 if not DBG
.CONFIG
in debug_set
: return
593 print('DBG.CONFIG:', m
)
595 def common_startup(process_cfg
):
596 # calls process_cfg(putative_clients, putative_servers)
598 # ConfigParser hates #-comments after values
599 trailingcomments_re
= regexp
.compile(r
'#.*')
600 cfg
.read_string(trailingcomments_re
.sub('', defcfg
))
603 def readconfig(pathname
, mandatory
=True):
604 def log(m
, p
=pathname
):
605 if not DBG
.CONFIG
in debug_set
: return
606 log_debug_config('%s: %s' %
(m
, p
))
609 files
= os
.listdir(pathname
)
611 except FileNotFoundError
:
616 except NotADirectoryError
:
623 re
= regexp
.compile('[^-A-Za-z0-9_]')
624 for f
in os
.listdir(pathname
):
625 if re
.search(f
): continue
626 subpath
= pathname
+ '/' + f
629 except FileNotFoundError
:
630 log('entry skipped', subpath
)
633 log('entry read', subpath
)
635 def oc_config(od
,os
, value
, op
):
640 def oc_extra_config(od
,os
, value
, op
):
643 def read_defconfig():
644 readconfig('/etc/hippotat/config.d', False)
645 readconfig('/etc/hippotat/secrets.d', False)
646 readconfig('/etc/hippotat/master.cfg', False)
648 def oc_defconfig(od
,os
, value
, op
):
651 read_defconfig(value
)
653 def dfs_less_detailed(dl
):
654 return [df
for df
in DBG
.iterconstants() if df
<= dl
]
656 def ds_default(od
,os
,dl
,op
):
659 debug_set |
= set(dfs_less_detailed(debug_def_detail
))
661 def ds_select(od
,os
, spec
, op
):
662 for it
in spec
.split(','):
664 if it
.startswith('-'):
665 mutator
= debug_set
.discard
668 mutator
= debug_set
.add
671 dfs
= DBG
.iterconstants()
675 mapper
= dfs_less_detailed
678 mapper
= lambda x
: [x
]
681 dfspec
= DBG
.lookupByName(it
)
683 optparser
.error('unknown debug flag %s in --debug-select' % it
)
690 optparser
.add_option('-D', '--debug',
693 help='enable default debug (to stdout)',
694 callback
= ds_default
)
696 optparser
.add_option('--debug-select',
699 metavar
='[-]DFLAG[+]|[-]+,...',
701 '''enable (`-': disable) each specified DFLAG;
702 `+': do same for all "more interesting" DFLAGSs;
703 just `+': all DFLAGs.
704 DFLAGS: ''' + ' '.join([df
.name
for df
in DBG
.iterconstants()]),
708 optparser
.add_option('-c', '--config',
711 metavar
='CONFIGFILE',
716 optparser
.add_option('--extra-config',
719 metavar
='CONFIGFILE',
722 callback
= oc_extra_config
)
724 optparser
.add_option('--default-config',
726 callback
= oc_defconfig
)
728 (opts
, args
) = optparser
.parse_args()
729 if len(args
): optparser
.error('no non-option arguments please')
735 (pss
, pcs
) = _cfg_process_putatives()
736 process_cfg(opts
, pss
, pcs
)
737 except (configparser
.Error
, ValueError):
738 traceback
.print_exc(file=sys
.stderr
)
739 print('\nInvalid configuration, giving up.', file=sys
.stderr
)
743 #print('X', debug_set, file=sys.stderr)
745 log_formatter
= twisted
.logger
.formatEventAsClassicLogText
746 stdout_obs
= twisted
.logger
.FileLogObserver(sys
.stdout
, log_formatter
)
747 stderr_obs
= twisted
.logger
.FileLogObserver(sys
.stderr
, log_formatter
)
748 pred
= twisted
.logger
.LogLevelFilterPredicate(LogLevel
.error
)
749 stdsomething_obs
= twisted
.logger
.FilteringLogObserver(
750 stderr_obs
, [pred
], stdout_obs
752 global file_log_observer
753 file_log_observer
= twisted
.logger
.FilteringLogObserver(
754 stdsomething_obs
, [LogNotBoringTwisted()]
756 #log_observer = stdsomething_obs
757 twisted
.logger
.globalLogBeginner
.beginLoggingTo(
758 [ file_log_observer
, crash_on_critical
]
762 log_debug(DBG
.INIT
, 'entering reactor')
763 if not _crashing
: reactor
.run()
764 print('ENDED', file=sys
.stderr
)