Ufotofu is a Rust library of abstractions (“streams” and “sinks”) for asynchronously working with series of data — except we call them producers and consumers, and have strong opinions about the API designs.

An ufotofu sequence consists of an arbitrary number of repeated items, terminated by at most one final value. A producer yields these values one at a time; a consumer receives them one at a time. Both traits can emit an error on any method call.

trait Consumer {
  type Item;
  type Final;
  type Error;

  async fn consume(
    &mut self,
    value: Either<Self::Item, Self::Final>,
  ) -> Result<(), Self::Error>;

  // Empty out any inner buffers.
  async fn flush(&mut self) -> Result<(), Self::Error>;
}
trait Producer {
  type Item;
  type Final;
  type Error;

  async fn produce(&mut self) -> Result<
    Either<Self::Item, Self::Final>, Self::Error,
  >;


  // Fill up any inner buffers.
  async fn slurp(&mut self) -> Result<(), Self::Error;
}

On this website, we introduce both the what and the why of the core ufotofu APIs. You can also read the crate docs, which give a more thorough introduction to only the what without the why. If you would like to contribute, please see our issue tracker.

Design Choices for Sequence APIs

Why the Producer, Consumer, BulkProducer, and BulkConsumer traits are what they are.

Producers and consumers are dual

Adding an item to one end of a sequence and removing an item from the other end of the sequence are dual operations. Both transfer the same amount of information, both obey equally rigid access restrictions, and they ultimately cancel each other out.

To us, it follows that the corresponding traits — consumers and producers — should also be dual. Every feature of one API without a counterpart in the other API would indicate that one design is less expressive than it should be. Any imbalance would go against the principle that APIs should always strive for consistency.

While there are limits to how far we could take this symmetry, many design choices that would have been muddy in isolation became clear when interpreted as the duals of more clear-cut choices. Examples — which we describe later in greater detail — include the explicit error type for producers, or the precise notion of buffering for producers.

Being hellbent on the duality of our core traits leads to elegant notions of composition.

The first notion of composition is that of piping a producer into a consumer. Each successful call to produce is mirrored by a call to consume, and the error type is the sum type over both possible errors:

async fn pipe<P, C>(
  producer: &mut P,
  consumer: &mut C,
) -> Result<(), PipeError<P::Error, C::Error>>
where
  P: Producer,
  C: Consumer<Item = P::Item, Final = P::Final>,
{
  loop {
    match producer.produce().await
      .map_err(PipeError::Producer)? {
      Left(item) => {
        consumer.consume(Left(item)).await
          .map_err(PipeError::Consumer)?;
      }
      Right(fin) => {
        consumer.consume(Right(fin)).await
          .map_err(PipeError::Consumer)?;
      }
    }
  }
}

Whereas piping is mostly driven by the producer, the other natural notion of composition is driven by the consumer: an in-memory channel. Each consume call on the sender provides the data for the corresponding produce call on the receiver:

sender.consume(Left(17)).await?;
sender.consume(Left(42)).await?;

assert_eq!(receiver.produce().await?, Left(17));

sender.consume(Left(99)).await?;
sender.consume(Right("salmon")).await?;

assert_eq!(receiver.produce().await?, Left(42));
assert_eq!(receiver.produce().await?, Left(99));
assert_eq!(receiver.produce().await?, Right("salmon"));

Zero-copy bulk processing

On typical modern hardware, operating on slices of values is more efficient than repeatedly operating on individual values. Producers and consumers are limited to operating on individual items, however. Ufotofu provides the BulkProducer and BulkConsumer traits for slice-based processing:

// A consumer that can consume multiple items at a time.
trait BulkConsumer: Consumer {
  // Exposes a mutable slice of items to an async closure.
  // The closure writes items into the slice, and reports
  // to the consumer how many it wrote.
  // The closure also provides a proper return value,
  // which is simply forwarded by this method.
  async fn expose_slots<F, R>(&mut self, f: F)
    -> Result<R, Self::Error>
    where F: AsyncFnOnce(&mut [Self::Item]) -> (usize, R);
}
// A producer that can produce multiple items at a time.
trait BulkProducer: Producer {
  // Exposes a slice of items to an async closure.
  // The closure reads items from the slice, and reports
  // to the producer how many it read.
  // The closure also provides a proper return value,
  // which is simply forwarded by this method.
  async fn expose_items<F, R>(&mut self, f: F)
    -> Result<Either<R, Self::Final>, Self::Error>
    where F: AsyncFnOnce(&[Self::Item]) -> (usize, R);
}

Before we describe and discuss these APIs in more detail: our design is atypical. A more traditional approach to bulk processing is exemplified by std::io::Read and std::io::Write: you pass a slice of items to the bulk processor, and it writes items into the slice or reads items from the slice. In ufotofu, such a design would have looked like this:

// An alternate BulkConsumer design,
// analogous to `io::Write`.
trait TraditionalBulkConsumer: Consumer {
  // Reads items from the given slice; returns how many.
  async fn bulk_consume(&mut self, buf: &[Self::Item])
    -> Result<usize, Self::Error>
}
// An alternate BulkProducer design,
// analogous to `io::Read`.
trait TraditionalBulkProducer: Producer {
  // Writes items into the given slice; returns how many.
  async fn bulk_produce(&mut self, buf: &mut [Self::Item])
    -> Result<Either<usize, Self::Final>, Self::Error>
}

This approach can introduce needless copying, however. Consider the task of piping data from an io::Read into an io::Write: you need to supply an intermediate buffer, and the bytes will first be copied into that buffer and then copied out of it. Each byte is copied twice. Here is an analogous sketch of bulk piping if ufotofu had more traditional bulk processing APIs:

async fn bulk_pipe_traditional<P, C>(
  p: P,
  c: C,
  buf: &mut [P::Item],
) -> Result<(), PipeError<P::Error, C::Error>>
where
  P: TraditionalBulkProducer<Item: Clone>,
  C: TraditionalBulkConsumer<Item = P::Item, Final = P::Final>,
{
  loop {
    // Write data into the buffer.
    match p.bulk_produce(buf).await
      .map_err(PipeError::Producer)?
    {
      Left(amount) => {
        // Read all written data from the buffer.
        c.bulk_consume_all(&buf[..amount]).await
          .map_err(PipeError::Consumer)?;
      }
      Right(fin) => {
        return Ok(c.consume(Right(fin)).await
          .map_err(PipeError::Consumer)?);
      }
    }
  }
}

Instead of interacting with external buffers, our actual designs work by exposing internal buffers to the outside. This avoids unnecessary copying, for example when implementing the actual ufotofu bulk_pipe function:

async fn bulk_pipe<P, C>(
  p: P,
  c: C,
) -> Result<(), PipeError<P::Error, C::Error>>
where
  P: BulkProducer<Item: Clone>,
  C: BulkConsumer<Item = P::Item, Final = P::Final>,
{
  loop {
    // The producer exposes its buffer.
    match p.expose_items(async |items| {
      // The consumer copies the exposed items
      // into its own buffer, using a method
      // reimplementing the traditional API design.
      match c.bulk_consume(items).await {
        Ok(amount) => (amount, Ok(())),
        Err(consumer_error) => (0, Err(consumer_error)),
      }
    }).await.map_err(PipeError::Producer)?
    {
      Left(Ok(())) => {} // Go to the next loop iteration.
      Left(Err(consumer_err)) => {
        return Err(PipeError::Consumer(consumer_err));
      }
      Right(fin) => {
        return Ok(c.consume(Right(fin)).await
          .map_err(PipeError::Consumer)?);
      }
    }
  }
}

Ufotofu provides BulkProducerExt::bulk_produce and BulkConsumerExt::bulk_consume as simple and efficient reimplementations of the more traditional APIs based on external slices:

// Reads data from the given slice; returns how much.
async fn bulk_consume(&mut self, buf: &[Self::Item])
  -> Result<usize, Self::Error>
  where Self::Item: Clone,
{
  self.expose_slots(async |slots| {
    let amount = min(slots.len(), buf.len());
    slots[..amount].clone_from_slice(&buf[..amount]);
    (amount, amount)
  }).await
}
// Writes data into the given slice; returns how much.
async fn bulk_produce(&mut self, buf: &mut [Self::Item])
  -> Result<Either<usize, Self::Final>, Self::Error>
  where Self::Item: Clone
{
  self.expose_items(async |items| {
    let amount = min(items.len(), buf.len());
    buf[..amount].clone_from_slice(&items[..amount]);
    (amount, amount)
  }).await
}

Our traits do appear to suffer from a drawback: they can only be implemented by processors which internally store items in a contiguous slice. You cannot, for example, write a bulk producer adaptor which maps all produced items through a function and have the adaptor implement BulkProducer. While the inner producer exposes slices of items, individually applying a function to these items does not materialise a slice of mapped items that the adaptor could expose.

This limitation is actually a feature, not a bug. The point of bulk processing is the efficiency of operating on whole slices simultaneously. Our API design automatically excludes “false” bulk processors which operate on individual items behind the scenes (such as the example mapping adaptor). Traditional bulk APIs based on copying into or from external buffers, in contrast, admit implementation by processors which simply interact on a per-item-basis with the external buffer.

In the synchronous standard library, where producers (i.e., Iterators) and bulk producers (i.e., Readers) are completely unrelated APIs, a strict notion of “true” bulk processors would cause considerable friction. A mapping combinator that turned a Read into an Iterator would imply switching to a completely different toolset, with different degrees of expressivity around error reporting, final values, etc. In ufotofu, where the bulk processor traits simply extend the item-by-item traits, no such problems arise.


Fully generic bulk processing

Many API designs hardcode bulk operations to u8 as the only type of items, and std::io::Error as the only type of errors.

Why, though?


Bulk traits extend item-by-item traits

Bulk processing APIs allow producing or consuming multiple items with a single method call. Producing or consuming exactly one item is a special case — every bulk producer or bulk consumer is able to do it. Ufotofu reflects this on the type level: BulkProducer extends Producer, and BulkConsumer extends Consumer.

This design choice lets us elegantly specify the semantics of bulk operations: in terms of items, final values, and errors, bulk operations must be equivalent to repeated individual operations. This rule unambiguously resolves potential confusion:

Consider a bulk consumer which could successfully consume four items but would have to emit an error after consuming a fifth item. It is tasked to consume five items in bulk. Should it immediately report an error, or should it consume the first four items and signal success? Only the latter is compatible with the sequence of results of five individual consume operations.

Finally, note that this design choice directly implies fully generic bulk processing.


Bulk operations never process zero items

Bulk processing combines many individual operations into a single operation. Does “many” include “zero”? The answer is, emphatically, no.

When calling produce on a producer, you need to await the result. If the producer cannot procure its result immediately, it parks the task and resumes it once the result is available. This happens transparently, the calling code remains unaware whether any delays occured or not. Backpressure correctly and automatically propagates through the system.

Bulk operations which report processing of zero items would undermine everything. Imagine we tasked a bulk producer to bulk-produce several items at once, and it reported back “successful” production of zero items. This would not signal the end of the sequence, since the result was neither a final value nor an error. So we should probably ask the bulk producer to produce items again. But when should we do so? Continuously extracting zero items in a busy loop is unacceptable, but the bulk producer has no way of actively signalling when it has actual data available.

For this reason (and analogous reasoning for consumers), temporary inability to (bulk-) process items must always be signalled with a pending future in ufotofu. There might be very real reasons for wanting to return zero on a bulk operation. If you encounter one of these, then the ufotofu traits are simply not the traits you are looking for. The more ufotofu constrains the semantics of its traits, the more useful functionality it can provide.

While bulk processors must not expose empty slices, the closures passed to expose_items and expose_slots may in fact report back that zero items were processed. For bulk producers, this interestingly enables peeking at least one item without actually producing it — making bulk producers more expressive than regular producers in an unexpected way. We have decided to embrace this fact, because restricting the bulk processing closures to always reporting nonzero amounts of processed items would have inhibited composability in various places. Vexingly, whereas peeking is highly useful, the dual notion of staging at least one item in a bulk consumer is virtually useless.

// Let a closure mutate the next item to be consumed,
// without actually consuming it.
async fn stage<F, R>(&mut self, f: F)
  -> Result<R, Self::Error>
  where F: AsyncFnOnce(&mut Self::Item) -> R
  {
    self.expose_slots(async |slots| {
      (0, f(&mut slots[0]).await)
    }).await
  }
// Let a closure inspect the next item to be produced,
// without actually producing it.
async fn peek<F, R>(&mut self, f: F)
  -> Result<Either<R, Self::Final>, Self::Error>
  where F: AsyncFnOnce(&Self::Item) -> R
  {
    self.expose_items(async |items| {
      (0, f(&items[0]).await)
    }).await
  }

Buffering is abstracted-over in traits

Ufotofu supports buffering in its core trait designs, both for consumers and for producers. Consumers follow the conventional notion of being allowed to delay the side-effects of consumption, and surrounding code can explicitly trigger these side-effects by calling flush. Dually, producers are allowed to eagerly perform side-effects to speed up future production, and surrounding code can explicitly trigger such optimistic side-effects by calling slurp.

The ConsumerExt::to_buffered and ProducerExt::to_buffered methods can add buffering to arbitrary consumers or producers respectively. The backing storage for the buffered items can be provided by any type implementing our Queue trait:

trait Queue {
  type Item;

  fn len(&self) -> usize;
  fn is_full(&self) -> bool;

  fn enqueue(&mut self, item: Self::Item) -> Option<Self::Item>;
  async fn expose_slots<F, R>(&mut self, f: F) -> R
    where F: AsyncFnOnce(&mut [Self::Item]) -> (usize, R);

  fn dequeue(&mut self) -> Option<Self::Item>;
  async fn expose_items<F, R>(&mut self, f: F) -> R
    where F: AsyncFnOnce(&[Self::Item]) -> (usize, R);
}

All errors are fatal errors

Every error reported by a producer or consumer is fatal. You must not produce or consume more items after an error. Why is that?

Errors are the only mechanism by which a consumer can provide information to its users. By defining all errors as fatal, we restrict the information flow from consumers to their environments. This deliberately limits expressivity: if consumers reported back information with every consume operation, they would effectively turn into coroutines (fully equivalent to if produce received an argument). Nonfatal errors would elevate consumers to this degree of expressivity:

// A signature not befitting a consumer; it outputs
// just as much information as it receives!
async fn consume(
  &mut self,
  value: Either<Self::Item, Self::Final>,
) -> Output;

// A signature with even more expressive output. Oops.
async fn consume(
  &mut self,
  value: Either<Self::Item, Self::Final>,
) -> Result<(), NonFatalError>;

Without any error reporting, consumers would be almost useless — imagine, for example, a TCP implementation that silently drops bytes instead of reporting connection loss. Fatal-only errors for consumers are a compromise to make consumers useful while still maximally constraining them. And the more we constrain our traits, the more useful functionality we can provide for them.

From a purely theoretical perspective, it might seem like consume should not return any information at all: produce does not receive any information either, and isn’t ufotofu all about duality? You can jump forward to our section on effects to see how we weasel ourselves out of this one.

Since producers mirror consumers, they too have fatal errors. In fact, the consumer-producer duality is the sole reason why producers have explicit error support at all:


Generic final values

Ufotofu sequences end with an explicit final value. This stands in contrast to APIs such as Iterator, where the end of the sequence does not carry any additional information. We will justify our unusual choice through three increasingly more abstract points.

Our first point takes the form of a concrete example API. Consider a database of books. There is a method for querying all current books by author, and a method for subscribing to notifications about changes to the data set by author:

struct Book {
  title: String,
  author: String,
}

enum Event {
  Inserted(Book),
  Removed(Book),
}

struct Database {
  // Produce all current books by `author`.
  fn query(&self, author: &str)
    -> impl Producer<Item = Book, Final = (), Error = !> {
      todo!();
  }

  // Notify about all updates to books by `author`.
  fn subscribe(&self, author: &str)
    -> impl Producer<Item = Event, Final = (), Error = !> {
      todo!();
  }
}

This API has a problem: suppose I want to keep an up-to-date list of all books in the database by Ursula Le Guin. I might start by calling query, retrieve the books, and then call subscribe to keep up to date. But then there can be no guarantee that I am not missing any updates which happened in between those method calls. So I should call subscribe first. This will be correct, but the database might give me redundant information.

Instead, I would like to add a query_and_subscribe method to the database, so that the database can give me complete yet non-redundant information. What should the return type of this method be? Thanks to arbitrary final values, we can give the exact right type:

struct Database {
  // First produce all current books, then transition
  // to a stream of updates.
  fn query_and_subscribe(&self, author: &str) -> impl Producer<
    Item = Book,
    Final = impl Producer<Item = Event, Final = (), Error = !>,
    Error = !,
  >
}

Our second point concerns two special cases of final values: the unit type (), and the empty type !. A producer whose type of final values it the unit type describes finite sequences — you get some number of items of the same type, and at some point it stops. A producer whose type of final values is the empty type can never emit a final value. In other words, this is how we can indicate infinite sequences on the type level.

In the standard library, in contrast, iterators cannot statically distinguish between finite and infinite sequences. A separate InfiniteIterator trait solves the problem if you really need this (there’s a crate already), but ufotofu will never have this problem in the first place.

Our third point is a rather theoretical one, and centers on the desired level of expressivity of sequence APIs. As a prelude, consider that we work with eager sequences (i.e., sequences which exist in memory all at once) all the time: whenever we define a data type, there is an associated sequential layout of its parts in memory. Rust basically offers three operators for describing eager sequences: choice (either some type or another, in Rust via enums), concatenation (some type followed by another, in Rust via tuples), and homogeneous repetition (an unknown but finite number of repetitions of some type, in Rust via slices).

Squint a bit, and you see exactly the operators that characterise the regular languages (with slices corresponding to the Kleene star).

If the absolute bread and butter of typical programming languages is to classify eager sequences at the level of expressivity of regular languages, that strongly indicates that API designs for lazy sequences should at least match this level of expressivity. But APIs that hardcode the final value of sequences are not expressive enough: while the choice operator can be expressed through sum types of items, and the sequence API itself takes on the role of the Kleene star, there is no proper concatenation.

A parameterisable final value type is exactly the missing piece; it enables concatenation by using a different instance of the sequence API as the final value. This is exactly what our book database example demonstrated.

// A simple regular expression:
// (A + B)* C*

// A corresponding type:
type Exp = (&[Either<A, B>], &[C]);

// A corresponding producer type bound:
impl Producer<
  Item = Either<A, B>,
  Final = impl Producer<Item = C, Final = ()>,
>

// A corresponding consumer type bound:
impl Consumer<
  Item = Either<A, B>,
  Final = impl Consumer<Item = C, Final = ()>,
>

Because lazy sequences can be infinite, theory land offers another natural level of expressivity: the ω-regular languages. Using the empty type as a type of final values allows ufotofu to indeed express the ω-regular languages, in addition to the “regular” regular languages.


Frugal Async Rust

Ufotofu adheres to the Frugal Async Rust principles. In short:

See the Frugal Async Rust page for the reasoning behind those choices.


Alternate Design Choices

What could have been — and might yet be.

Support for multi-threaded executors

We decided against supporting multi-threaded executors, in order to not require a Send bound on the futures returned by all async trait methods.

While the default functionality of ufotofu will always stay single-threaded, we are open to eventually providing a module with separate multi-threaded variants of, well, everything. See the corresponding issue for more details.

trait SendConsumer {
  type Item;
  type Final;
  type Error;

  async fn consume(
    &mut self,
    value: Either<Self::Item, Self::Final>,
  ) -> impl Future<
      Output = Result<(), Self::Error>,
    > + Send;

  async fn flush(&mut self)
    -> impl Future<
      Output = Result<(), Self::Error>,
    > + Send;
}
trait SendProducer {
  type Item;
  type Final;
  type Error;

  async fn produce(&mut self) -> impl Future<
    Output = Result<
      Either<Self::Item, Self::Final>, Self::Error,
    >,
  > + Send;

  
  async fn slurp(&mut self)
    -> impl Future<
      Output = Result<(), Self::Error>,
    > + Send;
}

Synchronous trait variants

Ufotofu is a library for asynchronous processing of lazy sequences, but all our design principles can also be applied to synchronous (i.e., blocking) code. Just like with support for multi-threaded executors, we would be open to having a separate, macro-generated module of ufotofu traits and adaptors. We even wrote that macro already. Our corresponding issue provides more details.

trait SynchronousConsumer {
  type Item;
  type Final;
  type Error;

  fn consume(
    &mut self,
    value: Either<Self::Item, Self::Final>,
  ) -> Result<(), Self::Error>;

  fn flush(&mut self) -> Result<(), Self::Error>;
}
trait SynchronousProducer {
  type Item;
  type Final;
  type Error;

  fn produce(&mut self) -> Result<
      Either<Self::Item, Self::Final>,
      Self::Error,
    >;

  fn slurp(&mut self) -> Result<(), Self::Error>;
}

Given that synchronous ufotofu APIs would be in direct competition with the standard library (Iterator, std::io::Read, and std::io::Write) synchronous ufotofu feels a bit preposterous. Then again, synchronous Rust could really benefit from more flexible consumer abstractions. As a concrete example, consider that serde does not have a unified way for Serializers to emit their bytes. A synchronous equivalent of BulkConsumer would fill the exact gap that causes the awkwardness of serde in this respect.

See also our discussion of effects for a significantly more principled take on a synchronous variant of ufotofu, sadly not readily expressed in idiomatic Rust.


Simplified zero-copy bulk processing

The zero-copy bulk processing model of ufotofu is arguably the most drastic departure from more conventional APIs. We are confident in our choice of exposing internal buffers to improve efficiency, but our design is not the only way to achieve this.

Our functions take an async closure to operate on the exposed slice; a more straightforward design has two separate methods: one for accessing the slice, and one for notifying the producer or consumer that some number of items should be considered produced or consumed respectively. The bytes crate provides an example of this style in its bytes::buf::Buf::chunk and bytes::buf::Buf::advance methods (hey, look, another real-world use-case for synchronous ufotofu).

trait SimplifiedBulkConsumer: Consumer {
  // Expose a mutable slice of item slots.
  async fn expose_slots(&mut self)
    -> Result<&mut [Self::Item], Self::Error>;

  // Inform the consumer how many of its exposed
  // slots it should now consume.
  async fn consider_consumed(&mut self, amount: usize)
    -> Result<(), Self::Error>;
}
trait SimplifiedBulkProducer: Producer {
  // Expose a slice of items.
  async fn expose_items(&mut self)
    -> Result<&[Self::Item], Self::Error>;

  // Inform the producer how many of its exposed
  // items it should consider as having been produced.
  async fn consider_produced(&mut self, amount: usize)
    -> Result<(), Self::Error>;
}

We decided against this design because it can cause unexpected behaviour when the same bulk processor is used from multiple parts of a codebase. Imagine storing a bulk producer in an Rc, in an appropriate async-aware variant of a RefCell. Suppose some part of the codebase asks the producer to expose some items, but then yields on an .await expression. Having yielded, a different task now asks for a slice of items; it reads five of them and notifies the the producer; then later the first task also notifies the producer that it read some number of items.

As a result, some items got “produced” twice, while others are completely skipped. With our design, this cannot happen — the cell-like type providing mutually exclusive mutable access would have prevented the critical double-usage.

This is one of the few instances where we opted for the less obvious and less convenient choice among possible API designs. But we believe that statically excluding the full class of bugs represented by this example is worth it.


Type-level nonempty slices

Bulk producers and bulk consumers must not pass empty slices to the closures passed to expose_items and expose_slots respectively. Wouldn’t it be nice to enforce this on the type level? We tried it, and the ergonomics just felt bad. There exist other crates implementing nonempty slices, but none have caught on. Until the standard library provides an official, better-designed mechanism for nonempty slices, ufotofu will go with the less precise but significantly more usable API of plain slices.

Likewise, the closures passed to expose_items and expose_slots return usizes instead of NonZeroUsize for ergonomics.


Write-only slices

When a bulk producer exposes a slice of items, the closure can read from that slice, but it cannot write into that slice. When a bulk consumer exposes a slice of items, the closure can write into that slice, and it can read from that slice. This breaks duality. Blasphemy!

This asymmetry is rooted deeply in Rust: Rust has read-only references, but it does not have write-only references. We decided to not fight the language on this point: while it is possible to introduce a WriteOnlySlice type, it is cumbersome to use, and the benefits are fairly low.

In particular, write-only slices would not allow bulk consumers to expose uninitialised memory without risking unsafe behaviour, because an incorrectly reported number of filled slots would cause such a bulk consumer to treat still-uninitialised memory as initialised.


Linear typing

Ufotofu forbids interacting with a producer or consumer after the final value was processed or an error was emitted. Wouldn’t it be nice to enforce this on the type level?

Unlike most mainstream languages, Rust actually supports linear typing to the degree that the usage invariants could be enforced on the type level. For example, produce could take self by value, and return (Item, Self) when producing a regular item, but simply drop self when returning a final value or error.

trait LinearConsumer {
  type Item;
  type Final;
  type Error;

  // To accurately type consumption, we would need
  // to split it up into two separate methods:

  // Consume a regular item, and return back `self`.
  async fn consume(self, item: Self::Item)
    -> Result<Self, Self::Error>;
  
  // Consume a final value, do not return back `self`.
  async fn close(self, fin: Self::Final)
    -> Result<(), Self::Error>;

  async fn flush(self) -> Result<Self, Self::Error>;
}
trait LinearProducer {
  type Item;
  type Final;
  type Error;

  async fn produce(self) -> Result<
      Either<(Self::Item, Self), Self::Final>,
      Self::Error,
    >;






    
  async fn slurp(self) -> Result<Self, Self::Error>;
}

We tried it, and the ergonomics just felt bad. This is deeply rooted in core: poll could have adopted the same linear-typing trick to prevent poll-after-completion, but it did not (and that is probably the right decision). Had poll made a different choice, then ufotofu would have followed it. But going against the very foundation of async Rust-as-is would cause an inacceptable amount of friction.


Consumers with fewer associated types

Our Consumer trait specifies the types of items and final values it accepts as an associated type. In principle, these could have been generic parameters instead. The futures::Sink trait, for example, has an associated error type but the type of items is a regular type parameter.

trait GenericConsumer<Item, Final> {
  type Error;

  async fn consume(&mut self, value: Either<Item, Final>)
    -> Result<(), Self::Error>;

  async fn flush(&mut self) -> Result<(), Self::Error>;
}

We tried this in early iterations of utofu, but it never felt good. We never encountered a situation where the added flexibility payed off, and having to specify the type parameter everywhere was cumbersome. And after fully homing in on the consumer-producer duality, breaking symmetry this way did not make sense any longer.


Unsafe bulk APIs

Our bulk processing API designs enable better efficiency than conventional reader and writer designs. We could have done even better — but only with unsafe trait methods.

In ufotofu, bulk consumers must expose slices of initialised memory. It can be necessary to explicitly initialise a buffer, only so that the freshly initialised memory can be overwritten again. A straightforward fix would be to have expose_slots expose a slice of MaybeUninit<Item>. But expose_slots would additionally have to be unsafe, because the bulk consumer cannot reliably check which item slots have been filled with initialised memory by the closure. The bulk consumer would have to trust the return value of the closure to distinguish between initialised and uninitialised memory. And since the closure might report garbage, this assumption would have to be unsafe.

Bulk producers provide a dual opportunity for an unsafe optimisation. When a bulk producer exposes a slice of items, the closure cannot move those items out of the slice — if it wants ownership of the exposed items, it has to clone them. Moving items out of the slice would require the bulk producer to trust the reported number of moved items in order to treat some memory as “empty” (to know to not drop its contents). Since double-dropping can cause undefined behaviour, such an API design would inherently be unsafe as well.

trait UnsafeBulkConsumer: Consumer {
  unsafe async fn expose_slots<F, R>(&mut self, f: F)
    -> Result<R, Self::Error>
    where F: AsyncFnOnce(&mut [MaybeUninit<
      Self::Item,
    >]) -> (usize, R);
}
trait UnsafeBulkProducer: Producer {
  unsafe async fn expose_items<F, R>(&mut self, f: F)
    -> Result<Either<R, Self::Final>, Self::Error>
    where F: AsyncFnOnce(&[MaybeUninit<
      Self::Item,
    >]) -> (usize, R);
}

These unsafe APIs might well be useful for performance-critical code, and the actual ufotofu APIs could be built as safe wrappers on top of these unsafe APIs. We have not done this; we prioritised to keep things simple, more teachable, and less scary. But, in principle, this is a direction ufotofu could explore.


Random access

A producer or consumer interacts only with a single endpoint of a sequence. In the real world, many sequences require random access. The conventional pattern for random access to lazy sequences is to seek to a particular index. The Rust standard library exposes this pattern in the std::io::Seek trait. What about ufotofu?

Truth to be told, we simply do not feel confident enough about deciding how to abstract over random access yet. It is likely going to be the seek pattern. But there are some not-entirely-obvious choices to be made: can and should Producer and Consumer work with a single common Seek trait? Should producers and consumers advance the cursor in the same direction or in opposite directions? Should it be possible to freely choose the direction in which to consume or produce? How exactly do errors factor into everything? What about final values? Can seekable sequences have a final value at each end? How is seeking past the end or start of a sequence handled?

For now, we simply punt on these choices. Ufotofu has been simmering for years now, and it feels needlessly hasty to make risky choices about the seek pattern at this point. But we are interested in any experiments with ufotofu-compatible seek traits, and would love for you to open an issue sharing your designs.

All that aside, note how sequences with random seeking are essentially classic Turing machines, which is pretty neat (and/or foreboding). This immediately suggests a whole related class of abstractions: Turing machines with an elastic tape, where items can be deleted or inserted in addition to the traditional file-like operations of overwriting and reading.

The Turing machine perspective also opens up questions of trait hierarchies. Consumers and producers can be considered as highly restrictive forms of Turing machines. And they complect both reading/writing and movement in a single method call. Perhaps the core traits of a seek-aware library should be three independent traits for seeking, reading from the current position, and writing to the current position — not Producer, Consumer, and Seeker. Would it even be advisable to combine these two worlds?


Hypothetical Design Choices

For the theory nerds.

Dedicated syntax

In order to code a new producer or consumer, you need to explicitly define some data type and then manually implement the required traits. Could we do better?

Generator syntax can be adapted for creating anonymous producers — each yield expression produces an item, and we can trivially add final and throw expressions for producing final values and errors respectively. Dually, we can imagine a dedicated syntax for creating consumers, with a consume expression that matches agains an Either<Item, Final>:

// Defines a consumer that adds the numbers it
// consumes to some `&mut u64`.
consumer![|x: &mut u64| {
  loop {
    consume!{
      Left(summand) => x += summand,
      Right(()) => break,
    }
  }
}];
// Defines a producer that produces the numbers from
// zero to `x-1`, then the final value `"done"`.
producer![|x: usize| {
  for i in 0..x {
    yield!(i);
  }

  final!("done");
}]

Even more interesting than custom syntax for regular producer and consumers would be syntax for defining bulk producers and bulk consumers. In principle, there could be slice-based versions of yield and consume expressions, with some mechanism for automatically feeding the number of processed items back into the generated state machine. And since bulk processors are strict generalisations of regular processors, a slice-based defintion would suffice for generating the logic for item-by-item processing as well.


Conducers

Adaptors for producers and for consumers often share some functionality. Consider a MapItem adaptor which maps items through a function. This could be either a producer adaptor that maps the items emitted by a wrapped producer, or a consumer adaptor that maps the items it consumes before passing them on to a wrapped consumer. If we want to implement both, we need to duplicate code.

We propose the theoretical notion of a conducer (consumer + producer) to solve this. A conducer maps a sequence of consume operations to a sequence of produce operations. For example, a MapItem conducer would map a single consumption of an item into a single production of the transformed item. Such a conducer can be composed both with producers and with consumers:

You can chain a conducer c after a producer p1 to obtain a new producer p2: the original producer p1 provides a sequence of produce operations, the conducer c consumes these, and outputs the produce operations of the resulting producer p2. Dually, you can chain a conducer c before a consumer c1 to obtain a new consumer c2: the resulting consumer c2 consumes some items, the conducer c maps these consume operations to some produce operations, and these are fed into the original consumer c1.

In this model, you need to define the core logic of the transformation only once, and you automatically obtain both a producer adaptor and a consumer adaptor.

The generalisation to bulk conducers is straightforward (in theory): a bulk conducer maps sequences of bulk consumptions to sequences of bulk productions.

All this is nice in theory, but how would you actually represent a “mapping from a sequence of consume operations to a sequence of produce operations” in Rust? The obvious choice would be a type that implements both Producer and Consumer. Unfortunately, in order to compose such a value with another producer or consumer, you would need to concurrently await both a consume and a produce call on the conducer. This would require two simultaneous mutable references to the conducer, which Rust forbids.

As a workaround, we suggest using a unified syntax from which both a consumer adaptor and a producer adaptor can be generated. In fact, we can simply combine our (hypothetical) dedicated syntaxes for producers and consumers: each consume expression defines a consume operation, and each yield or final expression defines a produce operation.

// Defines a conducer for mapping items through a function.
// Rust would generate two types from this definition:
// a consumer adaptor and a producer adaptor.
conducer!{|f: impl FnMut(T) -> U| {
    consume!{
        Left(it) => yield!(f(it)),
        Right(fin) => final!(fin),
    }
}};

Any dedicated syntax for creation bulk consumers and bulk producers could be similarly combined to obtain syntax for bulk conducers.

Note that it would be necessary — and possible! — to generate correct implementations of flush and slurp for the adaptors. Implementing these by hand is quite cumbersome, especially for producer adaptors, which need to explicitly buffer final values and errors encountered when slurping.

Dedicated conducer syntax also relates to coroutines in an interesting way. Consider the coroutines of Lua, where a single yield operator not only passes a value to the caller, but also, inside the coroutine, evaluates to a value passed in from the caller. In other words, the yield operator of lua coroutines corresponds to a conducer yield expression immediately followed by a receive expression. Conducers generalise these coroutines, because conducers can use yield and receive in other combinations than only in matching pairs. An example without a direct one-to-one correspondence would be an encoder, mapping the consumption of a single item to the production of several bytes:

// Defines a conducer for big-endian `u32` encoding.
// The producer adaptor turns a producer of `u32`
// into one of `u8` by encoding every original `u32`.
// The consumer adaptor consumes `u32` items and
// feeds them to a wrapped consumer of `u8` items.
conducer!{|| {
    receive!{
        Left(it: u32) => {
            for byte in it.to_be_bytes() {
                yield!(byte);
            }
        }
        Right(fin) => final!(fin),
    }
}};

If we leave the world of Rust, we can easily dream up a dynamically typed language with first-class support for conducers, by generalising Lua’s approach to coroutines. Add an always-available event loop to the language à la javascript, make the conducers await-able, and you get a pretty compelling design. Oh, and if the values of the language could be the valuable values augmented with (async) closures and conducers, that would be nice. Does anyone have time or funding for this project?


Duality and effects

We have discussed ufotofu’s focus on duality already, but we have not given a formal notion of duality satisfied by ufotofu. Let us remedy that.

Category theory tells us that you obtain a dual category by swapping the direction of all arrows. Clearly, by swapping the arrows on all trait methods, i.e., by swapping arguments and return values, we should be able to switch between producers and consumers. This might sound a bit silly, but — barring some caveats — is actually what we shall do.

To demonstrate, let us first consider ufotofu without error handling. In this simplified world, the Producer trait looks like this:

trait ErrorlessProducer {
    type Item;
    type Final;

    async fn produce(&mut self)
        -> Either<Self::Item, Self::Error>;
    async fn slurp(&mut self) -> ();
}

If we swap arguments and return types (keeping &mut self as an argument for obvious reasons), we do indeed obtain a correct Consumer trait without errors.

trait ErrorlessConsumer {
    type Item;
    type Final;

    async fn consume(
        &mut self,
        value: Either<Self::Item, Self::Error>,
    ) -> ();
    async fn flush(&mut self) -> ();
}

To the readers who might be disappointed in us keeping &mut self as an argument, the equivalent, purely-functional signatures should serve as assurance that “flipping the arrows” does indeed work:

trait PurelyFunctionalConsumer {
  type Item;
  type Final;

  async fn consume(
    self,
    value: Either<Self::Item, Self::Final>,
  ) -> Self;

  async fn flush(self) -> Self;
}
trait PurelyFunctionalProducer {
  type Item;
  type Final;

  async fn produce(self) -> Either<
    (Self::Item, Self),
    Self::Final,
  >;

  async fn slurp(self) -> Self;
}

Introducing error variants breaks this pattern: both producers and consumers return results, neither takes them as arguments. But there is an easy interpretation to preserve the duality formalism even in the presence of error handling: we interpret error handling as an effect tied to the notion of a function type, fully independent of its argument and return types — Rust merely forces us to encode this effect by wrapping the return type in a Result.

If this feels like cheating to you, know that we already applied the exact same trick in our previous examples: asynchrony is also an effect. If Rust did not have the async keyword, our return values would be Futures, and swapping those to being arguments would make little to no sense.

Once we start looking beyond Rust, the interpretation of asynchrony and error handling as effects opens up a more general API design, in which the API abstracts over the kinds of effects supported by a sequence processor. In a programming language with proper support for effects (whether monadic, algebraic, or anything else), you could define an effect-parametric lazy sequence library. It would be accessible only to a handful of nerds, but hey, it would also be pretty cool. In particular, having the same APIs for both synchronous and async code would be quite useful. But Rust is not the language for these kinds of experiments.


Greater expressivity

Ufotofu operates on sequences whose successive value types form regular languages. This suggests two natural ways of generalising ufotofu from a purely theoretical perspective: moving from sequences to more complex directed graphs, or moving from regular languages to more expressive classes of languages. Setting aside the question of whether such generalisations would be useful, they are definitely interesting.

The first interesting graph class to come to mind are the arborescences (“out-trees”). What do traits for lazily working with trees in a principled way look like? This could be interesting for incremental compilers (or dev tooling such as language servers), for example. From directed trees, the next obvious graph class are the directed acyclic graphs. Again, what could principled APIs for working with those look like? Is this graph class already too unstructured (unbounded directed treewidth, and all that) to admit usefully constrained APIs?

When considering lazy graph navigation, an interesting question is who determines where in the graph to navigate. Regular ufotofu tightly constrains the navigation of its sequencesitems steadily progress, there is no backtracking. A seek-like extension would give full power to the calling code. For a tree-based API, this would mean that the code using a tree-producer would get to determine which branches to explore in which order. An alternate design could be one where the tree-producer itself defines which new nodes (or updates to existing tree nodes) to present. Such an API could power, for example, the interface between an incremental syntax highlighter and a text editor UI.

Instead of considering APIs for working with larger graph classes, we can also try generalising the class of languages which can be described on the type level. Recall that ufotofu trait bounds where the types of final values may themselves again be constrained to ufotofu trait bounds, describe regular languages (plus the option of describing infinite sequences when allowing the empty type as a final value type). There is a neat way for seeing this: if you squint a lot (and always use the empty type for errors), the signature of produce looks like a production rule in a regular grammar, and a producer type-bound with recursive producer type-bounds on final values is essentially isomorphic to a regular grammar. And the isomorphism between producers and consumers gives you an isomorphism between regular grammars and consumers for free.

Can we design other APIs with natural isomorphisms to certain classes of grammars? For example, what would ufotofu-style APIs that are isomorphic to the grammars in Chomsky normal form look like, and which kinds of sequences (or even graphs?) would they describe? We leave this as an exercise to the interested reader who has more time on their hands for these kinds of questions than we do.


Related work

Ufotofu grew out of working with and studying a diverse set of sequence APIs in the wild, primarily in the javascript and Rust ecosystems. But there are a couple bits of academic related work that anyone who read this far will probably find interesting as well.

First and foremost, Oleg Kiselyov has several collections of writing related to iteration and sequence abstractions on their website. The iterIO Haskell library, which is based on Kiselyov’s itarees, stands out as one of few sequence abstraction libraries rooted in rigorous design principles. Their design is highly asymmetric, in direct contrast to what we did with ufotofu. Fun!

Second, there is probably an interesting relation between ufotofu and session types — especially from the perspective of interpreting ufotofu as a peculiar way of encoding regular languages on the type level.

And finally, there is Gibbons, Jeremy, and Bruno C. D. S. Oliveira. "The essence of the iterator pattern." Journal of functional programming 19.3-4 (2009): 377-402, which generalises iterators to a specific kind of traversal in applicative functors. Sadly this work considers neither dual notions to iteration, nor effects, nor sequential composition of heterogenous types. And those are exactly the aspects which make ufotofu relatively unique.

Not quite related work, but not entirely unrelated either is Béla Fleck and the Flecktones’s 1992 song (and album) UFO Tofu.


This project was funded through the NGI Assure Fund, a fund established by NLnet with financial support from the European Commission’s Next Generation Internet programme, under the aegis of DG Communications Networks, Content and Technology under grant agreement № 101092990.