注册与调用

Dapr Actor 注册与调用

这里以 dapr/java-sdk 中的示例为例介绍 Dapr Actor 的调用,首先我们可以声明 Actor 的状态存储库:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: statestore
spec:
  type: state.redis
  version: v1
  metadata:
    - name: redisHost
      value: 10.0.100.18:6379
    - name: redisPassword
      value: ""
    - name: actorStateStore
      value: "true"

Actor 注册

首先声明类型:

@ActorType(name = "DemoActor")
public interface DemoActor {

  void registerReminder();

  @ActorMethod(name = "echo_message")
  String say(String something);

  void clock(String message);

  @ActorMethod(returns = Integer.class)
  Mono<Integer> incrementAndGet(int delta);
}

然后注册实现:

/**
 * Implementation of the DemoActor for the server side.
 */
public class DemoActorImpl extends AbstractActor implements DemoActor, Remindable<Integer> {

  /**
   * Format to output date and time.
   */
  private static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

  /**
   * This is the constructor of an actor implementation, while also registering a timer.
   * @param runtimeContext The runtime context object which contains objects such as the state provider.
   * @param id             The id of this actor.
   */
  public DemoActorImpl(ActorRuntimeContext runtimeContext, ActorId id) {
    super(runtimeContext, id);

    super.registerActorTimer(
        null,
        "clock",
        "ping!",
        Duration.ofSeconds(2),
        Duration.ofSeconds(1)).block();
  }

  /**
   * Registers a reminder.
   */
  @Override
  public void registerReminder() {
    super.registerReminder(
        "myremind",
        (int) (Integer.MAX_VALUE * Math.random()),
        Duration.ofSeconds(5),
        Duration.ofSeconds(2)).block();
  }

  /**
   * Prints a message and appends the timestamp.
   * @param something Something to be said.
   * @return What was said appended with timestamp.
   */
  @Override
  public String say(String something) {
    Calendar utcNow = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
    String utcNowAsString = DATE_FORMAT.format(utcNow.getTime());

    // Handles the request by printing message.
    System.out.println("Server say method for actor "
        + super.getId() + ": "
        + (something == null ? "" : something + " @ " + utcNowAsString));

    super.getActorStateManager().set("lastmessage", something).block();

    // Now respond with current timestamp.
    return utcNowAsString;
  }

  /**
   * Increments a persistent counter, saves and returns its updated value.
   * Example of method implemented with Reactor's Mono class.
   * This method could be rewritten with blocking calls in Mono, using block() method:
   *
   * <p>public int incrementAndGet(int delta) {
   *   int counter = 0;
   *   if (super.getActorStateManager().contains("counter").block()) {
   *     counter = super.getActorStateManager().get("counter", int.class).block();
   *   }
   *   counter = counter + 1;
   *   super.getActorStateManager().set("counter", counter).block();
   *   return counter;
   * }</p>
   * @param delta Amount to be added to counter.
   * @return Mono response for the incremented value.
   */
  @Override
  public Mono<Integer> incrementAndGet(int delta) {
    return super.getActorStateManager().contains("counter")
        .flatMap(exists -> exists ? super.getActorStateManager().get("counter", int.class) : Mono.just(0))
        .map(c -> c + delta)
        .flatMap(c -> super.getActorStateManager().set("counter", c).thenReturn(c));
  }

  /**
   * Method invoked by timer.
   * @param message Message to be printed.
   */
  @Override
  public void clock(String message) {
    Calendar utcNow = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
    String utcNowAsString = DATE_FORMAT.format(utcNow.getTime());

    // Handles the request by printing message.
    System.out.println("Server timer for actor "
        + super.getId() + ": "
        + (message == null ? "" : message + " @ " + utcNowAsString));
  }

  /**
   * Method used to determine reminder's state type.
   * @return Class for reminder's state.
   */
  @Override
  public TypeRef<Integer> getStateType() {
    return TypeRef.INT;
  }

  /**
   * Method used be invoked for a reminder.
   * @param reminderName The name of reminder provided during registration.
   * @param state        The user state provided during registration.
   * @param dueTime      The invocation due time provided during registration.
   * @param period       The invocation period provided during registration.
   * @return Mono result.
   */
  @Override
  public Mono<Void> receiveReminder(String reminderName, Integer state, Duration dueTime, Duration period) {
    return Mono.fromRunnable(() -> {
      Calendar utcNow = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
      String utcNowAsString = DATE_FORMAT.format(utcNow.getTime());

      String message = String.format("Server reminded actor %s of: %s for %d @ %s",
          this.getId(), reminderName, state, utcNowAsString);

      // Handles the request by printing message.
      System.out.println(message);
    });
  }
}

最后是在运行时中进行注册:

/**
 * Service for Actor runtime.
 * 1. Build and install jars:
 * mvn clean install
 * 2. cd to [repo-root]/examples
 * 3. Run the server:
 * dapr run --components-path ./components/actors --app-id demoactorservice --app-port 3000 \
 *   -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.actors.DemoActorService -p 3000
 */
public class DemoActorService {

  /**
   * The main method of this app.
   * @param args The port the app will listen on.
   * @throws Exception An Exception.
   */
  public static void main(String[] args) throws Exception {
    Options options = new Options();
    options.addRequiredOption("p", "port", true, "Port the will listen to.");

    CommandLineParser parser = new DefaultParser();
    CommandLine cmd = parser.parse(options, args);

    // If port string is not valid, it will throw an exception.
    final int port = Integer.parseInt(cmd.getOptionValue("port"));

    // Idle timeout until actor instance is deactivated.
    ActorRuntime.getInstance().getConfig().setActorIdleTimeout(Duration.ofSeconds(30));
    // How often actor instances are scanned for deactivation and balance.
    ActorRuntime.getInstance().getConfig().setActorScanInterval(Duration.ofSeconds(10));
    // How long to wait until for draining an ongoing API call for an actor instance.
    ActorRuntime.getInstance().getConfig().setDrainOngoingCallTimeout(Duration.ofSeconds(10));
    // Determines whether to drain API calls for actors instances being balanced.
    ActorRuntime.getInstance().getConfig().setDrainBalancedActors(true);

    // Register the Actor class.
    ActorRuntime.getInstance().registerActor(DemoActorImpl.class);

    // Start Dapr's callback endpoint.
    DaprApplication.start(port);
  }
}

服务调用

在调用侧,需要动态地根据 Actor 类型来创建 Actor:


/**
 * Client for Actor runtime to invoke actor methods.
 * 1. Build and install jars:
 * mvn clean install
 * 2. cd to [repo-root]/examples
 * 3. Run the client:
 * dapr run --components-path ./components/actors --app-id demoactorclient -- java -jar \
 * target/dapr-java-sdk-examples-exec.jar io.dapr.examples.actors.DemoActorClient
 */
public class DemoActorClient {

  private static final int NUM_ACTORS = 3;

  /**
   * The main method.
   * @param args Input arguments (unused).
   * @throws InterruptedException If program has been interrupted.
   */
  public static void main(String[] args) throws InterruptedException {
    try (ActorClient client = new ActorClient()) {
      ActorProxyBuilder<DemoActor> builder = new ActorProxyBuilder(DemoActor.class, client);
      List<Thread> threads = new ArrayList<>(NUM_ACTORS);

      // Creates multiple actors.
      for (int i = 0; i < NUM_ACTORS; i++) {
        ActorId actorId = ActorId.createRandom();
        DemoActor actor = builder.build(actorId);

        // Start a thread per actor.
        Thread thread = new Thread(() -> callActorForever(actorId.toString(), actor));
        thread.start();
        threads.add(thread);
      }

      // Waits for threads to finish.
      for (Thread thread : threads) {
        thread.join();
      }
    }

    System.out.println("Done.");
  }

  /**
   * Makes multiple method calls into actor until interrupted.
   * @param actorId Actor's identifier.
   * @param actor Actor to be invoked.
   */
  private static final void callActorForever(String actorId, DemoActor actor) {
    // First, register reminder.
    actor.registerReminder();

    // Now, we run until thread is interrupted.
    while (!Thread.currentThread().isInterrupted()) {
      // Invoke actor method to increment counter by 1, then build message.
      int messageNumber = actor.incrementAndGet(1).block();
      String message = String.format("Actor %s said message #%d", actorId, messageNumber);

      // Invoke the 'say' method in actor.
      String result = actor.say(message);
      System.out.println(String.format("Actor %s got a reply: %s", actorId, result));

      try {
        // Waits for up to 1 second.
        Thread.sleep((long) (1000 * Math.random()));
      } catch (InterruptedException e) {
        // We have been interrupted, so we set the interrupted flag to exit gracefully.
        Thread.currentThread().interrupt();
      }
    }
  }
}