subprocess."""
pass
-_all_log_modes = ['debug', 'profile']
-_log_mode = os.environ.get('STGIT_SUBPROCESS_LOG', '')
-if _log_mode and not _log_mode in _all_log_modes:
- out.warn(('Unknown log mode "%s" specified in $STGIT_SUBPROCESS_LOG.'
- % _log_mode),
- 'Valid values are: %s' % ', '.join(_all_log_modes))
+def get_log_mode(spec):
+ if not ':' in spec:
+ spec += ':'
+ (log_mode, outfile) = spec.split(':', 1)
+ all_log_modes = ['debug', 'profile']
+ if log_mode and not log_mode in all_log_modes:
+ out.warn(('Unknown log mode "%s" specified in $STGIT_SUBPROCESS_LOG.'
+ % log_mode),
+ 'Valid values are: %s' % ', '.join(all_log_modes))
+ if outfile:
+ f = MessagePrinter(open(outfile, 'a'))
+ else:
+ f = out
+ return (log_mode, f)
+
+(_log_mode, _logfile) = get_log_mode(os.environ.get('STGIT_SUBPROCESS_LOG', ''))
class Run:
exc = RunException
if type(c) != str:
raise Exception, 'Bad command: %r' % (cmd,)
self.__good_retvals = [0]
- self.__env = None
+ self.__env = self.__cwd = None
self.__indata = None
self.__discard_stderr = False
def __log_start(self):
if _log_mode == 'debug':
- out.start('Running subprocess %s' % self.__cmd)
+ _logfile.start('Running subprocess %s' % self.__cmd)
+ if self.__cwd != None:
+ _logfile.info('cwd: %s' % self.__cwd)
+ if self.__env != None:
+ for k in sorted(self.__env.iterkeys()):
+ if k not in os.environ or os.environ[k] != self.__env[k]:
+ _logfile.info('%s: %s' % (k, self.__env[k]))
elif _log_mode == 'profile':
- out.start('Running subprocess %s' % self.__cmd[0])
+ _logfile.start('Running subprocess %s' % self.__cmd)
self.__starttime = datetime.datetime.now()
def __log_end(self, retcode):
if _log_mode == 'debug':
- out.done('return code: %d' % retcode)
+ _logfile.done('return code: %d' % retcode)
elif _log_mode == 'profile':
duration = datetime.datetime.now() - self.__starttime
- out.done('%1.3f s' % (duration.microseconds/1e6 + duration.seconds))
+ _logfile.done('%1.3f s' % (duration.microseconds/1e6
+ + duration.seconds))
def __check_exitcode(self):
+ if self.__good_retvals == None:
+ return
if self.exitcode not in self.__good_retvals:
raise self.exc('%s failed with code %d'
% (self.__cmd[0], self.exitcode))
"""Run with captured IO."""
self.__log_start()
try:
- p = subprocess.Popen(self.__cmd, env = self.__env,
+ p = subprocess.Popen(self.__cmd, env = self.__env, cwd = self.__cwd,
stdin = subprocess.PIPE,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE)
self.__log_end(self.exitcode)
self.__check_exitcode()
return outdata
- def __run_noio(self, exitcode = True):
+ def __run_noio(self):
"""Run without captured IO."""
assert self.__indata == None
self.__log_start()
try:
- p = subprocess.Popen(self.__cmd, env = self.__env)
+ p = subprocess.Popen(self.__cmd, env = self.__env, cwd = self.__cwd)
self.exitcode = p.wait()
except OSError, e:
raise self.exc('%s failed: %s' % (self.__cmd[0], e))
self.__log_end(self.exitcode)
- if exitcode:
- self.__check_exitcode()
+ self.__check_exitcode()
def returns(self, retvals):
self.__good_retvals = retvals
return self
+ def discard_exitcode(self):
+ self.__good_retvals = None
+ return self
def discard_stderr(self, discard = True):
self.__discard_stderr = discard
return self
self.__env = dict(os.environ)
self.__env.update(env)
return self
+ def cwd(self, cwd):
+ self.__cwd = cwd
+ return self
def raw_input(self, indata):
self.__indata = indata
return self
def input_lines(self, lines):
self.__indata = ''.join(['%s\n' % line for line in lines])
return self
+ def input_nulterm(self, lines):
+ self.__indata = ''.join('%s\0' % line for line in lines)
+ return self
def no_output(self):
outdata = self.__run_io()
if outdata:
else:
raise self.exc('%s produced %d lines, expected 1'
% (self.__cmd[0], len(outlines)))
- def run(self, exitcode = True):
+ def run(self):
"""Just run, with no IO redirection."""
- self.__run_noio(exitcode = exitcode)
+ self.__run_noio()
def xargs(self, xargs):
"""Just run, with no IO redirection. The extra arguments are
appended to the command line a few at a time; the command is