Skip to content
This repository was archived by the owner on Nov 12, 2025. It is now read-only.

Commit 9f145f8

Browse files
authored
fix: avoid failure if closing AppendRowsStream before opening (#304)
* fix: avoid failure if closing AppendRowsStream before opening * don't create a thread if we're going to block anyway
1 parent 69e3fb8 commit 9f145f8

2 files changed

Lines changed: 12 additions & 11 deletions

File tree

google/cloud/bigquery_storage_v1beta2/writer.py

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -259,21 +259,12 @@ def close(self, reason: Optional[Exception] = None):
259259
260260
This method is idempotent. Additional calls will have no effect.
261261
262-
The method does not block, it delegates the shutdown operations to a background
263-
thread.
264-
265262
Args:
266263
reason: The reason to close this. If ``None``, this is considered
267264
an "intentional" shutdown. This is passed to the callbacks
268265
specified via :meth:`add_close_callback`.
269266
"""
270-
self._regular_shutdown_thread = threading.Thread(
271-
name=_REGULAR_SHUTDOWN_THREAD_NAME,
272-
daemon=True,
273-
target=self._shutdown,
274-
kwargs={"reason": reason},
275-
)
276-
self._regular_shutdown_thread.start()
267+
self._shutdown(reason=reason)
277268

278269
def _shutdown(self, reason: Optional[Exception] = None):
279270
"""Run the actual shutdown sequence (stop the stream and all helper threads).
@@ -293,7 +284,8 @@ def _shutdown(self, reason: Optional[Exception] = None):
293284
self._consumer.stop()
294285
self._consumer = None
295286

296-
self._rpc.close()
287+
if self._rpc is not None:
288+
self._rpc.close()
297289
self._rpc = None
298290
self._closed = True
299291
_LOGGER.debug("Finished stopping manager.")

tests/unit/test_writer_v1beta2.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from google.api_core import exceptions
2121
from google.cloud.bigquery_storage_v1beta2.services import big_query_write
2222
from google.cloud.bigquery_storage_v1beta2 import types as gapic_types
23+
from google.cloud.bigquery_storage_v1beta2 import exceptions as bqstorage_exceptions
2324
from google.protobuf import descriptor_pb2
2425

2526

@@ -44,6 +45,14 @@ def test_constructor_and_default_state(module_under_test):
4445
assert manager._client is mock_client
4546

4647

48+
def test_close_before_open(module_under_test):
49+
mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient)
50+
manager = module_under_test.AppendRowsStream(mock_client, REQUEST_TEMPLATE)
51+
manager.close()
52+
with pytest.raises(bqstorage_exceptions.StreamClosedError):
53+
manager.send(object())
54+
55+
4756
@mock.patch("google.api_core.bidi.BidiRpc", autospec=True)
4857
@mock.patch("google.api_core.bidi.BackgroundConsumer", autospec=True)
4958
def test_open(background_consumer, bidi_rpc, module_under_test):

0 commit comments

Comments
 (0)