keys.scala, etc.: Make merging public keys have a progress bar.
[tripe-android] / admin.scala
index cc82186..9796cd9 100644 (file)
@@ -27,14 +27,15 @@ package uk.org.distorted.tripe; package object admin {
 
 /*----- Imports -----------------------------------------------------------*/
 
-import java.io.{BufferedReader, Reader, Writer};
+import java.io.{BufferedReader, InputStreamReader, OutputStreamWriter};
 import java.util.concurrent.locks.{Condition, ReentrantLock => Lock};
 
-import scala.collection.mutable.{HashMap, Publisher};
+import scala.collection.mutable.HashMap;
 import scala.concurrent.Channel;
 import scala.util.control.Breaks;
 
-import Magic._;
+import Implicits._;
+import sys.{serverInput, serverOutput};
 
 /*----- Classification of server messages ---------------------------------*/
 
@@ -42,33 +43,30 @@ sealed abstract class Message;
 
 sealed abstract class JobMessage extends Message;
 case object JobOK extends JobMessage;
-case class JobInfo(info: Seq[String]) extends JobMessage;
-case class JobFail(err: Seq[String]) extends JobMessage;
+final case class JobInfo(info: Seq[String]) extends JobMessage;
+final case class JobFail(err: Seq[String]) extends JobMessage;
 case object JobLostConnection extends JobMessage;
 
-case class BackgroundJobMessage(tag: String, msg: JobMessage)
+final case class BackgroundJobMessage(tag: String, msg: JobMessage)
        extends Message;
-case class JobDetached(tag: String) extends Message;
+final case class JobDetached(tag: String) extends Message;
 
 sealed abstract class AsyncMessage extends Message;
-case class Trace(msg: String) extends AsyncMessage;
-case class Warning(err: Seq[String]) extends AsyncMessage;
-case class Notify(note: Seq[String]) extends AsyncMessage;
+final case class Trace(msg: String) extends AsyncMessage;
+final case class Warning(err: Seq[String]) extends AsyncMessage;
+final case class Notify(note: Seq[String]) extends AsyncMessage;
 case object ConnectionLost extends AsyncMessage;
 
 sealed abstract class ServiceMessage extends Message;
-case class ServiceCancel(jobid: String) extends ServiceMessage;
-case class ServiceClaim(svc: String, version: String)
+final case class ServiceCancel(jobid: String) extends ServiceMessage;
+final case class ServiceClaim(svc: String, version: String)
        extends ServiceMessage;
-case class ServiceJob(jobid: String, svc: String,
+final case class ServiceJob(jobid: String, svc: String,
                      cmd: String, args: Seq[String])
        extends ServiceMessage;
 
 /*----- Main code ---------------------------------------------------------*/
 
-object Connection {
-}
-
 class ConnectionClosed extends Exception;
 
 class ServerFailed(msg: String) extends Exception(msg);
@@ -80,8 +78,7 @@ class CommandFailed(val msg: Seq[String]) extends Exception {
 
 class ConnectionLostException extends Exception;
 
-class Connection(val in: Reader, val out: Writer)
-       extends Publisher[AsyncMessage]
+object Connection extends Hook[AsyncMessage]
 {
   /* Synchronization.
    *
@@ -90,17 +87,22 @@ class Connection(val in: Reader, val out: Writer)
    * hold the `Connection' lock before locking any individual `Job' objects.
    */
 
-  var livep: Boolean = true;           // Is this connection still alive?
-  var fgjob: Option[this.Job] = None;  // Foreground job, if there is one.
-  val jobmap = new HashMap[String, this.Job]; // Maps tags to extant jobs.
-  var bgseq = 0;                       // Next background job tag.
+  private var livep: Boolean = true;   // Is this connection still alive?
+  private var fgjob: Option[this.Job] = None; // Foreground job, if there is one.
+  private val jobmap = new HashMap[String, this.Job]; // Maps tags to extant jobs.
+  private var bgseq = 0;               // Next background job tag.
+
+  private val in = new BufferedReader(new InputStreamReader(serverInput));
+  private val out = new OutputStreamWriter(serverOutput);
+
+  type Pub = Connection.type;
 
   class Job extends Iterator[Seq[String]] {
     private[Connection] val ch = new Channel[JobMessage];
     private[this] var nextmsg: Option[JobMessage] = None;
 
     private[this] def fetchNext()
-      { if (nextmsg == None) nextmsg = Some(ch.read); }
+      { if (!nextmsg) nextmsg = Some(ch.read); }
     override def hasNext: Boolean = {
       fetchNext();
       nextmsg match {
@@ -111,10 +113,10 @@ class Connection(val in: Reader, val out: Writer)
     override def next(): Seq[String] = {
       fetchNext();
       nextmsg match {
-       case None => ???
-       case Some(JobOK) => throw new NoSuchElementException
-       case Some(JobFail(msg)) => throw new CommandFailed(msg)
-       case Some(JobLostConnection) => throw new ConnectionLostException
+       case None => unreachable;
+       case Some(JobOK) => throw new NoSuchElementException;
+       case Some(JobFail(msg)) => throw new CommandFailed(msg);
+       case Some(JobLostConnection) => throw new ConnectionLostException;
        case Some(JobInfo(msg)) => nextmsg = None; msg
       }
     }
@@ -184,8 +186,6 @@ println(";; write command");
 
   def submit(toks: String*): this.Job = submit(false, toks: _*);
 
-  def close() { synchronized { out.close(); } }
-
   /* These two expect the connection lock to be held. */
   def foregroundJob: Job =
     fgjob.getOrElse { throw new ServerFailed("no foreground job"); }
@@ -248,9 +248,9 @@ println(s";; line: $line");
              j
            }
          case msg: AsyncMessage =>
-           publish(msg);
+           callHook(msg);
          case _: ServiceMessage =>
-           ();
+           ok;
        }
       }
     } catch {
@@ -264,11 +264,10 @@ println(s";; line: $line");
            j.ch.write(JobLostConnection);
            fgjob = None;
            notifyAll();
-         case None => ();
+         case None => ok;
        }
       }
-      publish(ConnectionLost);
-      in.close(); out.close();
+      callHook(ConnectionLost);
     }
   }
 }