X-Git-Url: https://git.distorted.org.uk/~mdw/tripe-android/blobdiff_plain/8eabb4ff13562f3550499ee599297f7e97fa8754..04a5abaece151705e9bd7026653f79938a7a2fbc:/util.scala diff --git a/util.scala b/util.scala index 8eba6f8..ea776e7 100644 --- a/util.scala +++ b/util.scala @@ -28,12 +28,15 @@ package uk.org.distorted; package object tripe { /*----- Imports -----------------------------------------------------------*/ import scala.concurrent.duration.{Deadline, Duration}; -import scala.util.control.Breaks; +import scala.util.control.{Breaks, ControlThrowable}; -import java.io.{BufferedReader, Closeable, File, Reader}; -import java.net.{URL, URLConnection}; +import java.io.{BufferedReader, Closeable, File, InputStream, Reader}; +import java.net.{HttpURLConnection, URL, URLConnection}; import java.nio.{ByteBuffer, CharBuffer}; +import java.nio.channels.{SelectionKey, Selector}; +import java.nio.channels.spi.{AbstractSelector, AbstractSelectableChannel}; import java.nio.charset.Charset; +import java.util.{Set => JSet}; import java.util.concurrent.locks.{Lock, ReentrantLock}; /*----- Miscellaneous useful things ---------------------------------------*/ @@ -41,21 +44,23 @@ import java.util.concurrent.locks.{Lock, ReentrantLock}; val rng = new java.security.SecureRandom; def unreachable(msg: String): Nothing = throw new AssertionError(msg); +def unreachable(): Nothing = unreachable("unreachable"); +final val ok = (); +final class Brand; /*----- Various pieces of implicit magic ----------------------------------*/ class InvalidCStringException(msg: String) extends Exception(msg); -type CString = Array[Byte]; -object Magic { +object Implicits { /* --- Syntactic sugar for locks --- */ implicit class LockOps(lk: Lock) { /* LK withLock { BODY } * LK.withLock(INTERRUPT) { BODY } - * LK.withLock(DUR, [INTERRUPT]) { BODY } orelse { ALT } - * LK.withLock(DL, [INTERRUPT]) { BODY } orelse { ALT } + * LK.withLock(DUR, [INTERRUPT]) { BODY } orElse { ALT } + * LK.withLock(DL, [INTERRUPT]) { BODY } orElse { ALT } * * Acquire a lock while executing a BODY. If a duration or deadline is * given then wait so long for the lock, and then give up and run ALT @@ -81,12 +86,12 @@ object Magic { def withLock[T](body: => T): T = withLock(true)(body); } - class PendingLock[T] private[Magic] + class PendingLock[T] private[Implicits] (val lk: Lock, val dur: Duration, val interrupt: Boolean, body: => T) { - /* An auxiliary class for LockOps; provides the `orelse' qualifier. */ + /* An auxiliary class for LockOps; provides the `orElse' qualifier. */ - def orelse(alt: => T): T = { + def orElse(alt: => T): T = { val locked = (dur, interrupt) match { case (Duration.Inf, true) => lk.lockInterruptibly(); true case (Duration.Inf, false) => lk.lock(); true @@ -98,86 +103,6 @@ object Magic { else try { body; } finally lk.unlock(); } } - - /* --- Conversion to/from C strings --- */ - - implicit class ConvertJStringToCString(s: String) { - /* Magic to convert a string into a C string (null-terminated bytes). */ - - def toCString: CString = { - /* Convert the receiver to a C string. - * - * We do this by hand, rather than relying on the JNI's built-in - * conversions, because we use the default encoding taken from the - * locale settings, rather than the ridiculous `modified UTF-8' which - * is (a) insensitive to the user's chosen locale and (b) not actually - * UTF-8 either. - */ - - val enc = Charset.defaultCharset.newEncoder; - val in = CharBuffer.wrap(s); - var sz: Int = (s.length*enc.averageBytesPerChar + 1).toInt; - var out = ByteBuffer.allocate(sz); - - while (true) { - /* If there's still stuff to encode, then encode it. Otherwise, - * there must be some dregs left in the encoder, so flush them out. - */ - val r = if (in.hasRemaining) enc.encode(in, out, true) - else enc.flush(out); - - /* Sift through the wreckage to figure out what to do. */ - if (r.isError) r.throwException(); - else if (r.isOverflow) { - /* No space in the buffer. Make it bigger. */ - - sz *= 2; - val newout = ByteBuffer.allocate(sz); - out.flip(); newout.put(out); - out = newout; - } else if (r.isUnderflow) { - /* All done. Check that there are no unexpected zero bytes -- so - * this will indeed be a valid C string -- and convert into a byte - * array that the C code will be able to pick apart. - */ - - out.flip(); val n = out.limit; val u = out.array; - if ({val z = u.indexOf(0); 0 <= z && z < n}) - throw new InvalidCStringException("null byte in encoding"); - val v = new Array[Byte](n + 1); - out.array.copyToArray(v, 0, n); - v(n) = 0; - return v; - } - } - - /* Placate the type checker. */ - unreachable("unreachable"); - } - } - - implicit class ConvertCStringToJString(v: CString) { - /* Magic to convert a C string into a `proper' string. */ - - def toJString: String = { - /* Convert the receiver to a C string. - * - * We do this by hand, rather than relying on the JNI's built-in - * conversions, because we use the default encoding taken from the - * locale settings, rather than the ridiculous `modified UTF-8' which - * is (a) insensitive to the user's chosen locale and (b) not actually - * UTF-8 either. - */ - - val inlen = v.indexOf(0) match { - case -1 => v.length - case n => n - } - val dec = Charset.defaultCharset.newDecoder; - val in = ByteBuffer.wrap(v, 0, inlen); - dec.decode(in).toString - } - } } /*----- Cleanup assistant -------------------------------------------------*/ @@ -215,13 +140,125 @@ def closing[T, U <: Closeable](thing: U)(body: U => T): T = try { body(thing) } finally { thing.close(); } +/*----- Control structures ------------------------------------------------*/ + +private case class ExitBlock[T](brand: Brand, result: T) + extends ControlThrowable; + +def block[T](body: (T => Nothing) => T): T = { + /* block { exit[T] => ...; exit(x); ... } + * + * Execute the body until it calls the `exit' function or finishes. + * Annoyingly, Scala isn't clever enough to infer the return type, so + * you'll have to write it explicitly. + */ + + val mybrand = new Brand; + try { body { result => throw new ExitBlock(mybrand, result) } } + catch { + case ExitBlock(brand, result) if brand eq mybrand => + result.asInstanceOf[T] + } +} + +def blockUnit(body: (=> Nothing) => Unit) { + /* blockUnit { exit => ...; exit; ... } + * + * Like `block'; it just saves you having to write `exit[Unit] => ...; + * exit(ok); ...'. + */ + + val mybrand = new Brand; + try { body { throw new ExitBlock(mybrand, null) }; } + catch { case ExitBlock(brand, result) if brand eq mybrand => ok; } +} + +def loop[T](body: (T => Nothing) => Unit): T = { + /* loop { exit[T] => ...; exit(x); ... } + * + * Repeatedly execute the body until it calls the `exit' function. + * Annoyingly, Scala isn't clever enough to infer the return type, so + * you'll have to write it explicitly. + */ + + block { exit => while (true) body(exit); unreachable } +} + +def loopUnit(body: (=> Nothing) => Unit): Unit = { + /* loopUnit { exit => ...; exit; ... } + * + * Like `loop'; it just saves you having to write `exit[Unit] => ...; + * exit(()); ...'. + */ + + blockUnit { exit => while (true) body(exit); } +} + +val BREAKS = new Breaks; +import BREAKS.{breakable, break}; + +/*----- Interruptably doing things ----------------------------------------*/ + +private class InterruptCatcher[T](body: => T, onWakeup: => Unit) + extends AbstractSelector(null) { + /* Hook onto the VM's thread interruption machinery. + * + * The `run' method is the only really interesting one. It will run the + * BODY, returning its result; if the thread is interrupted during this + * time, ONWAKEUP is invoked for effect. The expectation is that ONWAKEUP + * will somehow cause BODY to stop early. + * + * Credit for this hack goes to Nicholas Wilson: see + * . + */ + + private def nope: Nothing = + { throw new UnsupportedOperationException("can't do that"); } + protected def implCloseSelector() { } + protected def register(chan: AbstractSelectableChannel, + ops: Int, att: Any): SelectionKey = nope; + def keys(): JSet[SelectionKey] = nope; + def selectedKeys(): JSet[SelectionKey] = nope; + def select(): Int = nope; + def select(millis: Long): Int = nope; + def selectNow(): Int = nope; + + def run(): T = try { + begin(); + val ret = body; + if (Thread.interrupted()) throw new InterruptedException; + ret + } finally { + end(); + } + def wakeup(): Selector = { onWakeup; this } +} + +class PendingInterruptable[T] private[tripe](body: => T) { + /* This class exists to provide the `onInterrupt THUNK' syntax. */ + + def onInterrupt(thunk: => Unit): T = + new InterruptCatcher(body, thunk).run(); +} +def interruptably[T](body: => T) = { + /* interruptably { BODY } onInterrupt { THUNK } + * + * Execute BODY and return its result. If the thread receives an + * interrupt -- or is already in an interrupted state -- execute THUNK for + * effect; it is expected to cause BODY to return expeditiously, and when + * the BODY completes, an `InterruptedException' is thrown. + */ + + new PendingInterruptable(body); +} + /*----- A gadget for fetching URLs ----------------------------------------*/ class URLFetchException(msg: String) extends Exception(msg); trait URLFetchCallbacks { def preflight(conn: URLConnection) { } - def write(buf: Array[Byte], n: Int, len: Int): Unit; + def write(buf: Array[Byte], n: Int, len: Long): Unit; def done(win: Boolean) { } } @@ -229,50 +266,74 @@ def fetchURL(url: URL, cb: URLFetchCallbacks) { /* Fetch the URL, feeding the data through the callbacks CB. */ withCleaner { clean => - var win: Boolean = false; - clean { cb.done(win); } + var win: Boolean = false; clean { cb.done(win); } - /* Set up the connection, and run a preflight check. */ + /* Set up the connection. This isn't going to block, I think, and we + * need to use it in the interrupt handler. + */ val c = url.openConnection(); - cb.preflight(c); - - /* Start fetching data. */ - val in = c.getInputStream; clean { in.close(); } - val explen = c.getContentLength(); - /* Read a buffer at a time, and give it to the callback. Maintain a - * running total. + /* Java's default URL handlers don't respond to interrupts, so we have to + * take over this duty. */ - val buf = new Array[Byte](4096); - var n = 0; - var len = 0; - while ({n = in.read(buf); n >= 0 && (explen == -1 || len <= explen)}) { - cb.write(buf, n, len); - len += n; - } + interruptably { + /* Run the caller's preflight check. This must be done here, since it + * might well block while it discovers things like the content length. + */ + cb.preflight(c); - /* I can't find it documented anywhere that the existing machinery - * checks the received stream against the advertised content length. - * It doesn't hurt to check again, anyway. - */ - if (explen != -1 && explen != len) { - throw new URLFetchException( - s"received $len /= $explen bytes from `$url'"); - } + /* Start fetching data. */ + val in = c.getInputStream; clean { in.close(); } + val explen = c.getContentLength; - /* Glorious success is ours. */ - win = true; - } -} + /* Read a buffer at a time, and give it to the callback. Maintain a + * running total. + */ + var len: Long = 0; + blockUnit { exit => + for ((buf, n) <- blocks(in)) { + cb.write(buf, n, len); + len += n; + if (explen != -1 && len > explen) exit; + } + } -/*----- Running processes -------------------------------------------------*/ + /* I can't find it documented anywhere that the existing machinery + * checks the received stream against the advertised content length. + * It doesn't hurt to check again, anyway. + */ + if (explen != -1 && explen != len) { + throw new URLFetchException( + s"received $len /= $explen bytes from `$url'"); + } + + /* Glorious success is ours. */ + win = true; + } onInterrupt { + /* Oh. How do we do this? */ -//def runProgram( + c match { + case c: HttpURLConnection => + /* It's an HTTP connection (what happened to the case here?). + * HTTPS connections match too because they're a subclass. Getting + * the input stream will block, but there's an easier way. + */ + c.disconnect(); + + case _ => + /* It's something else. Let's hope that getting the input stream + * doesn't block. + */ + c.getInputStream.close(); + } + } + } +} /*----- Threading things --------------------------------------------------*/ -def thread[T](name: String, run: Boolean = true, daemon: Boolean = true) - (f: => T): Thread = { +def thread(name: String, run: Boolean = true, daemon: Boolean = true) + (f: => Unit): Thread = { /* Make a thread with a given name, and maybe start running it. */ val t = new Thread(new Runnable { def run() { f; } }, name); @@ -281,6 +342,28 @@ def thread[T](name: String, run: Boolean = true, daemon: Boolean = true) t } +class ValueThread[T](name: String, group: ThreadGroup = null, + stacksz: Long = 0)(body: => T) + extends Thread(group, null, name, stacksz) { + private[this] var exc: Throwable = _; + private[this] var ret: T = _; + + override def run() { + try { ret = body; } + catch { case e: Throwable => exc = e; } + } + def get: T = + if (isAlive) throw new IllegalArgumentException("still running"); + else if (exc != null) throw exc; + else ret; +} +def valueThread[T](name: String, run: Boolean = true) + (body: => T): ValueThread[T] = { + val t = new ValueThread(name)(body); + if (run) t.start(); + t +} + /*----- Quoting and parsing tokens ----------------------------------------*/ def quoteTokens(v: Seq[String]): String = { @@ -375,38 +458,118 @@ def splitTokens(s: String, pos: Int = 0): Seq[String] = { val b = List.newBuilder[String]; var i = pos; - while (nextToken(s, i) match { - case Some((w, j)) => b += w; i = j; true - case None => false - }) (); + loopUnit { exit => nextToken(s, i) match { + case Some((w, j)) => b += w; i = j; + case None => exit; + } } b.result } +/*----- Other random things -----------------------------------------------*/ + trait LookaheadIterator[T] extends BufferedIterator[T] { - private[this] var st: Option[T] = None; + /* An iterator in terms of a single `maybe there's another item' function. + * + * It seems like every time I write an iterator in Scala, the only way to + * find out whether there's a next item, for `hasNext', is to actually try + * to fetch it. So here's an iterator in terms of a function which goes + * off and maybe returns a next thing. It turns out to be easy to satisfy + * the additional requirements for `BufferedIterator', so why not? + */ + + /* Subclass responsibility. */ protected def fetch(): Option[T]; + + /* The machinery. `st' is `None' if there's no current item, null if we've + * actually hit the end, or `Some(x)' if the current item is x. + */ + private[this] var st: Option[T] = None; private[this] def peek() { + /* Arrange to have a current item. */ if (st == None) fetch() match { case None => st = null; case x@Some(_) => st = x; } } + + /* The `BufferedIterator' protocol. */ override def hasNext: Boolean = { peek(); st != null } - override def head(): T = + override def head: T = { peek(); if (st == null) throw new NoSuchElementException; st.get } - override def next(): T = { val it = head(); st = None; it } + override def next(): T = { val it = head; st = None; it } } -def lines(r: Reader) = new LookaheadIterator[String] { - /* Iterates over the lines of text in a `Reader' object. */ +def bufferedReader(r: Reader): BufferedReader = r match { + case br: BufferedReader => br + case _ => new BufferedReader(r) +} - private[this] val in = r match { - case br: BufferedReader => br; - case _ => new BufferedReader(r); +def lines(r: BufferedReader): BufferedIterator[String] = + new LookaheadIterator[String] { + /* Iterates over the lines of text in a `Reader' object. */ + override protected def fetch() = Option(r.readLine()); + } +def lines(r: Reader): BufferedIterator[String] = lines(bufferedReader(r)); + +def blocks(in: InputStream, blksz: Int): + BufferedIterator[(Array[Byte], Int)] = + /* Iterates over (possibly irregularly sized) blocks in a stream. */ + new LookaheadIterator[(Array[Byte], Int)] { + val buf = new Array[Byte](blksz) + override protected def fetch() = { + val n = in.read(buf); + if (n < 0) None + else Some((buf, n)) + } } - protected override def fetch(): Option[String] = Option(in.readLine); +def blocks(in: InputStream): + BufferedIterator[(Array[Byte], Int)] = blocks(in, 65536); + +def blocks(in: BufferedReader, blksz: Int): + BufferedIterator[(Array[Char], Int)] = + /* Iterates over (possibly irregularly sized) blocks in a reader. */ + new LookaheadIterator[(Array[Char], Int)] { + val buf = new Array[Char](blksz) + override protected def fetch() = { + val n = in.read(buf); + if (n < 0) None + else Some((buf, n)) + } + } +def blocks(in: BufferedReader): + BufferedIterator[(Array[Char], Int)] = blocks(in, 65536); +def blocks(r: Reader, blksz: Int): BufferedIterator[(Array[Char], Int)] = + blocks(bufferedReader(r), blksz); +def blocks(r: Reader): BufferedIterator[(Array[Char], Int)] = + blocks(bufferedReader(r)); + +def oxford(conj: String, things: Seq[String]): String = things match { + case Seq() => "" + case Seq(a) => a + case Seq(a, b) => s"$a $conj $b" + case Seq(a, tail@_*) => + val sb = new StringBuilder; + sb ++= a; sb ++= ", "; + def iter(rest: Seq[String]) { + rest match { + case Seq() => unreachable; + case Seq(a) => sb ++= conj; sb += ' '; sb ++= a; + case Seq(a, tail@_*) => sb ++= a; sb ++= ", "; iter(tail); + } + } + iter(tail); + sb.result } +def formatTime(t: Int): String = + if (t < -1) "???" + else { + val (s, t1) = (t%60, t/60); + val (m, h) = (t1%60, t1/60); + if (h > 0) f"$h%d:$m%02d:$s%02d" + else f"$m%02d:$s%02d" + } + /*----- That's all, folks -------------------------------------------------*/ }