/*----- Imports -----------------------------------------------------------*/
-import java.io.{BufferedReader, Reader, Writer};
+import java.io.{BufferedReader, InputStreamReader, OutputStreamWriter};
import java.util.concurrent.locks.{Condition, ReentrantLock => Lock};
-import scala.collection.mutable.{HashMap, Publisher};
+import scala.collection.mutable.HashMap;
import scala.concurrent.Channel;
import scala.util.control.Breaks;
import Implicits._;
+import sys.{serverInput, serverOutput};
/*----- Classification of server messages ---------------------------------*/
class ConnectionLostException extends Exception;
-class Connection(val in: Reader, val out: Writer)
- extends Publisher[AsyncMessage]
+object Connection extends Hook[AsyncMessage]
{
/* Synchronization.
*
* hold the `Connection' lock before locking any individual `Job' objects.
*/
- var livep: Boolean = true; // Is this connection still alive?
- var fgjob: Option[this.Job] = None; // Foreground job, if there is one.
- val jobmap = new HashMap[String, this.Job]; // Maps tags to extant jobs.
- var bgseq = 0; // Next background job tag.
+ private var livep: Boolean = true; // Is this connection still alive?
+ private var fgjob: Option[this.Job] = None; // Foreground job, if there is one.
+ private val jobmap = new HashMap[String, this.Job]; // Maps tags to extant jobs.
+ private var bgseq = 0; // Next background job tag.
- type Pub = Connection;
+ private val in = new BufferedReader(new InputStreamReader(serverInput));
+ private val out = new OutputStreamWriter(serverOutput);
+
+ type Pub = Connection.type;
class Job extends Iterator[Seq[String]] {
private[Connection] val ch = new Channel[JobMessage];
private[this] var nextmsg: Option[JobMessage] = None;
private[this] def fetchNext()
- { if (nextmsg == None) nextmsg = Some(ch.read); }
+ { if (!nextmsg) nextmsg = Some(ch.read); }
override def hasNext: Boolean = {
fetchNext();
nextmsg match {
override def next(): Seq[String] = {
fetchNext();
nextmsg match {
- case None => ???
- case Some(JobOK) => throw new NoSuchElementException
- case Some(JobFail(msg)) => throw new CommandFailed(msg)
- case Some(JobLostConnection) => throw new ConnectionLostException
+ case None => unreachable;
+ case Some(JobOK) => throw new NoSuchElementException;
+ case Some(JobFail(msg)) => throw new CommandFailed(msg);
+ case Some(JobLostConnection) => throw new ConnectionLostException;
case Some(JobInfo(msg)) => nextmsg = None; msg
}
}
def submit(toks: String*): this.Job = submit(false, toks: _*);
- def close() { synchronized { out.close(); } }
-
/* These two expect the connection lock to be held. */
def foregroundJob: Job =
fgjob.getOrElse { throw new ServerFailed("no foreground job"); }
j
}
case msg: AsyncMessage =>
- publish(msg);
+ callHook(msg);
case _: ServiceMessage =>
ok;
}
case None => ok;
}
}
- publish(ConnectionLost);
- in.close(); out.close();
+ callHook(ConnectionLost);
}
}
}