Description
I have a simple module that download avro data files (~50-200 files, each one can be between 1-50MB) from S3 and index the data to Elasticsearch.
This module is running in a docker container (within Kubernetes).
I was trying to use aiomultiprocess to speed up the process by running it in parallel with more resources (4 cores).
I have noticed that the module is getting stuck too often (keep running, doing nothing) and after a long research I found that it's a memory issue (although I didn't get Out Of Memory kill event from Kubernetes).
Is there a way to raise an exception in such case? I want to be alerted if my app is getting stuck so I could tune the memory and rerun the tasks again.
Below you can see my effort to reproduce this behavior in a simple task (just a stupid loop to fill memory) instead of downloading files / indexing them to database.
Running the code below (and also here) with memory limit of 2g never ends, while changing it to 3g finish successfully.
docker build -t aiomultiprocess_mem .
docker run --rm --memory=2g --cpus=5 --name=mem aiomultiprocess_mem
Output (at some point only heartbeat logs keeps infinitely):
2023-03-19 15:31:20,457 1 INFO creating pool [app.py:38]
2023-03-19 15:31:20,524 1 INFO heartbeat alive 4 processes [app.py:22]
2023-03-19 15:31:20,584 14 INFO T000 started! [app.py:27]
2023-03-19 15:31:20,589 18 INFO T003 started! [app.py:27]
2023-03-19 15:31:20,596 22 INFO T006 started! [app.py:27]
2023-03-19 15:31:20,600 26 INFO T009 started! [app.py:27]
2023-03-19 15:31:21,525 1 INFO heartbeat alive 4 processes [app.py:22]
2023-03-19 15:31:22,525 1 INFO heartbeat alive 4 processes [app.py:22]
2023-03-19 15:31:23,527 1 INFO heartbeat alive 4 processes [app.py:22]
2023-03-19 15:31:24,387 18 INFO T003 100000 [app.py:31]
2023-03-19 15:31:24,386 26 INFO T009 100000 [app.py:31]
2023-03-19 15:31:24,391 22 INFO T006 100000 [app.py:31]
2023-03-19 15:31:24,527 1 INFO heartbeat alive 4 processes [app.py:22]
2023-03-19 15:31:24,610 31 INFO T012 started! [app.py:27]
2023-03-19 15:31:25,528 1 INFO heartbeat alive 4 processes [app.py:22]
2023-03-19 15:31:25,695 31 INFO T012 100000 [app.py:31]
2023-03-19 15:31:26,261 31 INFO T013 started! [app.py:27]
...
...
2023-03-19 15:33:34,095 1 INFO heartbeat alive 4 processes [app.py:22]
2023-03-19 15:33:35,096 1 INFO heartbeat alive 4 processes [app.py:22]
2023-03-19 15:33:36,097 1 INFO heartbeat alive 4 processes [app.py:22]
2023-03-19 15:33:37,097 1 INFO heartbeat alive 4 processes [app.py:22]
2023-03-19 15:33:38,098 1 INFO heartbeat alive 4 processes [app.py:22]
2023-03-19 15:33:39,099 1 INFO heartbeat alive 4 processes [app.py:22]
2023-03-19 15:33:40,100 1 INFO heartbeat alive 4 processes [app.py:22]
2023-03-19 15:33:41,101 1 INFO heartbeat alive 4 processes [app.py:22]
2023-03-19 15:33:42,101 1 INFO heartbeat alive 4 processes [app.py:22]
...
Code:
├── Dockerfile
└── multi
├── __init__.py
├── __main__.py
└── app.py
Dockerfile:
FROM python:3.11.2-slim-bullseye
WORKDIR /
RUN pip install --no-cache-dir --upgrade pip==23.0.1 \
&& pip install --no-cache-dir aiomultiprocess==0.9.0
COPY multi /multi
CMD ["python", "-m", "multi"]
__main__.py:
import asyncio
from multi.app import main
if __name__ == "__main__":
asyncio.run(main())
app.py:
import asyncio
import logging
import os
import sys
from aiomultiprocess import Pool
logger = logging.getLogger(__name__)
def init_logger():
logging.basicConfig(
format="%(asctime)-15s %(process)d %(levelname)-18.18s %(message)s [%(filename)s:%(lineno)d]",
stream=sys.stdout
)
logging.root.setLevel(logging.INFO)
async def is_alive(pool):
while True:
if pool is not None:
logger.info(f"heartbeat alive {len(pool.processes.keys())} processes")
await asyncio.sleep(1)
async def my_mem_task(task_id):
logger.info(f"T{task_id:03} started!")
data = []
for i in range(100_000):
data.append([i] * 1_000)
logger.info(f"T{task_id:03} {len(data)}")
async def main():
init_logger()
number_of_processes = int(os.getenv("NUMBER_OF_PROCESSES", "4"))
number_of_async_tasks = int(os.getenv("NUMBER_OF_ASYNC_TASKS", "3"))
logger.info("creating pool")
async with Pool(
processes=number_of_processes,
childconcurrency=number_of_async_tasks,
initializer=init_logger,
) as pool:
asyncio.create_task(is_alive(pool))
task_ids = [task_id for task_id in range(150)]
await pool.map(my_mem_task, task_ids)
pool.close()
logger.info("processes pool closed")
await pool.join()
logger.info("all processes are done")
UPDATE:
Same happen in non aio version multiprocessing :( (added a relevant question in SO)
Details
- OS:
- Python version: 3.11.2
- aiomultiprocess version: 0.9.0
- Can you repro on master? yes
- Can you repro in a clean virtualenv? yes
Description
I have a simple module that download avro data files (~50-200 files, each one can be between 1-50MB) from S3 and index the data to Elasticsearch.
This module is running in a docker container (within Kubernetes).
I was trying to use
aiomultiprocessto speed up the process by running it in parallel with more resources (4 cores).I have noticed that the module is getting stuck too often (keep running, doing nothing) and after a long research I found that it's a memory issue (although I didn't get Out Of Memory kill event from Kubernetes).
Is there a way to raise an exception in such case? I want to be alerted if my app is getting stuck so I could tune the memory and rerun the tasks again.
Below you can see my effort to reproduce this behavior in a simple task (just a stupid loop to fill memory) instead of downloading files / indexing them to database.
Running the code below (and also here) with memory limit of 2g never ends, while changing it to 3g finish successfully.
Output (at some point only heartbeat logs keeps infinitely):
Code:
Dockerfile:__main__.py:app.py:UPDATE:
Same happen in non aio version
multiprocessing:( (added a relevant question in SO)Details