Skip to content

Commit 7c03f49

Browse files
Extend Firebase SDK with new APIs to consume streaming callable function response (#6602)
Extend Firebase SDK with new APIs to consume streaming callable function response. - Handling the server-sent event (SSE) parsing internally - Providing proper error handling and connection management - Maintaining memory efficiency for long-running streams --------- Co-authored-by: Rodrigo Lazo <[email protected]>
1 parent af5fd66 commit 7c03f49

File tree

8 files changed

+798
-4
lines changed

8 files changed

+798
-4
lines changed

Diff for: firebase-functions/api.txt

+17
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ package com.google.firebase.functions {
8484
method public com.google.android.gms.tasks.Task<com.google.firebase.functions.HttpsCallableResult> call(Object? data);
8585
method public long getTimeout();
8686
method public void setTimeout(long timeout, java.util.concurrent.TimeUnit units);
87+
method public org.reactivestreams.Publisher<com.google.firebase.functions.StreamResponse> stream();
88+
method public org.reactivestreams.Publisher<com.google.firebase.functions.StreamResponse> stream(Object? data = null);
8789
method public com.google.firebase.functions.HttpsCallableReference withTimeout(long timeout, java.util.concurrent.TimeUnit units);
8890
property public final long timeout;
8991
}
@@ -93,6 +95,21 @@ package com.google.firebase.functions {
9395
field public final Object? data;
9496
}
9597

98+
public abstract class StreamResponse {
99+
}
100+
101+
public static final class StreamResponse.Message extends com.google.firebase.functions.StreamResponse {
102+
ctor public StreamResponse.Message(com.google.firebase.functions.HttpsCallableResult message);
103+
method public com.google.firebase.functions.HttpsCallableResult getMessage();
104+
property public final com.google.firebase.functions.HttpsCallableResult message;
105+
}
106+
107+
public static final class StreamResponse.Result extends com.google.firebase.functions.StreamResponse {
108+
ctor public StreamResponse.Result(com.google.firebase.functions.HttpsCallableResult result);
109+
method public com.google.firebase.functions.HttpsCallableResult getResult();
110+
property public final com.google.firebase.functions.HttpsCallableResult result;
111+
}
112+
96113
}
97114

98115
package com.google.firebase.functions.ktx {

Diff for: firebase-functions/firebase-functions.gradle.kts

+3
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ dependencies {
112112
implementation(libs.okhttp)
113113
implementation(libs.playservices.base)
114114
implementation(libs.playservices.basement)
115+
implementation(libs.reactive.streams)
116+
115117
api(libs.playservices.tasks)
116118

117119
kapt(libs.autovalue)
@@ -131,6 +133,7 @@ dependencies {
131133
androidTestImplementation(libs.truth)
132134
androidTestImplementation(libs.androidx.test.runner)
133135
androidTestImplementation(libs.androidx.test.junit)
136+
androidTestImplementation(libs.kotlinx.coroutines.reactive)
134137
androidTestImplementation(libs.mockito.core)
135138
androidTestImplementation(libs.mockito.dexmaker)
136139
kapt("com.google.dagger:dagger-android-processor:2.43.2")

Diff for: firebase-functions/src/androidTest/backend/functions/index.js

+107
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,16 @@
1414

1515
const assert = require('assert');
1616
const functions = require('firebase-functions');
17+
const functionsV2 = require('firebase-functions/v2');
18+
19+
/**
20+
* Pauses the execution for a specified amount of time.
21+
* @param {number} ms - The number of milliseconds to sleep.
22+
* @return {Promise<void>} A promise that resolves after the specified time.
23+
*/
24+
function sleep(ms) {
25+
return new Promise((resolve) => setTimeout(resolve, ms));
26+
}
1727

1828
exports.dataTest = functions.https.onRequest((request, response) => {
1929
assert.deepEqual(request.body, {
@@ -122,3 +132,100 @@ exports.timeoutTest = functions.https.onRequest((request, response) => {
122132
// Wait for longer than 500ms.
123133
setTimeout(() => response.send({data: true}), 500);
124134
});
135+
136+
const streamData = ['hello', 'world', 'this', 'is', 'cool'];
137+
138+
/**
139+
* Generates chunks of text asynchronously, yielding one chunk at a time.
140+
* @async
141+
* @generator
142+
* @yields {string} A chunk of text from the data array.
143+
*/
144+
async function* generateText() {
145+
for (const chunk of streamData) {
146+
yield chunk;
147+
await sleep(100);
148+
}
149+
}
150+
151+
exports.genStream = functionsV2.https.onCall(async (request, response) => {
152+
if (request.acceptsStreaming) {
153+
for await (const chunk of generateText()) {
154+
response.sendChunk(chunk);
155+
}
156+
}
157+
return streamData.join(' ');
158+
});
159+
160+
exports.genStreamError = functionsV2.https.onCall(
161+
async (request, response) => {
162+
// Note: The functions backend does not pass the error message to the
163+
// client at this time.
164+
throw Error("BOOM")
165+
});
166+
167+
const weatherForecasts = {
168+
Toronto: { conditions: 'snowy', temperature: 25 },
169+
London: { conditions: 'rainy', temperature: 50 },
170+
Dubai: { conditions: 'sunny', temperature: 75 }
171+
};
172+
173+
/**
174+
* Generates weather forecasts asynchronously for the given locations.
175+
* @async
176+
* @generator
177+
* @param {Array<{name: string}>} locations - An array of location objects.
178+
*/
179+
async function* generateForecast(locations) {
180+
for (const location of locations) {
181+
yield { 'location': location, ...weatherForecasts[location.name] };
182+
await sleep(100);
183+
}
184+
};
185+
186+
exports.genStreamWeather = functionsV2.https.onCall(
187+
async (request, response) => {
188+
const locations = request.data && request.data.data?
189+
request.data.data: [];
190+
const forecasts = [];
191+
if (request.acceptsStreaming) {
192+
for await (const chunk of generateForecast(locations)) {
193+
forecasts.push(chunk);
194+
response.sendChunk(chunk);
195+
}
196+
}
197+
return {forecasts};
198+
});
199+
200+
exports.genStreamEmpty = functionsV2.https.onCall(
201+
async (request, response) => {
202+
if (request.acceptsStreaming) {
203+
// Send no chunks
204+
}
205+
// Implicitly return null.
206+
}
207+
);
208+
209+
exports.genStreamResultOnly = functionsV2.https.onCall(
210+
async (request, response) => {
211+
if (request.acceptsStreaming) {
212+
// Do not send any chunks.
213+
}
214+
return "Only a result";
215+
}
216+
);
217+
218+
exports.genStreamLargeData = functionsV2.https.onCall(
219+
async (request, response) => {
220+
if (request.acceptsStreaming) {
221+
const largeString = 'A'.repeat(10000);
222+
const chunkSize = 1024;
223+
for (let i = 0; i < largeString.length; i += chunkSize) {
224+
const chunk = largeString.substring(i, i + chunkSize);
225+
response.sendChunk(chunk);
226+
await sleep(100);
227+
}
228+
}
229+
return "Stream Completed";
230+
}
231+
);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
/*
2+
* Copyright 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.firebase.functions
18+
19+
import androidx.test.core.app.ApplicationProvider
20+
import androidx.test.ext.junit.runners.AndroidJUnit4
21+
import com.google.common.truth.Truth.assertThat
22+
import com.google.firebase.Firebase
23+
import com.google.firebase.initialize
24+
import java.util.concurrent.TimeUnit
25+
import kotlinx.coroutines.delay
26+
import kotlinx.coroutines.reactive.asFlow
27+
import kotlinx.coroutines.runBlocking
28+
import kotlinx.coroutines.withTimeout
29+
import org.junit.Before
30+
import org.junit.Test
31+
import org.junit.runner.RunWith
32+
import org.reactivestreams.Subscriber
33+
import org.reactivestreams.Subscription
34+
35+
@RunWith(AndroidJUnit4::class)
36+
class StreamTests {
37+
38+
private lateinit var functions: FirebaseFunctions
39+
40+
@Before
41+
fun setup() {
42+
Firebase.initialize(ApplicationProvider.getApplicationContext())
43+
functions = Firebase.functions
44+
}
45+
46+
internal class StreamSubscriber : Subscriber<StreamResponse> {
47+
internal val messages = mutableListOf<StreamResponse.Message>()
48+
internal var result: StreamResponse.Result? = null
49+
internal var throwable: Throwable? = null
50+
internal var isComplete = false
51+
internal lateinit var subscription: Subscription
52+
53+
override fun onSubscribe(subscription: Subscription) {
54+
this.subscription = subscription
55+
subscription.request(Long.MAX_VALUE)
56+
}
57+
58+
override fun onNext(streamResponse: StreamResponse) {
59+
if (streamResponse is StreamResponse.Message) {
60+
messages.add(streamResponse)
61+
} else {
62+
result = streamResponse as StreamResponse.Result
63+
}
64+
}
65+
66+
override fun onError(t: Throwable?) {
67+
throwable = t
68+
}
69+
70+
override fun onComplete() {
71+
isComplete = true
72+
}
73+
}
74+
75+
@Test
76+
fun genStream_withPublisher_receivesMessagesAndFinalResult() = runBlocking {
77+
val input = mapOf("data" to "Why is the sky blue")
78+
val function = functions.getHttpsCallable("genStream")
79+
val subscriber = StreamSubscriber()
80+
81+
function.stream(input).subscribe(subscriber)
82+
83+
while (!subscriber.isComplete) {
84+
delay(100)
85+
}
86+
assertThat(subscriber.messages.map { it.message.data.toString() })
87+
.containsExactly("hello", "world", "this", "is", "cool")
88+
assertThat(subscriber.result).isNotNull()
89+
assertThat(subscriber.result!!.result.data.toString()).isEqualTo("hello world this is cool")
90+
assertThat(subscriber.throwable).isNull()
91+
assertThat(subscriber.isComplete).isTrue()
92+
}
93+
94+
@Test
95+
fun genStream_withFlow_receivesMessagesAndFinalResult() = runBlocking {
96+
val input = mapOf("data" to "Why is the sky blue")
97+
val function = functions.getHttpsCallable("genStream")
98+
var isComplete = false
99+
var throwable: Throwable? = null
100+
val messages = mutableListOf<StreamResponse.Message>()
101+
var result: StreamResponse.Result? = null
102+
103+
val flow = function.stream(input).asFlow()
104+
try {
105+
withTimeout(1000) {
106+
flow.collect { response ->
107+
if (response is StreamResponse.Message) {
108+
messages.add(response)
109+
} else {
110+
result = response as StreamResponse.Result
111+
}
112+
}
113+
}
114+
isComplete = true
115+
} catch (e: Throwable) {
116+
throwable = e
117+
}
118+
119+
assertThat(messages.map { it.message.data.toString() })
120+
.containsExactly("hello", "world", "this", "is", "cool")
121+
assertThat(result).isNotNull()
122+
assertThat(result!!.result.data.toString()).isEqualTo("hello world this is cool")
123+
assertThat(throwable).isNull()
124+
assertThat(isComplete).isTrue()
125+
}
126+
127+
@Test
128+
fun genStreamError_receivesError() = runBlocking {
129+
val input = mapOf("data" to "test error")
130+
val function =
131+
functions.getHttpsCallable("genStreamError").withTimeout(2000, TimeUnit.MILLISECONDS)
132+
val subscriber = StreamSubscriber()
133+
134+
function.stream(input).subscribe(subscriber)
135+
136+
withTimeout(2000) {
137+
while (subscriber.throwable == null) {
138+
delay(100)
139+
}
140+
}
141+
142+
assertThat(subscriber.throwable).isNotNull()
143+
assertThat(subscriber.throwable).isInstanceOf(FirebaseFunctionsException::class.java)
144+
}
145+
146+
@Test
147+
fun genStreamWeather_receivesWeatherForecasts() = runBlocking {
148+
val inputData = listOf(mapOf("name" to "Toronto"), mapOf("name" to "London"))
149+
val input = mapOf("data" to inputData)
150+
151+
val function = functions.getHttpsCallable("genStreamWeather")
152+
val subscriber = StreamSubscriber()
153+
154+
function.stream(input).subscribe(subscriber)
155+
156+
while (!subscriber.isComplete) {
157+
delay(100)
158+
}
159+
160+
assertThat(subscriber.messages.map { it.message.data.toString() })
161+
.containsExactly(
162+
"{temperature=25, location={name=Toronto}, conditions=snowy}",
163+
"{temperature=50, location={name=London}, conditions=rainy}"
164+
)
165+
assertThat(subscriber.result).isNotNull()
166+
assertThat(subscriber.result!!.result.data.toString()).contains("forecasts")
167+
assertThat(subscriber.throwable).isNull()
168+
assertThat(subscriber.isComplete).isTrue()
169+
}
170+
171+
@Test
172+
fun genStreamEmpty_receivesNoMessages() = runBlocking {
173+
val function = functions.getHttpsCallable("genStreamEmpty")
174+
val subscriber = StreamSubscriber()
175+
176+
function.stream(mapOf("data" to "test")).subscribe(subscriber)
177+
178+
withTimeout(2000) { delay(500) }
179+
assertThat(subscriber.messages).isEmpty()
180+
assertThat(subscriber.result).isNull()
181+
}
182+
183+
@Test
184+
fun genStreamResultOnly_receivesOnlyResult() = runBlocking {
185+
val function = functions.getHttpsCallable("genStreamResultOnly")
186+
val subscriber = StreamSubscriber()
187+
188+
function.stream(mapOf("data" to "test")).subscribe(subscriber)
189+
190+
while (!subscriber.isComplete) {
191+
delay(100)
192+
}
193+
assertThat(subscriber.messages).isEmpty()
194+
assertThat(subscriber.result).isNotNull()
195+
assertThat(subscriber.result!!.result.data.toString()).isEqualTo("Only a result")
196+
}
197+
198+
@Test
199+
fun genStreamLargeData_receivesMultipleChunks() = runBlocking {
200+
val function = functions.getHttpsCallable("genStreamLargeData")
201+
val subscriber = StreamSubscriber()
202+
203+
function.stream(mapOf("data" to "test large data")).subscribe(subscriber)
204+
205+
while (!subscriber.isComplete) {
206+
delay(100)
207+
}
208+
assertThat(subscriber.messages).isNotEmpty()
209+
assertThat(subscriber.messages.size).isEqualTo(10)
210+
val receivedString =
211+
subscriber.messages.joinToString(separator = "") { it.message.data.toString() }
212+
val expectedString = "A".repeat(10000)
213+
assertThat(receivedString.length).isEqualTo(10000)
214+
assertThat(receivedString).isEqualTo(expectedString)
215+
assertThat(subscriber.result).isNotNull()
216+
assertThat(subscriber.result!!.result.data.toString()).isEqualTo("Stream Completed")
217+
}
218+
}

0 commit comments

Comments
 (0)