At Polars I developed a new query engine which uses a hybrid of push and pull. I gave a short (and not very technical) talk about the engine at our first meetup recently, which can be viewed here: https://www.youtube.com/watch?v=Ndil-eLynh4.

Each operator is a (set of) async functions which are connected to its input(s) and output(s) through capacity-1 spsc async channels. An operator pulls input by reading from a channel, and pushes output by writing into a channel. For an oversimplified example, consider a simple select operator:

    while let Ok(morsel) = input.recv().await {
        let result = select(morsel);
        if output.send(result).await.is_err() {
            break;
        }
    }
Note how it has two await points: on receive and send. The nice part about this is that Rust will automatically transform these asynchronous functions to state machines which can pause execution when either a send or receive blocks, returning control to our scheduler. In the above example the operator is able to pause both to wait for more data, or to wait until the receiver is able to consume more data. This also makes for example pausing execution in the middle of a hash join probe to send some partial results onwards in the computational graph trivial.

I'm not seeing how this is pull in any sense. Calling recv on the channel doesn't cause any result to be computed. The push of the previous operators will cause the compution to continue.

EDIT: Ok, I guess because they are bounded to 1, the spcs will let the pushing computation continue first after the "puller" has read the result, but it's more like pushing with back-pressure.

It is pull in the sense that an operator can call `recv().await` (the equivalent of `input.next()` in the article) at any point, which can then block the execution of the operator until more data is available.

It is push in the sense that an operator can call `send(x).await` (the equivalent of `out(x)` in the article) at any point, which can then block the execution of the operator until the data is consumed.

So it is a hybrid of both pull and push. You can, at any point, block on either pulling data or pushing data.

> In a setting where every operator has exactly one output, when to run an operator to produce some output is obvious: when your consumer needs it. This becomes, at the very least, messier with multiple outputs, since “requests for rows” and “computations to produce rows” are no longer one-to-one.

Don't push-based systems have the same issue for inputs? If you have multiple inputs then you may be handled one of the inputs without the other. The classic example with iterators is a `zip` operator, though maybe this is probably not so common in database queries.

It's pretty easy in a push based model to let the 'pusher' know that no more data is required. It's just like unsubscribing from an event, or returning a 'no more' status from the callback. The push model does feel more natural to me, but perhaps that comes from familiarity with linux piping.

It's easy when the network is working.

If it isn't, the 'pusher' continues to fill memory buffers that can take minutes to dequeue. You need constant communication and TCP conspires against you on this. If your flow is primarily one-directional, you might be able to use keep-alives for this, but the defaults are terrible. Here's what I have used:

    SO_KEEPALIVE=1
    TCP_KEEPCNT=1+x(seconds)
    TCP_KEEPIDLE=1
    TCP_KEEPINTVL=1
    TCP_NODELAY=1
    TCP_USER_TIMEOUT=x(msec)
where 'x' is the anticipated RTT of ping+connect (usually 3-4x the measured RTT you get from TCP_INFO).

Remember: the moon is only about 1.5 seconds away, so unless you're pushing data to the moon these numbers are likely very small.

On older Windows, you've got `SIO_KEEPALIVE_VALS` which is hardcoded at 10 ticks, so you need to divide your distance by 10, but I think new windows supports `TCP_USER_TIMEOUT` like Linux.

Mac doesn't have these socket options. I think you can set net.inet.tcp.keep* with sysctl, but this affects/breaks everything, so I don't recommend xnu as a consumer to a high-volume push-stream.

I actually don't recommend any of this unless you have literally no control over higher-level parts of the protocol: TCP just isn't good enough for high-volume push-protocols, and so I haven't used this in my own stuff for decades now.

It was kind of a toy problem, but I had a lot of fun with consul’s versioning data and making requests that didn’t return until the data had changed. It’s like GetIfModified but you can set it up with a delay if the data has not already been modified.

I don’t think it’s particularly good for multi-get but I didn’t get that far.

There are systems where you can report what you’ve seen so far when you reconnect, but those require a very different architecture on the sending end because it’s not just a streaming data situation at that point. And while it’s more efficient to store the data to derive the replay than to store the replay itself, it’s not infinitely moreso and a three hour network hardware problem could really ruin your entire week.

Interesting reading!

Does this explain a big inefficiency in BigQuery where it only ever does hash-joins? Is it because it is push so it never does merge-joins even when the inputs are all sorted or clustered etc?

Although tbh can't see why a system can't combine them both; some of the edges being push+buffer and some being buffer+pull.

If you read Extensible Query Optimizers in Practice, they explain why [0] at a high level and link to further reading.

> Although both Snowflake and BigQuery do not consider as many alternatives as Fabric DW, the dynamic schemes of the above systems avoids the potential bad plans chosen by the optimizer due to errors in cost and cardinality estimations.

I understand it's mostly due to the difficulty of getting the cost estimation reliably correct and so it defaults to something predictable to simplify the plan search as well.

[0] https://www.microsoft.com/en-us/research/wp-content/uploads/...