Skip to content

Tracer - Performance Monitoring

The Tracer module provides a simple Timer utility for measuring and logging code execution time. It works as both a decorator and a context manager, making it easy to monitor performance in any part of your application.

Overview

The Timer class offers:

  • Dual usage - Works as decorator or context manager
  • Automatic logging - Execution time logged automatically
  • Millisecond precision - Accurate timing measurements
  • Zero boilerplate - No manual time tracking needed
  • Logger integration - Uses the built-in Logger module

Basic Usage

As a Context Manager

Use with Timer() to measure a block of code:

1
2
3
4
5
import time
from tools.tracer import Timer

with Timer("database_query"):
    time.sleep(1)  # Simulate database query

Output:

1
2038-01-19 03:14:07,000 | DEBUG | database_query:__exit__:50 - executed in 1000.000000 ms

As a Decorator

Use @Timer() to measure function execution:

1
2
3
4
5
6
7
8
9
import time
from tools.tracer import Timer

@Timer("process_data")
def process_data(data):
    time.sleep(1)  # Simulate processing
    return data

result = process_data([1, 2, 3])

Output:

1
2038-01-19 03:14:07,000 | DEBUG | process_data:__exit__:50 - executed in 1000.000000 ms

Real-World Examples

API Endpoint Monitoring

Monitor API endpoint performance:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from fastapi import FastAPI
from tools.tracer import Timer

app = FastAPI()

@app.get("/users/{user_id}")
@Timer("get_user_endpoint")
async def get_user(user_id: int):
    with Timer("database_lookup"):
        user = await db.get_user(user_id)

    with Timer("user_serialization"):
        return user.dict()

Output:

1
2
3
2038-01-19 03:14:07,000 | DEBUG | database_lookup:__exit__:50 - executed in 45.123000 ms
2038-01-19 03:14:07,100 | DEBUG | user_serialization:__exit__:50 - executed in 2.456000 ms
2038-01-19 03:14:07,150 | DEBUG | get_user_endpoint:__exit__:50 - executed in 50.789000 ms

Data Processing Pipeline

Monitor each stage of a data pipeline:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from tools.tracer import Timer
from tools.logger import Logger

logger = Logger(__name__)

@Timer("full_pipeline")
def process_dataset(data):
    logger.info(f"Processing {len(data)} records")

    with Timer("data_validation"):
        validated = validate_data(data)

    with Timer("data_transformation"):
        transformed = transform_data(validated)

    with Timer("data_enrichment"):
        enriched = enrich_data(transformed)

    with Timer("data_storage"):
        save_data(enriched)

    logger.info("Pipeline complete")
    return enriched

Database Operations

Monitor individual database operations:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
from sqlalchemy.orm import Session
from tools.tracer import Timer

class UserRepository:
    def __init__(self, db: Session):
        self.db = db

    @Timer("user_create")
    def create_user(self, user_data: dict):
        user = User(**user_data)
        self.db.add(user)
        self.db.commit()
        return user

    @Timer("user_bulk_import")
    def import_users(self, users_data: list[dict]):
        with Timer("user_validation"):
            validated = [validate(u) for u in users_data]

        with Timer("user_db_insert"):
            self.db.bulk_insert_mappings(User, validated)
            self.db.commit()

File Processing

Track file I/O operations:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import json
from tools.tracer import Timer

@Timer("process_json_file")
def process_json_file(filepath: str):
    with Timer("file_read"):
        with open(filepath, 'r') as f:
            data = json.load(f)

    with Timer("data_processing"):
        processed = transform(data)

    with Timer("file_write"):
        with open(f"{filepath}.processed", 'w') as f:
            json.dump(processed, f)

    return processed

Nested Timers

You can nest timers to measure both overall and component timings:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from tools.tracer import Timer

@Timer("complete_analysis")
def analyze_data(dataset):
    with Timer("load_models"):
        model_a = load_model_a()
        model_b = load_model_b()

    with Timer("run_models"):
        with Timer("model_a_inference"):
            results_a = model_a.predict(dataset)

        with Timer("model_b_inference"):
            results_b = model_b.predict(dataset)

    with Timer("combine_results"):
        final = combine(results_a, results_b)

    return final

Output:

1
2
3
4
5
6
2038-01-19 03:14:07,000 | DEBUG | load_models:__exit__:50 - executed in 500.000000 ms
2038-01-19 03:14:07,550 | DEBUG | model_a_inference:__exit__:50 - executed in 120.000000 ms
2038-01-19 03:14:07,700 | DEBUG | model_b_inference:__exit__:50 - executed in 130.000000 ms
2038-01-19 03:14:07,850 | DEBUG | run_models:__exit__:50 - executed in 250.000000 ms
2038-01-19 03:14:07,900 | DEBUG | combine_results:__exit__:50 - executed in 50.000000 ms
2038-01-19 03:14:07,950 | DEBUG | complete_analysis:__exit__:50 - executed in 800.000000 ms

Integration with Logging

The Timer automatically uses the Logger module. You can combine them for comprehensive monitoring:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from tools.logger import Logger
from tools.tracer import Timer

logger = Logger(__name__)

@Timer("expensive_operation")
def expensive_operation(items: list):
    logger.info(f"Starting operation with {len(items)} items")

    with Timer("preprocessing"):
        preprocessed = preprocess(items)
        logger.debug(f"Preprocessed {len(preprocessed)} items")

    with Timer("main_processing"):
        results = process(preprocessed)
        logger.debug(f"Processed into {len(results)} results")

    logger.info("Operation complete")
    return results

Best Practices

1. Meaningful Timer Names

Use descriptive names that clearly indicate what's being measured:

1
2
3
4
5
6
7
# Good
with Timer("database_user_query"):
    user = db.query(User).filter_by(id=user_id).first()

# Less useful
with Timer("query"):
    user = db.query(User).filter_by(id=user_id).first()

2. Measure at Appropriate Granularity

Don't time trivial operations - focus on operations that matter:

1
2
3
4
5
6
7
8
9
# Good - measures significant operations
@Timer("process_large_dataset")
def process_dataset(data):
    return [transform(item) for item in data]

# Too granular - overhead not worth it
for item in data:
    with Timer("process_single_item"):  # Too fine-grained
        process(item)

3. Use Consistent Naming

Establish naming conventions for different operation types:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# Database operations
with Timer("db_query_users"):
    users = db.query(User).all()

# API calls
with Timer("api_call_external_service"):
    response = requests.get(url)

# File operations
with Timer("file_read_config"):
    config = load_config()

4. Combine with Monitoring

Use Timer data for performance monitoring and optimization:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
from tools.tracer import Timer
from tools.logger import Logger

logger = Logger(__name__)

SLOW_QUERY_THRESHOLD_MS = 1000

@Timer("database_query")
def query_users(filters):
    start = time.time()

    with Timer("db_execution"):
        results = db.query(User).filter(**filters).all()

    duration_ms = (time.time() - start) * 1000

    if duration_ms > SLOW_QUERY_THRESHOLD_MS:
        logger.warning(f"Slow query detected: {duration_ms:.2f}ms")

    return results

Implementation Details

The Timer class is built using Python's ContextDecorator, which allows it to work as both a context manager and a decorator.

How It Works

  1. __enter__: Records start time
  2. Code execution: Your code runs
  3. __exit__: Calculates duration and logs it

The duration is automatically logged using the Logger module at DEBUG level.

Advanced Usage

Custom Timer Subclass

Extend Timer to add custom behavior:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from tools.tracer import Timer
from tools.logger import Logger

class MetricsTimer(Timer):
    """Timer that also records metrics to a metrics system."""

    def __exit__(self, *exc):
        super().__exit__(*exc)

        # Record to metrics system
        metrics.record_timing(self.name, self._duration * 1000)

# Usage
with MetricsTimer("api_call"):
    response = call_api()

Conditional Timing

Enable timing based on configuration:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from contextlib import nullcontext
from tools.config import Settings
from tools.tracer import Timer

settings = Settings()

def maybe_timer(name: str):
    """Return Timer if profiling is enabled, else no-op."""
    if settings.ENABLE_PROFILING:
        return Timer(name)
    return nullcontext()

# Usage
with maybe_timer("optional_timing"):
    process_data()

Testing

Test code with timers by checking log output:

def test_timer_logging(caplog): with caplog.at_level(logging.DEBUG): with Timer("test_operation"): pass # Instant operation

1
2
3
4
5
6
7
assert "test_operation" in caplog.text
assert "executed in" in caplog.text
assert "ms" in caplog.text

assert "test_operation" in cm.output[0]
assert "executed in" in cm.output[0]
assert "ms" in cm.output[0]

```