注册与调用
Dapr Actor 注册与调用
这里以 dapr/java-sdk 中的示例为例介绍 Dapr Actor 的调用,首先我们可以声明 Actor 的状态存储库:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
spec:
type: state.redis
version: v1
metadata:
- name: redisHost
value: 10.0.100.18:6379
- name: redisPassword
value: ""
- name: actorStateStore
value: "true"
Actor 注册
首先声明类型:
@ActorType(name = "DemoActor")
public interface DemoActor {
void registerReminder();
@ActorMethod(name = "echo_message")
String say(String something);
void clock(String message);
@ActorMethod(returns = Integer.class)
Mono<Integer> incrementAndGet(int delta);
}
然后注册实现:
/**
* Implementation of the DemoActor for the server side.
*/
public class DemoActorImpl extends AbstractActor implements DemoActor, Remindable<Integer> {
/**
* Format to output date and time.
*/
private static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
/**
* This is the constructor of an actor implementation, while also registering a timer.
* @param runtimeContext The runtime context object which contains objects such as the state provider.
* @param id The id of this actor.
*/
public DemoActorImpl(ActorRuntimeContext runtimeContext, ActorId id) {
super(runtimeContext, id);
super.registerActorTimer(
null,
"clock",
"ping!",
Duration.ofSeconds(2),
Duration.ofSeconds(1)).block();
}
/**
* Registers a reminder.
*/
@Override
public void registerReminder() {
super.registerReminder(
"myremind",
(int) (Integer.MAX_VALUE * Math.random()),
Duration.ofSeconds(5),
Duration.ofSeconds(2)).block();
}
/**
* Prints a message and appends the timestamp.
* @param something Something to be said.
* @return What was said appended with timestamp.
*/
@Override
public String say(String something) {
Calendar utcNow = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
String utcNowAsString = DATE_FORMAT.format(utcNow.getTime());
// Handles the request by printing message.
System.out.println("Server say method for actor "
+ super.getId() + ": "
+ (something == null ? "" : something + " @ " + utcNowAsString));
super.getActorStateManager().set("lastmessage", something).block();
// Now respond with current timestamp.
return utcNowAsString;
}
/**
* Increments a persistent counter, saves and returns its updated value.
* Example of method implemented with Reactor's Mono class.
* This method could be rewritten with blocking calls in Mono, using block() method:
*
* <p>public int incrementAndGet(int delta) {
* int counter = 0;
* if (super.getActorStateManager().contains("counter").block()) {
* counter = super.getActorStateManager().get("counter", int.class).block();
* }
* counter = counter + 1;
* super.getActorStateManager().set("counter", counter).block();
* return counter;
* }</p>
* @param delta Amount to be added to counter.
* @return Mono response for the incremented value.
*/
@Override
public Mono<Integer> incrementAndGet(int delta) {
return super.getActorStateManager().contains("counter")
.flatMap(exists -> exists ? super.getActorStateManager().get("counter", int.class) : Mono.just(0))
.map(c -> c + delta)
.flatMap(c -> super.getActorStateManager().set("counter", c).thenReturn(c));
}
/**
* Method invoked by timer.
* @param message Message to be printed.
*/
@Override
public void clock(String message) {
Calendar utcNow = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
String utcNowAsString = DATE_FORMAT.format(utcNow.getTime());
// Handles the request by printing message.
System.out.println("Server timer for actor "
+ super.getId() + ": "
+ (message == null ? "" : message + " @ " + utcNowAsString));
}
/**
* Method used to determine reminder's state type.
* @return Class for reminder's state.
*/
@Override
public TypeRef<Integer> getStateType() {
return TypeRef.INT;
}
/**
* Method used be invoked for a reminder.
* @param reminderName The name of reminder provided during registration.
* @param state The user state provided during registration.
* @param dueTime The invocation due time provided during registration.
* @param period The invocation period provided during registration.
* @return Mono result.
*/
@Override
public Mono<Void> receiveReminder(String reminderName, Integer state, Duration dueTime, Duration period) {
return Mono.fromRunnable(() -> {
Calendar utcNow = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
String utcNowAsString = DATE_FORMAT.format(utcNow.getTime());
String message = String.format("Server reminded actor %s of: %s for %d @ %s",
this.getId(), reminderName, state, utcNowAsString);
// Handles the request by printing message.
System.out.println(message);
});
}
}
最后是在运行时中进行注册:
/**
* Service for Actor runtime.
* 1. Build and install jars:
* mvn clean install
* 2. cd to [repo-root]/examples
* 3. Run the server:
* dapr run --components-path ./components/actors --app-id demoactorservice --app-port 3000 \
* -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.actors.DemoActorService -p 3000
*/
public class DemoActorService {
/**
* The main method of this app.
* @param args The port the app will listen on.
* @throws Exception An Exception.
*/
public static void main(String[] args) throws Exception {
Options options = new Options();
options.addRequiredOption("p", "port", true, "Port the will listen to.");
CommandLineParser parser = new DefaultParser();
CommandLine cmd = parser.parse(options, args);
// If port string is not valid, it will throw an exception.
final int port = Integer.parseInt(cmd.getOptionValue("port"));
// Idle timeout until actor instance is deactivated.
ActorRuntime.getInstance().getConfig().setActorIdleTimeout(Duration.ofSeconds(30));
// How often actor instances are scanned for deactivation and balance.
ActorRuntime.getInstance().getConfig().setActorScanInterval(Duration.ofSeconds(10));
// How long to wait until for draining an ongoing API call for an actor instance.
ActorRuntime.getInstance().getConfig().setDrainOngoingCallTimeout(Duration.ofSeconds(10));
// Determines whether to drain API calls for actors instances being balanced.
ActorRuntime.getInstance().getConfig().setDrainBalancedActors(true);
// Register the Actor class.
ActorRuntime.getInstance().registerActor(DemoActorImpl.class);
// Start Dapr's callback endpoint.
DaprApplication.start(port);
}
}
服务调用
在调用侧,需要动态地根据 Actor 类型来创建 Actor:
/**
* Client for Actor runtime to invoke actor methods.
* 1. Build and install jars:
* mvn clean install
* 2. cd to [repo-root]/examples
* 3. Run the client:
* dapr run --components-path ./components/actors --app-id demoactorclient -- java -jar \
* target/dapr-java-sdk-examples-exec.jar io.dapr.examples.actors.DemoActorClient
*/
public class DemoActorClient {
private static final int NUM_ACTORS = 3;
/**
* The main method.
* @param args Input arguments (unused).
* @throws InterruptedException If program has been interrupted.
*/
public static void main(String[] args) throws InterruptedException {
try (ActorClient client = new ActorClient()) {
ActorProxyBuilder<DemoActor> builder = new ActorProxyBuilder(DemoActor.class, client);
List<Thread> threads = new ArrayList<>(NUM_ACTORS);
// Creates multiple actors.
for (int i = 0; i < NUM_ACTORS; i++) {
ActorId actorId = ActorId.createRandom();
DemoActor actor = builder.build(actorId);
// Start a thread per actor.
Thread thread = new Thread(() -> callActorForever(actorId.toString(), actor));
thread.start();
threads.add(thread);
}
// Waits for threads to finish.
for (Thread thread : threads) {
thread.join();
}
}
System.out.println("Done.");
}
/**
* Makes multiple method calls into actor until interrupted.
* @param actorId Actor's identifier.
* @param actor Actor to be invoked.
*/
private static final void callActorForever(String actorId, DemoActor actor) {
// First, register reminder.
actor.registerReminder();
// Now, we run until thread is interrupted.
while (!Thread.currentThread().isInterrupted()) {
// Invoke actor method to increment counter by 1, then build message.
int messageNumber = actor.incrementAndGet(1).block();
String message = String.format("Actor %s said message #%d", actorId, messageNumber);
// Invoke the 'say' method in actor.
String result = actor.say(message);
System.out.println(String.format("Actor %s got a reply: %s", actorId, result));
try {
// Waits for up to 1 second.
Thread.sleep((long) (1000 * Math.random()));
} catch (InterruptedException e) {
// We have been interrupted, so we set the interrupted flag to exit gracefully.
Thread.currentThread().interrupt();
}
}
}
}