In modern web applications, we often use async fire and forget patterns to improve user experience, for example, when a user clicks a item on a page, or an ML model makes a real-time prediction and want to log the event. These events are often not super critical and we can afford to lose some of them, so we end up using async fire and forget patterns. Most of the time this pattern and usage works fine because the end systems backing those events/http requests are designed to be resilient.
But what happens if the end system is down/restarted/responding slowly?
In this case, something inoccuous like logging a prediction can become a big problem and fast, causing failed user experience.
Let's take a look at a script that simulates the system behavior when we use async fire and forget patterns, send payloads of length 0.1MB and the end system is down/restarting. Only the first 5000 requests are sent over 30 seconds, enough time to see the memory usage.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 | import asyncio import logging import time import json import uuid from typing import List import psutil import aiohttp import pytest from concurrent.futures import ThreadPoolExecutor logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class MemoryMonitor: def __init__(self): self.process = psutil.Process() self.initial_memory = self.process.memory_info().rss / 1024 / 1024 # MB self.peak_memory = self.initial_memory self.current_memory = self.initial_memory self.memory_samples = [] def update(self): """Update memory statistics and return current memory usage.""" self.current_memory = self.process.memory_info().rss / 1024 / 1024 # MB self.peak_memory = max(self.peak_memory, self.current_memory) self.memory_samples.append(self.current_memory) return self.current_memory def get_stats(self): """Get comprehensive memory statistics.""" # Ensure we have the latest memory reading self.update() return { 'initial_memory_mb': self.initial_memory, 'current_memory_mb': self.current_memory, 'peak_memory_mb': self.peak_memory, 'memory_increase_from_initial_mb': self.current_memory - self.initial_memory, 'peak_memory_increase_mb': self.peak_memory - self.initial_memory, 'memory_samples_count': len(self.memory_samples), 'min_memory_mb': min(self.memory_samples) if self.memory_samples else self.initial_memory, 'max_memory_mb': max(self.memory_samples) if self.memory_samples else self.initial_memory, 'avg_memory_mb': sum(self.memory_samples) / len(self.memory_samples) if self.memory_samples else self.initial_memory } def generate_large_payload(size_mb: int = 0.1) -> dict: """Generate a large payload of specified size in MB with unique data.""" # Create a unique string with UUID and timestamp unique_id = str(uuid.uuid4()) timestamp = time.time() # Calculate how many UUIDs we need to reach the desired size # Each UUID is 36 characters bytes_needed = int(size_mb * 1024 * 1024) # Convert MB to bytes num_uuids = bytes_needed // 36 # Integer division to get number of UUIDs # Create a list of unique UUIDs unique_strings = [str(uuid.uuid4()) for _ in range(num_uuids)] return { "data": unique_strings, "request_id": unique_id, "timestamp": timestamp, "metadata": { "size_mb": size_mb, "type": "test_payload", "num_uuids": num_uuids, "bytes_generated": len(str(unique_strings)) # Actual size of the payload } } async def make_invalid_request(session: aiohttp.ClientSession, url: str, payload_size_mb: int = 1) -> None: try: payload = generate_large_payload(payload_size_mb) headers = { 'Content-Type': 'application/json', 'X-Test-Header': 'large-payload-test' } async with session.post( url, json=payload, headers=headers, timeout=30 # Increased timeout to 30 seconds for each request ) as response: await response.text() except asyncio.CancelledError: # Ignore canceled errors pass except Exception as e: logger.debug(f"Request failed: {str(e)}") raise async def create_concurrent_requests(num_requests: int = 5000, payload_size_mb: int = 0.1) -> None: invalid_urls = [ f"http://invalid-domain-{i}.com/api/data" for i in range(num_requests) ] memory_monitor = MemoryMonitor() start_time = time.time() tasks: List[asyncio.Task] = [] # Configure aiohttp session with appropriate timeout timeout = aiohttp.ClientTimeout(total=30) # 30 seconds total timeout async with aiohttp.ClientSession( timeout=timeout, json_serialize=json.dumps ) as session: # Create tasks gradually over time for i in range(0, num_requests, 200): # Create 200 tasks every 5 seconds batch_urls = invalid_urls[i:i+200] for url in batch_urls: task = asyncio.create_task( make_invalid_request(session, url, payload_size_mb) ) tasks.append(task) current_memory = memory_monitor.update() logger.info( f"Created {len(tasks)} tasks. " f"Current memory: {current_memory:.2f} MB, " f"Peak memory: {memory_monitor.peak_memory:.2f} MB" ) # Wait 5 seconds before creating next batch await asyncio.sleep(5) # After all tasks are created, monitor for remaining time remaining_time = 30 - (time.time() - start_time) if remaining_time > 0: logger.info(f"All tasks created. Monitoring for {remaining_time:.1f} more seconds...") await asyncio.sleep(remaining_time) # Log final stats end_time = time.time() stats = memory_monitor.get_stats() stats['execution_time_seconds'] = end_time - start_time stats['total_data_sent_mb'] = num_requests * payload_size_mb stats['tasks_created'] = len(tasks) logger.info("\nTest completed with detailed memory stats:") for key, value in stats.items(): if isinstance(value, float): logger.info(f"{key}: {value:.2f}") else: logger.info(f"{key}: {value}") @pytest.mark.asyncio async def test_async_exhaustion(): """Test to demonstrate HTTP async task exhaustion with large POST requests to invalid URLs.""" try: await create_concurrent_requests(num_requests=5000, payload_size_mb=0.1) except Exception as e: logger.error(f"Test failed with error: {str(e)}") raise if __name__ == "__main__": asyncio.run(test_async_exhaustion()) |
Running the script
1 | python simulate_fire_and_forget_behavior.py
|
Output
1 2 3 4 | INFO:__main__:Created 200 tasks. Current memory: 40.17 MB, Peak memory: 40.17 MB INFO:__main__:Created 400 tasks. Current memory: 129.16 MB, Peak memory: 129.16 MB INFO:__main__:Created 600 tasks. Current memory: 215.64 MB, Peak memory: 215.64 MB INFO:__main__:Created 800 tasks. Current memory: 302.33 MB, Peak memory: 302.33 MB |
The above output shows that this is not so inoccuous after all, this can bring down the system, pretty fast.
So what can we do to avoid this?
There can be multiple solutions to this problem, but the most common one is to have backpressure mechanisms in place, so that the system can handle the requests that are being sent. One easy way to do this would be asyncio.Queue with a maxsize of your choice and a limited set of workers(depends on the throughput you need to handle), if you are fine with dropping log events.
The code below shows a simple implementation of a bounded queue, that can be used to send requests to an endpoint.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 | import asyncio import logging import time import json import uuid from typing import List, Dict, Any from dataclasses import dataclass from collections import defaultdict import aiohttp import os import psutil logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class RequestPayload: topic: str data: Dict[str, Any] timestamp: float = time.time() request_id: str = f"req_{int(time.time() * 1000)}_{os.urandom(4).hex()}" class MetricsCollector: def __init__(self): self.metrics = defaultdict(int) self.start_time = time.time() self.last_metrics_time = time.time() self.last_metrics = defaultdict(int) def increment(self, metric_name: str, value: int = 1): self.metrics[metric_name] += value def _calculate_rate(self, current: int, last: int, time_diff: float) -> float: return (current - last) / time_diff if time_diff > 0 else 0.0 def get_metrics(self) -> Dict[str, Any]: current_time = time.time() elapsed_time = current_time - self.start_time time_diff = current_time - self.last_metrics_time # Calculate rates total_rate = self._calculate_rate( self.metrics['total_requests'], self.last_metrics['total_requests'], time_diff ) drop_rate = self._calculate_rate( self.metrics['dropped_requests'], self.last_metrics['dropped_requests'], time_diff ) success_rate = self._calculate_rate( self.metrics['successful_requests'], self.last_metrics['successful_requests'], time_diff ) failure_rate = self._calculate_rate( self.metrics['failed_requests'], self.last_metrics['failed_requests'], time_diff ) # Update last metrics self.last_metrics = self.metrics.copy() self.last_metrics_time = current_time return { # Counter metrics 'requests_total': self.metrics['total_requests'], 'requests_successful_total': self.metrics['successful_requests'], 'requests_failed_total': self.metrics['failed_requests'], 'requests_dropped_total': self.metrics['dropped_requests'], 'queue_full_total': self.metrics['queue_full_count'], # Gauge metrics 'queue_size_current': self.queue.qsize() if hasattr(self, 'queue') else 0, 'queue_size_max': self.maxsize if hasattr(self, 'maxsize') else 0, 'workers_active': len(self.workers) if hasattr(self, 'workers') else 0, # Rate metrics (per second) 'requests_rate': total_rate, 'requests_drop_rate': drop_rate, 'requests_success_rate': success_rate, 'requests_failure_rate': failure_rate, # Timing metrics 'elapsed_time_seconds': elapsed_time, 'uptime_seconds': elapsed_time, 'memory_usage_mb': psutil.Process().memory_info().rss / 1024 / 1024 } class BoundedQueue: def __init__(self, maxsize: int, num_workers: int, endpoint: str, metrics_interval: float = 5.0): self.queue = asyncio.Queue(maxsize=maxsize) self.workers = [] self.maxsize = maxsize self.num_workers = num_workers self.endpoint = endpoint self.metrics = MetricsCollector() self.running = True self.metrics_interval = metrics_interval self.metrics_task = None async def start(self): """Start the worker tasks and metrics reporter.""" self.workers = [ asyncio.create_task(self._worker(f"worker-{i}")) for i in range(self.num_workers) ] self.metrics_task = asyncio.create_task(self._report_metrics()) logger.info(f"Started {self.num_workers} workers and metrics reporter") async def stop(self): """Stop all workers and wait for them to finish.""" self.running = False if self.metrics_task: self.metrics_task.cancel() for worker in self.workers: worker.cancel() await asyncio.gather(*self.workers, return_exceptions=True) logger.info("All workers stopped") def put(self, topic: str, data: Dict[str, Any]) -> bool: """Try to put a request in the queue. Returns True if successful, False if queue is full.""" self.metrics.increment('total_requests') try: payload = RequestPayload(topic=topic, data=data) self.queue.put_nowait(payload) return True except asyncio.QueueFull: self.metrics.increment('queue_full_count') self.metrics.increment('dropped_requests') return False async def _report_metrics(self): """Periodically report metrics.""" while self.running: try: metrics = self.get_metrics() logger.info(f"Periodic metrics: {json.dumps(metrics, indent=2)}") await asyncio.sleep(self.metrics_interval) except asyncio.CancelledError: break except Exception: await asyncio.sleep(0.2) async def _worker(self, worker_name: str): """Worker that processes requests from the queue.""" async with aiohttp.ClientSession() as session: while self.running: try: try: payload = self.queue.get_nowait() except asyncio.QueueEmpty: await asyncio.sleep(0.1) continue try: request_data = { 'topic': payload.topic, 'data': payload.data, 'timestamp': payload.timestamp, 'request_id': payload.request_id } async with session.post( self.endpoint, json=request_data, timeout=30 ) as response: if response.status == 200: self.metrics.increment('successful_requests') else: self.metrics.increment('failed_requests') except Exception: self.metrics.increment('failed_requests') finally: self.queue.task_done() except asyncio.CancelledError: break def get_metrics(self) -> Dict[str, Any]: """Get current metrics.""" return self.metrics.get_metrics() async def main(): queue = BoundedQueue( maxsize=1000, num_workers=5, endpoint="http://invalid-url-that-will-never-resolve.example.com/api/events", metrics_interval=5.0 ) await queue.start() try: for i in range(10000): topic = f"test-topic-{i % 5}" # Create a payload of approximately 0.1MB large_string = "x" * (100 * 1024) # 100KB string data = { "value": i, "timestamp": time.time(), "metadata": {"source": "test"}, "payload": large_string # This will make the payload ~0.1MB } if not queue.put(topic, data): logger.warning(f"Failed to queue request {i}") await asyncio.sleep(0.001) finally: await queue.stop() metrics = queue.get_metrics() logger.info(f"Final metrics: {json.dumps(metrics, indent=2)}") if __name__ == "__main__": asyncio.run(main()) |
Conclusion
In this post, we saw how async fire and forget patterns can quickly become a problem when the end system is down/restarting. We also saw a simple implementation of a bounded queue, that can be used to send requests to an endpoint. Sometimes, we can be too quick to fire and forget, and forget to consider the cost of doing so.