18 Sep 2024 |
Arcadiy Ivanov | In reply to @contactparthshah:gitter.im I have requirement where based on incoming request, triggers the fire and forget background task. I have tried using aiojobs spawn however seeing below issues. I) without await not able to spawn the background task ii) if I provide await spawn(background_task()) then its starting the task background successfully however still this task is going on, next request is getting blocked.
What am I missing? You can do asyncio.create_task | 18:58:41 |
Arcadiy Ivanov | and if the task is not async and needs to run in a separate thread then it's asyncio.to_thread | 18:59:54 |
Parth Shah | Here is the example:
Now if this background_task is having some blocking call then itβs blocking the next request response. What I am trying to achieve that once request comes acknowledge it and start background task; now this background task should not block other incoming requests; what am I doing wrong here?
from aiohttp import web
from aiojobs.aiohttp import setup, spawn
import time
def background_task():
print('Started bg code')
time.sleep(5)
print('Completed bg code')
async def hello(request):
spawn(request, background_task())
return web.Response(text="Hello, world")
app = web.Application()
app.add_routes([web.get('/', hello)])
setup(app)
web.run_app(app, port=12345)
| 20:16:46 |
Parth Shah | Here is the example with asyncio.to_thread - still seeing the same issue. if I send request it should receive immediate response however that is not happening
import asyncio
from aiohttp import web
from aiojobs.aiohttp import setup, spawn
import time
def background_task():
print('Started bg code')
time.sleep(5)
print('Completed bg code')
async def hello(request):
asyncio.to_thread(background_task())
return web.Response(text="Hello, world")
app = web.Application()
app.add_routes([web.get('/', hello)])
setup(app)
web.run_app(app, port=12345)
| 20:22:39 |
@webknjaz πΊπ¦ #StandWithUkraine | https://stand-with-ukraine.pp.ua | #russiaIsANaziState | In reply to @contactparthshah:gitter.im
Here is the example:
Now if this background_task is having some blocking call then itβs blocking the next request response. What I am trying to achieve that once request comes acknowledge it and start background task; now this background task should not block other incoming requests; what am I doing wrong here?
from aiohttp import web
from aiojobs.aiohttp import setup, spawn
import time
def background_task():
print('Started bg code')
time.sleep(5)
print('Completed bg code')
async def hello(request):
spawn(request, background_task())
return web.Response(text="Hello, world")
app = web.Application()
app.add_routes([web.get('/', hello)])
setup(app)
web.run_app(app, port=12345)
time.sleep() is blocking. Never use it on the event loop. Opt for asyncio.sleep() instead. You may need to study why blocking calls are harmful in async context. | 20:51:06 |
@webknjaz πΊπ¦ #StandWithUkraine | https://stand-with-ukraine.pp.ua | #russiaIsANaziState | Ah, I misread it. You're actually doing that in a thread so it's not a problem. | 20:51:56 |
19 Sep 2024 |
Sam Bull | The first example is not using a thread, and the second example doesn't await the to_thread() call, so neither is correct.
But, yeah, it looks like you need to learn the basics of asyncio. I summarised this problem in a SO answer here, which may get you started:
https://stackoverflow.com/questions/78639773/python-aiohttp-web-server-not-handling-more-than-one-request-once/78642547#78642547 | 11:18:13 |
| withdraw joined the room. | 13:04:25 |
withdraw | Redacted or Malformed Event | 13:04:56 |
| Ope Ade joined the room. | 13:58:08 |
Ope Ade | Redacted or Malformed Event | 14:02:00 |
Ope Ade | Redacted or Malformed Event | 17:15:38 |
| @webknjaz πΊπ¦ #StandWithUkraine | https://stand-with-ukraine.pp.ua | #russiaIsANaziState banned Ope Ade (Spam). | 17:24:09 |
| @webknjaz πΊπ¦ #StandWithUkraine | https://stand-with-ukraine.pp.ua | #russiaIsANaziState banned withdraw (Spam). | 17:25:05 |
| Ade Ope joined the room. | 19:24:03 |
Ade Ope | Redacted or Malformed Event | 20:48:18 |
| @webknjaz πΊπ¦ #StandWithUkraine | https://stand-with-ukraine.pp.ua | #russiaIsANaziState banned Ade Ope (Spam). | 21:45:32 |
23 Sep 2024 |
| help.you.guys joined the room. | 17:51:10 |
help.you.guys | Redacted or Malformed Event | 17:58:54 |
| Sam Bull banned help.you.guys (Spam). | 18:21:01 |
27 Sep 2024 |
| meshya joined the room. | 04:18:13 |
5 Oct 2024 |
| dhillonr (Rohit Dhillon) joined the room. | 08:56:17 |
8 Oct 2024 |
dhillonr (Rohit Dhillon) | I have recently started using asyncio and aiokafka. My requirement is to run n different kafka consumers consuming from different topics. Each consumer on consumption pushes the data to a database and commits the offset once its done.
I am using manual commits for consumers to achieve atleast once delivery.
i get error that committing wrong offset group has already rebalanced.
Logs from consumer are below :
Heartbeat failed for group .....-redis-cache-dev-dc-03 because it is rebalancing
Revoking previously assigned partitions frozenset...
(Re-)joining group .......-redis-cache-dev-dc-03
Joined group '.......-redis-cache-dev-dc-03' (generation 2123) with member_id aiokafka-0.8.0-a77413e1-a0ea-4f6c-80ea-44f8f665fd54
Elected group leader -- performing partition assignments using roundrobin
OffsetCommit failed for group .......-redis-cache-dev-dc-03 due to group error ([Error 22] IllegalGenerationError: .......-redis-cache-dev-dc-03), will rejoin
OffsetCommit failed for group .....-redis-cache-dev-dc-03 due to group error ([Error 22] IllegalGenerationError: .......-redis-cache-dev-dc-03), will rejoin
Caught exception: CommitFailedError: ('Commit cannot be completed since the group has already\n rebalanced and assigned the partitions to another member.\n This means that the time between subsequent calls to poll()\n was longer than the configured max_poll_interval_ms, which\n typically implies that the poll loop is spending too much\n time message processing. You can address this either by\n increasing the rebalance timeout with max_poll_interval_ms,\n or by reducing the maximum size of batches returned in poll()\n with max_poll_records.\n ', 'Commit cannot be completed since the group has already rebalanced and may have assigned the partitions to another member')
Shutting down...```
| 09:31:55 |
dhillonr (Rohit Dhillon) | * I have recently started using asyncio and aiokafka. My requirement is to run n different kafka consumers consuming from different topics. Each consumer on consumption pushes the data to a database and commits the offset once its done.
I am using manual commits for consumers to achieve atleast once delivery.
Heartbeat failed for group .....-redis-cache-dev-dc-03 because it is rebalancing
Revoking previously assigned partitions frozenset...
(Re-)joining group .......-redis-cache-dev-dc-03
Joined group '.......-redis-cache-dev-dc-03' (generation 2123) with member_id aiokafka-0.8.0-a77413e1-a0ea-4f6c-80ea-44f8f665fd54
Elected group leader -- performing partition assignments using roundrobin
OffsetCommit failed for group .......-redis-cache-dev-dc-03 due to group error ([Error 22] IllegalGenerationError: .......-redis-cache-dev-dc-03), will rejoin
OffsetCommit failed for group .....-redis-cache-dev-dc-03 due to group error ([Error 22] IllegalGenerationError: .......-redis-cache-dev-dc-03), will rejoin
Caught exception: CommitFailedError: ('Commit cannot be completed since the group has already\n rebalanced and assigned the partitions to another member.\n This means that the time between subsequent calls to poll()\n was longer than the configured max_poll_interval_ms, which\n typically implies that the poll loop is spending too much\n time message processing. You can address this either by\n increasing the rebalance timeout with max_poll_interval_ms,\n or by reducing the maximum size of batches returned in poll()\n with max_poll_records.\n ', 'Commit cannot be completed since the group has already rebalanced and may have assigned the partitions to another member')
Shutting down...```
| 09:32:38 |
dhillonr (Rohit Dhillon) | * I have recently started using asyncio and aiokafka. My requirement is to run n different kafka consumers consuming from different topics. Each consumer on consumption pushes the data to a database and commits the offset once its done.
I am using manual commits for consumers to achieve atleast once delivery. But i get error that committing wrong offset group has already rebalanced. Logs from consumer are below :
Heartbeat failed for group .....-redis-cache-dev-dc-03 because it is rebalancing
Revoking previously assigned partitions frozenset...
(Re-)joining group .......-redis-cache-dev-dc-03
Joined group '.......-redis-cache-dev-dc-03' (generation 2123) with member_id aiokafka-0.8.0-a77413e1-a0ea-4f6c-80ea-44f8f665fd54
Elected group leader -- performing partition assignments using roundrobin
OffsetCommit failed for group .......-redis-cache-dev-dc-03 due to group error ([Error 22] IllegalGenerationError: .......-redis-cache-dev-dc-03), will rejoin
OffsetCommit failed for group .....-redis-cache-dev-dc-03 due to group error ([Error 22] IllegalGenerationError: .......-redis-cache-dev-dc-03), will rejoin
Caught exception: CommitFailedError: ('Commit cannot be completed since the group has already\n rebalanced and assigned the partitions to another member.\n This means that the time between subsequent calls to poll()\n was longer than the configured max_poll_interval_ms, which\n typically implies that the poll loop is spending too much\n time message processing. You can address this either by\n increasing the rebalance timeout with max_poll_interval_ms,\n or by reducing the maximum size of batches returned in poll()\n with max_poll_records.\n ', 'Commit cannot be completed since the group has already rebalanced and may have assigned the partitions to another member')
Shutting down...```
| 09:32:54 |
dhillonr (Rohit Dhillon) | Code looks like this :
async def save_msg(redis_conn, cache_queue, message):
await redis_conn.rpush(cache_queue, message.value)
async def handle_partn_batch(consumer, topic_partition, redis_conn,
partn_batch, cache_queue, logger, log_flag):
tasks = []
for message in partn_batch:
# do processing of message
task = asyncio.create_task(save_msg(redis_conn, cache_queue, message))
tasks.append(task)
await asyncio.gather(*tasks)
# pass the topic, partition and offset as a dict of TopicPartition: OffsetAndMetadata
await consumer.commit({topic_partition: partn_batch[-1].offset + 1})
async def consume(topic_name, consumer_name, connector, redis_conn, cache_queue, batch_size, threshold,
log_flag):
consumer = await connector.create_async_kafka_consumer(topic_name, consumer_name)
while True:
# check space available in cache
current_queue_length = await redis_conn.llen(cache_queue)
if (current_queue_length + batch_size <= threshold):
message_batch = await consumer.getmany(timeout_ms=1 * 1000)
logger.info(f"Consumed {sum(len(sublist) for sublist in message_batch.values())} for cache {cache_queue}")
batch_tasks = []
for topic_partition, partition_batch in message_batch.items():
if partition_batch:
task = asyncio.create_task(handle_partn_batch(consumer, topic_partition, redis_conn,
partition_batch, cache_queue, logger,
log_flag))
batch_tasks.append(task)
await asyncio.gather(*batch_tasks)
else:
if logger and log_flag:
logger.info(f"[[Redis Cache {cache_queue} is Full Sleeping....]]")
await asyncio.sleep(0.01)
| 09:38:03 |
10 Oct 2024 |
| matrix.helper.02 joined the room. | 16:33:14 |
matrix.helper.02 | Redacted or Malformed Event | 17:15:47 |
| @webknjaz πΊπ¦ #StandWithUkraine | https://stand-with-ukraine.pp.ua | #russiaIsANaziState banned matrix.helper.02 (Spam). | 21:53:12 |
11 Oct 2024 |
| sauceee joined the room. | 09:29:56 |