Lolipop - purely functional Raft implementation
Lolipop is a basic implementation of Raft algorithm using a purely functional approach. It is effect agnostic meaning user can choose your own effect type (eg. cats IO, Future, ZIO)
It does not support the following feature (todo)
- Dynamic membership
- Log compaction
Usage
Lolipop is rather minimal, users need to implement several interfaces to be able to use it, below are interfaces that are required
1. Implement StateMachine
Raft algorithm is used to implement a replicated state machine, represented by StateMachine in our code.
StateMachine is application specific, it manages the State that we are trying to reach consensus for. It could be as simple as an Int, or it could be a collection of documents.
Below is a sample implementation, there are 2 method to implement
import cats.effect._
import cats.implicits._
import raft.algebra.StateMachine
case class ChangeCount(i: Int)
val counter: IO[StateMachine[IO, ChangeCount, Int]] = for {
state <- Ref[IO].of(0)
} yield {
new StateMachine[IO, ChangeCount, Int] {
override def execute(cmd: ChangeCount): IO[Int] = {
state.modify { i =>
val j = i + cmd.i
j -> j
}
}
override def getCurrent: IO[Int] = state.get
}
}
2. Implement LogIO
Raft algorithm reach consensus by replicating logs in the cluster, and logs are expected to be durable so that consensus are maintain in spite of server crash.
LogIO
represent the ability for the application to interact with log files, Lolipop let user to implement this because there are different ways to achieve durable logs (eg. file system, remote database).
Below is an example implemented using SwayDB
import cats.effect.IO
import raft.algebra.io.LogIO
import raft.model.RaftLog
class FileLogIO(db: swaydb.Map[Int, RaftLog[ChangeCount], IO]) extends LogIO[IO, ChangeCount] {
override def getByIdx(idx: Int): IO[Option[Log]] = db.get(idx)
override def overwrite(logs: Seq[Log]): IO[Unit] = {
db.put(logs.map(s => s.idx -> s)).map(_ => ())
}
override def lastLog: IO[Option[Log]] = db.lastOption.map { opt =>
opt.map(_._2)
}
/**
* @param idx - inclusive
*/
override def takeFrom(idx: Int): IO[Seq[Log]] = {
db.fromOrAfter(idx).takeWhile(_ => true).map(_._2).materialize
}
override def append(log: Log): IO[Unit] = db.put(log.idx, log).map(_ => ())
}
3. Implement NetworkIO
NetworkIO
represents the two kind of network communication in a Raft cluster, AppendRequestRPC
and VoteRequestRPC
.
User has to provide their own implementations, here’s an example using http4s (Note: HTTP has relatively high overhead and thus is not recommended, this only serves as a reference)
import cats.effect.Sync
import io.circe.Encoder
import io.circe.generic.auto._
import org.http4s.Method._
import org.http4s._
import org.http4s.circe._
import org.http4s.client._
import org.http4s.client.dsl.Http4sClientDsl
import raft.algebra.io.NetworkIO
import raft.model._
class HttpNetwork[F[_]: Sync, Cmd: Encoder](networkConfig: Map[String, Uri], client: Client[F])
extends NetworkIO[F, Cmd]
with Http4sClientDsl[F]
with CirceEntityEncoder
with CirceEntityDecoder {
override def sendAppendRequest(nodeID: String, appendReq: AppendRequest[Cmd]): F[AppendResponse] = {
val uri = networkConfig(nodeID) / "append"
client.expect[AppendResponse](POST.apply(body = appendReq, uri = uri))
}
override def sendVoteRequest(nodeID: String, voteRq: VoteRequest): F[VoteResponse] = {
val uri = networkConfig(nodeID) / "vote"
client.expect[VoteResponse](POST.apply(body = voteRq, uri = uri))
}
}
4. Implement PersistentIO
Each raft node need to store some metadata durable, other than logs, the interaction with these metadata is a lot simpler, it’s basic get and put, it is represented by PersistentIO
Here’s an example
import cats.Monad
import cats.effect.concurrent.MVar
import raft.algebra.io.PersistentIO
import raft.model.Persistent
class SwayDBPersist[F[_]: Monad](db: swaydb.Map[Int, Persistent, F], lock: MVar[F, Unit]) extends PersistentIO[F] {
val singleKey = 1
override def get: F[Persistent] = db.get(singleKey).map(_.get)
/**
* The implementatFn of this method must persist
* the `Persistent` atomically
*
* possible implementatFn:
* - JVM FileLock
* - embedded database
*/
override def update(f: Persistent => Persistent): F[Unit] =
for {
_ <- lock.take
old <- get
new_ = f(old)
_ <- db.put(singleKey, new_)
_ <- lock.put(())
} yield ()
}
5. Implement EventLogger
(Optional)
EventLogger is useful for debugging, it is being used to record important events in a Raft process. There is a JsonEventLogger
provided out of the box.
TODO: Provide a slf4j event logger
6. Create `RaftProcess
Once we have the above dependencies, we can build a RaftProcess using methods on RaftProcess object, it requires all dependencies mentioned above, with an additional ClusterConfig
, which is a case class containing names of each node
WARNING: ClusterConfig
will change once we support dynamic membership
import raft._
val clusterConfig = ClusterConfig("node0", Set("node1", "node2"))
val proc = RaftProcess.simple(
stateMachine,
clusterConfig,
logIO,
networkIO,
eventLogger,
persistentIO
)
7. Expose raft api to the network
Once you get a RaftProcess
, you can do the following
Bind client API to network
RaftProcess exposes RaftApi
trait, which contains multiple method that should be binded to network
def write(cmd: Cmd): F[WriteResponse]
def read: F[ReadResponse[State]]
def requestAppend(req: AppendRequest[Cmd]): F[AppendResponse]
def requestVote(req: VoteRequest): F[VoteResponse]
User need to route network request to these methods
Here’s an example of exposing methods by http api
def raftProtocol(api: RaftApi[F, Cmd]): HttpRoutes[F] =
HttpRoutes
.of[F] {
case req @ POST -> Root / "append" =>
for {
app <- req.as[AppendRequest[Cmd]]
reply <- api.requestAppend(app)
res <- Ok(reply.asJson)
} yield res
case req @ POST -> Root / "vote" =>
for {
vote <- req.as[VoteRequest]
reply <- api.requestVote(vote)
res <- Ok(reply.asJson)
} yield res
case req @ POST -> Root / "cmd" =>
for {
change <- req.as[Cmd]
reply <- api.incoming(change)
res <- Ok(reply.asJson)
} yield res
case GET -> Root / "state" =>
for {
reply <- api.read
res <- Ok(reply.asJson)
} yield res
}
val server = BlazeServerBuilder[F]
.bindHttp(9000,"localhost")
// hook api to HTTP4S
.withHttpApp(raftProtocol(raftProc.api).orNotFound)
.serve
Start the Raft Server
Once the methods are exposed, we can start the raft process, by doing this, raft process will run continuously to check if each nodes are reacheable.
You can start server by calling def startRaft: Stream[F, Unit]
on RaftProcess, we typically evaluate the stream in Main class.