ownsource: tie in (part 1) and cleanup stuff
[hippotat] / hippotatlib / __init__.py
CommitLineData
b0cfbfce
IJ
1# -*- python -*-
2
37ab4cdc
IJ
3import signal
4signal.signal(signal.SIGINT, signal.SIG_DFL)
5
1321ad5f 6import sys
cae50358 7import os
1321ad5f 8
b83d422a
IJ
9from zope.interface import implementer
10
040ff511
IJ
11import twisted
12from twisted.internet import reactor
1d023c89 13import twisted.internet.endpoints
8c3b6620
IJ
14import twisted.logger
15from twisted.logger import LogLevel
16import twisted.python.constants
17from twisted.python.constants import NamedConstant
b0cfbfce
IJ
18
19import ipaddress
20from ipaddress import AddressValueError
21
ae7c7784 22from optparse import OptionParser
5510890e 23import configparser
ae7c7784
IJ
24from configparser import ConfigParser
25from configparser import NoOptionError
26
c13ee6e6
IJ
27from functools import partial
28
ae7c7784 29import collections
84e763c7 30import time
8c3b6620 31import codecs
eedc8b30 32import traceback
ae7c7784 33
1321ad5f
IJ
34import re as regexp
35
5a37bac8 36import hippotatlib.slip as slip
1321ad5f 37
d579a048 38class DBG(twisted.python.constants.Names):
380ed56c 39 INIT = NamedConstant()
cae50358 40 CONFIG = NamedConstant()
d579a048 41 ROUTE = NamedConstant()
b68c0739 42 DROP = NamedConstant()
4a780703 43 OWNSOURCE = NamedConstant()
d579a048
IJ
44 FLOW = NamedConstant()
45 HTTP = NamedConstant()
380ed56c 46 TWISTED = NamedConstant()
d579a048 47 QUEUE = NamedConstant()
380ed56c 48 HTTP_CTRL = NamedConstant()
d579a048 49 QUEUE_CTRL = NamedConstant()
297b3ebf 50 HTTP_FULL = NamedConstant()
0accf0d3 51 CTRL_DUMP = NamedConstant()
380ed56c 52 SLIP_FULL = NamedConstant()
9acb0eca 53 DATA_COMPLETE = NamedConstant()
d579a048 54
b68c0739 55_hex_codec = codecs.getencoder('hex_codec')
8c3b6620 56
b83d422a
IJ
57#---------- logging ----------
58
59org_stderr = sys.stderr
60
8c3b6620
IJ
61log = twisted.logger.Logger()
62
2e68eb10
IJ
63debug_set = set()
64debug_def_detail = DBG.HTTP
3e35fc99 65
8c3b6620 66def log_debug(dflag, msg, idof=None, d=None):
3e35fc99 67 if dflag not in debug_set: return
e8fcf3b7 68 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
8c3b6620 69 if idof is not None:
e8ed0029 70 msg = '[%#x] %s' % (id(idof), msg)
8c3b6620 71 if d is not None:
9acb0eca
IJ
72 trunc = ''
73 if not DBG.DATA_COMPLETE in debug_set:
74 if len(d) > 64:
75 d = d[0:64]
76 trunc = '...'
b68c0739 77 d = _hex_codec(d)[0].decode('ascii')
9acb0eca 78 msg += ' ' + d + trunc
8c3b6620
IJ
79 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
80
b83d422a
IJ
81@implementer(twisted.logger.ILogFilterPredicate)
82class LogNotBoringTwisted:
83 def __call__(self, event):
84 yes = twisted.logger.PredicateResult.yes
85 no = twisted.logger.PredicateResult.no
86 try:
87 if event.get('log_level') != LogLevel.info:
88 return yes
9acb0eca 89 dflag = event.get('dflag')
c7f134ce 90 if dflag is False : return yes
9acb0eca
IJ
91 if dflag in debug_set: return yes
92 if dflag is None and DBG.TWISTED in debug_set: return yes
93 return no
b83d422a
IJ
94 except Exception:
95 print(traceback.format_exc(), file=org_stderr)
96 return yes
97
98#---------- default config ----------
99
ca732796
IJ
100defcfg = '''
101[DEFAULT]
9e445690
IJ
102max_batch_down = 65536
103max_queue_time = 10
104target_requests_outstanding = 3
105http_timeout = 30
106http_timeout_grace = 5
107max_requests_outstanding = 6
108max_batch_up = 4000
109http_retry = 5
c7fb640e 110port = 80
8d374606 111vroutes = ''
ca732796
IJ
112
113#[server] or [<client>] overrides
114ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
ca732796 115
9e445690 116# relating to virtual network
ca732796 117mtu = 1500
ca732796 118
c7fb640e
IJ
119[SERVER]
120server = SERVER
9e445690 121# addrs = 127.0.0.1 ::1
9e445690
IJ
122# url
123
124# relating to virtual network
8d374606
IJ
125vvnetwork = 172.24.230.192
126# vnetwork = <prefix>/<len>
127# vadd r = <ipaddr>
128# vrelay = <ipaddr>
9e445690 129
ca732796
IJ
130
131# [<client-ip4-or-ipv6-address>]
132# password = <password> # used by both, must match
133
c7fb640e 134[LIMIT]
9e445690
IJ
135max_batch_down = 262144
136max_queue_time = 121
137http_timeout = 121
138target_requests_outstanding = 10
ca732796
IJ
139'''
140
87a7c0c7 141# these need to be defined here so that they can be imported by import *
cae50358 142cfg = ConfigParser(strict=False)
ae7c7784
IJ
143optparser = OptionParser()
144
e4006ac4 145_mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
7b07f0b5
IJ
146def mime_translate(s):
147 # SLIP-encoded packets cannot contain ESC ESC.
148 # Swap `-' and ESC. The result cannot contain `--'
149 return s.translate(_mimetrans)
150
87a7c0c7 151class ConfigResults:
c7fb640e
IJ
152 def __init__(self):
153 pass
87a7c0c7
IJ
154 def __repr__(self):
155 return 'ConfigResults('+repr(self.__dict__)+')'
156
a8827d59 157def log_discard(packet, iface, saddr, daddr, why):
b68c0739 158 log_debug(DBG.DROP,
a8827d59 159 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
b68c0739 160 d=packet)
1321ad5f 161
b0cfbfce
IJ
162#---------- packet parsing ----------
163
164def packet_addrs(packet):
165 version = packet[0] >> 4
166 if version == 4:
167 addrlen = 4
168 saddroff = 3*4
169 factory = ipaddress.IPv4Address
170 elif version == 6:
171 addrlen = 16
172 saddroff = 2*4
173 factory = ipaddress.IPv6Address
174 else:
175 raise ValueError('unsupported IP version %d' % version)
176 saddr = factory(packet[ saddroff : saddroff + addrlen ])
177 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
178 return (saddr, daddr)
179
180#---------- address handling ----------
181
182def ipaddr(input):
183 try:
184 r = ipaddress.IPv4Address(input)
185 except AddressValueError:
186 r = ipaddress.IPv6Address(input)
187 return r
188
189def ipnetwork(input):
190 try:
191 r = ipaddress.IPv4Network(input)
192 except NetworkValueError:
193 r = ipaddress.IPv6Network(input)
194 return r
040ff511
IJ
195
196#---------- ipif (SLIP) subprocess ----------
197
a95cfeb2 198class SlipStreamDecoder():
db6ba584 199 def __init__(self, desc, on_packet):
040ff511 200 self._buffer = b''
a95cfeb2 201 self._on_packet = on_packet
db6ba584
IJ
202 self._desc = desc
203 self._log('__init__')
204
205 def _log(self, msg, **kwargs):
3297cac1 206 log_debug(DBG.SLIP_FULL, 'slip %s: %s' % (self._desc, msg), **kwargs)
a95cfeb2
IJ
207
208 def inputdata(self, data):
db6ba584 209 self._log('inputdata', d=data)
7fa9c132
IJ
210 data = self._buffer + data
211 self._buffer = b''
212 packets = slip.decode(data, True)
040ff511
IJ
213 self._buffer = packets.pop()
214 for packet in packets:
a95cfeb2 215 self._maybe_packet(packet)
54890d4d 216 self._log('bufremain', d=self._buffer)
a95cfeb2
IJ
217
218 def _maybe_packet(self, packet):
54890d4d 219 self._log('maybepacket', d=packet)
db6ba584
IJ
220 if len(packet):
221 self._on_packet(packet)
a95cfeb2 222
4f991c0c 223 def flush(self):
54890d4d 224 self._log('flush')
7fa9c132 225 data = self._buffer
a95cfeb2 226 self._buffer = b''
7fa9c132
IJ
227 packets = slip.decode(data)
228 assert(len(packets) == 1)
229 self._maybe_packet(packets[0])
4f991c0c 230
e4006ac4 231class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
4f991c0c
IJ
232 def __init__(self, router):
233 self._router = router
db6ba584 234 self._decoder = SlipStreamDecoder('ipif', self.slip_on_packet)
a95cfeb2
IJ
235 def connectionMade(self): pass
236 def outReceived(self, data):
237 self._decoder.inputdata(data)
238 def slip_on_packet(self, packet):
4f991c0c
IJ
239 (saddr, daddr) = packet_addrs(packet)
240 if saddr.is_link_local or daddr.is_link_local:
a8827d59 241 log_discard(packet, 'ipif', saddr, daddr, 'link-local')
4f991c0c
IJ
242 return
243 self._router(packet, saddr, daddr)
040ff511
IJ
244 def processEnded(self, status):
245 status.raiseException()
246
247def start_ipif(command, router):
040ff511
IJ
248 ipif = _IpifProcessProtocol(router)
249 reactor.spawnProcess(ipif,
250 '/bin/sh',['sh','-xc', command],
ff613365
IJ
251 childFDs={0:'w', 1:'r', 2:2},
252 env=None)
909e0ff3 253 return ipif
040ff511 254
909e0ff3 255def queue_inbound(ipif, packet):
15407d80 256 log_debug(DBG.FLOW, "queue_inbound", d=packet)
040ff511
IJ
257 ipif.transport.write(slip.delimiter)
258 ipif.transport.write(slip.encode(packet))
259 ipif.transport.write(slip.delimiter)
260
650a3251
IJ
261#---------- packet queue ----------
262
263class PacketQueue():
d579a048
IJ
264 def __init__(self, desc, max_queue_time):
265 self._desc = desc
8718b02c 266 assert(desc + '')
650a3251
IJ
267 self._max_queue_time = max_queue_time
268 self._pq = collections.deque() # packets
269
b68c0739 270 def _log(self, dflag, msg, **kwargs):
8c3b6620 271 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
d579a048 272
650a3251 273 def append(self, packet):
8c3b6620 274 self._log(DBG.QUEUE, 'append', d=packet)
650a3251
IJ
275 self._pq.append((time.monotonic(), packet))
276
277 def nonempty(self):
8c3b6620 278 self._log(DBG.QUEUE, 'nonempty ?')
650a3251
IJ
279 while True:
280 try: (queuetime, packet) = self._pq[0]
8c3b6620
IJ
281 except IndexError:
282 self._log(DBG.QUEUE, 'nonempty ? empty.')
283 return False
650a3251
IJ
284
285 age = time.monotonic() - queuetime
84e763c7 286 if age > self._max_queue_time:
650a3251 287 # strip old packets off the front
8c3b6620 288 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
650a3251
IJ
289 self._pq.popleft()
290 continue
291
8c3b6620 292 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
650a3251
IJ
293 return True
294
7b07f0b5
IJ
295 def process(self, sizequery, moredata, max_batch):
296 # sizequery() should return size of batch so far
297 # moredata(s) should add s to batch
8c3b6620 298 self._log(DBG.QUEUE, 'process...')
7b07f0b5
IJ
299 while True:
300 try: (dummy, packet) = self._pq[0]
8c3b6620
IJ
301 except IndexError:
302 self._log(DBG.QUEUE, 'process... empty')
303 break
304
305 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
7b07f0b5
IJ
306
307 encoded = slip.encode(packet)
308 sofar = sizequery()
309
8c3b6620
IJ
310 self._log(DBG.QUEUE_CTRL,
311 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
b68c0739 312 d=encoded)
8c3b6620 313
7b07f0b5
IJ
314 if sofar > 0:
315 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
8c3b6620 316 self._log(DBG.QUEUE_CTRL, 'process... overflow')
7b07f0b5
IJ
317 break
318 moredata(slip.delimiter)
319
320 moredata(encoded)
84e763c7 321 self._pq.popleft()
ae7c7784
IJ
322
323#---------- error handling ----------
324
b68c0739
IJ
325_crashing = False
326
ae7c7784 327def crash(err):
b68c0739
IJ
328 global _crashing
329 _crashing = True
e8ed0029
IJ
330 print('========== CRASH ==========', err,
331 '===========================', file=sys.stderr)
ae7c7784
IJ
332 try: reactor.stop()
333 except twisted.internet.error.ReactorNotRunning: pass
334
335def crash_on_defer(defer):
336 defer.addErrback(lambda err: crash(err))
337
e4006ac4 338def crash_on_critical(event):
ae7c7784
IJ
339 if event.get('log_level') >= LogLevel.critical:
340 crash(twisted.logger.formatEvent(event))
341
87a7c0c7
IJ
342#---------- config processing ----------
343
c7fb640e
IJ
344def _cfg_process_putatives():
345 servers = { }
346 clients = { }
347 # maps from abstract object to canonical name for cs's
87a7c0c7 348
c7fb640e
IJ
349 def putative(cmap, abstract, canoncs):
350 try:
351 current_canoncs = cmap[abstract]
352 except KeyError:
353 pass
354 else:
355 assert(current_canoncs == canoncs)
356 cmap[abstract] = canoncs
357
358 server_pat = r'[-.0-9A-Za-z]+'
359 client_pat = r'[.:0-9a-f]+'
360 server_re = regexp.compile(server_pat)
361 serverclient_re = regexp.compile(server_pat + r' ' + client_pat)
88487243 362
c7fb640e 363 for cs in cfg.sections():
8d374606 364 if cs == 'LIMIT':
c7fb640e
IJ
365 # plan A "[LIMIT]"
366 continue
88487243 367
c7fb640e
IJ
368 try:
369 # plan B "[<client>]" part 1
370 ci = ipaddr(cs)
371 except AddressValueError:
88487243 372
c7fb640e
IJ
373 if server_re.fullmatch(cs):
374 # plan C "[<servername>]"
375 putative(servers, cs, cs)
376 continue
377
378 if serverclient_re.fullmatch(cs):
379 # plan D "[<servername> <client>]" part 1
380 (pss,pcs) = cs.split(' ')
381
8d374606 382 if pcs == 'LIMIT':
c7fb640e
IJ
383 # plan E "[<servername> LIMIT]"
384 continue
385
386 try:
387 # plan D "[<servername> <client>]" part 2
388 ci = ipaddr(pc)
389 except AddressValueError:
390 # plan F "[<some thing we do not understand>]"
391 # well, we ignore this
392 print('warning: ignoring config section %s' % cs, file=sys.stderr)
393 continue
394
395 else: # no AddressValueError
396 # plan D "[<servername> <client]" part 3
397 putative(clients, ci, pcs)
398 putative(servers, pss, pss)
399 continue
400
401 else: # no AddressValueError
402 # plan B "[<client>" part 2
403 putative(clients, ci, cs)
404 continue
405
406 return (servers, clients)
407
74934d63 408def cfg_process_common(c, ss):
c7fb640e
IJ
409 c.mtu = cfg.getint(ss, 'mtu')
410
411def cfg_process_saddrs(c, ss):
412 class ServerAddr():
413 def __init__(self, port, addrspec):
414 self.port = port
415 # also self.addr
416 try:
417 self.addr = ipaddress.IPv4Address(addrspec)
418 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
419 self._inurl = b'%s'
420 except AddressValueError:
421 self.addr = ipaddress.IPv6Address(addrspec)
422 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
423 self._inurl = b'[%s]'
424 def make_endpoint(self):
425 return self._endpointfactory(reactor, self.port, self.addr)
426 def url(self):
427 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
428 if self.port != 80: url += b':%d' % self.port
429 url += b'/'
430 return url
431
432 c.port = cfg.getint(ss,'port')
433 c.saddrs = [ ]
434 for addrspec in cfg.get(ss, 'addrs').split():
435 sa = ServerAddr(c.port, addrspec)
436 c.saddrs.append(sa)
437
438def cfg_process_vnetwork(c, ss):
c7f134ce
IJ
439 c.vnetwork = ipnetwork(cfg.get(ss,'vnetwork'))
440 if c.vnetwork.num_addresses < 3 + 2:
441 raise ValueError('vnetwork needs at least 2^3 addresses')
88487243 442
8d374606 443def cfg_process_vaddr(c, ss):
88487243 444 try:
c7f134ce 445 c.vaddr = cfg.get(ss,'vaddr')
88487243 446 except NoOptionError:
8d374606 447 cfg_process_vnetwork(c, ss)
c7f134ce 448 c.vaddr = next(c.vnetwork.hosts())
88487243 449
c7fb640e
IJ
450def cfg_search_section(key,sections):
451 for section in sections:
452 if cfg.has_option(section, key):
453 return section
8d374606 454 raise NoOptionError(key, repr(sections))
c7fb640e
IJ
455
456def cfg_search(getter,key,sections):
457 section = cfg_search_section(key,sections)
458 return getter(section, key)
459
460def cfg_process_client_limited(cc,ss,sections,key):
461 val = cfg_search(cfg.getint, key, sections)
8d374606 462 lim = cfg_search(cfg.getint, key, ['%s LIMIT' % ss, 'LIMIT'])
c7fb640e
IJ
463 cc.__dict__[key] = min(val,lim)
464
465def cfg_process_client_common(cc,ss,cs,ci):
466 # returns sections to search in, iff password is defined, otherwise None
467 cc.ci = ci
468
8d374606 469 sections = ['%s %s' % (ss,cs),
c7fb640e
IJ
470 cs,
471 ss,
472 'DEFAULT']
473
474 try: pwsection = cfg_search_section('password', sections)
475 except NoOptionError: return None
88487243 476
c7fb640e 477 pw = cfg.get(pwsection, 'password')
c7f134ce 478 cc.password = pw.encode('utf-8')
88487243 479
c7fb640e
IJ
480 cfg_process_client_limited(cc,ss,sections,'target_requests_outstanding')
481 cfg_process_client_limited(cc,ss,sections,'http_timeout')
88487243 482
c7fb640e
IJ
483 return sections
484
8d374606 485def cfg_process_ipif(c, sections, varmap):
c7fb640e
IJ
486 for d, s in varmap:
487 try: v = getattr(c, s)
488 except AttributeError: continue
489 setattr(c, d, v)
490
c7f134ce 491 #print('CFGIPIF',repr((varmap, sections, c.__dict__)),file=sys.stderr)
8d374606 492
c7fb640e
IJ
493 section = cfg_search_section('ipif', sections)
494 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
88487243 495
ae7c7784
IJ
496#---------- startup ----------
497
5510890e 498def common_startup(process_cfg):
c7fb640e
IJ
499 # calls process_cfg(putative_clients, putative_servers)
500
82302bac 501 # ConfigParser hates #-comments after values
c7fb640e 502 trailingcomments_re = regexp.compile(r'#.*')
82302bac 503 cfg.read_string(trailingcomments_re.sub('', defcfg))
cae50358
IJ
504 need_defcfg = True
505
506 def readconfig(pathname, mandatory=True):
507 def log(m, p=pathname):
508 if not DBG.CONFIG in debug_set: return
509 print('DBG.CONFIG: %s: %s' % (m, pathname))
510
511 try:
512 files = os.listdir(pathname)
513
514 except FileNotFoundError:
515 if mandatory: raise
516 log('skipped')
517 return
518
519 except NotADirectoryError:
520 cfg.read(pathname)
521 log('read file')
522 return
523
524 # is a directory
525 log('directory')
526 re = regexp.compile('[^-A-Za-z0-9_]')
527 for f in os.listdir(cdir):
528 if re.search(f): continue
529 subpath = pathname + '/' + f
530 try:
531 os.stat(subpath)
532 except FileNotFoundError:
533 log('entry skipped', subpath)
534 continue
535 cfg.read(subpath)
536 log('entry read', subpath)
537
538 def oc_config(od,os, value, op):
539 nonlocal need_defcfg
540 need_defcfg = False
541 readconfig(value)
2e68eb10 542
9acb0eca
IJ
543 def dfs_less_detailed(dl):
544 return [df for df in DBG.iterconstants() if df <= dl]
545
546 def ds_default(od,os,dl,op):
2e68eb10 547 global debug_set
9acb0eca 548 debug_set = set(dfs_less_detailed(debug_def_detail))
2e68eb10 549
9acb0eca 550 def ds_select(od,os, spec, op):
9acb0eca
IJ
551 for it in spec.split(','):
552
9acb0eca
IJ
553 if it.startswith('-'):
554 mutator = debug_set.discard
555 it = it[1:]
556 else:
557 mutator = debug_set.add
2cf75145
IJ
558
559 if it == '+':
560 dfs = DBG.iterconstants()
561
562 else:
563 if it.endswith('+'):
564 mapper = dfs_less_detailed
565 it = it[0:len(it)-1]
566 else:
567 mapper = lambda x: [x]
568
569 try:
570 dfspec = DBG.lookupByName(it)
571 except ValueError:
572 optparser.error('unknown debug flag %s in --debug-select' % it)
573
574 dfs = mapper(dfspec)
575
576 for df in dfs:
577 mutator(df)
9acb0eca
IJ
578
579 optparser.add_option('-D', '--debug',
2e68eb10
IJ
580 nargs=0,
581 action='callback',
9acb0eca
IJ
582 help='enable default debug (to stdout)',
583 callback= ds_default)
584
585 optparser.add_option('--debug-select',
586 nargs=1,
587 type='string',
2cf75145 588 metavar='[-]DFLAG[+]|[-]+,...',
9acb0eca 589 help=
2cf75145
IJ
590'''enable (`-': disable) each specified DFLAG;
591`+': do same for all "more interesting" DFLAGSs;
592just `+': all DFLAGs.
593 DFLAGS: ''' + ' '.join([df.name for df in DBG.iterconstants()]),
9acb0eca
IJ
594 action='callback',
595 callback= ds_select)
2e68eb10 596
cae50358
IJ
597 optparser.add_option('-c', '--config',
598 nargs=1,
599 type='string',
600 metavar='CONFIGFILE',
601 dest='configfile',
602 action='callback',
603 callback= oc_config)
604
2e68eb10
IJ
605 (opts, args) = optparser.parse_args()
606 if len(args): optparser.error('no non-option arguments please')
607
cae50358
IJ
608 if need_defcfg:
609 readconfig('/etc/hippotat/config', False)
610 readconfig('/etc/hippotat/config.d', False)
9acb0eca 611
c7fb640e 612 try:
8d374606 613 (pss, pcs) = _cfg_process_putatives()
c7fb640e 614 process_cfg(pss, pcs)
5510890e
IJ
615 except (configparser.Error, ValueError):
616 traceback.print_exc(file=sys.stderr)
617 print('\nInvalid configuration, giving up.', file=sys.stderr)
618 sys.exit(12)
619
cae50358 620 #print(repr(debug_set), file=sys.stderr)
2e68eb10 621
8c3b6620 622 log_formatter = twisted.logger.formatEventAsClassicLogText
389236df
IJ
623 stdout_obs = twisted.logger.FileLogObserver(sys.stdout, log_formatter)
624 stderr_obs = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
625 pred = twisted.logger.LogLevelFilterPredicate(LogLevel.error)
b83d422a 626 stdsomething_obs = twisted.logger.FilteringLogObserver(
389236df
IJ
627 stderr_obs, [pred], stdout_obs
628 )
b83d422a
IJ
629 log_observer = twisted.logger.FilteringLogObserver(
630 stdsomething_obs, [LogNotBoringTwisted()]
631 )
632 #log_observer = stdsomething_obs
8c3b6620
IJ
633 twisted.logger.globalLogBeginner.beginLoggingTo(
634 [ log_observer, crash_on_critical ]
635 )
ae7c7784 636
ae7c7784 637def common_run():
b68c0739
IJ
638 log_debug(DBG.INIT, 'entering reactor')
639 if not _crashing: reactor.run()
ae7c7784 640 print('CRASHED (end)', file=sys.stderr)