21 Dec 2020 |
shamrin (Alexey Shamrin) | Hi! How do I wrap asyncpg add_listener function in Trio? The add_listener function calls provided callback every time NOTIFY happens in Postgres: await conn.add_listener(channel, lambda *a: print("callback:", a)) I would like to wrap for 'async for' consumption: async for event in listen(channel): print("event":, event) How do I do it? (I've searched through docs, google, github issues and forum. But I couldn't find the solution.) | 20:56:47 |
lord_fomo | we can do that | 20:56:48 |
lord_fomo | andersea (Anders E. Andersen): https://github.com/goodboy/tractor/pull/121 | 20:57:15 |
lord_fomo | in fact you get psuedo-SC from it | 20:57:29 |
lord_fomo | as long as you keep asyncio tasks mapped 1-to-1 with trio tasks | 20:57:48 |
andersea (Anders E. Andersen) | interesting | 20:57:54 |
shamrin (Alexey Shamrin) | * Hi! How do I wrap asyncpg add_listener function in Trio? The add_listener function calls provided callback every time NOTIFY happens in Postgres: await conn.add_listener(channel, lambda *a: print("callback:", a)) I would like to wrap it for 'async for' consumption: async for event in listen(channel): print("event":, event) How do I do it? (I've searched through docs, google, github issues and forum. But I couldn't find the solution.) | 20:57:58 |
lord_fomo | yah guest mode ftw | 20:58:03 |
andersea (Anders E. Andersen) | @shamrin have you seen triopg? | 20:58:31 |
andersea (Anders E. Andersen) | https://github.com/python-trio/triopg | 20:58:44 |
shamrin (Alexey Shamrin) | * Hi! How do I wrap asyncpg add_listener function in Trio? The add_listener function calls provided callback every time NOTIFY happens in Postgres: await conn.add_listener(channel, lambda *a: print("callback:", a))
I would like to wrap it for 'async for' consumption: async for event in listen(channel): print("event":, event)
How do I do it? (I've searched through docs, google, github issues and forum. But I couldn't find the solution.) | 20:58:49 |
lord_fomo | originally thought we would need anyio but realized it was simple enough to propagate errors through the event loop barrier | 20:58:53 |
lord_fomo | * originally thought we would need anyio but realized it was simple enough to propagate errors through the event loop barrier | 20:59:07 |
shamrin (Alexey Shamrin) | @andersea yes, but quick search for "listen", "listener" and "add_listener" didn't return any results. | 20:59:51 |
lord_fomo | even more cool is the debugger seems to just work with it as well | 20:59:52 |
lord_fomo | even with multi-core/process | 20:59:58 |
andersea (Anders E. Andersen) | @shamrin Here is a utility function I am using in my code @asynccontextmanager
async def listen(channel, conn):
send_channel, receive_channel = trio.open_memory_channel(1)
def _listen_callback(c, pid, chan, payload):
send_channel.send_nowait(orjson.loads(payload))
await conn.add_listener(channel, _listen_callback)
yield receive_channel
await conn.remove_listener(channel, _listen_callback)
I have written a trigger that returns each inserted or updated record as json. This is then unpacked and sent to a memory channel, so I can use it in async code. | 21:02:46 |
andersea (Anders E. Andersen) | conn is a triopg connection proxy | 21:03:15 |
andersea (Anders E. Andersen) | This will obviously break, if iterating the receive channel isn't fast, so you may want to consider what buffer size you need. | 21:04:41 |
andersea (Anders E. Andersen) | but this is an async context manager, so you use it like this:async with listen('my_channel', conn) as receive_channel:
async for message in receive_channel:
print(message) | 21:06:26 |
shamrin (Alexey Shamrin) | @andersea Nice! I haven't realized triopg wraps all of asyncpg . Cool solution with memory channel. | 21:06:59 |
shamrin (Alexey Shamrin) | @andersea I guess it's time to get rid of my own clunky asyncpg wrapper and switch to triopg :) | 21:09:21 |
andersea (Anders E. Andersen) | I think triopg wraps everything, as far as I know.. :) | 21:09:54 |
@mikenerone-59d01d7ad73408ce4f78141d:gitter.im | In general, probably want: async with send_channel:
yield receive_channel
Right? (Though this is fine as-is in cases where both the producer and the consumer get cancelled at the same time, which is probably pretty often.) | 21:14:16 |
@mikenerone-59d01d7ad73408ce4f78141d:gitter.im | * In general, probably want: async with send_channel:
yield receive_channel
So the channel gets closed, right? (Though this is fine as-is in cases where both the producer and the consumer get cancelled at the same time, which is probably pretty often.) | 21:14:35 |
andersea (Anders E. Andersen) | You are right, if consumer and producer can run independently, it can break, but I don't think I have any instances where that could happen. I use it like in the example I gave, where I immidiately iterate the channel, so should be pretty safe. | 21:19:01 |
@mikenerone-59d01d7ad73408ce4f78141d:gitter.im | Gotcha | 21:19:40 |
@mikenerone-59d01d7ad73408ce4f78141d:gitter.im | I'm as much verifying my own understanding as anything else 😛 | 21:20:03 |
andersea (Anders E. Andersen) | Yeah if you spawn two tasks and run the producer in one of them and the consumer in another it will definitely deadlock if you break out of the producer manually. Also, it is only an issue if you manually break out of the producer. Postgres has no way to tell you that no more messages are coming on a channel, so the stream is infinite, by design.. | 21:24:14 |
andersea (Anders E. Andersen) | Redacted or Malformed Event | 21:28:44 |