#! @PYTHON@ ### ### Convert a directory tree of audio files ### ### (c) 2010 Mark Wooding ### ###----- Licensing notice --------------------------------------------------- ### ### This program is free software; you can redistribute it and/or modify ### it under the terms of the GNU General Public License as published by ### the Free Software Foundation; either version 2 of the License, or ### (at your option) any later version. ### ### This program is distributed in the hope that it will be useful, ### but WITHOUT ANY WARRANTY; without even the implied warranty of ### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ### GNU General Public License for more details. ### ### You should have received a copy of the GNU General Public License ### along with this program; if not, write to the Free Software Foundation, ### Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. ###-------------------------------------------------------------------------- ### External dependencies. ## Language features. from __future__ import with_statement ## Standard Python libraries. import sys as SYS import os as OS import errno as E import time as T import unicodedata as UD import fnmatch as FN import re as RX import shutil as SH import optparse as OP import threading as TH import shlex as L from math import sqrt from contextlib import contextmanager ## eyeD3 tag fettling. import eyeD3 as E3 ## Gstreamer. It picks up command-line arguments -- most notably `--help' -- ## and processes them itself. Of course, its help is completely wrong. This ## kludge is due to Jonas Wagner. _argv, SYS.argv = SYS.argv, [] import gobject as G import gio as GIO import gst as GS SYS.argv = _argv ## Python Imaging. from PIL import Image as I ## Python parsing. import pyparsing as P ###-------------------------------------------------------------------------- ### Special initialization. VERSION = '@VERSION@' ## GLib. G.threads_init() ###-------------------------------------------------------------------------- ### Eyecandy progress reports. def charwidth(s): """ Return the width of S, in characters. Specifically, this is the number of backspace characters required to overprint the string S. If the current encoding for `stdout' appears to be Unicode then do a complicated Unicode thing; otherwise assume that characters take up one cell each. None of this handles tab characters in any kind of useful way. Sorry. """ ## If there's no encoding for stdout then we're doing something stupid. if SYS.stdout.encoding is None: return len(s) ## Turn the string into Unicode so we can hack on it properly. Maybe that ## won't work out, in which case fall back to being stupid. try: u = s.decode(SYS.stdout.encoding) except UnicodeError: return len(s) ## Our main problem is combining characters, but we should also try to ## handle wide (mostly Asian) characters, and zero-width ones. This hack ## is taken mostly from http://www.cl.cam.ac.uk/~mgk25/ucs/wcwidth.c w = 0 for ch in u: cd = ord(ch) if UD.category(ch) in ['Cf', 'Me', 'Mn'] or \ 0x1160 <= cd <= 0x11ff: pass elif UD.east_asian_width(ch) in ['F', 'W']: w += 2 else: w += 1 ## Done. return w class StatusLine (object): """ Maintains a status line containing ephemeral progress information. The status line isn't especially important, but it keeps interactive users amused. There should be only one status line object in your program; otherwise they'll interfere with each other and get confused. The update algorithm (in `set') is fairly careful to do the right thing with long status `lines', and to work properly in an Emacs `shell' buffer. """ def __init__(me): "Initialize the status line." me._last = '' me._lastlen = 0 me.eyecandyp = OS.isatty(SYS.stdout.fileno()) def set(me, line): """ Set the status line contents to LINE, replacing what was there before. This only produces actual output if stdout is interactive. """ n = len(line) ## Eyecandy update. if me.eyecandyp: ## If the old line was longer, we need to clobber its tail, so work out ## what that involves. if n < me._lastlen: b = charwidth(me._last[n:]) pre = '\b'*b + ' '*b else: pre = '' ## Now figure out the length of the common prefix between what we had ## before and what we have now. This reduces the amount of I/O done, ## which keeps network traffic down on SSH links, and keeps down the ## amount of work slow terminal emulators like Emacs have to do. i = 0 m = min(n, me._lastlen) while i < m and line[i] == me._last[i]: i += 1 ## Actually do the output, all in one syscall. b = charwidth(me._last[i:]) SYS.stdout.write(pre + '\b'*b + line[i:]) SYS.stdout.flush() ## Update our idea of what's gone on. me._lastlen = n me._last = line def clear(me): "Clear the status line. Just like set('')." me.set('') def commit(me, line = None): """ Commit the current status line, and maybe the string LINE. If the current status line is nonempty, then commit it to the transcript. If LINE is not None, then commit that to the transcript too. After all of this, we clear the status line to get back to a clean state. """ if me._last: if me.eyecandyp: SYS.stdout.write('\n') else: SYS.stdout.write(me._last + '\n') if line is not None: SYS.stdout.write(line + '\n') me._lastlen = 0 me._last = '' STATUS = StatusLine() def filestatus(file, status): return '%s%s: %s' % (' '*8, OS.path.basename(file), status) class ProgressEyecandy (object): """ Provide amusement while something big and complicated is happening. This is an abstract class. Subclasses must provide a method `progress' returning a pair (CURRENT, MAX) indicating the current progress through the operation. """ def __init__(me, what, silentp = False): """ Initialize a progress meter. WHAT is a prefix string to be written before the progress eyecandy itself. """ me._what = what me._silentp = silentp me._spinner = 0 me._start = T.time() def _fmt_time(me, t): "Format T as a time, in (maybe hours) minutes and seconds." s, t = t % 60, int(t/60) m, h = t % 60, int(t/60) if h > 0: return '%d:%02d:%02d' % (h, m, s) else: return '%02d:%02d' % (m, s) def show(me): "Show the current level of progress." ## If we're not showing pointless frippery, don't bother at all. if not STATUS.eyecandyp: return ## Update the spinner index. me._spinner = (me._spinner + 1)%4 ## Fetch the current progress information. Note that we always fetch ## both the current and maximum levels, because both might change if an ## operation revises its idea of how much work needs doing. cur, max = me.progress() ## If we couldn't get progress information, display something vaguely ## amusing anyway. if cur is None or max is None: STATUS.set('%s %c [unknown progress]' % (me._what, r'/-\|'[me._spinner])) return ## Work out -- well, guess -- the time remaining. if cur: t = T.time() eta = me._fmt_time((t - me._start)*(max - cur)/cur) else: eta = '???' ## Set the status bar. n = 40*cur/max STATUS.set('%s %c [%s%s] %3d%% (%s)' % \ (me._what, r'/-\|'[me._spinner], '='*n, ' '*(40 - n), 100*cur/max, eta)) def done(me, win = True): "Show a completion notice, or a failure if WIN is false." if not win: STATUS.set('%s FAILED!' % me._what) elif not me._silentp: STATUS.set('%s done (%s)' % (me._what, me._fmt_time(T.time() - me._start))) else: return STATUS.commit() ###-------------------------------------------------------------------------- ### Timeout handling. KILLSWITCH = TH.Event() def timeout(t0, t1): T.sleep(t0) KILLSWITCH.set() T.sleep(t1) moan('dying messily due to timeout') OS._exit(3) ###-------------------------------------------------------------------------- ### Parsing utilities. ## Allow hyphens in identifiers. IDCHARS = P.alphanums + '-_' P.Keyword.setDefaultKeywordChars(IDCHARS) ## Some common kinds of tokens. Name = P.Word(IDCHARS) Num = P.Word(P.nums).setParseAction(lambda toks: map(int, toks)) String = P.QuotedString('"', '\\') ## Handy abbreviations for constructed parser elements. def K(k): return P.Keyword(k).suppress() def D(d): return P.Literal(d).suppress() def R(p): return P.ZeroOrMore(p).setParseAction(lambda s, l, t: [t]) O = P.Optional ###-------------------------------------------------------------------------- ### Format identification and conversion. class IdentificationFailure (Exception): pass class FileCategory (object): """ A FileCategory represents a class of files. For example, it's sensible to consider audio, or image files as a category. A file category knows how to recognize member files from MIME content types. """ def __init__(me, name, mime_pats, ident): """ Construct a new category. The PATS are a list of `fnmatch' patterns to be compared with a MIME type. The IDENT is a function which produces an identification object given a file's name and first-guess MIME type. The object is passed to a Format's `check' method to see whether a file needs re-encoding, and to `convert' to assist with the conversion. An identification object must have an attribute `mime' which is a set of possible MIME types accumulated for the object. """ me.name = name me._mime_pats = mime_pats me._ident = ident CATEGORYMAP[name] = me def identify(me, file, mime): """ Attempt to identify FILE, given its apparent MIME type. If identification succeeds, return an identification object which can be used by associated file formats; otherwise return None. """ for p in me._mime_pats: if not FN.fnmatchcase(mime, p): continue try: return me._ident(file, mime) except IdentificationFailure: pass return None class BaseFormat (object): """ A BaseFormat object represents a particular encoding and parameters. The object can verify (the `check' method) whether a particular file matches its requirements, and if necessary (`encode') re-encode a file. Subclasses should define the following methods. check(ID) Answer whether the file identified by ID is acceptable according to the receiver's parameters. convert(MASTER, ID, TARGET) Convert the file MASTER, which has been identified as ID, according to the receiver's parameters, writing the output to TARGET. Subclasses should also provide these attributes. CATEGORY A FileCategory object for the category of files that this format lives within. EXT A file extension to be applied to encoded output files. NAME A user-facing name for the format. PROPS A parser element to parse a property definition. It should produce a pair NAME, VALUE to be stored in a dictionary. Subclasses for different kinds of file may introduce more subclass protocol. """ def fixup(me, path): """Post-encoding fixups.""" pass FORMATMAP = {} CATEGORYMAP = {} def defformat(name, cls): "Define a format NAME using class CLS." if not hasattr(cls, 'NAME'): raise ValueError, 'abstract class' if not hasattr(cls, 'CATEGORY'): raise ValueError, 'no category' FORMATMAP[name] = cls class FormatParser (P.ParserElement): """ Parse a format specifier: format-spec ::= string [format-properties] format-properties ::= `{' format-property (`,' format-property)* `}' The syntax of a format-property is determined by the PROPS attribute on the named format and its superclasses. """ ## We cache the parser elements we generate to avoid enormous consing. CACHE = {} def parseImpl(me, s, loc, actp = True): ## Firstly, determine the format name. loc, r = Name._parse(s, loc, actp) fmt = r[0] ## Look up the format class. try: fcls = FORMATMAP[fmt] except KeyError: raise P.ParseException(s, loc, "Unknown format `%s'" % fmt) ## Fetch the property-list parser from the cache, if possible; else ## construct it. try: pp = me.CACHE[fmt] except KeyError: seen = set() prop = None for c in fcls.mro(): try: p = c.PROPS except AttributeError: continue if p in seen: continue if prop is None: prop = p else: prop |= p seen.add(p) if prop is None: pp = me.CACHE[fmt] = None else: props = P.delimitedList(prop) props.setParseAction(lambda s, l, t: dict(t.asList())) pp = me.CACHE[fmt] = O(D('{') - props - D('}')) ## Parse the properties. if pp is None: pd = {} else: loc, r = pp._parse(s, loc, actp) if r: pd = r[0] else: pd = {} ## Construct the format object and return it. return loc, fcls(**pd) Format = FormatParser() def prop(kw, pval, tag = None): if tag is None: tag = kw if pval is None: p = K(kw) p.setParseAction(lambda s, l, t: (tag, True)) else: p = K(kw) + D('=') + pval p.setParseAction(lambda s, l, t: (tag, t[0])) return p ###-------------------------------------------------------------------------- ### Policies and actions. class Action (object): """ An Action object represents a conversion action to be performed. This class isn't intended to be instantiated directly. It exists to define some protocol common to all Action objects. Action objects have the following attributes. master The name of the master (source) file. target The name of the target (destination) file. PRIORITY The priority of the action, for deciding which of two actions to perform. Higher priorities are more likely to win. Converting an Action to a string describes the action in a simple user-readable manner. The `perform' method actually carries the action out. """ PRIORITY = 0 def __init__(me, master): "Stash the MASTER file name for later." me.master = master def choose(me, him): "Choose either ME or HIM and return one." if him is None or me.PRIORITY > him.PRIORITY: return me else: return him class CopyAction (Action): """ An Action object for simply copying a file. Actually we try to hardlink it first, falling back to a copy later. This is both faster and more efficient with regard to disk space. """ ## Copying is good. Linking is really good, but we can't tell the ## difference at this stage. PRIORITY = 10 def __init__(me, master, targetdir): "Initialize a CopyAction, from MASTER to the TARGETDIR directory." Action.__init__(me, master) me.target = OS.path.join(targetdir, OS.path.basename(master)) def __str__(me): return 'copy/link' def perform(me): "Actually perform a CopyAction." try: STATUS.set(filestatus(me.master, 'link')) OS.link(me.master, me.target) except OSError, err: if err.errno != E.EXDEV: raise STATUS.set(filestatus(me.master, 'copy')) new = me.target + '.new' SH.copyfile(me.master, new) OS.rename(new, me.target) STATUS.commit() class ConvertAction (Action): """ An Action object for converting a file to a given format. Additional attributes: id The identification object for the master file. format The format to which we're meant to conver the master. """ def __init__(me, master, targetdir, id, format): "Initialize a ConvertAction." Action.__init__(me, master) stem, ext = OS.path.splitext(OS.path.basename(master)) me.target = OS.path.join(targetdir, stem + '.' + format.EXT) me.id = id me.format = format def __str__(me): return 'convert to %s' % me.format.NAME def perform(me): "Acually perform a ConvertAction." STATUS.set(filestatus(me.master, me)) me.format.convert(me.master, me.id, me.target) Policy = P.Forward() class FormatPolicy (object): """ A FormatPolicy object represents a set of rules for how to convert files. Given a master file, the FormatPolicy will identify it and return a list of actions to be performed. The methods required of a FormatPolicy are: setcategory(CAT) Store CAT as the policy's category. Check that this is consistent with the policy as stored. actions(MASTER, TARGETDIR, ID, COHORT) Given a MASTER file, identified as ID, a target directory TARGETDIR, and a list COHORT of (FILE, ID) pairs for other files of the same category in the same directory, return a list of actions to be performed to get the target directory into the right form. The list might be empty if the policy object /rejects/ the file. """ class AndPolicy (FormatPolicy): """ A FormatPolicy which does the union of a bunch of other policies. Each subsidiary policy is invoked in turn. The highest-priority action for each target file is returned. """ def __init__(me, policies): me._policies = policies def setcategory(me, cat): me.cat = cat for p in me._policies: p.setcategory(cat) def actions(me, master, targetdir, id, cohort): tmap = {} for p in me._policies: for a in p.actions(master, targetdir, id, cohort): if a.target in tmap: tmap[a.target] = a.choose(tmap.get(a.target)) else: tmap[a.target] = a return tmap.values() And = K('and') - D('{') - R(Policy) - D('}') And.setParseAction(lambda s, l, t: AndPolicy(t[0])) class OrPolicy (FormatPolicy): """ A FormatPolicy which tries other policies and uses the first that accepts. Each subsidiary policy is invoked in turn. If any accepts, the actions it proposes are turned and no further policies are invoked. If none accepts then the file is rejected. """ def __init__(me, policies): me._policies = policies def setcategory(me, cat): me.cat = cat for p in me._policies: p.setcategory(cat) def actions(me, master, targetdir, id, cohort): for p in me._policies: aa = p.actions(master, targetdir, id, cohort) if aa: return aa else: return [] Or = K('or') - D('{') - R(Policy) - D('}') Or.setParseAction(lambda s, l, t: OrPolicy(t[0])) class AcceptPolicy (FormatPolicy): """ A FormatPolicy which copies files in a particular format. If all of the files in a cohort are recognized as being in a particular format (including this one), then accept it with a CopyAction; otherwise reject. """ def __init__(me, format): me._format = format def setcategory(me, cat): if me._format.CATEGORY is not cat: raise ValueError, \ "Accept format `%s' has category `%s', not `%s'" % \ (me._format.__class__.__name__, me._format.CATEGORY.name, cat.name) me.cat = cat def actions(me, master, targetdir, id, cohort): if me._format.check(id) and \ all(me._format.check(cid) for f, cid in cohort): return [CopyAction(master, targetdir)] else: return [] Accept = K('accept') - Format Accept.setParseAction(lambda s, l, t: AcceptPolicy(t[0])) class ConvertPolicy (FormatPolicy): """ A FormatPolicy which copies files in a particular format or converts if necessary. """ def __init__(me, format): me._format = format def setcategory(me, cat): if me._format.CATEGORY is not cat: raise ValueError, \ "Accept format `%s' has category `%s', not `%s'" % \ (me._format.__class__.__name__, me._format.CATEGORY.name, cat.name) me.cat = cat def actions(me, master, targetdir, id, cohort): if me._format.check(id): return [CopyAction(master, targetdir)] else: return [ConvertAction(master, targetdir, id, me._format)] Convert = K('convert') - Format Convert.setParseAction(lambda s, l, t: ConvertPolicy(t[0])) Policy << (And | Or | Accept | Convert) ###-------------------------------------------------------------------------- ### Audio handling, based on GStreamer. def make_element(factory, name = None, **props): "Return a new element from the FACTORY with the given NAME and PROPS." elt = GS.element_factory_make(factory, name) elt.set_properties(**props) return elt class GStreamerProgressEyecandy (ProgressEyecandy): """ Provide amusement while GStreamer is busy doing something. The GStreamerProgressEyecandy object is a context manager. Wrap it round your GStreamer loop to provide progress information for an operation. """ def __init__(me, what, elt, **kw): """ Initialize a progress meter. WHAT is a prefix string to be written before the progress eyecandy itself. ELT is a GStreamer element to interrogate to find the progress information. """ me._elt = elt ProgressEyecandy.__init__(me, what, **kw) def _update(me): "Called by GLib main event loop to update the eyecandy." me.show() return True def _timer(me): """ Update the progress meter. This is called periodically by the GLib main event-processing loop. """ me.show() return True def progress(me): "Return the current progress as a pair (CURRENT, MAX)." ## Fetch the current progress information. We get the duration each ## time, because (particularly with VBR-encoded MP3 inputs) the estimated ## duration can change as we progress. Hopefully it settles down fairly ## soon. try: t, hunoz = me._elt.query_position(GS.FORMAT_TIME) end, hukairz = me._elt.query_duration(GS.FORMAT_TIME) return t, end except GS.QueryError: return None, None def __enter__(me): "Enter context: attach progress meter display." ## If we're not showing pointless frippery, don't bother at all. if not STATUS.eyecandyp: return ## Update regularly. The pipeline runs asynchronously. me._id = G.timeout_add(200, me._update) def __exit__(me, ty, val, tb): "Leave context: remove display and report completion or failure." ## If we're not showing pointless frippery, there's nothing to remove. if STATUS.eyecandyp: G.source_remove(me._id) ## Report completion anyway. me.done(ty is None) ## As you were. return False class AudioIdentifier (object): """ Analyses and identifies an audio file. Important properties are: cap A capabilities structure describing the audio file data. The most interesting thing in here is probably its name, which is a MIME type describing the data. dcap A capabilities structure describing the decoded audio data. This is of interest during conversion. tags A dictionary containing metadata tags from the file. These are in GStreamer's encoding-independent format. bitrate An approximation to the stream's bitrate, in kilobits per second. This might be slow to work out for some files so it's computed on demand. """ def __init__(me, file, mime): "Initialize the object suitably for identifying FILE." ## Make some initial GStreamer objects. We'll want the pipeline later if ## we need to analyse a poorly tagged MP3 stream, so save it away. me._pipe = GS.Pipeline() me._file = file bus = me._pipe.get_bus() bus.add_signal_watch() loop = G.MainLoop() ## The basic recognition kit is based around `decodebin'. We must keep ## it happy by giving it sinks for the streams it's found, which it ## announces asynchronously. source = make_element('filesrc', 'file', location = file) decoder = make_element('decodebin', 'decode') sink = make_element('fakesink') def decoder_pad_arrived(elt, pad): if pad.get_caps()[0].get_name().startswith('audio/'): elt.link_pads(pad.get_name(), sink, 'sink') dpaid = decoder.connect('pad-added', decoder_pad_arrived) me._pipe.add(source, decoder, sink) GS.element_link_many(source, decoder) ## Arrange to collect tags from the pipeline's bus as they're reported. ## If we reuse the pipeline later, we'll want different bus-message ## handling, so make sure we can take the signal handler away. tags = {} fail = [] def bus_message(bus, msg): if msg.type == GS.MESSAGE_ERROR: fail[:] = (ValueError, msg.structure['debug'], None) loop.quit() elif msg.type == GS.MESSAGE_STATE_CHANGED: if msg.structure['new-state'] == GS.STATE_PAUSED and \ msg.src == me._pipe: loop.quit() elif msg.type == GS.MESSAGE_TAG: tags.update(msg.structure) bmid = bus.connect('message', bus_message) ## We want to identify the kind of stream this is. (Hmm. The MIME type ## recognizer has already done this work, but GStreamer is probably more ## reliable.) The `decodebin' has a `typefind' element inside which will ## announce the identified media type. All we need to do is find it and ## attach a signal handler. (Note that the handler might be run in the ## thread context of the pipeline element, but Python's GIL will keep ## things from being too awful.) me.cap = None me.dcap = None for e in decoder.elements(): if e.get_factory().get_name() == 'typefind': tfelt = e break else: assert False, 'failed to find typefind element' ## Crank up most of the heavy machinery. The message handler will stop ## the loop when things seem to be sufficiently well underway. me._pipe.set_state(GS.STATE_PAUSED) loop.run() bus.disconnect(bmid) decoder.disconnect(dpaid) if fail: me._pipe.set_state(GS.STATE_NULL) raise fail[0], fail[1], fail[2] ## Store the collected tags. me.tags = tags ## Gather the capabilities. The `typefind' element knows the input data ## type. The 'decodebin' knows the raw data type. me.cap = tfelt.get_pad('src').get_negotiated_caps()[0] me.mime = set([mime, me.cap.get_name()]) me.dcap = sink.get_pad('sink').get_negotiated_caps()[0] ## If we found a plausible bitrate then stash it. Otherwise note that we ## failed. If anybody asks then we'll work it out then. if 'nominal-bitrate' in tags: me._bitrate = tags['nominal-bitrate']/1000 elif 'bitrate' in tags and tags['bitrate'] >= 80000: me._bitrate = tags['bitrate']/1000 else: me._bitrate = None ## The bitrate computation wants the file size. Ideally we'd want the ## total size of the frames' contents, but that seems hard to dredge ## out. If the framing overhead is small, this should be close enough ## for our purposes. me._bytes = OS.stat(file).st_size def __del__(me): "Close the pipeline down so we don't leak file descriptors." me._pipe.set_state(GS.STATE_NULL) @property def bitrate(me): """ Return the approximate bit-rate of the input file. This might take a while if we have to work it out the hard way. """ ## If we already know the answer then just return it. if me._bitrate is not None: return me._bitrate ## Make up a new main loop. loop = G.MainLoop() ## Watch for bus messages. We'll stop when we reach the end of the ## stream: then we'll have a clear idea of how long the track was. fail = [] def bus_message(bus, msg): if msg.type == GS.MESSAGE_ERROR: fail[:] = (ValueError, msg.structure['debug'], None) loop.quit() elif msg.type == GS.MESSAGE_EOS: loop.quit() bus = me._pipe.get_bus() bmid = bus.connect('message', bus_message) ## Get everything moving, and keep the user amused while we work. me._pipe.set_state(GS.STATE_PLAYING) with GStreamerProgressEyecandy(filestatus(file, 'measure bitrate') % me._pipe, silentp = True): loop.run() bus.disconnect(bmid) if fail: me._pipe.set_state(GS.STATE_NULL) raise fail[0], fail[1], fail[2] ## Now we should be able to find out our position accurately and work out ## a bitrate. Cache it in case anybody asks again. t, hukairz = me._pipe.query_position(GS.FORMAT_TIME) me._bitrate = int(8*me._bytes*1e6/t) ## Done. return me._bitrate class AudioFormat (BaseFormat): """ An AudioFormat is a kind of Format specialized for audio files. Format checks are done on an AudioIdentifier object. """ PROPS = prop('bitrate', Num) ## libmagic reports `application/ogg' for Ogg Vorbis files. We've switched ## to GIO now, which reports either `audio/ogg' or `audio/x-vorbis+ogg' ## depending on how thorough it's trying to be. Still, it doesn't do any ## harm here; the main risk is picking up Ogg Theora files by accident, and ## we'll probably be able to extract the audio from them anyway. CATEGORY = FileCategory('audio', ['audio/*', 'application/ogg'], AudioIdentifier) def __init__(me, bitrate = None): "Construct an object, requiring an approximate bitrate." me.bitrate = bitrate def check(me, id): """ Return whether the AudioIdentifier ID is suitable for our purposes. Subclasses can either override this method or provide a property `MIMETYPES', which is a list (other thing that implements `__contains__') of GStreamer MIME types matching this format. """ return id.mime & me.MIMETYPES and \ (me.bitrate is None or id.bitrate <= me.bitrate * sqrt(2)) def encoder(me): """ Constructs a GStreamer element to encode audio input. Subclasses can either override this method (or replace `encode' entirely), or provide a method `encoder_chain' which returns a list of elements to be linked together in sequence. The first element in the chain must have a pad named `sink' and the last must have a pad named `src'. """ elts = me.encoder_chain() bin = GS.Bin() bin.add(*elts) GS.element_link_many(*elts) bin.add_pad(GS.GhostPad('sink', elts[0].get_pad('sink'))) bin.add_pad(GS.GhostPad('src', elts[-1].get_pad('src'))) return bin def convert(me, master, id, target): """ Encode audio from MASTER, already identified as ID, writing it to TARGET. See `encoder' for subclasses' responsibilities. """ ## Construct the necessary equipment. pipe = GS.Pipeline() bus = pipe.get_bus() bus.add_signal_watch() loop = G.MainLoop() ## Make sure that there isn't anything in the way of our output. We're ## going to write to a scratch file so that we don't get confused by ## half-written rubbish left by a crashed program. new = target + '.new' try: OS.unlink(new) except OSError, err: if err.errno != E.ENOENT: raise ## Piece together our pipeline. The annoying part is that the ## `decodebin' doesn't have any source pads yet, so our chain is in two ## halves for now. source = make_element('filesrc', 'source', location = master) decoder = make_element('decodebin', 'decode') convert = make_element('audioconvert', 'convert') encoder = me.encoder() sink = make_element('filesink', 'sink', location = new) pipe.add(source, decoder, convert, encoder, sink) GS.element_link_many(source, decoder) GS.element_link_many(convert, encoder, sink) ## Some decoders (e.g., the AC3 decoder) include channel-position ## indicators in their output caps. The Vorbis encoder interferes with ## this, and you end up with a beautifully encoded mono signal from a ## stereo source. From a quick butchers at the `vorbisenc' source, I ## /think/ that this is only a problem with stereo signals: mono signals ## are mono already, and `vorbisenc' accepts channel positions if there ## are more than two channels. ## ## So we have this bodge. We already collected the decoded audio caps ## during identification. So if we see 2-channel audio with channel ## positions, we strip the positions off forcibly by adding a filter. if id.dcap.get_name().startswith('audio/x-raw-') and \ id.dcap.has_field('channels') and \ id.dcap['channels'] == 2 and \ id.dcap.has_field('channel-positions'): dcap = GS.Caps() c = id.dcap.copy() c.remove_field('channel-positions') dcap.append(c) else: dcap = None ## Hook onto the `decodebin' so we can link together the two halves of ## our encoding chain. For now, we'll hope that there's only one audio ## stream in there, and just throw everything else away. def decoder_pad_arrived(elt, pad): if pad.get_caps()[0].get_name().startswith('audio/'): if dcap: elt.link_pads_filtered(pad.get_name(), convert, 'sink', dcap) else: elt.link_pads(pad.get_name(), convert, 'sink') decoder.connect('pad-added', decoder_pad_arrived) ## Watch the bus for completion messages. fail = [] def bus_message(bus, msg): if msg.type == GS.MESSAGE_ERROR: fail[:] = (ValueError, msg.structure['debug'], None) loop.quit() elif msg.type == GS.MESSAGE_EOS: loop.quit() bmid = bus.connect('message', bus_message) ## Get everything ready and let it go. pipe.set_state(GS.STATE_PLAYING) with GStreamerProgressEyecandy(filestatus(master, 'convert to %s' % me.NAME), pipe): loop.run() pipe.set_state(GS.STATE_NULL) if fail: raise fail[0], fail[1], fail[2] ## Fix up the output file if we have to. me.fixup(new) ## We're done. OS.rename(new, target) class OggVorbisFormat (AudioFormat): "AudioFormat object for Ogg Vorbis." ## From https://en.wikipedia.org/wiki/Vorbis QMAP = [(-1, 45), ( 0, 64), ( 1, 80), ( 2, 96), ( 3, 112), ( 4, 128), ( 5, 160), ( 6, 192), ( 7, 224), ( 8, 256), ( 9, 320), (10, 500)] NAME = 'Ogg Vorbis' MIMETYPES = set(['application/ogg', 'audio/x-vorbis', 'audio/ogg', 'audio/x-vorbis+ogg']) EXT = 'ogg' def encoder_chain(me): encprops = {} if me.bitrate is not None: for q, br in me.QMAP: if br >= me.bitrate: break else: raise ValueError, 'no suitable quality setting found' encprops['quality'] = q/10.0 return [make_element('vorbisenc', **encprops), make_element('oggmux')] defformat('ogg-vorbis', OggVorbisFormat) class MP3Format (AudioFormat): "AudioFormat object for MP3." NAME = 'MP3' MIMETYPES = set(['audio/mpeg']) EXT = 'mp3' def encoder_chain(me): encprops = {} if me.bitrate is not None: encprops['vbr_mean_bitrate'] = me.bitrate return [make_element('lame', vbr = 4, **encprops), make_element('xingmux'), make_element('id3v2mux')] def fixup(me, path): """ Fix up MP3 files. GStreamer produces ID3v2 tags, but not ID3v1. This seems unnecessarily unkind to stupid players. """ tag = E3.Tag() tag.link(path) tag.setTextEncoding(E3.UTF_8_ENCODING) try: tag.update(E3.ID3_V1_1) except (UnicodeEncodeError, E3.tag.GenreException): pass defformat('mp3', MP3Format) ###-------------------------------------------------------------------------- ### Image handling, based on the Python Imaging Library. class ImageIdentifier (object): """ Analyses and identifies an image file. Simply leaves an Image object in the `img' property which can be inspected. """ def __init__(me, file, mime): ## Get PIL to open the file. It will magically work out what kind of ## file it is. try: me.img = I.open(file) except IOError, exc: ## Unhelpful thing to raise on identification failure. We can ## distinguish this from an actual I/O error because it doesn't have an ## `errno'. if exc.errno is None: raise IdentificationFailure raise me.mime = set([mime]) class ImageFormat (BaseFormat): """ An ImageFormat is a kind of Format specialized for image files. Subclasses don't need to provide anything other than the properties required by all concrete Format subclasses. However, there is a requirement that the `NAME' property match PIL's `format' name for the format. """ PROPS = prop('size', Num) CATEGORY = FileCategory('image', ['image/*'], ImageIdentifier) def __init__(me, size = None, **kw): """ Initialize an ImageFormat object. Additional keywords are used when encoding, and may be recognized by enhanced `check' methods in subclasses. """ me._size = size me._props = kw def check(me, id): "Check whether the ImageIdentifier ID matches our requirements." return id.img.format == me.NAME and \ (me._size is None or (id.img.size[0] <= me._size and id.img.size[1] <= me._size)) def convert(me, master, id, target): "Encode the file MASTER, identified as ID, writing the result to TARGET." ## Write to a scratch file. new = target + '.new' ## The ImageIdentifier already contains a copy of the open file. It ## would be wasteful not to use it. img = id.img STATUS.set(filestatus(master, 'convert to %s' % me.NAME)) ## If there's a stated maximum size then scale the image down to match. ## But thumbnailing clobbers the original, so take a copy. if me._size is not None and \ (img.size[0] > me._size or img.size[1] > me._size): img = img.copy() img.thumbnail((me._size, me._size), I.ANTIALIAS) ## Write the output image. img.save(new, me.NAME, **me._props) ## Fix it up if necessary. me.fixup(new) ## We're done. OS.rename(new, target) STATUS.commit() class JPEGFormat (ImageFormat): """ Image format for JPEG (actually JFIF) files. Interesting properties to set: optimize If present, take a second pass to select optimal encoder settings. progressive If present, make a progressive file. quality Integer from 1--100 (worst to best); default is 75. """ EXT = 'jpg' NAME = 'JPEG' PROPS = prop('optimize', None) \ | prop('progressive', None, 'progression') \ | prop('quality', Num) defformat('jpeg', JPEGFormat) class PNGFormat (ImageFormat): """ Image format for PNG files. Interesting properties: optimize If present, make a special effort to minimize the output file. """ EXT = 'png' NAME = 'PNG' PROPS = prop('optimize', None) defformat('png', PNGFormat) class BMPFormat (ImageFormat): """ Image format for Windows BMP files, as used by RockBox. No additional properties. """ NAME = 'BMP' EXT = 'bmp' defformat('bmp', BMPFormat) ###-------------------------------------------------------------------------- ### Remaining parsing machinery. Type = K('type') - Name - D('{') - R(Policy) - D('}') def build_type(s, l, t): try: cat = CATEGORYMAP[t[0]] except KeyError: raise P.ParseException(s, loc, "Unknown category `%s'" % t[0]) pols = t[1] if len(pols) == 1: pol = pols[0] else: pol = AndPolicy(pols) pol.setcategory(cat) return pol Type.setParseAction(build_type) TARGETS = [] class TargetJob (object): def __init__(me, targetdir, policies): me.targetdir = targetdir me.policies = policies def perform(me): TARGETS.append(me) Target = K('target') - String - D('{') - R(Type) - D('}') def build_target(s, l, t): return TargetJob(t[0], t[1]) Target.setParseAction(build_target) VARS = { 'master': None } class VarsJob (object): def __init__(me, vars): me.vars = vars def perform(me): for k, v in me.vars: VARS[k] = v Var = prop('master', String) Vars = K('vars') - D('{') - R(Var) - D('}') def build_vars(s, l, t): return VarsJob(t[0]) Vars.setParseAction(build_vars) TopLevel = Vars | Target Config = R(TopLevel) Config.ignore(P.pythonStyleComment) ###-------------------------------------------------------------------------- ### The directory grobbler. def grobble(master, targets, noact = False): """ Work through the MASTER directory, writing converted files to TARGETS. The TARGETS are a list of `TargetJob' objects, each describing a target directory and a policy to apply to it. If NOACT is true, then don't actually do anything permanent to the filesystem. """ ## Transform the targets into a more convenient data structure. tpolmap = [] for t in targets: pmap = {} tpolmap.append(pmap) for p in t.policies: pmap.setdefault(p.cat, []).append(p) ## Keep track of the current position in the master tree. dirs = [] ## And the files which haven't worked. broken = [] def grobble_file(master, pmap, targetdir, cohorts): ## Convert MASTER, writing the result to TARGETDIR. ## ## The COHORTS are actually (CAT, ID, COHORT) triples, where a COHORT is ## a list of (FILENAME, ID) pairs. ## ## Since this function might convert the MASTER file, the caller doesn't ## know the name of the output files, so we return then as a list. done = set() st_m = OS.stat(master) ## Work through each category listed and apply its policy. for cat, id, cohort in cohorts: ## Go through the category's policies and see if any match. If we fail ## here, see if there are more categories to try. for pol in pmap[cat]: acts = pol.actions(master, targetdir, id, cohort) if acts: break else: continue ## Work through the targets one by one. for a in acts: done.add(a.target) ## Find out whether the target file already exists and is up-to-date ## with respect to the master. (Caution here with low-resolution ## timestamps.) If it's OK, then just move on. try: st_t = OS.stat(a.target) if st_m.st_mtime < st_t.st_mtime or \ (st_m.st_ino, st_m.st_dev) == (st_t.st_ino, st_t.st_dev): continue except OSError, err: if err.errno not in (E.ENOENT, E.ENOTDIR): raise ## We have real work to do. If there's a current status message, ## it's the containing directory so flush it so that people know ## where we are. STATUS.commit() ## Remove the target. (A hardlink will fail if the target already ## exists.) if not noact: try: OS.unlink(a.target) except OSError, err: if err.errno not in (E.ENOENT, E.ENOTDIR): raise ## Do whatever it is we decided to do. if noact: STATUS.commit(filestatus(master, a)) else: a.perform() ## We're done. Return the names of the targets. return list(done) @contextmanager def wrap(masterfile): ## Handle exceptions found while trying to convert a particular file or ## directory. try: yield masterfile ## Something bad happened. Report the error, but continue. (This list ## of exceptions needs a lot of work.) except (IOError, OSError), exc: STATUS.clear() STATUS.commit(filestatus(masterfile, 'failed (%s)' % exc)) broken.append((masterfile, exc)) def grobble_dir(master, targets): ## Recursively convert files in MASTER, writing them to the TARGETS. ## Keep track of the subdirectories we encounter, because we'll need to ## do all of those in one go at the end. subdirs = set() ## Work through each target directory in turn. for target, pmap in zip(targets, tpolmap): ## Make sure the TARGET exists and is a directory. It's a fundamental ## assumption of this program that the entire TARGET tree is ## disposable, so if something exists but isn't a directory, we should ## kill it. if OS.path.isdir(target): pass else: if OS.path.exists(target): STATUS.commit(filestatus(target, 'clear nondirectory')) if not noact: OS.unlink(target) STATUS.commit(filestatus(target, 'create directory')) if not noact: OS.mkdir(target) ## Keep a list of things in the target. As we convert files, we'll ## check them off. Anything left over is rubbish and needs to be ## deleted. checklist = {} try: for i in OS.listdir(target): checklist[i] = False except OSError, err: if err.errno not in (E.ENOENT, E.ENOTDIR): raise ## Keep track of the files in each category. catmap = {} todo = [] done = [] ## Work through the master files. for f in sorted(OS.listdir(master)): ## If the killswitch has been pulled then stop. The whole idea is ## that we want to cause a clean shutdown if possible, so we don't ## want to do it in the middle of encoding because the encoding ## effort will have been wasted. This is the only place we need to ## check. If we've exited the loop, then clearing old files will ## probably be fast, and we'll either end up here when the recursive ## call returns or we'll be in the same boat as before, clearing old ## files, only up a level. If worst comes to worst, we'll be killed ## forcibly somewhere inside `SH.rmtree', and that can continue where ## it left off. if KILLSWITCH.is_set(): return ## Do something with the file. with wrap(OS.path.join(master, f)) as masterfile: ## If it's a directory then prepare to grobble it recursively, but ## don't do that yet. if OS.path.isdir(masterfile): subdirs.add(f) done.append(OS.path.join(target, f)) ## Otherwise it's a file. Work out what kind, and stash it under ## the appropriate categories. Later, we'll apply policy to the ## files, by category, and work out what to do with them all. else: gf = GIO.File(masterfile) mime = gf.query_info('standard::content-type').get_content_type() cats = [] for cat in pmap.iterkeys(): id = cat.identify(masterfile, mime) if id is None: continue catmap.setdefault(cat, []).append((masterfile, id)) cats.append((cat, id)) if not cats: catmap.setdefault(None, []).append((masterfile, id)) todo.append((masterfile, cats)) ## Work through the categorized files to see what actions to do for ## them. for masterfile, cats in todo: with wrap(masterfile): done += grobble_file(masterfile, pmap, target, [(cat, id, catmap[cat]) for cat, id in cats]) ## Check the results off the list so that we don't clear it later. for f in done: checklist[OS.path.basename(f)] = True ## Maybe there's stuff in the target which isn't accounted for. Delete ## it: either the master has changed, or the policy for this target has ## changed. Either way, the old files aren't wanted. for f in checklist: if not checklist[f]: STATUS.commit(filestatus(f, 'clear bogus file')) if not noact: bogus = OS.path.join(target, f) try: if OS.path.isdir(bogus): SH.rmtree(bogus) else: OS.unlink(bogus) except OSError, err: if err.errno != E.ENOENT: raise ## If there are subdirectories which want processing then do those. ## Keep the user amused by telling him where we are in the tree. for d in sorted(subdirs): dirs.append(d) STATUS.set('/'.join(dirs)) with wrap(OS.path.join(master, d)) as masterdir: try: grobble_dir(masterdir, [OS.path.join(target, d) for target in targets]) finally: dirs.pop() STATUS.set('/'.join(dirs)) ## Right. We're ready to go. grobble_dir(master, [t.targetdir for t in targets]) return broken ###-------------------------------------------------------------------------- ### Command-line interface. QUIS = OS.path.basename(SYS.argv[0]) def moan(msg): "Report a warning message to the user." SYS.stderr.write('%s: %s\n' % (QUIS, msg)) def die(msg): "Report a fatal error message to the user." moan(msg) SYS.exit(1) def parse_opts(args): """ Parse command-line arguments in ARGS. Returns a Grobbler object and the MASTER and TARGET directories to be grobbled. """ ## Build the option parser object. op = OP.OptionParser(prog = QUIS, version = VERSION, usage = '%prog [-in] [-t TIMEOUT] [-T TIMEOUT] ' 'CONFIG', description = """\ Convert a directory tree of files according to the configuration file CONFIG. """) ## Timeout handling. def cb_time(opt, ostr, arg, op): m = RX.match(r'\s*(\d+)\s*([dhms]?)\s*', arg) if not m: raise OP.OptionValueerror, 'bad time value `%s\'' % arg t, u = m.groups() t = int(t) * { '': 1, 's': 1, 'm': 60, 'h': 3600, 'd': 86400 }[u] setattr(op.values, opt.dest, t) op.add_option('-t', '--timeout', type = 'string', metavar = 'SECS', dest = 'timeout', help = 'stop processing nicely after SECS', action = 'callback', callback = cb_time) op.add_option('-T', '--timeout-nasty', type = 'string', metavar = 'SECS', dest = 'timeout_nasty', help = 'stop processing unpleasantly after further SECS', action = 'callback', callback = cb_time) ## Other options. op.add_option('-i', '--interactive', action = 'store_true', dest = 'tty', help = 'provide progress information') op.add_option('-n', '--no-act', action = 'store_true', dest = 'noact', help = 'don\'t actually modify the filesystem') ## Ready to rock. op.set_defaults(formats = [], noact = False, timeout = None, timeout_nasty = 300) opts, args = op.parse_args(args) ## Check that we got the non-option arguments that we want. if len(args) != 1: op.error('wrong number of arguments') ## Act on the options. if opts.tty: STATUS.eyecandyp = True if opts.timeout is not None: to = TH.Thread(target = timeout, args = (opts.timeout, opts.timeout_nasty)) to.daemon = True to.start() ## Parse the configuration file. with open(args[0]) as conf: jobs, = Config.parseFile(conf, True) for j in jobs: j.perform() return opts if __name__ == '__main__': opts = parse_opts(SYS.argv[1:]) if 'master' not in VARS: die("no master directory set") broken = grobble(VARS['master'], TARGETS, opts.noact) if broken: moan('failed to convert some files:') for file, exc in broken: moan('%s: %s' % (file, exc)) SYS.exit(1) ## This is basically a successful completion: we did what we were asked to ## do. It seems polite to report a message, though. ## ## Why don't we have a nonzero exit status? The idea would be that a ## calling script would be interested that we used up all of our time, and ## not attempt to convert some other directory as well. But that doesn't ## quite work. Such a script would need to account correctly for time we ## had spent even if we complete successfully. And if the script is having ## to watch the clock itself, it can do that without our help here. if KILLSWITCH.is_set(): moan('killed by timeout') ###----- That's all, folks --------------------------------------------------