Published

Fri 13 June 2025

←Home

"Fire but don't forget": The Hidden Costs of Async fire and forget patterns

In modern web applications, we often use async fire and forget patterns to improve user experience, for example, when a user clicks a item on a page, or an ML model makes a real-time prediction and want to log the event. These events are often not super critical and we can afford to lose some of them, so we end up using async fire and forget patterns. Most of the time this pattern and usage works fine because the end systems backing those events/http requests are designed to be resilient.

But what happens if the end system is down/restarted/responding slowly?

In this case, something inoccuous like logging a prediction can become a big problem and fast, causing failed user experience.

Let's take a look at a script that simulates the system behavior when we use async fire and forget patterns, send payloads of length 0.1MB and the end system is down/restarting. Only the first 5000 requests are sent over 30 seconds, enough time to see the memory usage.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import asyncio
import logging
import time
import json
import uuid
from typing import List
import psutil
import aiohttp
import pytest
from concurrent.futures import ThreadPoolExecutor

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class MemoryMonitor:
    def __init__(self):
        self.process = psutil.Process()
        self.initial_memory = self.process.memory_info().rss / 1024 / 1024  # MB
        self.peak_memory = self.initial_memory
        self.current_memory = self.initial_memory
        self.memory_samples = []

    def update(self):
        """Update memory statistics and return current memory usage."""
        self.current_memory = self.process.memory_info().rss / 1024 / 1024  # MB
        self.peak_memory = max(self.peak_memory, self.current_memory)
        self.memory_samples.append(self.current_memory)
        return self.current_memory

    def get_stats(self):
        """Get comprehensive memory statistics."""
        # Ensure we have the latest memory reading
        self.update()

        return {
            'initial_memory_mb': self.initial_memory,
            'current_memory_mb': self.current_memory,
            'peak_memory_mb': self.peak_memory,
            'memory_increase_from_initial_mb': self.current_memory - self.initial_memory,
            'peak_memory_increase_mb': self.peak_memory - self.initial_memory,
            'memory_samples_count': len(self.memory_samples),
            'min_memory_mb': min(self.memory_samples) if self.memory_samples else self.initial_memory,
            'max_memory_mb': max(self.memory_samples) if self.memory_samples else self.initial_memory,
            'avg_memory_mb': sum(self.memory_samples) / len(self.memory_samples) if self.memory_samples else self.initial_memory
        }

def generate_large_payload(size_mb: int = 0.1) -> dict:
    """Generate a large payload of specified size in MB with unique data."""
    # Create a unique string with UUID and timestamp
    unique_id = str(uuid.uuid4())
    timestamp = time.time()

    # Calculate how many UUIDs we need to reach the desired size
    # Each UUID is 36 characters
    bytes_needed = int(size_mb * 1024 * 1024)  # Convert MB to bytes
    num_uuids = bytes_needed // 36  # Integer division to get number of UUIDs

    # Create a list of unique UUIDs
    unique_strings = [str(uuid.uuid4()) for _ in range(num_uuids)]

    return {
        "data": unique_strings,
        "request_id": unique_id,
        "timestamp": timestamp,
        "metadata": {
            "size_mb": size_mb,
            "type": "test_payload",
            "num_uuids": num_uuids,
            "bytes_generated": len(str(unique_strings))  # Actual size of the payload
        }
    }

async def make_invalid_request(session: aiohttp.ClientSession, url: str, payload_size_mb: int = 1) -> None:
    try:
        payload = generate_large_payload(payload_size_mb)
        headers = {
            'Content-Type': 'application/json',
            'X-Test-Header': 'large-payload-test'
        }

        async with session.post(
            url,
            json=payload,
            headers=headers,
            timeout=30  # Increased timeout to 30 seconds for each request
        ) as response:
            await response.text()
    except asyncio.CancelledError:
        # Ignore canceled errors
        pass
    except Exception as e:
        logger.debug(f"Request failed: {str(e)}")
        raise

async def create_concurrent_requests(num_requests: int = 5000, payload_size_mb: int = 0.1) -> None:
    invalid_urls = [
        f"http://invalid-domain-{i}.com/api/data" for i in range(num_requests)
    ]

    memory_monitor = MemoryMonitor()
    start_time = time.time()
    tasks: List[asyncio.Task] = []

    # Configure aiohttp session with appropriate timeout
    timeout = aiohttp.ClientTimeout(total=30)  # 30 seconds total timeout

    async with aiohttp.ClientSession(
        timeout=timeout,
        json_serialize=json.dumps
    ) as session:
        # Create tasks gradually over time
        for i in range(0, num_requests, 200):  # Create 200 tasks every 5 seconds
            batch_urls = invalid_urls[i:i+200]
            for url in batch_urls:
                task = asyncio.create_task(
                    make_invalid_request(session, url, payload_size_mb)
                )
                tasks.append(task)

            current_memory = memory_monitor.update()
            logger.info(
                f"Created {len(tasks)} tasks. "
                f"Current memory: {current_memory:.2f} MB, "
                f"Peak memory: {memory_monitor.peak_memory:.2f} MB"
            )

            # Wait 5 seconds before creating next batch
            await asyncio.sleep(5)

        # After all tasks are created, monitor for remaining time
        remaining_time = 30 - (time.time() - start_time)
        if remaining_time > 0:
            logger.info(f"All tasks created. Monitoring for {remaining_time:.1f} more seconds...")
            await asyncio.sleep(remaining_time)

        # Log final stats
        end_time = time.time()
        stats = memory_monitor.get_stats()
        stats['execution_time_seconds'] = end_time - start_time
        stats['total_data_sent_mb'] = num_requests * payload_size_mb
        stats['tasks_created'] = len(tasks)

        logger.info("\nTest completed with detailed memory stats:")
        for key, value in stats.items():
            if isinstance(value, float):
                logger.info(f"{key}: {value:.2f}")
            else:
                logger.info(f"{key}: {value}")

@pytest.mark.asyncio
async def test_async_exhaustion():
    """Test to demonstrate HTTP async task exhaustion with large POST requests to invalid URLs."""
    try:
        await create_concurrent_requests(num_requests=5000, payload_size_mb=0.1)
    except Exception as e:
        logger.error(f"Test failed with error: {str(e)}")
        raise

if __name__ == "__main__":
    asyncio.run(test_async_exhaustion())

Running the script

1
python simulate_fire_and_forget_behavior.py

Output

1
2
3
4
INFO:__main__:Created 200 tasks. Current memory: 40.17 MB, Peak memory: 40.17 MB
INFO:__main__:Created 400 tasks. Current memory: 129.16 MB, Peak memory: 129.16 MB
INFO:__main__:Created 600 tasks. Current memory: 215.64 MB, Peak memory: 215.64 MB
INFO:__main__:Created 800 tasks. Current memory: 302.33 MB, Peak memory: 302.33 MB

The above output shows that this is not so inoccuous after all, this can bring down the system, pretty fast.

So what can we do to avoid this?

There can be multiple solutions to this problem, but the most common one is to have backpressure mechanisms in place, so that the system can handle the requests that are being sent. One easy way to do this would be asyncio.Queue with a maxsize of your choice and a limited set of workers(depends on the throughput you need to handle), if you are fine with dropping log events.

The code below shows a simple implementation of a bounded queue, that can be used to send requests to an endpoint.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
import asyncio
import logging
import time
import json
import uuid
from typing import List, Dict, Any
from dataclasses import dataclass
from collections import defaultdict
import aiohttp
import os
import psutil

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class RequestPayload:
    topic: str
    data: Dict[str, Any]
    timestamp: float = time.time()
    request_id: str = f"req_{int(time.time() * 1000)}_{os.urandom(4).hex()}"

class MetricsCollector:
    def __init__(self):
        self.metrics = defaultdict(int)
        self.start_time = time.time()
        self.last_metrics_time = time.time()
        self.last_metrics = defaultdict(int)

    def increment(self, metric_name: str, value: int = 1):
        self.metrics[metric_name] += value

    def _calculate_rate(self, current: int, last: int, time_diff: float) -> float:
        return (current - last) / time_diff if time_diff > 0 else 0.0

    def get_metrics(self) -> Dict[str, Any]:
        current_time = time.time()
        elapsed_time = current_time - self.start_time
        time_diff = current_time - self.last_metrics_time

        # Calculate rates
        total_rate = self._calculate_rate(
            self.metrics['total_requests'],
            self.last_metrics['total_requests'],
            time_diff
        )
        drop_rate = self._calculate_rate(
            self.metrics['dropped_requests'],
            self.last_metrics['dropped_requests'],
            time_diff
        )
        success_rate = self._calculate_rate(
            self.metrics['successful_requests'],
            self.last_metrics['successful_requests'],
            time_diff
        )
        failure_rate = self._calculate_rate(
            self.metrics['failed_requests'],
            self.last_metrics['failed_requests'],
            time_diff
        )

        # Update last metrics
        self.last_metrics = self.metrics.copy()
        self.last_metrics_time = current_time

        return {
            # Counter metrics
            'requests_total': self.metrics['total_requests'],
            'requests_successful_total': self.metrics['successful_requests'],
            'requests_failed_total': self.metrics['failed_requests'],
            'requests_dropped_total': self.metrics['dropped_requests'],
            'queue_full_total': self.metrics['queue_full_count'],

            # Gauge metrics
            'queue_size_current': self.queue.qsize() if hasattr(self, 'queue') else 0,
            'queue_size_max': self.maxsize if hasattr(self, 'maxsize') else 0,
            'workers_active': len(self.workers) if hasattr(self, 'workers') else 0,

            # Rate metrics (per second)
            'requests_rate': total_rate,
            'requests_drop_rate': drop_rate,
            'requests_success_rate': success_rate,
            'requests_failure_rate': failure_rate,

            # Timing metrics
            'elapsed_time_seconds': elapsed_time,
            'uptime_seconds': elapsed_time,
            'memory_usage_mb': psutil.Process().memory_info().rss / 1024 / 1024
        }

class BoundedQueue:
    def __init__(self, maxsize: int, num_workers: int, endpoint: str, metrics_interval: float = 5.0):
        self.queue = asyncio.Queue(maxsize=maxsize)
        self.workers = []
        self.maxsize = maxsize
        self.num_workers = num_workers
        self.endpoint = endpoint
        self.metrics = MetricsCollector()
        self.running = True
        self.metrics_interval = metrics_interval
        self.metrics_task = None

    async def start(self):
        """Start the worker tasks and metrics reporter."""
        self.workers = [
            asyncio.create_task(self._worker(f"worker-{i}"))
            for i in range(self.num_workers)
        ]
        self.metrics_task = asyncio.create_task(self._report_metrics())
        logger.info(f"Started {self.num_workers} workers and metrics reporter")

    async def stop(self):
        """Stop all workers and wait for them to finish."""
        self.running = False
        if self.metrics_task:
            self.metrics_task.cancel()
        for worker in self.workers:
            worker.cancel()
        await asyncio.gather(*self.workers, return_exceptions=True)
        logger.info("All workers stopped")

    def put(self, topic: str, data: Dict[str, Any]) -> bool:
        """Try to put a request in the queue. Returns True if successful, False if queue is full."""
        self.metrics.increment('total_requests')
        try:
            payload = RequestPayload(topic=topic, data=data)
            self.queue.put_nowait(payload)
            return True
        except asyncio.QueueFull:
            self.metrics.increment('queue_full_count')
            self.metrics.increment('dropped_requests')
            return False

    async def _report_metrics(self):
        """Periodically report metrics."""
        while self.running:
            try:
                metrics = self.get_metrics()
                logger.info(f"Periodic metrics: {json.dumps(metrics, indent=2)}")
                await asyncio.sleep(self.metrics_interval)
            except asyncio.CancelledError:
                break
            except Exception:
                await asyncio.sleep(0.2)

    async def _worker(self, worker_name: str):
        """Worker that processes requests from the queue."""
        async with aiohttp.ClientSession() as session:
            while self.running:
                try:
                    try:
                        payload = self.queue.get_nowait()
                    except asyncio.QueueEmpty:
                        await asyncio.sleep(0.1)
                        continue

                    try:
                        request_data = {
                            'topic': payload.topic,
                            'data': payload.data,
                            'timestamp': payload.timestamp,
                            'request_id': payload.request_id
                        }

                        async with session.post(
                            self.endpoint,
                            json=request_data,
                            timeout=30
                        ) as response:
                            if response.status == 200:
                                self.metrics.increment('successful_requests')
                            else:
                                self.metrics.increment('failed_requests')
                    except Exception:
                        self.metrics.increment('failed_requests')
                    finally:
                        self.queue.task_done()

                except asyncio.CancelledError:
                    break

    def get_metrics(self) -> Dict[str, Any]:
        """Get current metrics."""
        return self.metrics.get_metrics()

async def main():
    queue = BoundedQueue(
        maxsize=1000,
        num_workers=5,
        endpoint="http://invalid-url-that-will-never-resolve.example.com/api/events",
        metrics_interval=5.0
    )

    await queue.start()

    try:
        for i in range(10000):
            topic = f"test-topic-{i % 5}"
            # Create a payload of approximately 0.1MB
            large_string = "x" * (100 * 1024)  # 100KB string
            data = {
                "value": i,
                "timestamp": time.time(),
                "metadata": {"source": "test"},
                "payload": large_string  # This will make the payload ~0.1MB
            }

            if not queue.put(topic, data):
                logger.warning(f"Failed to queue request {i}")

            await asyncio.sleep(0.001)

    finally:
        await queue.stop()
        metrics = queue.get_metrics()
        logger.info(f"Final metrics: {json.dumps(metrics, indent=2)}")

if __name__ == "__main__":
    asyncio.run(main())

Conclusion

In this post, we saw how async fire and forget patterns can quickly become a problem when the end system is down/restarting. We also saw a simple implementation of a bounded queue, that can be used to send requests to an endpoint. Sometimes, we can be too quick to fire and forget, and forget to consider the cost of doing so.

Go Top
comments powered by Disqus