feat: Implement AckSetTracker which tracks message acknowledgements.#19
Conversation
Note that it is awkward to structure this like the java version, as there is no "AsyncCallable" type in python.
| self._acks = queue.PriorityQueue() | ||
|
|
||
| def track(self, offset: int): | ||
| if len(self._receipts) > 0: |
There was a problem hiding this comment.
This leaves the AckTracker in an inconsistent state on exceptions. Also, there is no need to pop and push, you can just access the first element with [0].
There was a problem hiding this comment.
no need to pop and push
Done.
leaves the AckTracker in an inconsistent state on exceptions
Not quite sure what you mean here, can you elaborate?
| async def ack(self, offset: int): | ||
| self._acks.put_nowait(offset) | ||
| prefix_acked_offset: Optional[int] = None | ||
| while len(self._receipts) != 0 and not self._acks.empty(): |
There was a problem hiding this comment.
Can you peek the queues instead of popping and pushing?
PriorityQueue.queue[index] and deque[index].
There was a problem hiding this comment.
I can, but this seems super implicit as it relies on 1) the fact that Priorityqueue class has a queue submember (undocumented) and 2) the PriorityQueue class uses the heapq module and the element at index 0 of a queue managed by that module is the minimum one. I don't like that, but can make this change if you prefer.
There was a problem hiding this comment.
Yeah, I think that's fair. Let's leave as is.
| @@ -0,0 +1,46 @@ | |||
| from asynctest.mock import MagicMock, CoroutineMock, call | |||
There was a problem hiding this comment.
CoroutineMock is unused, unless it does something to the global state?
| @abstractmethod | ||
| def track(self, offset: int): | ||
| """ | ||
| Track the provided offset. |
There was a problem hiding this comment.
Nit: leave a comment in the interface saying that these should be strictly increasing? It would be nice to include the basic behavior on the class description; i.e. that it commits contiguous ranges.
| async def ack(self, offset: int): | ||
| self._acks.put_nowait(offset) | ||
| prefix_acked_offset: Optional[int] = None | ||
| while len(self._receipts) != 0 and not self._acks.empty(): |
There was a problem hiding this comment.
Yeah, I think that's fair. Let's leave as is.
Note that it is awkward to structure this like the java version, as there is no "AsyncCallable" type in python.