import java.io.{BufferedReader, InputStreamReader, OutputStreamWriter};
import java.util.concurrent.locks.{Condition, ReentrantLock => Lock};
-import scala.collection.mutable.{HashMap, Publisher};
+import scala.collection.mutable.HashMap;
import scala.concurrent.Channel;
import scala.util.control.Breaks;
class ConnectionLostException extends Exception;
-object Connection extends Publisher[AsyncMessage]
+object Connection extends Hook[AsyncMessage]
{
/* Synchronization.
*
private[this] var nextmsg: Option[JobMessage] = None;
private[this] def fetchNext()
- { if (nextmsg == None) nextmsg = Some(ch.read); }
+ { if (!nextmsg) nextmsg = Some(ch.read); }
override def hasNext: Boolean = {
fetchNext();
nextmsg match {
override def next(): Seq[String] = {
fetchNext();
nextmsg match {
- case None => ???
- case Some(JobOK) => throw new NoSuchElementException
- case Some(JobFail(msg)) => throw new CommandFailed(msg)
- case Some(JobLostConnection) => throw new ConnectionLostException
+ case None => unreachable;
+ case Some(JobOK) => throw new NoSuchElementException;
+ case Some(JobFail(msg)) => throw new CommandFailed(msg);
+ case Some(JobLostConnection) => throw new ConnectionLostException;
case Some(JobInfo(msg)) => nextmsg = None; msg
}
}
j
}
case msg: AsyncMessage =>
- publish(msg);
+ callHook(msg);
case _: ServiceMessage =>
ok;
}
case None => ok;
}
}
- publish(ConnectionLost);
+ callHook(ConnectionLost);
}
}
}