3 # Hippotat - Asinine IP Over HTTP program
4 # hippotatlib/__init__.py - common library code
6 # Copyright 2017 Ian Jackson
10 # This program is free software: you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License as published by
12 # the Free Software Foundation, either version 3 of the License, or
13 # (at your option) any later version.
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 # GNU General Public License for more details.
20 # You should have received a copy of the GNU General Public License
21 # along with this program, in the file GPLv3. If not,
22 # see <http://www.gnu.org/licenses/>.
26 signal
.signal(signal
.SIGINT
, signal
.SIG_DFL
)
31 from zope
.interface
import implementer
34 from twisted
.internet
import reactor
35 import twisted
.internet
.endpoints
37 from twisted
.logger
import LogLevel
38 import twisted
.python
.constants
39 from twisted
.python
.constants
import NamedConstant
42 from ipaddress
import AddressValueError
44 from optparse
import OptionParser
46 from configparser
import ConfigParser
47 from configparser
import NoOptionError
49 from functools
import partial
58 import hippotatlib
.slip
as slip
60 class DBG(twisted
.python
.constants
.Names
):
61 INIT
= NamedConstant()
62 CONFIG
= NamedConstant()
63 ROUTE
= NamedConstant()
64 DROP
= NamedConstant()
65 OWNSOURCE
= NamedConstant()
66 FLOW
= NamedConstant()
67 HTTP
= NamedConstant()
68 TWISTED
= NamedConstant()
69 QUEUE
= NamedConstant()
70 HTTP_CTRL
= NamedConstant()
71 QUEUE_CTRL
= NamedConstant()
72 HTTP_FULL
= NamedConstant()
73 CTRL_DUMP
= NamedConstant()
74 SLIP_FULL
= NamedConstant()
75 DATA_COMPLETE
= NamedConstant()
77 _hex_codec
= codecs
.getencoder('hex_codec')
79 #---------- logging ----------
81 org_stderr
= sys
.stderr
83 log
= twisted
.logger
.Logger()
86 debug_def_detail
= DBG
.HTTP
88 def log_debug(dflag
, msg
, idof
=None, d
=None):
89 if dflag
not in debug_set
: return
90 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
92 msg
= '[%#x] %s' %
(id(idof
), msg
)
95 if not DBG
.DATA_COMPLETE
in debug_set
:
99 d
= _hex_codec(d
)[0].decode('ascii')
100 msg
+= ' ' + d
+ trunc
101 log
.info('{dflag} {msgcore}', dflag
=dflag
, msgcore
=msg
)
103 def logevent_is_boringtwisted(event
):
105 if event
.get('log_level') != LogLevel
.info
:
107 dflag
= event
.get('dflag')
108 if dflag
is False : return False
109 if dflag
in debug_set
: return False
110 if dflag
is None and DBG
.TWISTED
in debug_set
: return False
113 print(traceback
.format_exc(), file=org_stderr
)
116 @implementer(twisted
.logger
.ILogFilterPredicate
)
117 class LogNotBoringTwisted
:
118 def __call__(self
, event
):
120 twisted
.logger
.PredicateResult
.no
121 if logevent_is_boringtwisted(event
) else
122 twisted
.logger
.PredicateResult
.yes
125 #---------- default config ----------
129 max_batch_down = 65536
131 target_requests_outstanding = 3
133 http_timeout_grace = 5
134 max_requests_outstanding = 6
140 #[server] or [<client>] overrides
141 ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
143 # relating to virtual network
148 # addrs = 127.0.0.1 ::1
151 # relating to virtual network
152 vvnetwork = 172.24.230.192
153 # vnetwork = <prefix>/<len>
158 # [<client-ip4-or-ipv6-address>]
159 # password = <password> # used by both, must match
162 max_batch_down = 262144
165 target_requests_outstanding = 10
168 # these need to be defined here so that they can be imported by import *
169 cfg
= ConfigParser(strict
=False)
170 optparser
= OptionParser()
172 _mimetrans
= bytes
.maketrans(b
'-'+slip
.esc
, slip
.esc
+b
'-')
173 def mime_translate(s
):
174 # SLIP-encoded packets cannot contain ESC ESC.
175 # Swap `-' and ESC. The result cannot contain `--'
176 return s
.translate(_mimetrans
)
182 return 'ConfigResults('+repr(self
.__dict__
)+')'
184 def log_discard(packet
, iface
, saddr
, daddr
, why
):
186 'discarded packet [%s] %s -> %s: %s' %
(iface
, saddr
, daddr
, why
),
189 #---------- packet parsing ----------
191 def packet_addrs(packet
):
192 version
= packet
[0] >> 4
196 factory
= ipaddress
.IPv4Address
200 factory
= ipaddress
.IPv6Address
202 raise ValueError('unsupported IP version %d' % version
)
203 saddr
= factory(packet
[ saddroff
: saddroff
+ addrlen
])
204 daddr
= factory(packet
[ saddroff
+ addrlen
: saddroff
+ addrlen
*2 ])
205 return (saddr
, daddr
)
207 #---------- address handling ----------
211 r
= ipaddress
.IPv4Address(input)
212 except AddressValueError
:
213 r
= ipaddress
.IPv6Address(input)
216 def ipnetwork(input):
218 r
= ipaddress
.IPv4Network(input)
219 except NetworkValueError
:
220 r
= ipaddress
.IPv6Network(input)
223 #---------- ipif (SLIP) subprocess ----------
225 class SlipStreamDecoder():
226 def __init__(self
, desc
, on_packet
):
228 self
._on_packet
= on_packet
230 self
._log('__init__')
232 def _log(self
, msg
, **kwargs
):
233 log_debug(DBG
.SLIP_FULL
, 'slip %s: %s' %
(self
._desc
, msg
), **kwargs
)
235 def inputdata(self
, data
):
236 self
._log('inputdata', d
=data
)
237 data
= self
._buffer
+ data
239 packets
= slip
.decode(data
, True)
240 self
._buffer
= packets
.pop()
241 for packet
in packets
:
242 self
._maybe_packet(packet
)
243 self
._log('bufremain', d
=self
._buffer
)
245 def _maybe_packet(self
, packet
):
246 self
._log('maybepacket', d
=packet
)
248 self
._on_packet(packet
)
254 packets
= slip
.decode(data
)
255 assert(len(packets
) == 1)
256 self
._maybe_packet(packets
[0])
258 class _IpifProcessProtocol(twisted
.internet
.protocol
.ProcessProtocol
):
259 def __init__(self
, router
):
260 self
._router
= router
261 self
._decoder
= SlipStreamDecoder('ipif', self
.slip_on_packet
)
262 def connectionMade(self
): pass
263 def outReceived(self
, data
):
264 self
._decoder
.inputdata(data
)
265 def slip_on_packet(self
, packet
):
266 (saddr
, daddr
) = packet_addrs(packet
)
267 if saddr
.is_link_local
or daddr
.is_link_local
:
268 log_discard(packet
, 'ipif', saddr
, daddr
, 'link-local')
270 self
._router(packet
, saddr
, daddr
)
271 def processEnded(self
, status
):
272 status
.raiseException()
274 def start_ipif(command
, router
):
275 ipif
= _IpifProcessProtocol(router
)
276 reactor
.spawnProcess(ipif
,
277 '/bin/sh',['sh','-xc', command
],
278 childFDs
={0:'w', 1:'r', 2:2},
282 def queue_inbound(ipif
, packet
):
283 log_debug(DBG
.FLOW
, "queue_inbound", d
=packet
)
284 ipif
.transport
.write(slip
.delimiter
)
285 ipif
.transport
.write(slip
.encode(packet
))
286 ipif
.transport
.write(slip
.delimiter
)
288 #---------- packet queue ----------
291 def __init__(self
, desc
, max_queue_time
):
294 self
._max_queue_time
= max_queue_time
295 self
._pq
= collections
.deque() # packets
297 def _log(self
, dflag
, msg
, **kwargs
):
298 log_debug(dflag
, self
._desc
+' pq: '+msg
, **kwargs
)
300 def append(self
, packet
):
301 self
._log(DBG
.QUEUE
, 'append', d
=packet
)
302 self
._pq
.append((time
.monotonic(), packet
))
305 self
._log(DBG
.QUEUE
, 'nonempty ?')
307 try: (queuetime
, packet
) = self
._pq
[0]
309 self
._log(DBG
.QUEUE
, 'nonempty ? empty.')
312 age
= time
.monotonic() - queuetime
313 if age
> self
._max_queue_time
:
314 # strip old packets off the front
315 self
._log(DBG
.QUEUE
, 'dropping (old)', d
=packet
)
319 self
._log(DBG
.QUEUE
, 'nonempty ? nonempty.')
322 def process(self
, sizequery
, moredata
, max_batch
):
323 # sizequery() should return size of batch so far
324 # moredata(s) should add s to batch
325 self
._log(DBG
.QUEUE
, 'process...')
327 try: (dummy
, packet
) = self
._pq
[0]
329 self
._log(DBG
.QUEUE
, 'process... empty')
332 self
._log(DBG
.QUEUE_CTRL
, 'process... packet', d
=packet
)
334 encoded
= slip
.encode(packet
)
337 self
._log(DBG
.QUEUE_CTRL
,
338 'process... (sofar=%d, max=%d) encoded' %
(sofar
, max_batch
),
342 if sofar
+ len(slip
.delimiter
) + len(encoded
) > max_batch
:
343 self
._log(DBG
.QUEUE_CTRL
, 'process... overflow')
345 moredata(slip
.delimiter
)
350 #---------- error handling ----------
357 print('========== CRASH ==========', err
,
358 '===========================', file=sys
.stderr
)
360 except twisted
.internet
.error
.ReactorNotRunning
: pass
362 def crash_on_defer(defer
):
363 defer
.addErrback(lambda err
: crash(err
))
365 def crash_on_critical(event
):
366 if event
.get('log_level') >= LogLevel
.critical
:
367 crash(twisted
.logger
.formatEvent(event
))
369 #---------- config processing ----------
371 def _cfg_process_putatives():
374 # maps from abstract object to canonical name for cs's
376 def putative(cmap
, abstract
, canoncs
):
378 current_canoncs
= cmap
[abstract
]
382 assert(current_canoncs
== canoncs
)
383 cmap
[abstract
] = canoncs
385 server_pat
= r
'[-.0-9A-Za-z]+'
386 client_pat
= r
'[.:0-9a-f]+'
387 server_re
= regexp
.compile(server_pat
)
388 serverclient_re
= regexp
.compile(server_pat
+ r
' ' + client_pat
)
390 for cs
in cfg
.sections():
396 # plan B "[<client>]" part 1
398 except AddressValueError
:
400 if server_re
.fullmatch(cs
):
401 # plan C "[<servername>]"
402 putative(servers
, cs
, cs
)
405 if serverclient_re
.fullmatch(cs
):
406 # plan D "[<servername> <client>]" part 1
407 (pss
,pcs
) = cs
.split(' ')
410 # plan E "[<servername> LIMIT]"
414 # plan D "[<servername> <client>]" part 2
416 except AddressValueError
:
417 # plan F "[<some thing we do not understand>]"
418 # well, we ignore this
419 print('warning: ignoring config section %s' % cs
, file=sys
.stderr
)
422 else: # no AddressValueError
423 # plan D "[<servername> <client]" part 3
424 putative(clients
, ci
, pcs
)
425 putative(servers
, pss
, pss
)
428 else: # no AddressValueError
429 # plan B "[<client>" part 2
430 putative(clients
, ci
, cs
)
433 return (servers
, clients
)
435 def cfg_process_common(c
, ss
):
436 c
.mtu
= cfg
.getint(ss
, 'mtu')
438 def cfg_process_saddrs(c
, ss
):
440 def __init__(self
, port
, addrspec
):
444 self
.addr
= ipaddress
.IPv4Address(addrspec
)
445 self
._endpointfactory
= twisted
.internet
.endpoints
.TCP4ServerEndpoint
447 except AddressValueError
:
448 self
.addr
= ipaddress
.IPv6Address(addrspec
)
449 self
._endpointfactory
= twisted
.internet
.endpoints
.TCP6ServerEndpoint
450 self
._inurl
= b
'[%s]'
451 def make_endpoint(self
):
452 return self
._endpointfactory(reactor
, self
.port
,
453 interface
= '%s' % self
.addr
)
455 url
= b
'http://' + (self
._inurl % str
(self
.addr
).encode('ascii'))
456 if self
.port
!= 80: url
+= b
':%d' % self
.port
460 return 'ServerAddr'+repr((self
.port
,self
.addr
))
462 c
.port
= cfg
.getint(ss
,'port')
464 for addrspec
in cfg
.get(ss
, 'addrs').split():
465 sa
= ServerAddr(c
.port
, addrspec
)
468 def cfg_process_vnetwork(c
, ss
):
469 c
.vnetwork
= ipnetwork(cfg
.get(ss
,'vnetwork'))
470 if c
.vnetwork
.num_addresses
< 3 + 2:
471 raise ValueError('vnetwork needs at least 2^3 addresses')
473 def cfg_process_vaddr(c
, ss
):
475 c
.vaddr
= cfg
.get(ss
,'vaddr')
476 except NoOptionError
:
477 cfg_process_vnetwork(c
, ss
)
478 c
.vaddr
= next(c
.vnetwork
.hosts())
480 def cfg_search_section(key
,sections
):
481 for section
in sections
:
482 if cfg
.has_option(section
, key
):
484 raise NoOptionError(key
, repr(sections
))
486 def cfg_search(getter
,key
,sections
):
487 section
= cfg_search_section(key
,sections
)
488 return getter(section
, key
)
490 def cfg_process_client_limited(cc
,ss
,sections
,key
):
491 val
= cfg_search(cfg
.getint
, key
, sections
)
492 lim
= cfg_search(cfg
.getint
, key
, ['%s LIMIT' % ss
, 'LIMIT'])
493 cc
.__dict__
[key
] = min(val
,lim
)
495 def cfg_process_client_common(cc
,ss
,cs
,ci
):
496 # returns sections to search in, iff password is defined, otherwise None
499 sections
= ['%s %s' %
(ss
,cs
),
504 try: pwsection
= cfg_search_section('password', sections
)
505 except NoOptionError
: return None
507 pw
= cfg
.get(pwsection
, 'password')
508 cc
.password
= pw
.encode('utf-8')
510 cfg_process_client_limited(cc
,ss
,sections
,'target_requests_outstanding')
511 cfg_process_client_limited(cc
,ss
,sections
,'http_timeout')
515 def cfg_process_ipif(c
, sections
, varmap
):
517 try: v
= getattr(c
, s
)
518 except AttributeError: continue
521 #print('CFGIPIF',repr((varmap, sections, c.__dict__)),file=sys.stderr)
523 section
= cfg_search_section('ipif', sections
)
524 c
.ipif_command
= cfg
.get(section
,'ipif', vars=c
.__dict__
)
526 #---------- startup ----------
528 def common_startup(process_cfg
):
529 # calls process_cfg(putative_clients, putative_servers)
531 # ConfigParser hates #-comments after values
532 trailingcomments_re
= regexp
.compile(r
'#.*')
533 cfg
.read_string(trailingcomments_re
.sub('', defcfg
))
536 def readconfig(pathname
, mandatory
=True):
537 def log(m
, p
=pathname
):
538 if not DBG
.CONFIG
in debug_set
: return
539 print('DBG.CONFIG: %s: %s' %
(m
, pathname
))
542 files
= os
.listdir(pathname
)
544 except FileNotFoundError
:
549 except NotADirectoryError
:
556 re
= regexp
.compile('[^-A-Za-z0-9_]')
557 for f
in os
.listdir(pathname
):
558 if re
.search(f
): continue
559 subpath
= pathname
+ '/' + f
562 except FileNotFoundError
:
563 log('entry skipped', subpath
)
566 log('entry read', subpath
)
568 def oc_config(od
,os
, value
, op
):
573 def oc_extra_config(od
,os
, value
, op
):
576 def read_defconfig():
577 readconfig('/etc/hippotat/config.d', False)
578 readconfig('/etc/hippotat/passwords.d', False)
579 readconfig('/etc/hippotat/master.cfg', False)
581 def oc_defconfig(od
,os
, value
, op
):
584 read_defconfig(value
)
586 def dfs_less_detailed(dl
):
587 return [df
for df
in DBG
.iterconstants() if df
<= dl
]
589 def ds_default(od
,os
,dl
,op
):
592 debug_set |
= set(dfs_less_detailed(debug_def_detail
))
594 def ds_select(od
,os
, spec
, op
):
595 for it
in spec
.split(','):
597 if it
.startswith('-'):
598 mutator
= debug_set
.discard
601 mutator
= debug_set
.add
604 dfs
= DBG
.iterconstants()
608 mapper
= dfs_less_detailed
611 mapper
= lambda x
: [x
]
614 dfspec
= DBG
.lookupByName(it
)
616 optparser
.error('unknown debug flag %s in --debug-select' % it
)
623 optparser
.add_option('-D', '--debug',
626 help='enable default debug (to stdout)',
627 callback
= ds_default
)
629 optparser
.add_option('--debug-select',
632 metavar
='[-]DFLAG[+]|[-]+,...',
634 '''enable (`-': disable) each specified DFLAG;
635 `+': do same for all "more interesting" DFLAGSs;
636 just `+': all DFLAGs.
637 DFLAGS: ''' + ' '.join([df
.name
for df
in DBG
.iterconstants()]),
641 optparser
.add_option('-c', '--config',
644 metavar
='CONFIGFILE',
649 optparser
.add_option('--extra-config',
652 metavar
='CONFIGFILE',
655 callback
= oc_extra_config
)
657 optparser
.add_option('--default-config',
659 callback
= oc_defconfig
)
661 (opts
, args
) = optparser
.parse_args()
662 if len(args
): optparser
.error('no non-option arguments please')
668 (pss
, pcs
) = _cfg_process_putatives()
669 process_cfg(opts
, pss
, pcs
)
670 except (configparser
.Error
, ValueError):
671 traceback
.print_exc(file=sys
.stderr
)
672 print('\nInvalid configuration, giving up.', file=sys
.stderr
)
676 #print('X', debug_set, file=sys.stderr)
678 log_formatter
= twisted
.logger
.formatEventAsClassicLogText
679 stdout_obs
= twisted
.logger
.FileLogObserver(sys
.stdout
, log_formatter
)
680 stderr_obs
= twisted
.logger
.FileLogObserver(sys
.stderr
, log_formatter
)
681 pred
= twisted
.logger
.LogLevelFilterPredicate(LogLevel
.error
)
682 stdsomething_obs
= twisted
.logger
.FilteringLogObserver(
683 stderr_obs
, [pred
], stdout_obs
685 global file_log_observer
686 file_log_observer
= twisted
.logger
.FilteringLogObserver(
687 stdsomething_obs
, [LogNotBoringTwisted()]
689 #log_observer = stdsomething_obs
690 twisted
.logger
.globalLogBeginner
.beginLoggingTo(
691 [ file_log_observer
, crash_on_critical
]
695 log_debug(DBG
.INIT
, 'entering reactor')
696 if not _crashing
: reactor
.run()
697 print('ENDED', file=sys
.stderr
)