Suppose canister A wants to send canister B a stream of messages.
The messages in the stream are ordered and should be processed by B in that order.
This package provides an implementation of a protocol for this purpose.
The protocol has the following properties:
- efficiency: messages from
Aare sent in batches toB - order: preservation of order is guaranteed
- no gaps: messages are retried if needed to make sure there are no gaps in the stream
The package provides two classes, StreamSender for A and StreamReceiver for B.
In A, the canister code pushes items one by one to the StreamSender class.
In B, the StreamReceiver class invokes a callback for each arrived item.
The two classes manage everything in between including batching,
retries if any inter-canister calls fail and
managing concurrency (pipelining).
From the outside the protocol provides ordered, reliable messaging similar to TCP. The implementation is simpler than TCP. For example, the only state maintained by the receiver is the stream position (a single integer). In case of a gap, the receiver does not buffer items after the gap. Instead, the sender will automatically retry those items.
The package is published on MOPS and GitHub. Please refer to the README on GitHub where it renders properly with formulas and tables.
The API documentation can be found here on Mops.
Examples are documented in examples/README.md.
For updates, help, questions, feedback and other requests related to this package join us on:
Reliable, asynchronous communication between canisters is hard to get right because of the many edge cases that can occur if inter-canister calls fail. The purpose of this package is to hide that complexity from the developer by letting this library handle all of it.
Before instantiating the class, the user needs to define a function sendFunc that makes an inter-canister call to the receiver and calls a corresponding receiving endpoint.
This is boilerplate code and is usually a one-line function.
Sender and receiver have to a agree on the name of the endpoint.
The StreamSender forms each batch by taking unsent items from its internal queue.
The user has a way to control the batch size beyond simply defining a maximum number of item per batch.
For example, the user may want to count the byte size of the items in a batch
and limit the size of a batch by byte size.
This will allow him to make better use of the total available message size in inter-canister communication.
To this end, the user provides a counterCreator function (typically a class constructor).
The StreamSender uses it to create a new counter instance for each batch.
It then feeds the items one-by-one to the counter's accept function before adding them to the batch.
When the accept function returns null then it stops
and considers the batch as "full".
Moreover, as an advanced feature, the counter can also transform the item from one type to a different type and the transformed type goes into the batch. In other words, the type of the items in the queue can differ from the type of the items in the batch.
sendFunc and counterCreator must be passed to the StreamSender constructor.
The StreamSender has four more settings that can be set dynamically at runtime via setter functions.
maxQueueSize: the maximum number of elements that can simultaneously be inStreamSender's queue. Default setting is infinity.maxWindowSize: the maximum number of concurrentsendChunkcalls. Default setting is 5.keepAliveSeconds: the period in seconds after whichStreamSendershould send a ping chunk in case there are no items to send. Default setting is not to ping.maxStreamLength: the maximum number of items a stream can ever accept. Default setting is infinity.
Methods:
pushis used to add item to the stream.statusto check current status of stream sender.sendChunkto trigger sending a chunk to the receiver side.- additional helper functions are provided.
StreamReceiver requires the following arguments in its constructor:
itemCallbackis the function that will be called on each individual received item.timeoutArgdefines if the stream should be stopped if too much time passes between two consecutiveonChunkcalls.nullmeans no timeout. Otherwise a duration and the time retrieval function are supplied.
The method onChunk of the StreamReceiver must be connected with an endpoint of the receiver canister.
It must be called with each arriving chunk from the sender.
You need mops installed. In your project directory run:
mops add streamIn the Motoko source file import the package as:
import StreamSender "mo:stream/StreamSender";
import StreamReceiver "mo:stream/StreamReceiver";This example is taken from examples/minimal.
import Stream "../../../src/StreamSender";
import Prim "mo:prim";
persistent actor Alice {
// Read Bob's canister id once from an environment variable.
//
// Note: We don't allow the receiver to change later because that
// would risk corrupting the stream state. We would create a new
// stream instead if we have a new receiver.
let bob : Text = switch (Prim.envVar<system>("PUBLIC_CANISTER_ID:bob")) {
case (?id) id;
case _ Prim.trap("Environment variable 'PUBLIC_CANISTER_ID:bob' not set");
};
// Substitute your item type here
type Item = Nat;
// The endpoint (update method) on Bob's side must have this type:
type RecvFunc = shared Stream.ChunkMessage<Item> -> async Stream.ControlMessage;
// The endpoint (update method) on Bob's side is called "receive" in this example.
// Hence, this is the actor supertype for Bob that we use:
type ReceiverAPI = actor { receive : RecvFunc };
// This is Bob, the receiving actor whose Principal was supplied in the init argument:
transient let B : ReceiverAPI = actor (bob);
// This is our `sendFunc` which is simply a wrapper around Bob's `receive` method.
// It is possible to wrap custom code around calling `B.receive` but we must not tamper
// with the response and we must not trap.
transient let send_ = func(x : Stream.ChunkMessage<Item>) : async* Stream.ControlMessage {
await B.receive(x);
};
// This is our `counterCreator`.
// A class with a single `accept` method which returns `null` when a batch is full.
// In this example we simply count the number of items and allow at most 3 items in a batch.
// We also do not transform the items, i.e. the type of items in the queue is identical to
// the type of items in the batch.
class counter_() {
var sum = 0;
let maxLength = 3;
public func accept(item : Item) : ?Item {
if (sum == maxLength) return null;
sum += 1;
return ?item;
};
};
// We can now define our `StreamSender` class, initialized with our `sendFunc` and `counterCreator`.
transient let sender = Stream.StreamSender<Item, Item>(send_, counter_);
// This function shows how we push to the `StreamSender`'s queue with `push`.
public shared func enqueue(n : Nat) : async () {
var i = 0;
while (i < n) {
ignore sender.push((i + 1) ** 2);
i += 1;
};
};
// This function shows how we trigger a chunk to formed and sent with `sendChunk`.
public shared func batch() : async () {
await* sender.sendChunk();
};
};
This example is taken from examples/minimal.
import Error "mo:core/Error";
import Principal "mo:core/Principal";
import Stream "../../../src/StreamReceiver";
import Prim "mo:prim";
persistent actor Bob {
// Read Alice's principal once from an environment variable.
//
// Note: We don't allow the sender to change later because that
// would risk corrupting the stream state. We would create a new
// stream instead if we have a new sender.
let sender = Principal.fromText(
switch (Prim.envVar<system>("PUBLIC_CANISTER_ID:alice")) {
case (?id) id;
case _ Prim.trap("Environment variable 'PUBLIC_CANISTER_ID:alice' not set");
}
);
// Substitute your item type here
type Item = Nat;
// We define a function to process each item.
// It accepts the item index (position) in the stream and the item itself.
// In this example the processing function simply logs the item.
// The function name can be freely chosen.
transient var log_ : Text = "";
func processItem(index : Nat, item : Item) : Bool {
// put your processing code here
log_ #= debug_show (index, item) # " ";
true;
};
// Now we can define our `StreamReceiver` by passing it the processing function defined above:
transient let receiver_ = Stream.StreamReceiver<Item>(processItem, null);
// We have to create the endpoint (update method) that Alice will call to send chunks.
// Here, both sides have agreed on the name "receive" for this endpoint.
// The type must be: `shared Stream.ChunkMessage<Item> -> async Stream.ControlMessage`
// It is possible to wrap custom code around calling `onChunk` but we must not tamper
// with the response and we must not trap.
public shared (msg) func receive(m : Stream.ChunkMessage<Item>) : async Stream.ControlMessage {
// Make sure only Alice can call this method
if (msg.caller != sender) throw Error.reject("not authorized");
receiver_.onChunk(m);
};
// A getter for the log to monitor the receiver in action
public func log() : async Text { log_ };
};Install node (LTS recommended) including npm.
Required for mops.
Install mops:
npm install -g ic-mops
mops toolchain initRun
git clone git@github.com:research-ag/stream.git
mops install
mops testThe examples/ directory contains executable examples.
For an overview and how to run them locally see: examples/README.md.
Minimal code required to get a sender and a receiver talking to each other.
Compared to the example above this demonstrates:
- how a more sophisticated counter for batch preparation can look like
- how queue type can differ from sending type
- how to send chunks from heartbeat
- how to persist the stream across canister upgrades
Compared to the main example this demonstrates:
- how to use a Tracker connected to a stream
- how to persist the metrics across canister upgrades
You can watch the metrics from a browser at a URL like this:
http://txyno-ch777-77776-aaaaq-cai.raw.localhost:8000/metrics
where txyno-ch777-77776-aaaaq-cai is replaced by the canister id
that is shown during icp deploy.
MR Research AG, 2023 - 2026
Main author: Timo Hanke (timohanke).
Contributors: Andrii Stepanov (AStepanov25), Andy Gura (AndyGura).
Apache-2.0