异常处理与持久化
Error Handling & Persistence | 异常处理与持久化
Fault Tolerance | 容错机制
Persistence | 持久化
<dependency>
<groupId>org.iq80.leveldb</groupId>
<artifactId>leveldb</artifactId>
<version>0.7</version>
</dependency>
然后我们还需要针对持久化策略添加相关的配置:
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.journal.leveldb.dir = "target/example/journal"
akka.persistence.snapshot-store.local.dir = "target/example/snapshots"
# DO NOT USE THIS IN PRODUCTION !!!
# See also https://github.com/typesafehub/activator/issues/287
akka.persistence.journal.leveldb.native = false
当我们声明某个可持久化的
class ExamplePersistentActor extends AbstractPersistentActor {}
然后复写
@Override
public Receive createReceiveRecover() {
return receiveBuilder()
// 恢复之前在上一个快照点之后发布的 Event
.match(Evt.class, e -> state.update(e))
// 恢复之前保存的状态
.match(SnapshotOffer.class, ss -> state = (ExampleState) ss.snapshot())
.build();
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Cmd.class, c -> {
final String data = c.getData();
final Evt evt = new Evt(data + "-" + getNumEvents());
// 持久化消息
persist(evt, (Evt event) -> {
state.update(event);
getContext().system().eventStream().publish(event);
});
})
// 触发持久化当前状态
.matchEquals("snap", s -> saveSnapshot(state.copy()))
.matchEquals("print", s -> System.out.println(state))
.build();
}
在外部调用时,我们可以手动地触发进行状态存储:
persistentActor.tell(new Cmd("foo"), null);
persistentActor.tell("snap", null);
persistentActor.tell(new Cmd("buzz"), null);
persistentActor.tell("print", null);