#! /usr/bin/python3 ### -*- mode: python; coding: utf-8 -*- from contextlib import contextmanager import os as OS import re as RX import sys as SYS class ExpectedError (Exception): pass @contextmanager def location(loc): global LOC old, LOC = LOC, loc yield loc LOC = old def filter(value, func = None, dflt = None): if value is None: return dflt elif func is None: return value else: return func(value) def check(cond, msg): if not cond: raise ExpectedError(msg) def getint(s): if not s.isdigit(): raise ExpectedError("bad integer `%s'" % s) return int(s) class Words (object): def __init__(me, s): me._s = s me._i, me._n = 0, len(s) def _wordstart(me): s, i, n = me._s, me._i, me._n while i < n: if not s[i].isspace(): return i i += 1 return -1 def nextword(me): s, n = me._s, me._n begin = i = me._wordstart() if begin < 0: return None while i < n and not s[i].isspace(): i += 1 me._i = i return s[begin:i] def rest(me): s, n = me._s, me._n begin = me._wordstart() if begin < 0: return None else: return s[begin:].rstrip() URL_SAFE_P = 256*[False] for ch in \ b"ABCDEFGHIJKLMNOPQRSTUVWXYZ" \ b"abcdefghijklmnopqrstuvwxyz" \ b"0123456789" b"!$%-.,/": URL_SAFE_P[ch] = True def urlencode(s): return "".join((URL_SAFE_P[ch] and chr(ch) or "%%%02x" % ch for ch in s.encode("UTF-8"))) PROG = OS.path.basename(SYS.argv[0]) class BaseLocation (object): def report(me, exc): SYS.stderr.write("%s: %s%s\n" % (PROG, me._loc(), exc)) class DummyLocation (BaseLocation): def _loc(me): return "" class FileLocation (BaseLocation): def __init__(me, fn, lno = 1): me._fn, me._lno = fn, lno def _loc(me): return "%s:%d: " % (me._fn, me._lno) def stepline(me): me._lno += 1 LOC = DummyLocation() class Source (object): PREFIX = "" TITLEP = CHAPTERP = False def __init__(me, fn): me.fn = fn me.neps = None me.used_titles = dict() me.used_chapters = set() me.nuses = 0 def url(me, title = None, chapter = None): if title is None: if me.TITLEP: raise ExpectedError("missing title number") if chapter is not None: raise ExpectedError("can't specify chapter without title") suffix = "" elif not me.TITLEP: raise ExpectedError("can't specify title with `%s'" % me.fn) elif chapter is None: suffix = "#%d" % title elif not me.CHAPTERP: raise ExpectedError("can't specify chapter with `%s'" % me.fn) else: suffix = "#%d:%d-%d:%d" % (title, chapter, title, chapter) if chapter is not None: key, set = (title, chapter), me.used_chapters else: key, set = title, me.used_titles if key in set: if title is None: raise ExpectedError("`%s' already used" % me.fn) elif chapter is None: raise ExpectedError("`%s' title %d already used" % (me.fn, title)) else: raise ExpectedError("`%s' title %d chapter %d already used" % (me.fn, title, chapter)) if chapter is not None: me.used_chapters.add((title, chapter)) return me.PREFIX + ROOT + urlencode(me.fn) + suffix class VideoDisc (Source): PREFIX = "dvd://" TITLEP = CHAPTERP = True def __init__(me, fn, *args, **kw): super().__init__(fn, *args, **kw) me.neps = 0 class VideoSeason (object): def __init__(me, i, title): me.i = i me.title = title me.episodes = {} def set_episode_disc(me, i, disc): if i in me.episodes: raise ExpectedError("season %d episode %d already taken" % (me.i, i)) me.episodes[i] = disc; disc.neps += 1 def some_group(m, *gg): for g in gg: s = m.group(g) if s is not None: return s return None class VideoDir (object): _R_ISO_PRE = RX.compile(r""" ^ (?: S (?P \d+) (?: \. \ (?P .*) — (?: D \d+ \. \ )? | D \d+ \. \ | (?= E \d+ \. \ ) | \. \ ) | \d+ \. \ ) (?: (?P (?: S \d+ \ )? E \d+ (?: – \d+)? (?: , \ (?: S \d+ \ )? E \d+ (?: – \d+)?)*) | (?P E \d+) \. \ .*) \. iso $ """, RX.X) _R_ISO_EP = RX.compile(r""" ^ (?: S (?P \d+) \ )? E (?P \d+) (?: – (?P \d+))? $ """, RX.X) def __init__(me, dir): me.dir = dir fns = OS.listdir(OS.path.join(ROOT, dir)) fns.sort() season = None seasons = {} for fn in fns: path = OS.path.join(dir, fn) if not fn.endswith(".iso"): continue m = me._R_ISO_PRE.match(fn) if not m: #print(";; `%s' ignored" % path, file = SYS.stderr) continue i = filter(m.group("si"), int) stitle = m.group("st") check(i is not None or stitle is None, "explicit season title without number in `%s'" % fn) if i is not None: if season is None or i != season.i: check(season is None or i == season.i + 1, "season %d /= %d" % (i, season is None and -1 or season.i + 1)) check(i not in seasons, "season %d already seen" % i) seasons[i] = season = VideoSeason(i, stitle) else: check(stitle == season.title, "season title `%s' /= `%s'" % (stitle, season.title)) disc = VideoDisc(path) ts = season any, bad = False, False epname = m.group("epname") if epname is not None: eplist = [epname] else: eplist = m.group("eplist").split(", ") for eprange in eplist: mm = me._R_ISO_EP.match(eprange) if mm is None: bad = True; continue if not any: #print(";; `%s'" % path, file = SYS.stderr) any = True i = filter(mm.group("si"), int) if i is not None: try: ts = seasons[i] except KeyError: ts = seasons[i] = VideoSeason(i, None) if ts is None: ts = season = seasons[1] = VideoSeason(1, None) start = filter(mm.group("ei"), int) end = filter(mm.group("ej"), int, start) for k in range(start, end + 1): ts.set_episode_disc(k, disc) #print(";;\tepisode %d.%d" % (ts.i, k), file = SYS.stderr) if not any: pass #print(";; `%s' ignored" % path, file = SYS.stderr) elif bad: raise ExpectedError("bad ep list in `%s'", fn) me.seasons = seasons class AudioDisc (Source): PREFIX = "file://" TITLEP = CHAPTERP = False class AudioEpisode (Source): PREFIX = "file://" TITLEP = CHAPTERP = False def __init__(me, fn, i, *args, **kw): super().__init__(fn, *args, **kw) me.i = i class AudioDir (object): _R_FLAC = RX.compile(r""" ^ E (\d+) (?: \. \ (.*))? \. flac $ """, RX.X) def __init__(me, dir): me.dir = dir fns = OS.listdir(OS.path.join(ROOT, dir)) fns.sort() episodes = {} last_i = 0 for fn in fns: path = OS.path.join(dir, fn) if not fn.endswith(".flac"): continue m = me._R_FLAC.match(fn) if not m: continue i = filter(m.group(1), int) etitle = m.group(2) check(i == last_i + 1, "episode %d /= %d" % (i, last_i + 1)) episodes[i] = AudioEpisode(path, i) last_i = i me.episodes = episodes class Chapter (object): def __init__(me, episode, title, i): me.title, me.i = title, i me.url = episode.source.url(episode.tno, i) class Episode (object): def __init__(me, season, i, neps, title, src, tno = None): me.season = season me.i, me.neps, me.title = i, neps, title me.chapters = [] me.source, me.tno = src, tno me.url = src.url(tno) def add_chapter(me, title, j): ch = Chapter(me, title, j) me.chapters.append(ch) return ch def label(me): return me.season._eplabel(me.i, me.neps, me.title) class Season (object): def __init__(me, playlist, title, i, implicitp = False): me.playlist = playlist me.title, me.i = title, i me.implicitp = implicitp me.episodes = [] def add_episode(me, j, neps, title, src, tno): ep = Episode(me, j, neps, title, src, tno) me.episodes.append(ep) return ep def _eplabel(me, i, neps, title): if neps == 1: epname = me.playlist.epname; epn = "%d" % i elif neps == 2: epname = me.playlist.epnames; epn = "%d, %d" % (i, i + 1) else: epname = me.playlist.epnames; epn = "%d–%d" % (i, i + neps - 1) if title is None: if me.implicitp: return "%s %s" % (epname, epn) elif me.title is None: return "%s %d.%s" % (epname, me.i, epn) else: return "%s—%s %s" % (me.title, epname, epn) else: if me.implicitp: return "%s. %s" % (epn, title) elif me.title is None: return "%d.%s. %s" % (me.i, epn, title) else: return "%s—%s. %s" % (me.title, epn, title) class MovieSeason (object): def __init__(me, playlist): me.playlist = playlist me.i = -1 me.implicitp = False me.episodes = [] def add_episode(me, j, neps, title, src, tno): if title is None: raise ExpectedError("movie must have a title") ep = Episode(me, j, neps, title, src, tno) me.episodes.append(ep) return ep def _eplabel(me, i, epn, title): return title class Playlist (object): def __init__(me): me.seasons = [] me.epname, me.epnames = "Episode", "Episodes" def add_season(me, title, i, implicitp = False): season = Season(me, title, i, implicitp) me.seasons.append(season) return season def add_movies(me): season = MovieSeason(me) me.seasons.append(season) return season def write(me, f): f.write("#EXTM3U\n") for season in me.seasons: f.write("\n") for i, ep in enumerate(season.episodes, 1): if not ep.chapters: f.write("#EXTINF:0,,%s\n%s\n" % (ep.label(), ep.url)) else: for ch in ep.chapters: f.write("#EXTINF:0,,%s: %s\n%s\n" % (ep.label(), ch.title, ch.url)) UNSET = ["UNSET"] def parse_list(fn): playlist = Playlist() season, episode, chapter, ep_i = None, None, None, 1 vds = {} ads = iso = None with location(FileLocation(fn, 0)) as floc: with open(fn, "r") as f: for line in f: floc.stepline() sline = line.lstrip() if sline == "" or sline.startswith(";"): continue if line.startswith("!"): ww = Words(line[1:]) cmd = ww.nextword() check(cmd is not None, "missing command") if cmd == "season": v = ww.nextword(); check(v is not None, "missing season number") if v == "-": check(v.rest() is None, "trailing junk") season = playlist.add_movies() else: i = getint(v) title = ww.rest() season = playlist.add_season(title, i, implicitp = False) episode = chapter = None ep_i = 1 elif cmd == "movie": check(ww.rest() is None, "trailing junk") season = playlist.add_movies() episode = chapter = None ep_i = 1 elif cmd == "epname": name = ww.rest() check(name is not None, "missing episode name") try: sep = name.index(":") except ValueError: names = name + "s" else: name, names = name[:sep], name[sep + 1:] playlist.epname, playlist.epnames = name, names elif cmd == "epno": i = ww.rest() check(i is not None, "missing episode number") ep_i = getint(i) elif cmd == "iso": fn = ww.rest(); check(fn is not None, "missing filename") if fn == "-": iso = None else: check(OS.path.exists(OS.path.join(ROOT, fn)), "iso file `%s' not found" % fn) iso = VideoDisc(fn) elif cmd == "vdir": name = ww.nextword(); check(name is not None, "missing name") fn = ww.rest(); check(fn is not None, "missing directory") if fn == "-": try: del vds[name] except KeyError: pass else: vds[name] = VideoDir(fn) elif cmd == "adir": fn = ww.rest(); check(fn is not None, "missing directory") if fn == "-": ads = None else: ads = AudioDir(fn) elif cmd == "end": break else: raise ExpectedError("unknown command `%s'" % cmd) else: if not line[0].isspace(): ww = Words(line) conf = ww.nextword() check(conf is not None, "missing config") i, vdname, neps, fake_epi = UNSET, "-", 1, ep_i for c in conf.split(","): if c.isdigit(): i = int(c) elif c == "-": i = None else: eq = c.find("="); check(eq >= 0, "bad assignment `%s'" % c) k, v = c[:eq], c[eq + 1:] if k == "vd": vdname = v elif k == "n": neps = getint(v) elif k == "ep": fake_epi = getint(v) else: raise ExpectedError("unknown setting `%s'" % k) title = ww.rest() check(i is not UNSET, "no title number") if season is None: season = playlist.add_season(None, 1, implicitp = True) if i is None: check(ads, "no title, but no audio directory") check(season.implicitp, "audio source, but explicit season") try: src = ads.episodes[ep_i] except KeyError: raise ExpectedError("episode %d not found in audio dir `%s'" % ep_i, ads.dir) elif iso: src = iso else: check(vdname in vds, "title, but no iso or video directory") try: vdir = vds[vdname] except KeyError: raise ExpectedError("video dir label `%s' not set" % vdname) try: s = vdir.seasons[season.i] except KeyError: raise ExpectedError("season %d not found in video dir `%s'" % (season.i, vdir.dir)) try: src = s.episodes[ep_i] except KeyError: raise ExpectedError("episode %d.%d not found in video dir `%s'" % (season.i, ep_i, vdir.dir)) episode = season.add_episode(fake_epi, neps, title, src, i) chapter = None ep_i += neps; src.nuses += neps else: ww = Words(line) title = ww.rest() check(episode is not None, "no current episode") check(episode.source.CHAPTERP, "episode source doesn't allow chapters") if chapter is None: j = 1 else: j += 1 chapter = episode.add_chapter(title, j) discs = set() for vdir in vds.values(): for s in vdir.seasons.values(): for d in s.episodes.values(): discs.add(d) for d in sorted(discs, key = lambda d: d.fn): if d.neps != d.nuses: raise ExpectedError("disc `%s' has %d episodes, used %d times" % (d.fn, d.neps, d.nuses)) return playlist ROOT = "/mnt/dvd/archive/" try: for f in SYS.argv[1:]: parse_list(f).write(SYS.stdout) except (ExpectedError, IOError, OSError) as e: LOC.report(e) SYS.exit(2)