#!/usr/bin/env python import sys import string import os import anydbm import zlib class zip_untangler: def __init__(self, file, datasofar): self.file = file assert len(datasofar) < 30 self.header = datasofar self.data = "" self.dataleft = None self.decompress = zlib.decompressobj() # Zlib header bytes, expected by decompress obj but not # present in zip file self.decompress.decompress("\x78\x9c") def readline(self): if self.dataleft == None: while len(self.header) < 30: s = self.file.read(30 - len(self.header)) assert s != "" self.header = self.header + s # Name length and extra length. namelen = 256 * ord(self.header[27]) + ord(self.header[26]) extralen = 256 * ord(self.header[29]) + ord(self.header[28]) while len(self.header) < 30 + namelen + extralen: s = self.file.read(30 + namelen + extralen - len(self.header)) assert s != "" self.header = self.header + s self.dataleft = \ 256 * (256 * (256 * ord(self.header[21]) + ord(self.header[20])) \ + ord(self.header[19])) + ord(self.header[18]) k = string.find(self.data, "\n") while k < 0: rlen = self.dataleft if rlen > 4096: rlen = 4096 if rlen == 0: break d = self.file.read(rlen) if d == "": break self.dataleft = self.dataleft - rlen self.data = self.data + self.decompress.decompress(d) k = string.find(self.data, "\n") if k < 0: ret = self.data self.data = "" return ret else: ret = self.data[:k+1] self.data = self.data[k+1:] return ret def hexstr(x): s = hex(x) if s[-1:] == "L" or s[-1:] == "l": s = s[:-1] if s[:2] == "0x" or s[:2] == "0X": s = s[2:] return s def charname(x): if db is not None: key = hexstr(x) while len(key) < 4: key = "0" + key key = string.upper(key) if han_translations: try: value = handb[key] return " " + value except KeyError: pass try: value = db[key] return string.split(value, ";")[1] except KeyError: return "" else: return "" def output(char, bytes, errors): if output_analysis: if char == -1: s = " " else: s = "U-%08X " % char for i in bytes: s = s + " %02X" % i for i in range(6-len(bytes)): s = s + " " if char == -1: name = "" else: name = charname(char) if name != "": s = s + " " + name s = s + errors print s else: if char == -1 or errors != "": # problem chars become U+FFFD REPLACEMENT CHARACTER sys.stdout.write("\xEF\xBF\xBD") else: for i in bytes: sys.stdout.write(chr(i)) def process_ucs(x, bytes=[], errors=""): if x < 0x80: utf8 = [x] realbytes = 1 else: if x < 0x800: tmp = (0xC0, 1) elif x < 0x10000: tmp = (0xE0, 2) elif x < 0x200000: tmp = (0xF0, 3) elif x < 0x4000000: tmp = (0xF8, 4) else: assert x < 0x80000000L tmp = (0xFC, 5) realbytes = tmp[1] + 1 utf8 = [tmp[0] + (x >> (6*tmp[1]))] for i in range(tmp[1]-1, -1, -1): utf8.append(0x80 + (0x3F & (x >> (i*6)))) if bytes != [] and len(bytes) > realbytes: errors = errors + " (overlong form of" for i in utf8: errors = errors + " %02X" % i errors = errors + ")" utf8 = bytes if x >= 0xD800 and x <= 0xDFFF: errors = errors + " (surrogate)" if x >= 0xFFFE and x <= 0xFFFF: errors = errors + " (invalid char)" output(x, utf8, errors) def process_utf8(next): c = next() while c != None: char = [c] i = c if i < 0x80: process_ucs(i) # single-byte char c = next() elif i == 0xfe or i == 0xff: output(-1, char, " (invalid UTF-8 byte)") c = next() elif i >= 0x80 and i <= 0xbf: output(-1, char, " (unexpected continuation byte)") c = next() else: if i >= 0xC0 and i <= 0xDF: acc = i &~ 0xC0 cbytes = 1 elif i >= 0xE0 and i <= 0xEF: acc = i &~ 0xE0 cbytes = 2 elif i >= 0xF0 and i <= 0xF7: acc = i &~ 0xF0 cbytes = 3 elif i >= 0xF8 and i <= 0xFB: acc = i &~ 0xF8 cbytes = 4 elif i >= 0xFC and i <= 0xFD: acc = i &~ 0xFC cbytes = 5 gotone = 0 while cbytes > 0: c = next() if c == None or c < 0x80 or c > 0xBF: gotone = 1 break char.append(c) acc = (acc << 6) + (c & 0x3F) cbytes = cbytes - 1 if cbytes > 0: output(-1, char, " (incomplete sequence)") else: process_ucs(acc, char) if not gotone: c = next() def do(args): # Class to turn a list into a callable object that returns one # element at a time. class liststepper: def __init__(self, list): self.list = list self.index = 0 def __call__(self): if self.index >= len(self.list): return None ret = self.list[self.index] self.index = self.index + 1 return ret list = [] for arg in args: got = ('none') if string.upper(arg[0]) == "U": assert arg[1] == "+" or arg[1] == "-" got = ('ucs', string.atoi(arg[2:], 16)) elif arg[:2] == "&#": # SGML character entity. Either &# followed by a # number, or &#x followed by a hex number. s = arg if s[-1:] == ";": s = s[:-1] if string.upper(s[:3]) == "&#X": got = ('ucs', string.atoi(s[3:], 16)) else: got = ('ucs', string.atoi(s[2:], 10)) else: got = ('utf8', string.atoi(arg, 16)) if got[0] == 'utf8': list.append(got[1]) elif got[0] == 'ucs': if len(list) > 0: process_utf8(liststepper(list)) list = [] process_ucs(got[1]) if len(list) > 0: process_utf8(liststepper(list)) def usage(arg): print "usage: cvt-utf8 [flags] " print " e.g. cvt-utf8 e2 82 ac" print " or cvt-utf8 U+20ac" print " or cvt-utf8 U-10ffff" print " or cvt-utf8 '–'" print "" print "where: -o or --output just output well-formed UTF-8 instead of" print " an analysis of the input data" print " -h or --han also give Han definitions from unihan db" print "" print " also: cvt-utf8 --test run Markus Kuhn's decoder stress tests" #' print " cvt-utf8 --input (or -i)" print " read, analyse and decode UTF-8 from stdin" if arg == "--help-admin": print " cvt-utf8 --help display user help text" print " cvt-utf8 --help-admin display admin help text (this one)" print " cvt-utf8 --build " print " convert UnicodeData.txt to unicode db" print " cvt-utf8 --build-unihan " print " convert Unihan.txt to unihan db" print " cvt-utf8 --fetch-build " print " "+\ "build unicode db by download from unicode.org" print " cvt-utf8 --fetch-build-unihan " print " "+\ "build Unihan db by download from unicode.org" else: print " cvt-utf8 --help display this help text" print " cvt-utf8 --help-admin display admin help text" print " cvt-utf8 --version report version number" print " cvt-utf8 --licence display (MIT) licence text" def licence(): print "cvt-utf8 is copyright 2002-2004 Simon Tatham." print "" print "Permission is hereby granted, free of charge, to any person" print "obtaining a copy of this software and associated documentation files" print "(the \"Software\"), to deal in the Software without restriction," print "including without limitation the rights to use, copy, modify, merge," print "publish, distribute, sublicense, and/or sell copies of the Software," print "and to permit persons to whom the Software is furnished to do so," print "subject to the following conditions:" print "" print "The above copyright notice and this permission notice shall be" print "included in all copies or substantial portions of the Software." print "" print "THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND," print "EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF" print "MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND" print "NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS" print "BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN" print "ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN" print "CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE" print "SOFTWARE." def version(): rev = "$Revision$" rev = string.replace(rev, " ", "") rev = string.replace(rev, "$", "") revs = string.split(rev, ":") if len(revs) > 1: print "cvt-utf8 revision %s" % revs[1] else: print "cvt-utf8: unknown version" args = sys.argv[1:] output_analysis = 1 han_translations = 0 mode = "cmdline" if args == []: usage("") sys.exit(0) while len(args) > 0 and args[0][:1] == "-": if args[0] == "--help" or args[0] == "--help-admin": usage(args[0]) sys.exit(0) elif args[0] == "--licence" or args[0] == "--license": licence() sys.exit(0) elif args[0] == "--version": version() sys.exit(0) elif args[0] == "-o" or args[0] == "--output": output_analysis = 0 args = args[1:] elif args[0] == "-h" or args[0] == "--han": han_translations = 1 args = args[1:] elif args[0] == "--build" or args[0] == "--fetch-build": if args[0] == "--build": if len(args) != 3: print "cvt-utf8: --build expects two filename arguments" sys.exit(1) infile = open(args[1], "r") outfile = args[2] else: if len(args) != 2: print "cvt-utf8: --fetch-build expects one filename argument" sys.exit(1) import urllib infile = urllib.urlopen("http://www.unicode.org/Public/UNIDATA/UnicodeData.txt") outfile = args[1] # Now build the database. if outfile[-3:] == ".db": print "cvt-utf8: warning: you should not append .db to db name" db = anydbm.open(outfile, "n") while 1: s = infile.readline() if s == "": break ss = string.split(s, ";")[0] db[ss] = s db.close() sys.exit(0) elif args[0] == "--build-unihan" or args[0] == "--fetch-build-unihan": if args[0] == "--build-unihan": if len(args) != 3: print "cvt-utf8: --build expects two filename arguments" sys.exit(1) infile = open(args[1], "r") s = infile.read(1) # Unihan.txt starts with a hash. If this file starts with a # P, we assume it's a zip file ("PK"). if s == "P": infile = zip_untangler(infile, s) s = "" outfile = args[2] else: if len(args) != 2: print "cvt-utf8: --fetch-build-unihan expects one filename argument" sys.exit(1) import urllib infile = urllib.urlopen("ftp://ftp.unicode.org/Public/UNIDATA/Unihan.zip") # We know this one is zipped. infile = zip_untangler(infile, "") outfile = args[1] s = "" # Now build the database. if outfile[-3:] == ".db": print "cvt-utf8: warning: you should not append .db to db name" db = anydbm.open(outfile, "n") while 1: s = s + infile.readline() if s == "": break while s[-1:] == "\r" or s[-1:] == "\n": s = s[:-1] sa = string.split(s, "\t") if len(sa) == 3 and sa[1] == "kDefinition" and sa[0][:2] == "U+": db[sa[0][2:]] = sa[2] s = "" db.close() sys.exit(0) elif args[0] == "--test": mode = "test" args = args[1:] elif args[0] == "--input" or args[0] == "-i": mode = "input" args = args[1:] else: sys.stderr.write("cvt-utf8: unknown argument '%s'" % args[0]) sys.exit(1) locations = [] locations.append("/usr/share/unicode/unicode") locations.append("/usr/lib/unicode/unicode") locations.append("/usr/local/share/unicode/unicode") locations.append("/usr/local/lib/unicode/unicode") locations.append(os.environ["HOME"] + "/share/unicode/unicode") locations.append(os.environ["HOME"] + "/lib/unicode/unicode") for loc in locations: try: db = anydbm.open(loc, "r") except IOError: db = None except anydbm.error: db = None if db != None: break if han_translations: i = string.rfind(loc, "/") assert i >= 0 hanloc = loc[:i+1] + "unihan" handb = anydbm.open(hanloc, "r") # this has been explicitly required, so we don't squelch exceptions if mode == "test": do(["CE","BA","E1","BD","B9","CF","83","CE","BC","CE","B5"]) do(["00"]) do(["C2","80"]) do(["E0","A0","80"]) do(["F0","90","80","80"]) do(["F8","88","80","80","80"]) do(["FC","84","80","80","80","80"]) do(["7F"]) do(["DF","BF"]) do(["EF","BF","BF"]) do(["F7","BF","BF","BF"]) do(["FB","BF","BF","BF","BF"]) do(["FD","BF","BF","BF","BF","BF"]) do(["ED","9F","BF"]) do(["EE","80","80"]) do(["EF","BF","BD"]) do(["F4","8F","BF","BF"]) do(["F4","90","80","80"]) do(["80"]) do(["BF"]) do(["80","BF"]) do(["80","BF","80"]) do(["80","BF","80","BF"]) do(["80","BF","80","BF","80"]) do(["80","BF","80","BF","80","BF"]) do(["80","BF","80","BF","80","BF","80"]) do(["80","81","82","83","84","85","86","87", "88","89","8A","8B","8C","8D","8E","8F", "90","91","92","93","94","95","96","97", "98","99","9A","9B","9C","9D","9E","9F", "A0","A1","A2","A3","A4","A5","A6","A7", "A8","A9","AA","AB","AC","AD","AE","AF", "B0","B1","B2","B3","B4","B5","B6","B7", "B8","B9","BA","BB","BC","BD","BE","BF"]) do(["C0","20","C1","20","C2","20","C3","20", "C4","20","C5","20","C6","20","C7","20", "C8","20","C9","20","CA","20","CB","20", "CC","20","CD","20","CE","20","CF","20", "D0","20","D1","20","D2","20","D3","20", "D4","20","D5","20","D6","20","D7","20", "D8","20","D9","20","DA","20","DB","20", "DC","20","DD","20","DE","20","DF","20"]) do(["E0","20","E1","20","E2","20","E3","20", "E4","20","E5","20","E6","20","E7","20", "E8","20","E9","20","EA","20","EB","20", "EC","20","ED","20","EE","20","EF","20"]) do(["F0","20","F1","20","F2","20","F3","20", "F4","20","F5","20","F6","20","F7","20"]) do(["F8","20","F9","20","FA","20","FB","20"]) do(["FC","20","FD","20"]) do(["C0"]) do(["E0","80"]) do(["F0","80","80"]) do(["F8","80","80","80"]) do(["FC","80","80","80","80"]) do(["DF"]) do(["EF","BF"]) do(["F7","BF","BF"]) do(["FB","BF","BF","BF"]) do(["FD","BF","BF","BF","BF"]) do(["C0","E0","80","F0","80","80","F8","80", "80","80","FC","80","80","80","80", "DF","EF","BF","F7","BF","BF","FB", "BF","BF","BF","FD","BF","BF","BF","BF"]) do(["FE"]) do(["FF"]) do(["FE","FE","FF","FF"]) do(["C0","AF"]) do(["E0","80","AF"]) do(["F0","80","80","AF"]) do(["F8","80","80","80","AF"]) do(["FC","80","80","80","80","AF"]) do(["C1","BF"]) do(["E0","9F","BF"]) do(["F0","8F","BF","BF"]) do(["F8","87","BF","BF","BF"]) do(["FC","83","BF","BF","BF","BF"]) do(["C0","80"]) do(["E0","80","80"]) do(["F0","80","80","80"]) do(["F8","80","80","80","80"]) do(["FC","80","80","80","80","80"]) do(["ED","A0","80"]) do(["ED","AD","BF"]) do(["ED","AE","80"]) do(["ED","AF","BF"]) do(["ED","B0","80"]) do(["ED","BE","80"]) do(["ED","BF","BF"]) do(["ED","A0","80","ED","B0","80"]) do(["ED","A0","80","ED","BF","BF"]) do(["ED","AD","BF","ED","B0","80"]) do(["ED","AD","BF","ED","BF","BF"]) do(["ED","AE","80","ED","B0","80"]) do(["ED","AE","80","ED","BF","BF"]) do(["ED","AF","BF","ED","B0","80"]) do(["ED","AF","BF","ED","BF","8F"]) do(["EF","BF","BE"]) do(["EF","BF","BF"]) elif mode == "input": def getchar(): s = sys.stdin.read(1) if s == "": return None return ord(s) & 0xFF # ensure it isn't negative process_utf8(getchar) else: do(args)