Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
2995cd2
fix: client shutting down should not impact the server (#120)
yhl25 Aug 11, 2025
79a834c
feat: Session window and Accumulator (#118)
yhl25 Aug 13, 2025
78baec0
feat: refactor (#119)
adarsh0728 Aug 13, 2025
97a61a1
chore: add new examples to CI (#123)
yhl25 Aug 13, 2025
52de69f
chore: Create CODEOWNERS (#127)
vigith Aug 14, 2025
70a3798
chore: upgrade rust + latest tonic + cargo fmt (#126)
vigith Aug 14, 2025
d6e18fa
chore: use workspace for dependencies (#128)
yhl25 Aug 14, 2025
eaed7ab
chore: support at least last 5 versions of Rust (#129)
vigith Aug 14, 2025
d55e91f
feat: panic hook to capture stack trace (#125)
adarsh0728 Aug 19, 2025
c7e860d
chore: panic handling in reduce (#130)
adarsh0728 Aug 22, 2025
5a3e76e
chore: update makefile (#131)
yhl25 Sep 4, 2025
07ddfbe
chore: improve error handling for batchmap (#137)
404salad Sep 21, 2025
cf6b3fd
chore: generalize server to avoid redundant implementations (#138)
vigith Sep 21, 2025
c1a3060
feat: Nack Support For Source (#136)
yhl25 Sep 22, 2025
a116f04
fix: server shutdowns immediately if user's shutdown oneshot is none …
yhl25 Sep 22, 2025
48147a2
chore(ci): explicitly add rustfmt and clippy (#142)
vigith Sep 27, 2025
e6640c4
chore: export a few consts (#135)
vigith Oct 2, 2025
f78b707
fix: reduce visibility (#145)
vigith Oct 5, 2025
21279eb
chore: reduce more visibility (#146)
vigith Oct 5, 2025
a438f1f
chore: update packages and bump up version (#147)
vigith Oct 12, 2025
3adf2b5
chore: expose accumulator fields (#148)
vigith Oct 13, 2025
fde3dea
feat: Support for OnSuccess sink (#152)
vaibhavtiwari33 Nov 3, 2025
000388c
feat: metadata implementations for source, transformer, map and sink …
adarsh0728 Nov 23, 2025
42a1e81
feat: add reduce streaming (#153)
vigith Nov 24, 2025
3ab1122
chore: Update cron schedule for sideinput example (#154)
vaibhavtiwari33 Dec 4, 2025
5d2a927
doc: add more details (#155)
vigith Dec 6, 2025
0b85673
chore: expose and export side-input path (#156)
vigith Dec 7, 2025
0a496b1
test: Implement redis sink in Rust for on-success sink e2e test (#157)
vaibhavtiwari33 Dec 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# These owners will be the default owners for everything in
# the repo. Unless a later match takes precedence
* @yhl25 @vigith
3 changes: 2 additions & 1 deletion .github/workflows/build-push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ jobs:
"examples/map-cat", "examples/map-tickgen-serde", "examples/mapt-event-time-filter",
"examples/reduce-counter", "examples/sideinput", "examples/sideinput/udf",
"examples/simple-source", "examples/sink-log", "examples/source-transformer-now",
"examples/batchmap-flatmap", "examples/batchmap-cat",
"examples/batchmap-flatmap", "examples/batchmap-cat", "examples/map-even-odd",
"examples/session-counter", "examples/stream-sorter","examples/flatmap-stream",
]

steps:
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ jobs:
- name: Install Rust
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
components: rustfmt, clippy
cache-workspaces: |
. -> target

Expand All @@ -34,7 +35,7 @@ jobs:
echo "SCCACHE_GHA_ENABLED=true" >> $GITHUB_ENV

- name: Run sccache-cache
uses: mozilla-actions/sccache-action@v0.0.5
uses: mozilla-actions/sccache-action@v0.0.9

- name: Install dependencies
run: sudo apt-get install -y protobuf-compiler
Expand Down
2 changes: 1 addition & 1 deletion .rustfmt.toml
Original file line number Diff line number Diff line change
@@ -1 +1 @@
edition = "2021"
edition = "2024"
25 changes: 24 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,27 @@ default-members = ["numaflow"]
resolver = "2"

[workspace.package]
rust-version = "1.84"
rust-version = "1.85.0"
edition = "2024"

[workspace.dependencies]
tonic = "0.14.2"
tonic-prost = "0.14.2"
tonic-prost-build = "0.14.2"
tonic-types = "0.14.2"
prost = "0.14.1"
prost-types = "0.14.1"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "signal"] }
tokio-util = "0.7.16"
tokio-stream = { version = "0.1.17", features = ["net"] }
serde = { version = "1.0.228", features = ["derive"] }
serde_json = "1.0.145"
chrono = "0.4.42"
tracing = "0.1.41"
tracing-subscriber = "0.3"
uuid = { version = "1.18.1", features = ["v4"] }
thiserror = "2.0.17"
hyper-util = "0.1.17"
notify = "8.2.0"
tempfile = "3.23.0"
tower = { version = "0.5.2", features = ["util"] }
30 changes: 27 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
.PHONY: lint fmt test-fmt clippy test codegen
.PHONY: lint fmt test-fmt clippy test codegen build clean-proto codegen-clean

.PHONY: build
build:
cargo build --workspace

fmt:
cargo fmt --all
Expand All @@ -12,14 +16,34 @@ test-fmt:

.PHONY: clippy
clippy:
cargo clippy --workspace -- -D warnings
cargo clippy --workspace -- -D warnings -A clippy::module_inception

# run cargo test on the repository root
.PHONY: test
test:
@echo "Running tests"
cargo test --workspace
@echo "Running panic tests sequentially..."
cargo test --workspace --features test-panic sink::tests::sink_panic
cargo test --workspace --features test-panic sourcetransform::tests::source_transformer_panic
cargo test --workspace --features test-panic map::tests::map_server_panic
cargo test --workspace --features test-panic mapstream::tests::map_stream_server_panic
cargo test --workspace --features test-panic batchmap::tests::batchmap_panic
cargo test --workspace --features test-panic reduce::tests::panic_tests::panic_in_reduce
cargo test --workspace --features test-panic reduce::tests::panic_tests::panic_with_multiple_keys
cargo test --workspace --features test-panic session_reduce::tests::panic_tests::test_panic_in_session_reduce
cargo test --workspace --features test-panic accumulator::tests::panic_tests::test_panic_in_accumulate
cargo test --workspace --features test-panic shared::panic_tests::test_panic_hook_functionality

.PHONY: codegen
codegen:
# Change timestamps so that tonic_build code generation will always be triggered.
cd numaflow && mkdir -p src/servers && touch proto/* && PROTO_CODE_GEN=1 cargo build
cd numaflow && touch proto/* && PROTO_CODE_GEN=1 cargo build

.PHONY: clean-proto
clean-proto:
# Clean generated proto files
cd numaflow && rm -rf src/generated

.PHONY: codegen-clean
codegen-clean: clean-proto codegen
1 change: 1 addition & 0 deletions examples/all-sinks/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
target/
12 changes: 12 additions & 0 deletions examples/all-sinks/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "all-sinks"
version = "0.1.0"
edition.workspace = true
rust-version.workspace = true

[dependencies]
rand = "0.9.2"
tonic.workspace = true
tokio.workspace = true
tracing-subscriber.workspace = true
numaflow = { path = "../../numaflow" }
20 changes: 20 additions & 0 deletions examples/all-sinks/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
FROM rust:1.85-bullseye AS build

RUN apt-get update
RUN apt-get install protobuf-compiler -y

WORKDIR /numaflow-rs
COPY ./ ./
WORKDIR /numaflow-rs/examples/all-sinks

# build for release
RUN cargo build --release

# our final base
FROM debian:bullseye AS all-sinks

# copy the build artifact from the build stage
COPY --from=build /numaflow-rs/target/release/all-sinks .

# set the startup command to run your binary
CMD ["./all-sinks"]
16 changes: 16 additions & 0 deletions examples/all-sinks/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
TAG ?= stable
PUSH ?= false
IMAGE_REGISTRY = quay.io/numaio/numaflow-rs/all-sinks:${TAG}
DOCKER_FILE_PATH = examples/all-sinks/Dockerfile

.PHONY: update
update:
cargo check
cargo update

.PHONY: image
image: update
cd ../../ && docker build \
-f ${DOCKER_FILE_PATH} \
-t ${IMAGE_REGISTRY} .
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi
33 changes: 33 additions & 0 deletions examples/all-sinks/manifests/all-sinks.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: all-sinks
spec:
vertices:
- name: in
source:
# A self data generating source
generator:
rpu: 300
duration: 1s
keyCount: 5
value: 5
- name: out
sink:
udsink:
container:
image: quay.io/numaio/numaflow-rs/all-sinks:stable
imagePullPolicy: IfNotPresent
fallback:
udsink:
container:
image: quay.io/numaio/numaflow-rs/sink-log:stable
imagePullPolicy: IfNotPresent
onSuccess:
udsink:
container:
image: quay.io/numaio/numaflow-rs/sink-log:stable
imagePullPolicy: IfNotPresent
edges:
- from: in
to: out
60 changes: 60 additions & 0 deletions examples/all-sinks/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use numaflow::sink::KeyValueGroup;
use numaflow::sink::{self, Message, Response, SinkRequest};
use rand::Rng;
use std::collections::HashMap;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
tracing_subscriber::fmt::init();
sink::Server::new(SinkHandler).start().await
}

struct SinkHandler;

#[tonic::async_trait]
impl sink::Sinker for SinkHandler {
async fn sink(&self, mut input: tokio::sync::mpsc::Receiver<SinkRequest>) -> Vec<Response> {
let mut responses: Vec<Response> = Vec::new();

while let Some(datum) = input.recv().await {
let response = match primary_sink_write_status() {
// Writing to primary sink was successful, we want to send a message to on_success sink
// The message sent to on_success sink can be different from the original message
true => {
// build user metadata for on_success sink message payload
let user_metadata = HashMap::from([(
String::from("key1"),
KeyValueGroup::from(HashMap::from([(
datum.id.clone(),
datum.value.clone(),
)])),
)]);
Response::on_success(
datum.id,
// optional message, send original message to on_success sink in case `None` is provided
Message::new(datum.value) // required value
.with_keys(vec!["key1".to_string()]) // optional keys
.with_user_metadata(user_metadata) // optional metadata
.build(),
)
}
// Writing to primary sink was not successful, we want to send a message to fallback sink
// The message sent to the fallback sink will be the original message
false => Response::fallback(datum.id),
};

// return the responses
responses.push(response);
}

responses
}
}

/// Get status of writing to primary sink
/// true: write successful
/// false: write failed
fn primary_sink_write_status() -> bool {
// dummy logic to simulate write status
rand::rng().random_range(0..=1) == 1
}
7 changes: 4 additions & 3 deletions examples/batchmap-cat/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
[package]
name = "batchmap-cat"
version = "0.1.0"
edition = "2021"
edition.workspace = true
rust-version.workspace = true

[dependencies]
tonic = "0.13.0"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
tonic.workspace = true
tokio.workspace = true
numaflow = { path = "../../numaflow" }
2 changes: 1 addition & 1 deletion examples/batchmap-cat/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM rust:1.84-bullseye AS build
FROM rust:1.85-bullseye AS build

RUN apt-get update
RUN apt-get install protobuf-compiler -y
Expand Down
7 changes: 4 additions & 3 deletions examples/batchmap-flatmap/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
[package]
name = "batchmap-flatmap"
version = "0.1.0"
edition = "2021"
edition.workspace = true
rust-version.workspace = true

[dependencies]
tonic = "0.13.0"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
tonic.workspace = true
tokio.workspace = true
numaflow = { path = "../../numaflow" }
2 changes: 1 addition & 1 deletion examples/batchmap-flatmap/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM rust:1.84-bullseye AS build
FROM rust:1.85-bullseye AS build

RUN apt-get update
RUN apt-get install protobuf-compiler -y
Expand Down
7 changes: 4 additions & 3 deletions examples/flatmap-stream/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
[package]
name = "flatmap-stream"
version = "0.1.0"
edition = "2021"
edition.workspace = true
rust-version.workspace = true

[dependencies]
tonic = "0.13.0"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
tonic.workspace = true
tokio.workspace = true
numaflow = { path = "../../numaflow" }
2 changes: 1 addition & 1 deletion examples/flatmap-stream/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM rust:1.84-bullseye AS build
FROM rust:1.85-bullseye AS build

RUN apt-get update
RUN apt-get install protobuf-compiler -y
Expand Down
7 changes: 4 additions & 3 deletions examples/map-cat/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
[package]
name = "map-cat"
version = "0.1.0"
edition = "2021"
edition.workspace = true
rust-version.workspace = true

[dependencies]
tonic = "0.13.0"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
tonic.workspace = true
tokio.workspace = true
numaflow = { path = "../../numaflow" }
2 changes: 1 addition & 1 deletion examples/map-cat/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM rust:1.84-bullseye AS build
FROM rust:1.85-bullseye AS build

RUN apt-get update
RUN apt-get install protobuf-compiler -y
Expand Down
6 changes: 5 additions & 1 deletion examples/map-cat/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use numaflow::map;
use numaflow::shared::grpc_server::ServerExtras;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
map::Server::new(Cat).start().await
map::Server::new(Cat)
.with_max_message_size(10240)
.start()
.await
}

struct Cat;
Expand Down
1 change: 1 addition & 0 deletions examples/map-even-odd/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
target/
10 changes: 10 additions & 0 deletions examples/map-even-odd/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[package]
name = "map-even-odd"
version = "0.1.0"
edition.workspace = true
rust-version.workspace = true

[dependencies]
tonic.workspace = true
tokio.workspace = true
numaflow = { path = "../../numaflow" }
20 changes: 20 additions & 0 deletions examples/map-even-odd/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
FROM rust:1.85-bullseye AS build

RUN apt-get update
RUN apt-get install protobuf-compiler -y

WORKDIR /numaflow-rs
COPY ./ ./
WORKDIR /numaflow-rs/examples/map-even-odd

# build for release
RUN cargo build --release

# our final base
FROM debian:bullseye AS map-even-odd

# copy the build artifact from the build stage
COPY --from=build /numaflow-rs/target/release/map-even-odd .

# set the startup command to run your binary
CMD ["./map-even-odd"]
Loading