feat: Implement SinglePartitionSubscriber.#22
Conversation
This handles mapping a single partition to a Cloud Pub/Sub Like asynchronous subscriber.
| sized_message = self._messages_by_offset[offset] | ||
| try: | ||
| self._nack_handler.on_nack(sized_message.message, | ||
| lambda: self._queue.put(requests.AckRequest( |
There was a problem hiding this comment.
Do we have to use AckRequests? It seems simpler to just call _handle_ack() here. What am I missing?
There was a problem hiding this comment.
_handle_ack is a coroutine. the nack handler function is a Callable[[PubsubMessage], None] to enable it being called from other threads that are not part of the event loop. It would be harder to call _handle_ack from here than it would be to just put an AckRequest on the queue.
There was a problem hiding this comment.
SGTM. Maybe add a comment? This was a bit unintuitive to me, possibly because I've never used asyncio before.
| self.fail(e) | ||
|
|
||
| async def _handle_queue_message(self, message: Union[ | ||
| requests.AckRequest, requests.DropRequest, requests.ModAckRequest, requests.NackRequest]): |
There was a problem hiding this comment.
Can the queue really have all these kinds of messages? Do we ever want them?
There was a problem hiding this comment.
It can. This is the protocol I backed out from here https://github.com/googleapis/python-pubsub/blob/master/google/cloud/pubsub_v1/subscriber/message.py
Technically drop and modack requests can be sent from the message, but as you can see on the line below we immediately fail the client if those requests are sent.
| else: | ||
| self._handle_nack(message) | ||
|
|
||
| async def _looper(self): |
There was a problem hiding this comment.
Why not use asyncio's default event_loop for asynchronicity?
There was a problem hiding this comment.
This does use asyncio's default event loop? This is just a coroutine running in it that polls for a queue.Queue to have a message. There is asyncio.Queue (used elsewhere) which you can await on, but Message.ack/nack (from CPS' client library) needs to be able to be called from other threads, so it uses the threadsafe queue instead of the asyncio-enabled one.
There was a problem hiding this comment.
Thanks for explaining this, both here and offline. Do you mind adding a comment explaining why we cannot use the asyncio queue, and why we cannot use the blocking get() here?
| @@ -0,0 +1,11 @@ | |||
| from typing import NamedTuple | |||
There was a problem hiding this comment.
Meta comment; do you mind setting your editor's line length limit to 100 lines? The line breaks in your PRs look quite unreadable; this is fine, but I would appreciate it for the next ones.
There was a problem hiding this comment.
After talking with tianzi, there's an auto-formatter that exists. I'll run that after this chain of prs is submitted since it would be quite difficult to fix the commit chain history :/ I hope thats acceptable.
There was a problem hiding this comment.
Yeah, totally fine. Thanks.
| else: | ||
| self._handle_nack(message) | ||
|
|
||
| async def _looper(self): |
There was a problem hiding this comment.
Thanks for explaining this, both here and offline. Do you mind adding a comment explaining why we cannot use the asyncio queue, and why we cannot use the blocking get() here?
| sized_message = self._messages_by_offset[offset] | ||
| try: | ||
| self._nack_handler.on_nack(sized_message.message, | ||
| lambda: self._queue.put(requests.AckRequest( |
There was a problem hiding this comment.
SGTM. Maybe add a comment? This was a bit unintuitive to me, possibly because I've never used asyncio before.
| @@ -0,0 +1,11 @@ | |||
| from typing import NamedTuple | |||
There was a problem hiding this comment.
Yeah, totally fine. Thanks.
This handles mapping a single partition to a Cloud Pub/Sub Like asynchronous subscriber.