1
|
import os
|
2
|
|
3
|
from prometheus_client import CONTENT_TYPE_LATEST, REGISTRY, CollectorRegistry, generate_latest
|
4
|
from prometheus_client.multiprocess import MultiProcessCollector
|
5
|
from starlette.requests import Request
|
6
|
from starlette.responses import Response
|
7
|
|
8
|
|
9
|
def metrics(request: Request) -> Response:
|
10
|
if "prometheus_multiproc_dir" in os.environ:
|
11
|
registry = CollectorRegistry()
|
12
|
MultiProcessCollector(registry)
|
13
|
else:
|
14
|
registry = REGISTRY
|
15
|
|
16
|
return Response(generate_latest(registry), media_type=CONTENT_TYPE_LATEST)
|