!OqDVTrmPstKzivLwZW:gitter.im

python-trio/general

778 Members
Discussion of Trio, a friendly Python library for async concurrency and I/O17 Servers

Load older messages


SenderMessageTime
21 Dec 2020
@shamrin-54d39d83db8155e6700f7d06:gitter.imshamrin (Alexey Shamrin)

Hi! How do I wrap asyncpg add_listener function in Trio? The add_listener function calls provided callback every time NOTIFY happens in Postgres:

await conn.add_listener(channel, lambda *a: print("callback:", a))

I would like to wrap for 'async for' consumption:

async for event in listen(channel): print("event":, event)

How do I do it?

(I've searched through docs, google, github issues and forum. But I couldn't find the solution.)

20:56:47
@goodboy:matrix.orglord_fomowe can do that20:56:48
@goodboy:matrix.orglord_fomo andersea (Anders E. Andersen): https://github.com/goodboy/tractor/pull/121 20:57:15
@goodboy:matrix.orglord_fomoin fact you get psuedo-SC from it20:57:29
@goodboy:matrix.orglord_fomo as long as you keep asyncio tasks mapped 1-to-1 with trio tasks 20:57:48
@andersea-552a785515522ed4b3dee39d:gitter.imandersea (Anders E. Andersen) interesting 20:57:54
@shamrin-54d39d83db8155e6700f7d06:gitter.imshamrin (Alexey Shamrin) *

Hi! How do I wrap asyncpg add_listener function in Trio? The add_listener function calls provided callback every time NOTIFY happens in Postgres:

await conn.add_listener(channel, lambda *a: print("callback:", a))

I would like to wrap it for 'async for' consumption:

async for event in listen(channel): print("event":, event)

How do I do it?

(I've searched through docs, google, github issues and forum. But I couldn't find the solution.)

20:57:58
@goodboy:matrix.orglord_fomoyah guest mode ftw20:58:03
@andersea-552a785515522ed4b3dee39d:gitter.imandersea (Anders E. Andersen) @shamrin have you seen triopg? 20:58:31
@andersea-552a785515522ed4b3dee39d:gitter.imandersea (Anders E. Andersen) https://github.com/python-trio/triopg 20:58:44
@shamrin-54d39d83db8155e6700f7d06:gitter.imshamrin (Alexey Shamrin) *

Hi! How do I wrap asyncpg add_listener function in Trio? The add_listener function calls provided callback every time NOTIFY happens in Postgres:

await conn.add_listener(channel, lambda *a: print("callback:", a))

I would like to wrap it for 'async for' consumption:

async for event in listen(channel): print("event":, event)

How do I do it?

(I've searched through docs, google, github issues and forum. But I couldn't find the solution.)

20:58:49
@goodboy:matrix.orglord_fomo originally thought we would need anyio but realized it was simple enough to propagate errors through the event loop barrier 20:58:53
@goodboy:matrix.orglord_fomo * originally thought we would need anyio but realized it was simple enough to propagate errors through the event loop barrier 20:59:07
@shamrin-54d39d83db8155e6700f7d06:gitter.imshamrin (Alexey Shamrin) @andersea yes, but quick search for "listen", "listener" and "add_listener" didn't return any results. 20:59:51
@goodboy:matrix.orglord_fomoeven more cool is the debugger seems to just work with it as well20:59:52
@goodboy:matrix.orglord_fomoeven with multi-core/process20:59:58
@andersea-552a785515522ed4b3dee39d:gitter.imandersea (Anders E. Andersen)

@shamrin Here is a utility function I am using in my code

@asynccontextmanager
async def listen(channel, conn):
    send_channel, receive_channel = trio.open_memory_channel(1)

    def _listen_callback(c, pid, chan, payload):
        send_channel.send_nowait(orjson.loads(payload))

    await conn.add_listener(channel, _listen_callback)
    yield receive_channel
    await conn.remove_listener(channel, _listen_callback)

I have written a trigger that returns each inserted or updated record as json. This is then unpacked and sent to a memory channel, so I can use it in async code.

21:02:46
@andersea-552a785515522ed4b3dee39d:gitter.imandersea (Anders E. Andersen) conn is a triopg connection proxy 21:03:15
@andersea-552a785515522ed4b3dee39d:gitter.imandersea (Anders E. Andersen) This will obviously break, if iterating the receive channel isn't fast, so you may want to consider what buffer size you need. 21:04:41
@andersea-552a785515522ed4b3dee39d:gitter.imandersea (Anders E. Andersen) but this is an async context manager, so you use it like this:
async with listen('my_channel', conn) as receive_channel:
    async for message in receive_channel:
        print(message)
21:06:26
@shamrin-54d39d83db8155e6700f7d06:gitter.imshamrin (Alexey Shamrin) @andersea Nice! I haven't realized triopg wraps all of asyncpg. Cool solution with memory channel. 21:06:59
@shamrin-54d39d83db8155e6700f7d06:gitter.imshamrin (Alexey Shamrin) @andersea I guess it's time to get rid of my own clunky asyncpg wrapper and switch to triopg :) 21:09:21
@andersea-552a785515522ed4b3dee39d:gitter.imandersea (Anders E. Andersen) I think triopg wraps everything, as far as I know.. :) 21:09:54
@mikenerone-59d01d7ad73408ce4f78141d:gitter.im@mikenerone-59d01d7ad73408ce4f78141d:gitter.im

In general, probably want:

async with send_channel:
    yield receive_channel

Right? (Though this is fine as-is in cases where both the producer and the consumer get cancelled at the same time, which is probably pretty often.)

21:14:16
@mikenerone-59d01d7ad73408ce4f78141d:gitter.im@mikenerone-59d01d7ad73408ce4f78141d:gitter.im *

In general, probably want:

async with send_channel:
    yield receive_channel

So the channel gets closed, right? (Though this is fine as-is in cases where both the producer and the consumer get cancelled at the same time, which is probably pretty often.)

21:14:35
@andersea-552a785515522ed4b3dee39d:gitter.imandersea (Anders E. Andersen) You are right, if consumer and producer can run independently, it can break, but I don't think I have any instances where that could happen. I use it like in the example I gave, where I immidiately iterate the channel, so should be pretty safe. 21:19:01
@mikenerone-59d01d7ad73408ce4f78141d:gitter.im@mikenerone-59d01d7ad73408ce4f78141d:gitter.im Gotcha 21:19:40
@mikenerone-59d01d7ad73408ce4f78141d:gitter.im@mikenerone-59d01d7ad73408ce4f78141d:gitter.im I'm as much verifying my own understanding as anything else 😛 21:20:03
@andersea-552a785515522ed4b3dee39d:gitter.imandersea (Anders E. Andersen) Yeah if you spawn two tasks and run the producer in one of them and the consumer in another it will definitely deadlock if you break out of the producer manually. Also, it is only an issue if you manually break out of the producer. Postgres has no way to tell you that no more messages are coming on a channel, so the stream is infinite, by design.. 21:24:14
@andersea-552a785515522ed4b3dee39d:gitter.imandersea (Anders E. Andersen)Redacted or Malformed Event21:28:44

There are no newer messages yet.


Back to Room ListRoom Version: 6