/*----- Imports -----------------------------------------------------------*/
+import scala.language.{existentials, implicitConversions};
+
+import scala.collection.mutable.{HashSet, WeakHashMap};
import scala.concurrent.duration.{Deadline, Duration};
-import scala.util.control.Breaks;
+import scala.util.control.{Breaks, ControlThrowable};
-import java.io.{BufferedReader, Closeable, File, Reader};
-import java.net.{URL, URLConnection};
+import java.io.{BufferedReader, Closeable, File, InputStream, Reader};
+import java.net.{HttpURLConnection, URL, URLConnection};
import java.nio.{ByteBuffer, CharBuffer};
+import java.nio.channels.{SelectionKey, Selector};
+import java.nio.channels.spi.{AbstractSelector, AbstractSelectableChannel};
import java.nio.charset.Charset;
+import java.text.SimpleDateFormat;
+import java.util.{Set => JSet};
import java.util.concurrent.locks.{Lock, ReentrantLock};
/*----- Miscellaneous useful things ---------------------------------------*/
val rng = new java.security.SecureRandom;
def unreachable(msg: String): Nothing = throw new AssertionError(msg);
+def unreachable(): Nothing = unreachable("unreachable");
+final val ok = ();
+class Brand(val what: String) {
+ override def toString(): String = s"<${getClass.getName} $what>";
+}
/*----- Various pieces of implicit magic ----------------------------------*/
else try { body; } finally lk.unlock();
}
}
+
+ /* Implicit conversions to `Boolean'. I miss the way C integers and
+ * pointers convert to boolean, so let's do that here.
+ *
+ * Numeric zero, null, and empty containers are all false; other objects
+ * are true.
+ */
+ implicit def truish(n: Byte): Boolean = n != 0;
+ implicit def truish(n: Char): Boolean = n != 0;
+ implicit def truish(n: Short): Boolean = n != 0;
+ implicit def truish(n: Int): Boolean = n != 0;
+ implicit def truish(n: Long): Boolean = n != 0;
+ implicit def truish(n: Float): Boolean = n != 0;
+ implicit def truish(n: Double): Boolean = n != 0;
+ implicit def truish(x: AnyRef): Boolean = x != null;
+ implicit def truish(s: String): Boolean = s != null && s != "";
+ implicit def truish(o: Option[_]): Boolean = o != None;
+ implicit def truish(i: Iterator[_]): Boolean = i != null && i.hasNext;
+ implicit def truish(c: Traversable[_]): Boolean =
+ c != null && c.nonEmpty;
+
+ /* Some additional bitwise operators.
+ *
+ * For now, just the `bic' operator `&~', because typing `& ~' is
+ * inconsistent with my current style.
+ */
+ class BitwiseIntImplicits(x: Int) {
+ def &~(y: Byte): Int = x & ~y;
+ def &~(y: Char): Int = x & ~y;
+ def &~(y: Short): Int = x & ~y;
+ def &~(y: Int): Int = x & ~y;
+ def &~(y: Long): Long = x & ~y;
+ }
+ class BitwiseLongImplicits(x: Long) {
+ def &~(y: Byte): Long = x & ~y;
+ def &~(y: Char): Long = x & ~y;
+ def &~(y: Short): Long = x & ~y;
+ def &~(y: Int): Long = x & ~y;
+ def &~(y: Long): Long = x & ~y;
+ }
+ implicit def bitwiseImplicits(x: Byte) = new BitwiseIntImplicits(x);
+ implicit def bitwiseImplicits(x: Char) = new BitwiseIntImplicits(x);
+ implicit def bitwiseImplicits(x: Short) = new BitwiseIntImplicits(x);
+ implicit def bitwiseImplicits(x: Int) = new BitwiseIntImplicits(x);
+ implicit def bitwiseImplicits(x: Long) = new BitwiseLongImplicits(x);
}
+import Implicits.truish;
+
/*----- Cleanup assistant -------------------------------------------------*/
class Cleaner {
try { body(thing) }
finally { thing.close(); }
+/*----- Control structures ------------------------------------------------*/
+
+private case class ExitBlock[T](brand: Brand, result: T)
+ extends ControlThrowable;
+
+def block[T](body: (T => Nothing) => T): T = {
+ /* block { exit[T] => ...; exit(x); ... }
+ *
+ * Execute the body until it calls the `exit' function or finishes.
+ * Annoyingly, Scala isn't clever enough to infer the return type, so
+ * you'll have to write it explicitly.
+ */
+
+ val mybrand = new Brand("block-exit");
+ try { body { result => throw new ExitBlock(mybrand, result) } }
+ catch {
+ case ExitBlock(brand, result) if brand eq mybrand =>
+ result.asInstanceOf[T]
+ }
+}
+
+def blockUnit(body: (=> Nothing) => Unit) {
+ /* blockUnit { exit => ...; exit; ... }
+ *
+ * Like `block'; it just saves you having to write `exit[Unit] => ...;
+ * exit(ok); ...'.
+ */
+
+ val mybrand = new Brand("block-exit");
+ try { body { throw new ExitBlock(mybrand, null) }; }
+ catch { case ExitBlock(brand, result) if brand eq mybrand => ok; }
+}
+
+def loop[T](body: (T => Nothing) => Unit): T = {
+ /* loop { exit[T] => ...; exit(x); ... }
+ *
+ * Repeatedly execute the body until it calls the `exit' function.
+ * Annoyingly, Scala isn't clever enough to infer the return type, so
+ * you'll have to write it explicitly.
+ */
+
+ block { exit => while (true) body(exit); unreachable }
+}
+
+def loopUnit(body: (=> Nothing) => Unit): Unit = {
+ /* loopUnit { exit => ...; exit; ... }
+ *
+ * Like `loop'; it just saves you having to write `exit[Unit] => ...;
+ * exit(()); ...'.
+ */
+
+ blockUnit { exit => while (true) body(exit); }
+}
+
+val BREAKS = new Breaks;
+import BREAKS.{breakable, break};
+
+/*----- Interruptably doing things ----------------------------------------*/
+
+private class InterruptCatcher[T](body: => T, onWakeup: => Unit)
+ extends AbstractSelector(null) {
+ /* Hook onto the VM's thread interruption machinery.
+ *
+ * The `run' method is the only really interesting one. It will run the
+ * BODY, returning its result; if the thread is interrupted during this
+ * time, ONWAKEUP is invoked for effect. The expectation is that ONWAKEUP
+ * will somehow cause BODY to stop early.
+ *
+ * Credit for this hack goes to Nicholas Wilson: see
+ * <https://github.com/NWilson/javaInterruptHook>.
+ */
+
+ private def nope: Nothing =
+ { throw new UnsupportedOperationException("can't do that"); }
+ protected def implCloseSelector() { }
+ protected def register(chan: AbstractSelectableChannel,
+ ops: Int, att: Any): SelectionKey = nope;
+ def keys(): JSet[SelectionKey] = nope;
+ def selectedKeys(): JSet[SelectionKey] = nope;
+ def select(): Int = nope;
+ def select(millis: Long): Int = nope;
+ def selectNow(): Int = nope;
+
+ def run(): T = try {
+ begin();
+ val ret = body;
+ if (Thread.interrupted()) throw new InterruptedException;
+ ret
+ } finally {
+ end();
+ }
+ def wakeup(): Selector = { onWakeup; this }
+}
+
+class PendingInterruptable[T] private[tripe](body: => T) {
+ /* This class exists to provide the `onInterrupt THUNK' syntax. */
+
+ def onInterrupt(thunk: => Unit): T =
+ new InterruptCatcher(body, thunk).run();
+}
+def interruptably[T](body: => T) = {
+ /* interruptably { BODY } onInterrupt { THUNK }
+ *
+ * Execute BODY and return its result. If the thread receives an
+ * interrupt -- or is already in an interrupted state -- execute THUNK for
+ * effect; it is expected to cause BODY to return expeditiously, and when
+ * the BODY completes, an `InterruptedException' is thrown.
+ */
+
+ new PendingInterruptable(body);
+}
+
/*----- A gadget for fetching URLs ----------------------------------------*/
class URLFetchException(msg: String) extends Exception(msg);
trait URLFetchCallbacks {
def preflight(conn: URLConnection) { }
- def write(buf: Array[Byte], n: Int, len: Int): Unit;
+ def write(buf: Array[Byte], n: Int, len: Long): Unit;
def done(win: Boolean) { }
}
/* Fetch the URL, feeding the data through the callbacks CB. */
withCleaner { clean =>
- var win: Boolean = false;
- clean { cb.done(win); }
+ var win: Boolean = false; clean { cb.done(win); }
- /* Set up the connection, and run a preflight check. */
+ /* Set up the connection. This isn't going to block, I think, and we
+ * need to use it in the interrupt handler.
+ */
val c = url.openConnection();
- cb.preflight(c);
- /* Start fetching data. */
- val in = c.getInputStream; clean { in.close(); }
- val explen = c.getContentLength();
-
- /* Read a buffer at a time, and give it to the callback. Maintain a
- * running total.
+ /* Java's default URL handlers don't respond to interrupts, so we have to
+ * take over this duty.
*/
- val buf = new Array[Byte](4096);
- var n = 0;
- var len = 0;
- while ({n = in.read(buf); n >= 0 && (explen == -1 || len <= explen)}) {
- cb.write(buf, n, len);
- len += n;
- }
+ interruptably {
+ /* Run the caller's preflight check. This must be done here, since it
+ * might well block while it discovers things like the content length.
+ */
+ cb.preflight(c);
- /* I can't find it documented anywhere that the existing machinery
- * checks the received stream against the advertised content length.
- * It doesn't hurt to check again, anyway.
- */
- if (explen != -1 && explen != len) {
- throw new URLFetchException(
- s"received $len /= $explen bytes from `$url'");
- }
+ /* Start fetching data. */
+ val in = c.getInputStream; clean { in.close(); }
+ val explen = c.getContentLength;
- /* Glorious success is ours. */
- win = true;
- }
-}
+ /* Read a buffer at a time, and give it to the callback. Maintain a
+ * running total.
+ */
+ var len: Long = 0;
+ blockUnit { exit =>
+ for ((buf, n) <- blocks(in)) {
+ cb.write(buf, n, len);
+ len += n;
+ if (explen != -1 && len > explen) exit;
+ }
+ }
-/*----- Running processes -------------------------------------------------*/
+ /* I can't find it documented anywhere that the existing machinery
+ * checks the received stream against the advertised content length.
+ * It doesn't hurt to check again, anyway.
+ */
+ if (explen != -1 && explen != len) {
+ throw new URLFetchException(
+ s"received $len /= $explen bytes from `$url'");
+ }
-//def runProgram(
+ /* Glorious success is ours. */
+ win = true;
+ } onInterrupt {
+ /* Oh. How do we do this? */
+
+ c match {
+ case c: HttpURLConnection =>
+ /* It's an HTTP connection (what happened to the case here?).
+ * HTTPS connections match too because they're a subclass. Getting
+ * the input stream will block, but there's an easier way.
+ */
+ c.disconnect();
+
+ case _ =>
+ /* It's something else. Let's hope that getting the input stream
+ * doesn't block.
+ */
+ c.getInputStream.close();
+ }
+ }
+ }
+}
/*----- Threading things --------------------------------------------------*/
-def thread[T](name: String, run: Boolean = true, daemon: Boolean = true)
- (f: => T): Thread = {
+def thread(name: String, run: Boolean = true, daemon: Boolean = true)
+ (f: => Unit): Thread = {
/* Make a thread with a given name, and maybe start running it. */
val t = new Thread(new Runnable { def run() { f; } }, name);
t
}
+class ValueThread[T](name: String, group: ThreadGroup = null,
+ stacksz: Long = 0)(body: => T)
+ extends Thread(group, null, name, stacksz) {
+ private[this] var exc: Throwable = _;
+ private[this] var ret: T = _;
+
+ override def run() {
+ try { ret = body; }
+ catch { case e: Throwable => exc = e; }
+ }
+ def get: T =
+ if (isAlive) throw new IllegalArgumentException("still running");
+ else if (exc != null) throw exc;
+ else ret;
+}
+def valueThread[T](name: String, run: Boolean = true)
+ (body: => T): ValueThread[T] = {
+ val t = new ValueThread(name)(body);
+ if (run) t.start();
+ t
+}
+
/*----- Quoting and parsing tokens ----------------------------------------*/
def quoteTokens(v: Seq[String]): String = {
if (i >= n) return None;
/* There is something there. Unpick the quoting and escaping. */
- while (i < n && (q != 0 || !s(i).isWhitespace)) {
+ while (i < n && (q || !s(i).isWhitespace)) {
s(i) match {
case '\\' =>
if (i + 1 >= n) throw new InvalidQuotingException("trailing `\\'");
b += s(i + 1); i += 2;
case ch@('"' | ''') =>
- if (q == 0) q = ch;
+ if (!q) q = ch;
else if (q == ch) q = 0;
else b += ch;
i += 1;
}
/* Check that the quoting was valid. */
- if (q != 0) throw new InvalidQuotingException(s"unmatched `$q'");
+ if (q) throw new InvalidQuotingException(s"unmatched `$q'");
/* Skip whitespace before the next token. */
while (i < n && s(i).isWhitespace) i += 1;
val b = List.newBuilder[String];
var i = pos;
- while (nextToken(s, i) match {
- case Some((w, j)) => b += w; i = j; true
- case None => false
- }) ();
+ loopUnit { exit => nextToken(s, i) match {
+ case Some((w, j)) => b += w; i = j;
+ case None => exit;
+ } }
b.result
}
+/*----- Hooks -------------------------------------------------------------*/
+
+/* This is a really simple publisher/subscriber system. The only slight
+ * tweak -- and the reason I'm not just using the Scala machinery -- is that
+ * being attached to a hook doesn't prevent the client from being garbage
+ * collected.
+ */
+
+trait BaseHookClient[E] {
+ /* The minimal requirements for a hook client. Honestly you should be
+ * using `HookClient' instead.
+ */
+
+ type H = Hook[E]; // the type of hook we attach to
+ def hook(hk: H, evt: E); // called with events from the hook
+}
+
+trait HookClient[E] extends BaseHookClient[E] {
+ /* The properly cooked hook client. This keeps track of which hooks we're
+ * attached to so we can release them all easily.
+ */
+
+ private val hooks = new HashSet[H];
+ protected def attachHook(hk: H) { hk.addHookClient(this); hooks += hk; }
+ protected def detachHook(hk: H) { hk.rmHookClient(this); hooks -= hk; }
+ protected def detachAllHooks()
+ { for (hk <- hooks) hk.rmHookClient(this); hooks.clear(); }
+}
+
+trait Hook[E] {
+ type C = BaseHookClient[E];
+ private val clients = new WeakHashMap[C, Unit];
+ def addHookClient(c: C) { clients(c) = (); }
+ def rmHookClient(c: C) { clients -= c; }
+ protected def callHook(evt: E)
+ { for (c <- clients.keys) c.hook(this, evt); }
+}
+
+/*----- Fluid variables ---------------------------------------------------*/
+
+object BaseFluid {
+ /* The multi-fluid `let' form is defined here so that it can access the
+ * `capture' method of individual fluids, but users should use the
+ * package-level veneer.
+ */
+
+ private[tripe] def let[U](fxs: (BaseFluid[T], T) forSome { type T }*)
+ (body: => U): U = {
+ /* See the package-level `let' for details. */
+ val binds = for ((f, _) <- fxs) yield f.capture;
+ try { for ((f, x) <- fxs) f.v = x; body }
+ finally { for (b <- binds) b.restore(); }
+ }
+}
+def let[U](fxs: (BaseFluid[T], T) forSome { type T }*)(body: => U): U = {
+ /* let(F -> X, ...) { BODY }
+ *
+ * Evaluate BODY in a dynamic context where each fluid F is bound to the
+ * corresponding value X.
+ */
+
+ BaseFluid.let(fxs: _*)(body);
+}
+
+trait BaseFluid[T] {
+ /* The basic fluid protocol. */
+
+ override def toString(): String =
+ f"${getClass.getName}%s@${hashCode}%x($v%s)";
+
+ protected trait Binding {
+ /* A captured binding which can be restored later. Implementing this is
+ * a subclass responsibility.
+ */
+
+ def restore();
+ /* Restore the fluid's state to the state captured here. */
+ }
+
+ /* Fetch and modify the current binding. */
+ def v: T;
+ def v_=(x: T);
+
+ protected def capture: Binding;
+ /* Capture and the current state of the fluid. */
+
+ def let[U](x: T)(body: => U): U = {
+ /* let(X) { BODY }
+ *
+ * Evaluate BODY in a dynamic context where the fluid is bound to the
+ * value X.
+ */
+
+ val b = capture;
+ try { v = x; body } finally { b.restore(); }
+ }
+}
+
+class SharedFluid[T](init: T) extends BaseFluid[T] {
+ /* A simple global fluid. It's probably a mistake to try to access a
+ * `SharedFluid' from multiple threads without serious synchronization.
+ */
+
+ var v: T = init;
+ private class Binding(old: T) extends super.Binding
+ { def restore() { v = old; } }
+ protected def capture: super.Binding = new Binding(v);
+}
+
+class ThreadFluid[T](init: T) extends BaseFluid[T] {
+ /* A thread-aware fluid. The top-level binding is truly global, shared by
+ * all threads, but `let'-bindings are thread-local.
+ */
+
+ private[this] var global: T = init;
+ private[this] var bound: ThreadLocal[Option[T]] = new ThreadLocal;
+ bound.set(None);
+
+ def v: T = bound.get match { case None => global; case Some(x) => x; };
+ def v_=(x: T) { bound.get match {
+ case None => global = x;
+ case _ => bound.set(Some(x));
+ } }
+
+ private class Binding(old: Option[T]) extends super.Binding
+ { def restore() { bound.set(old); } }
+ protected def capture: super.Binding = new Binding(bound.get);
+}
+
+/*----- Other random things -----------------------------------------------*/
+
trait LookaheadIterator[T] extends BufferedIterator[T] {
- private[this] var st: Option[T] = None;
+ /* An iterator in terms of a single `maybe there's another item' function.
+ *
+ * It seems like every time I write an iterator in Scala, the only way to
+ * find out whether there's a next item, for `hasNext', is to actually try
+ * to fetch it. So here's an iterator in terms of a function which goes
+ * off and maybe returns a next thing. It turns out to be easy to satisfy
+ * the additional requirements for `BufferedIterator', so why not?
+ */
+
+ /* Subclass responsibility. */
protected def fetch(): Option[T];
+
+ /* The machinery. `st' is `None' if there's no current item, null if we've
+ * actually hit the end, or `Some(x)' if the current item is x.
+ */
+ private[this] var st: Option[T] = None;
private[this] def peek() {
+ /* Arrange to have a current item. */
if (st == None) fetch() match {
case None => st = null;
case x@Some(_) => st = x;
}
}
+
+ /* The `BufferedIterator' protocol. */
override def hasNext: Boolean = { peek(); st != null }
- override def head(): T =
+ override def head: T =
{ peek(); if (st == null) throw new NoSuchElementException; st.get }
- override def next(): T = { val it = head(); st = None; it }
+ override def next(): T = { val it = head; st = None; it }
}
-def lines(r: Reader) = new LookaheadIterator[String] {
- /* Iterates over the lines of text in a `Reader' object. */
+def bufferedReader(r: Reader): BufferedReader = r match {
+ case br: BufferedReader => br
+ case _ => new BufferedReader(r)
+}
- private[this] val in = r match {
- case br: BufferedReader => br;
- case _ => new BufferedReader(r);
+def lines(r: BufferedReader): BufferedIterator[String] =
+ new LookaheadIterator[String] {
+ /* Iterates over the lines of text in a `Reader' object. */
+ override protected def fetch() = Option(r.readLine());
}
- protected override def fetch(): Option[String] = Option(in.readLine);
+def lines(r: Reader): BufferedIterator[String] = lines(bufferedReader(r));
+
+def blocks(in: InputStream, blksz: Int):
+ BufferedIterator[(Array[Byte], Int)] =
+ /* Iterates over (possibly irregularly sized) blocks in a stream. */
+ new LookaheadIterator[(Array[Byte], Int)] {
+ val buf = new Array[Byte](blksz)
+ override protected def fetch() = {
+ val n = in.read(buf);
+ if (n < 0) None
+ else Some((buf, n))
+ }
+ }
+def blocks(in: InputStream):
+ BufferedIterator[(Array[Byte], Int)] = blocks(in, 65536);
+
+def blocks(in: BufferedReader, blksz: Int):
+ BufferedIterator[(Array[Char], Int)] =
+ /* Iterates over (possibly irregularly sized) blocks in a reader. */
+ new LookaheadIterator[(Array[Char], Int)] {
+ val buf = new Array[Char](blksz)
+ override protected def fetch() = {
+ val n = in.read(buf);
+ if (n < 0) None
+ else Some((buf, n))
+ }
+ }
+def blocks(in: BufferedReader):
+ BufferedIterator[(Array[Char], Int)] = blocks(in, 65536);
+def blocks(r: Reader, blksz: Int): BufferedIterator[(Array[Char], Int)] =
+ blocks(bufferedReader(r), blksz);
+def blocks(r: Reader): BufferedIterator[(Array[Char], Int)] =
+ blocks(bufferedReader(r));
+
+def oxford(conj: String, things: Seq[String]): String = things match {
+ case Seq() => "<nothing>"
+ case Seq(a) => a
+ case Seq(a, b) => s"$a $conj $b"
+ case Seq(a, tail@_*) =>
+ val sb = new StringBuilder;
+ sb ++= a; sb ++= ", ";
+ def iter(rest: Seq[String]) {
+ rest match {
+ case Seq() => unreachable;
+ case Seq(a) => sb ++= conj; sb += ' '; sb ++= a;
+ case Seq(a, tail@_*) => sb ++= a; sb ++= ", "; iter(tail);
+ }
+ }
+ iter(tail);
+ sb.result
}
+val datefmt = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss Z");
+
+def formatDuration(t: Int): String =
+ if (t < -1) "???"
+ else {
+ val (s, t1) = (t%60, t/60);
+ val (m, h) = (t1%60, t1/60);
+ if (h > 0) f"$h%d:$m%02d:$s%02d"
+ else f"$m%02d:$s%02d"
+ }
+
/*----- That's all, folks -------------------------------------------------*/
}