!iWusolgBmhtSRRYDov:gitter.im

aio-libs

891 Members
https://github.com/aio-libs: a set of asyncio-based libraries built with high quality. There is a matrix space with more rooms at https://matrix.to/#/#aio-libs-space:matrix.org13 Servers

Load older messages


SenderMessageTime
18 Sep 2024
@arcivanov:matrix.orgArcadiy Ivanov
In reply to @contactparthshah:gitter.im
I have requirement where based on incoming request, triggers the fire and forget background task. I have tried using aiojobs spawn however seeing below issues.
I) without await not able to spawn the background task
ii) if I provide await spawn(background_task()) then its starting the task background successfully however still this task is going on, next request is getting blocked.

What am I missing?
You can do asyncio.create_task
18:58:41
@arcivanov:matrix.orgArcadiy Ivanovand if the task is not async and needs to run in a separate thread then it's asyncio.to_thread18:59:54
@contactparthshah:gitter.imParth Shah

Here is the example:

Now if this background_task is having some blocking call then it’s blocking the next request response. What I am trying to achieve that once request comes acknowledge it and start background task; now this background task should not block other incoming requests; what am I doing wrong here?

from aiohttp import web
from aiojobs.aiohttp import setup, spawn
import time

def background_task():
    print('Started bg code')
    time.sleep(5)
    print('Completed bg code')


async def hello(request):
    spawn(request, background_task())
    return web.Response(text="Hello, world")


app = web.Application()
app.add_routes([web.get('/', hello)])
setup(app)
web.run_app(app, port=12345)
20:16:46
@contactparthshah:gitter.imParth Shah

Here is the example with asyncio.to_thread - still seeing the same issue. if I send request it should receive immediate response however that is not happening

import asyncio
from aiohttp import web
from aiojobs.aiohttp import setup, spawn
import time

def background_task():
    print('Started bg code')
    time.sleep(5)
    print('Completed bg code')


async def hello(request):
    asyncio.to_thread(background_task())
    return web.Response(text="Hello, world")


app = web.Application()
app.add_routes([web.get('/', hello)])
setup(app)
web.run_app(app, port=12345)
20:22:39
@webknjaz:matrix.org@webknjaz πŸ‡ΊπŸ‡¦ #StandWithUkraine | https://stand-with-ukraine.pp.ua | #russiaIsANaziState
In reply to @contactparthshah:gitter.im

Here is the example:

Now if this background_task is having some blocking call then it’s blocking the next request response. What I am trying to achieve that once request comes acknowledge it and start background task; now this background task should not block other incoming requests; what am I doing wrong here?

from aiohttp import web
from aiojobs.aiohttp import setup, spawn
import time

def background_task():
    print('Started bg code')
    time.sleep(5)
    print('Completed bg code')


async def hello(request):
    spawn(request, background_task())
    return web.Response(text="Hello, world")


app = web.Application()
app.add_routes([web.get('/', hello)])
setup(app)
web.run_app(app, port=12345)
time.sleep() is blocking. Never use it on the event loop. Opt for asyncio.sleep() instead. You may need to study why blocking calls are harmful in async context.
20:51:06
@webknjaz:matrix.org@webknjaz πŸ‡ΊπŸ‡¦ #StandWithUkraine | https://stand-with-ukraine.pp.ua | #russiaIsANaziStateAh, I misread it. You're actually doing that in a thread so it's not a problem.20:51:56
19 Sep 2024
@sam:sambull.orgSam BullThe first example is not using a thread, and the second example doesn't await the to_thread() call, so neither is correct. But, yeah, it looks like you need to learn the basics of asyncio. I summarised this problem in a SO answer here, which may get you started: https://stackoverflow.com/questions/78639773/python-aiohttp-web-server-not-handling-more-than-one-request-once/78642547#7864254711:18:13
@lementsc:matrix.orgwithdraw joined the room.13:04:25
@lementsc:matrix.orgwithdrawRedacted or Malformed Event13:04:56
@erickson.9:matrix.orgOpe Ade joined the room.13:58:08
@erickson.9:matrix.orgOpe AdeRedacted or Malformed Event14:02:00
@erickson.9:matrix.orgOpe AdeRedacted or Malformed Event17:15:38
@webknjaz:matrix.org@webknjaz πŸ‡ΊπŸ‡¦ #StandWithUkraine | https://stand-with-ukraine.pp.ua | #russiaIsANaziState banned @erickson.9:matrix.orgOpe Ade (Spam).17:24:09
@webknjaz:matrix.org@webknjaz πŸ‡ΊπŸ‡¦ #StandWithUkraine | https://stand-with-ukraine.pp.ua | #russiaIsANaziState banned @lementsc:matrix.orgwithdraw (Spam).17:25:05
@opeade:matrix.orgAde Ope joined the room.19:24:03
@opeade:matrix.orgAde OpeRedacted or Malformed Event20:48:18
@webknjaz:matrix.org@webknjaz πŸ‡ΊπŸ‡¦ #StandWithUkraine | https://stand-with-ukraine.pp.ua | #russiaIsANaziState banned @opeade:matrix.orgAde Ope (Spam).21:45:32
23 Sep 2024
@help.you.guys:matrix.orghelp.you.guys joined the room.17:51:10
@help.you.guys:matrix.orghelp.you.guysRedacted or Malformed Event17:58:54
@sam:sambull.orgSam Bull banned @help.you.guys:matrix.orghelp.you.guys (Spam).18:21:01
27 Sep 2024
@meshya:matrix.orgmeshya joined the room.04:18:13
5 Oct 2024
@dhillonr-5df72a46d73408ce4fd42d7c:gitter.imdhillonr (Rohit Dhillon) joined the room.08:56:17
8 Oct 2024
@dhillonr-5df72a46d73408ce4fd42d7c:gitter.imdhillonr (Rohit Dhillon)

I have recently started using asyncio and aiokafka.
My requirement is to run n different kafka consumers consuming from different topics.
Each consumer on consumption pushes the data to a database and commits the offset once its done.

I am using manual commits for consumers to achieve atleast once delivery.

 i get error that committing wrong offset group has already rebalanced.
 Logs from consumer are below :

 Heartbeat failed for group .....-redis-cache-dev-dc-03 because it is rebalancing

 Revoking previously assigned partitions frozenset...

 (Re-)joining group .......-redis-cache-dev-dc-03

 Joined group '.......-redis-cache-dev-dc-03' (generation 2123) with member_id aiokafka-0.8.0-a77413e1-a0ea-4f6c-80ea-44f8f665fd54

 Elected group leader -- performing partition assignments using roundrobin

 OffsetCommit failed for group .......-redis-cache-dev-dc-03 due to group error ([Error 22] IllegalGenerationError: .......-redis-cache-dev-dc-03), will rejoin

OffsetCommit failed for group .....-redis-cache-dev-dc-03 due to group error ([Error 22] IllegalGenerationError: .......-redis-cache-dev-dc-03), will rejoin

Caught exception: CommitFailedError: ('Commit cannot be completed since the group has already\n            rebalanced and assigned the partitions to another member.\n            This means that the time between subsequent calls to poll()\n            was longer than the configured max_poll_interval_ms, which\n            typically implies that the poll loop is spending too much\n            time message processing. You can address this either by\n            increasing the rebalance timeout with max_poll_interval_ms,\n            or by reducing the maximum size of batches returned in poll()\n            with max_poll_records.\n            ', 'Commit cannot be completed since the group has already rebalanced and may have assigned the partitions to another member')

Shutting down...```
09:31:55
@dhillonr-5df72a46d73408ce4fd42d7c:gitter.imdhillonr (Rohit Dhillon) *

I have recently started using asyncio and aiokafka.
My requirement is to run n different kafka consumers consuming from different topics.
Each consumer on consumption pushes the data to a database and commits the offset once its done.

I am using manual commits for consumers to achieve atleast once delivery.

 Heartbeat failed for group .....-redis-cache-dev-dc-03 because it is rebalancing

 Revoking previously assigned partitions frozenset...

 (Re-)joining group .......-redis-cache-dev-dc-03

 Joined group '.......-redis-cache-dev-dc-03' (generation 2123) with member_id aiokafka-0.8.0-a77413e1-a0ea-4f6c-80ea-44f8f665fd54

 Elected group leader -- performing partition assignments using roundrobin

 OffsetCommit failed for group .......-redis-cache-dev-dc-03 due to group error ([Error 22] IllegalGenerationError: .......-redis-cache-dev-dc-03), will rejoin

OffsetCommit failed for group .....-redis-cache-dev-dc-03 due to group error ([Error 22] IllegalGenerationError: .......-redis-cache-dev-dc-03), will rejoin

Caught exception: CommitFailedError: ('Commit cannot be completed since the group has already\n            rebalanced and assigned the partitions to another member.\n            This means that the time between subsequent calls to poll()\n            was longer than the configured max_poll_interval_ms, which\n            typically implies that the poll loop is spending too much\n            time message processing. You can address this either by\n            increasing the rebalance timeout with max_poll_interval_ms,\n            or by reducing the maximum size of batches returned in poll()\n            with max_poll_records.\n            ', 'Commit cannot be completed since the group has already rebalanced and may have assigned the partitions to another member')

Shutting down...```
09:32:38
@dhillonr-5df72a46d73408ce4fd42d7c:gitter.imdhillonr (Rohit Dhillon) *

I have recently started using asyncio and aiokafka.
My requirement is to run n different kafka consumers consuming from different topics.
Each consumer on consumption pushes the data to a database and commits the offset once its done.

I am using manual commits for consumers to achieve atleast once delivery.
But i get error that committing wrong offset group has already rebalanced.
Logs from consumer are below :

 Heartbeat failed for group .....-redis-cache-dev-dc-03 because it is rebalancing

 Revoking previously assigned partitions frozenset...

 (Re-)joining group .......-redis-cache-dev-dc-03

 Joined group '.......-redis-cache-dev-dc-03' (generation 2123) with member_id aiokafka-0.8.0-a77413e1-a0ea-4f6c-80ea-44f8f665fd54

 Elected group leader -- performing partition assignments using roundrobin

 OffsetCommit failed for group .......-redis-cache-dev-dc-03 due to group error ([Error 22] IllegalGenerationError: .......-redis-cache-dev-dc-03), will rejoin

OffsetCommit failed for group .....-redis-cache-dev-dc-03 due to group error ([Error 22] IllegalGenerationError: .......-redis-cache-dev-dc-03), will rejoin

Caught exception: CommitFailedError: ('Commit cannot be completed since the group has already\n            rebalanced and assigned the partitions to another member.\n            This means that the time between subsequent calls to poll()\n            was longer than the configured max_poll_interval_ms, which\n            typically implies that the poll loop is spending too much\n            time message processing. You can address this either by\n            increasing the rebalance timeout with max_poll_interval_ms,\n            or by reducing the maximum size of batches returned in poll()\n            with max_poll_records.\n            ', 'Commit cannot be completed since the group has already rebalanced and may have assigned the partitions to another member')

Shutting down...```
09:32:54
@dhillonr-5df72a46d73408ce4fd42d7c:gitter.imdhillonr (Rohit Dhillon)

Code looks like this :

async def save_msg(redis_conn, cache_queue, message):
    await redis_conn.rpush(cache_queue, message.value)

async def handle_partn_batch(consumer, topic_partition, redis_conn, 
                             partn_batch, cache_queue, logger, log_flag):
    tasks = []
    for message in partn_batch:
        # do processing of message
        task = asyncio.create_task(save_msg(redis_conn, cache_queue, message))
        tasks.append(task)
    
    await asyncio.gather(*tasks)

    # pass the topic, partition and offset as a dict of TopicPartition: OffsetAndMetadata
    await consumer.commit({topic_partition: partn_batch[-1].offset + 1})


async def consume(topic_name, consumer_name, connector, redis_conn, cache_queue, batch_size, threshold,
                  log_flag):
    consumer =  await connector.create_async_kafka_consumer(topic_name, consumer_name)
    while True:
        # check space available in cache
        current_queue_length = await redis_conn.llen(cache_queue) 

        if (current_queue_length + batch_size <= threshold):
            message_batch =  await consumer.getmany(timeout_ms=1 * 1000)
            logger.info(f"Consumed {sum(len(sublist) for sublist in message_batch.values())} for cache {cache_queue}")
            
            batch_tasks = []
            for topic_partition, partition_batch in message_batch.items():
                if partition_batch:
                    task = asyncio.create_task(handle_partn_batch(consumer, topic_partition, redis_conn, 
                                                                  partition_batch, cache_queue, logger, 
                                                                  log_flag))
                    batch_tasks.append(task)
            await asyncio.gather(*batch_tasks)

        else:
            if logger and log_flag:
                logger.info(f"[[Redis Cache  {cache_queue} is Full Sleeping....]]")
            await asyncio.sleep(0.01)

09:38:03
10 Oct 2024
@matrix.helper.02:matrix.orgmatrix.helper.02 joined the room.16:33:14
@matrix.helper.02:matrix.orgmatrix.helper.02Redacted or Malformed Event17:15:47
@webknjaz:matrix.org@webknjaz πŸ‡ΊπŸ‡¦ #StandWithUkraine | https://stand-with-ukraine.pp.ua | #russiaIsANaziState banned @matrix.helper.02:matrix.orgmatrix.helper.02 (Spam).21:53:12
11 Oct 2024
@sauceee:matrix.orgsauceee joined the room.09:29:56

Show newer messages


Back to Room ListRoom Version: 6