MongoDB: 유저별 독립 커넥션 전략 완벽 가이드
작성일: 2025-11-04
버전: 1.0
목적: 유저별 별도 데이터베이스 + 독립 커넥션으로 동시성 문제 해결
목차
1. 문제 정의
1.1 시나리오
유저별로 별도의 데이터베이스 사용
- User A → user_a_db
- User B → user_b_db
- User C → user_c_db
...
- User N → user_n_db (N = 수천~수만)
1.2 단일 연결 방식의 문제점
# ❌ 단일 연결 + useDb() 방식
client = MongoClient("mongodb://localhost:27017/")
# 요청 1: User A
db_a = client['user_a_db']
db_a.posts.insert_one({...}) # 연결 점유
# 요청 2: User B (동시에 발생)
db_b = client['user_b_db']
db_b.posts.insert_one({...}) # 같은 연결 사용 → 대기!
# 요청 3, 4, 5... (동시 요청 증가)
# → 연결 풀이 고갈되고 병목 발생
문제점:
- 동시성 제한: 하나의 연결에 여러 DB 요청이 순차 처리
- 연결 풀 경쟁: 모든 유저가 같은 연결 풀 공유
- 성능 저하: 유저 수가 증가하면 대기 시간 증가
- 불공평한 리소스 배분: 특정 유저의 무거운 쿼리가 다른 유저 영향
1.3 해결 방안: 유저별 독립 커넥션
# ✅ 유저별 독립 연결 방식
connections = {
'user_a': MongoClient("mongodb://localhost:27017/", maxPoolSize=10),
'user_b': MongoClient("mongodb://localhost:27017/", maxPoolSize=10),
'user_c': MongoClient("mongodb://localhost:27017/", maxPoolSize=10),
}
# 요청 1: User A
client_a = connections['user_a']
db_a = client_a['user_a_db']
db_a.posts.insert_one({...}) # 독립 연결 풀 사용
# 요청 2: User B (동시 발생)
client_b = connections['user_b']
db_b = client_b['user_b_db']
db_b.posts.insert_one({...}) # 독립 연결 풀 사용 → 대기 없음!
장점:
- ✅ 유저 간 완전한 격리
- ✅ 동시성 극대화
- ✅ 특정 유저의 부하가 다른 유저에게 영향 없음
- ✅ 유저별 연결 풀 크기 조정 가능
2. 연결 방식 비교
2.1 방식 1: 단일 연결 + useDb() ❌
"""
단일 MongoClient로 여러 DB 접근
"""
client = MongoClient("mongodb://localhost:27017/", maxPoolSize=100)
def get_user_db(user_id: str):
return client[f'user_{user_id}_db']
# 모든 유저가 같은 연결 풀(100개) 공유
db_a = get_user_db('user_a')
db_b = get_user_db('user_b')
장점:
- 구현 간단
- 연결 수 적음 (1개 클라이언트)
- 메모리 사용 적음
단점:
- ❌ 동시성 제한 (연결 풀 100개를 모든 유저가 공유)
- ❌ 유저 간 간섭 (한 유저의 슬로우 쿼리가 전체 영향)
- ❌ 확장성 제한
- ❌ 유저별 QoS(Quality of Service) 불가능
적합한 경우:
- 유저 수 < 10
- 동시 접속 < 50
- 트래픽 예측 가능
- 단순한 애플리케이션
2.2 방식 2: 유저별 독립 연결 ✅
"""
유저별 독립 MongoClient 생성
"""
class UserConnectionManager:
def __init__(self):
self.connections = {}
self.lock = threading.RLock()
def get_client(self, user_id: str) -> MongoClient:
"""유저별 독립 클라이언트 반환"""
with self.lock:
if user_id not in self.connections:
self.connections[user_id] = MongoClient(
"mongodb://localhost:27017/",
maxPoolSize=10 # 유저당 10개 연결
)
return self.connections[user_id]
def get_database(self, user_id: str):
"""유저 DB 반환"""
client = self.get_client(user_id)
return client[f'user_{user_id}_db']
# 각 유저가 독립 연결 풀(10개) 사용
manager = UserConnectionManager()
db_a = manager.get_database('user_a') # 독립 연결 풀
db_b = manager.get_database('user_b') # 독립 연결 풀
장점:
- ✅ 유저 간 완전한 격리
- ✅ 동시성 극대화
- ✅ 유저별 QoS 가능 (Premium 유저는 더 큰 풀)
- ✅ 특정 유저 장애 격리
- ✅ 무한 확장 가능
단점:
- ⚠️ 연결 수 증가 (유저당 N개)
- ⚠️ 메모리 사용 증가
- ⚠️ 관리 복잡도 증가
적합한 경우:
- 유저 수 > 10
- 동시 접속 > 50
- 높은 동시성 요구
- 유저별 SLA 보장 필요
2.3 방식 3: 하이브리드 (그룹별 연결) 🔀
"""
유저를 그룹으로 묶어서 그룹별 연결
"""
def get_user_group(user_id: str) -> str:
"""해시로 유저 그룹 결정"""
return f"group_{hash(user_id) % 10}" # 10개 그룹
class GroupConnectionManager:
def __init__(self):
self.connections = {}
# 10개 그룹 = 10개 클라이언트
for i in range(10):
self.connections[f'group_{i}'] = MongoClient(
"mongodb://localhost:27017/",
maxPoolSize=50 # 그룹당 50개
)
def get_database(self, user_id: str):
group = get_user_group(user_id)
client = self.connections[group]
return client[f'user_{user_id}_db']
# 유저를 10개 그룹으로 분산
# 총 연결 수 = 10 * 50 = 500
장점:
- ✅ 적당한 격리 수준
- ✅ 연결 수 제한 (그룹 수로 제한)
- ✅ 관리 가능한 복잡도
단점:
- ⚠️ 같은 그룹 내 유저는 간섭 가능
- ⚠️ 그룹 불균형 시 문제
적합한 경우:
- 중간 규모 (100-1000 유저)
- 연결 수 제한 필요
- 완벽한 격리는 불필요
2.4 성능 비교
시나리오: 1000명 동시 접속, 각자 1초간 쿼리
┌─────────────────────┬──────────────┬──────────────┬──────────────┐
│ 방식 │ 연결 수 │ 처리 시간 │ 격리 수준 │
├─────────────────────┼──────────────┼──────────────┼──────────────┤
│ 단일 연결 │ 100 │ ~10초 │ ⭐ │
│ (maxPoolSize=100) │ │ (순차 처리) │ │
├─────────────────────┼──────────────┼──────────────┼──────────────┤
│ 그룹별 연결 │ 500 │ ~2초 │ ⭐⭐⭐ │
│ (10 그룹 * 50) │ │ │ │
├─────────────────────┼──────────────┼──────────────┼──────────────┤
│ 유저별 연결 │ 10,000 │ ~1초 │ ⭐⭐⭐⭐⭐ │
│ (1000 유저 * 10) │ │ (병렬 처리) │ │
└─────────────────────┴──────────────┴──────────────┴──────────────┘
3. 유저별 독립 커넥션 구현
3.1 기본 구현 (Python)
"""
유저별 독립 연결 관리자 - 프로덕션 수준
Copyright (c) 2024 VCANUS
"""
import threading
import weakref
import time
from typing import Dict, Optional
from pymongo import MongoClient
from pymongo.database import Database
import logging
logger = logging.getLogger(__name__)
class UserConnectionManager:
"""
유저별 독립 MongoDB 연결 관리
특징:
- 유저별 독립 MongoClient 생성
- 연결 풀 자동 관리
- Thread-safe
- 자동 정리 (idle timeout)
- 메모리 효율적 (weak reference 옵션)
"""
def __init__(
self,
connection_string: str,
user_pool_size: int = 10,
idle_timeout: int = 3600, # 1시간
use_weak_ref: bool = False,
**mongo_options
):
"""
Args:
connection_string: MongoDB 연결 문자열
user_pool_size: 유저당 연결 풀 크기
idle_timeout: 유휴 연결 타임아웃 (초)
use_weak_ref: WeakValueDictionary 사용 여부
**mongo_options: MongoClient 추가 옵션
"""
self.connection_string = connection_string
self.user_pool_size = user_pool_size
self.idle_timeout = idle_timeout
# 연결 저장소
if use_weak_ref:
# 약한 참조 - 사용 안할 때 자동 GC
self._clients: Dict[str, MongoClient] = weakref.WeakValueDictionary()
else:
# 강한 참조 - 명시적 제거 전까지 유지
self._clients: Dict[str, MongoClient] = {}
# 마지막 접근 시간 추적
self._last_access: Dict[str, float] = {}
# Thread-safe 락
self._lock = threading.RLock()
# 기본 MongoDB 옵션
self.mongo_options = {
'maxPoolSize': user_pool_size,
'minPoolSize': 1,
'maxIdleTimeMS': idle_timeout * 1000,
'connectTimeoutMS': 5000,
'serverSelectionTimeoutMS': 5000,
**mongo_options
}
# 정리 스레드 시작
self._cleanup_thread = threading.Thread(
target=self._cleanup_idle_connections,
daemon=True
)
self._cleanup_thread.start()
logger.info(
f"UserConnectionManager initialized: "
f"pool_size={user_pool_size}, idle_timeout={idle_timeout}s"
)
def get_client(self, user_id: str) -> MongoClient:
"""
유저의 독립 MongoClient 반환
없으면 새로 생성, 있으면 재사용
Args:
user_id: 유저 ID
Returns:
MongoClient 인스턴스
"""
with self._lock:
# 접근 시간 업데이트
self._last_access[user_id] = time.time()
# 기존 클라이언트 확인
if user_id in self._clients:
client = self._clients[user_id]
# 연결 상태 확인
try:
client.admin.command('ping')
logger.debug(f"Reusing connection for user: {user_id}")
return client
except Exception as e:
logger.warning(f"Dead connection for {user_id}, recreating: {e}")
# 죽은 연결 제거
del self._clients[user_id]
# 새 클라이언트 생성
logger.info(f"Creating new connection for user: {user_id}")
client = MongoClient(
self.connection_string,
**self.mongo_options
)
self._clients[user_id] = client
return client
def get_database(self, user_id: str, db_name: Optional[str] = None) -> Database:
"""
유저의 데이터베이스 반환
Args:
user_id: 유저 ID
db_name: 데이터베이스 이름 (기본값: user_{user_id}_db)
Returns:
Database 인스턴스
"""
client = self.get_client(user_id)
if db_name is None:
db_name = f"user_{user_id}_db"
return client[db_name]
def close_user_connection(self, user_id: str) -> bool:
"""
특정 유저의 연결 종료
Args:
user_id: 유저 ID
Returns:
성공 여부
"""
with self._lock:
if user_id in self._clients:
try:
self._clients[user_id].close()
del self._clients[user_id]
if user_id in self._last_access:
del self._last_access[user_id]
logger.info(f"Closed connection for user: {user_id}")
return True
except Exception as e:
logger.error(f"Failed to close connection for {user_id}: {e}")
return False
return False
def _cleanup_idle_connections(self):
"""
유휴 연결 자동 정리 (백그라운드 스레드)
"""
while True:
time.sleep(60) # 1분마다 체크
try:
with self._lock:
now = time.time()
idle_users = []
for user_id, last_access in self._last_access.items():
if now - last_access > self.idle_timeout:
idle_users.append(user_id)
for user_id in idle_users:
logger.info(f"Closing idle connection for user: {user_id}")
self.close_user_connection(user_id)
except Exception as e:
logger.error(f"Error in cleanup thread: {e}")
def get_stats(self) -> dict:
"""
연결 통계 조회
Returns:
통계 정보
"""
with self._lock:
return {
'total_connections': len(self._clients),
'user_pool_size': self.user_pool_size,
'idle_timeout': self.idle_timeout,
'active_users': list(self._clients.keys()),
'total_potential_connections': len(self._clients) * self.user_pool_size
}
def close_all(self):
"""모든 연결 종료"""
with self._lock:
logger.info(f"Closing all {len(self._clients)} connections...")
for user_id, client in list(self._clients.items()):
try:
client.close()
except Exception as e:
logger.error(f"Error closing connection for {user_id}: {e}")
self._clients.clear()
self._last_access.clear()
logger.info("All connections closed")
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close_all()
# ============= 사용 예시 =============
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
# 매니저 초기화
manager = UserConnectionManager(
connection_string="mongodb://localhost:27017/",
user_pool_size=10, # 유저당 10개 연결
idle_timeout=3600 # 1시간 유휴 시 자동 종료
)
# User A - 독립 연결
db_a = manager.get_database('user_a')
db_a.posts.insert_one({
'title': 'Hello from User A',
'content': 'First post'
})
# User B - 독립 연결 (User A와 간섭 없음)
db_b = manager.get_database('user_b')
db_b.posts.insert_one({
'title': 'Hello from User B',
'content': 'First post'
})
# User C - 독립 연결
db_c = manager.get_database('user_c')
db_c.posts.insert_one({
'title': 'Hello from User C',
'content': 'First post'
})
# 통계 확인
stats = manager.get_stats()
print(f"\nConnection Stats:")
print(f" Total connections: {stats['total_connections']}")
print(f" Active users: {stats['active_users']}")
print(f" Potential connections: {stats['total_potential_connections']}")
# 특정 유저 연결 종료
manager.close_user_connection('user_a')
# 모든 연결 종료
manager.close_all()
3.2 Flask 통합
"""
Flask 애플리케이션과 통합
"""
from flask import Flask, g, request, jsonify
from functools import wraps
app = Flask(__name__)
# 전역 매니저
connection_manager = UserConnectionManager(
connection_string="mongodb://localhost:27017/",
user_pool_size=10,
idle_timeout=3600
)
def get_user_id_from_request() -> str:
"""
요청에서 유저 ID 추출
우선순위:
1. JWT 토큰
2. X-User-ID 헤더
3. 쿼리 파라미터
"""
# 방법 1: JWT에서 추출
auth_header = request.headers.get('Authorization')
if auth_header and auth_header.startswith('Bearer '):
token = auth_header[7:]
# payload = decode_jwt(token)
# return payload['user_id']
# 방법 2: 헤더
user_id = request.headers.get('X-User-ID')
if user_id:
return user_id
# 방법 3: 쿼리 파라미터
user_id = request.args.get('user_id')
if user_id:
return user_id
raise ValueError("User ID not found in request")
def require_user_db(f):
"""
유저 DB 주입 데코레이터
각 유저마다 독립 연결 사용
"""
@wraps(f)
def decorated_function(*args, **kwargs):
try:
# 유저 ID 추출
user_id = get_user_id_from_request()
# 독립 연결로 DB 가져오기
g.user_id = user_id
g.db = connection_manager.get_database(user_id)
return f(*args, **kwargs)
except ValueError as e:
return jsonify({'error': str(e)}), 400
except Exception as e:
logger.error(f"Database error: {e}", exc_info=True)
return jsonify({'error': 'Internal server error'}), 500
return decorated_function
# ============= API 엔드포인트 =============
@app.route('/api/posts', methods=['GET'])
@require_user_db
def get_posts():
"""포스트 목록 조회"""
posts = list(g.db.posts.find({}, {'_id': 0}).limit(20))
return jsonify({
'user_id': g.user_id,
'posts': posts
})
@app.route('/api/posts', methods=['POST'])
@require_user_db
def create_post():
"""포스트 생성"""
data = request.get_json()
# 유저의 독립 DB에 저장
result = g.db.posts.insert_one(data)
return jsonify({
'user_id': g.user_id,
'post_id': str(result.inserted_id),
'message': 'Post created successfully'
}), 201
@app.route('/api/posts/<post_id>', methods=['GET'])
@require_user_db
def get_post(post_id):
"""포스트 조회"""
from bson import ObjectId
post = g.db.posts.find_one({'_id': ObjectId(post_id)})
if not post:
return jsonify({'error': 'Post not found'}), 404
post['_id'] = str(post['_id'])
return jsonify({
'user_id': g.user_id,
'post': post
})
# ============= 관리자 엔드포인트 =============
@app.route('/admin/connections', methods=['GET'])
def get_connection_stats():
"""연결 통계"""
stats = connection_manager.get_stats()
return jsonify(stats)
@app.route('/admin/connections/<user_id>', methods=['DELETE'])
def close_user_connection(user_id):
"""특정 유저 연결 종료"""
success = connection_manager.close_user_connection(user_id)
if success:
return jsonify({'message': f'Connection closed for {user_id}'})
else:
return jsonify({'error': f'No connection found for {user_id}'}), 404
if __name__ == '__main__':
app.run(debug=True)
3.3 FastAPI 통합
"""
FastAPI 애플리케이션과 통합
"""
from fastapi import FastAPI, Depends, Header, HTTPException
from typing import Optional
from pydantic import BaseModel
app = FastAPI()
# 전역 매니저
connection_manager = UserConnectionManager(
connection_string="mongodb://localhost:27017/",
user_pool_size=10
)
# ============= Pydantic 모델 =============
class Post(BaseModel):
title: str
content: str
tags: Optional[list] = []
# ============= Dependency =============
async def get_user_database(
x_user_id: str = Header(..., description="User ID")
):
"""
유저 독립 DB 의존성
각 유저마다 독립 연결 사용
"""
if not x_user_id:
raise HTTPException(
status_code=400,
detail="X-User-ID header is required"
)
# 독립 연결로 DB 반환
db = connection_manager.get_database(x_user_id)
return {
'user_id': x_user_id,
'db': db
}
# ============= API 엔드포인트 =============
@app.get("/api/posts")
async def get_posts(
user_data = Depends(get_user_database),
limit: int = 20
):
"""포스트 목록 조회"""
posts = list(
user_data['db'].posts
.find({}, {'_id': 0})
.limit(limit)
)
return {
'user_id': user_data['user_id'],
'posts': posts
}
@app.post("/api/posts", status_code=201)
async def create_post(
post: Post,
user_data = Depends(get_user_database)
):
"""포스트 생성"""
result = user_data['db'].posts.insert_one(post.dict())
return {
'user_id': user_data['user_id'],
'post_id': str(result.inserted_id),
'message': 'Post created successfully'
}
@app.get("/api/posts/{post_id}")
async def get_post(
post_id: str,
user_data = Depends(get_user_database)
):
"""포스트 조회"""
from bson import ObjectId
post = user_data['db'].posts.find_one({'_id': ObjectId(post_id)})
if not post:
raise HTTPException(status_code=404, detail="Post not found")
post['_id'] = str(post['_id'])
return {
'user_id': user_data['user_id'],
'post': post
}
# ============= 관리자 엔드포인트 =============
@app.get("/admin/connections")
async def get_connection_stats():
"""연결 통계"""
return connection_manager.get_stats()
@app.delete("/admin/connections/{user_id}")
async def close_user_connection(user_id: str):
"""특정 유저 연결 종료"""
success = connection_manager.close_user_connection(user_id)
if success:
return {'message': f'Connection closed for {user_id}'}
else:
raise HTTPException(
status_code=404,
detail=f'No connection found for {user_id}'
)
4. 연결 풀 전략
4.1 풀 크기 결정
"""
유저별 연결 풀 크기 계산
"""
def calculate_pool_size(
concurrent_users: int,
requests_per_user: int = 5,
available_connections: int = 10000
) -> tuple:
"""
적절한 풀 크기 계산
Args:
concurrent_users: 동시 접속 유저 수
requests_per_user: 유저당 동시 요청 수
available_connections: 서버에서 허용하는 최대 연결 수
Returns:
(user_pool_size, max_users)
"""
# 1. 이상적인 유저당 풀 크기
ideal_pool_size = requests_per_user
# 2. 서버 제약 고려
max_users_with_ideal = available_connections // ideal_pool_size
if max_users_with_ideal >= concurrent_users:
# 충분한 연결 수
return (ideal_pool_size, concurrent_users)
else:
# 연결 수 부족 - 풀 크기 줄임
adjusted_pool_size = available_connections // concurrent_users
return (adjusted_pool_size, concurrent_users)
# 예시
concurrent_users = 1000
pool_size, max_users = calculate_pool_size(
concurrent_users=concurrent_users,
requests_per_user=5,
available_connections=10000
)
print(f"Recommended pool size: {pool_size}")
print(f"Max users supported: {max_users}")
print(f"Total connections: {pool_size * max_users}")
# 출력:
# Recommended pool size: 5
# Max users supported: 1000
# Total connections: 5000
4.2 동적 풀 크기 조정
"""
유저 등급별 다른 풀 크기
"""
class TieredUserConnectionManager(UserConnectionManager):
"""
등급별 차등 연결 풀
"""
def __init__(self, connection_string: str, **options):
super().__init__(connection_string, **options)
# 등급별 풀 크기
self.tier_pool_sizes = {
'free': 3,
'basic': 5,
'premium': 10,
'enterprise': 20
}
def get_user_tier(self, user_id: str) -> str:
"""
유저 등급 조회 (실제로는 DB에서 가져와야 함)
"""
# TODO: 실제 구현에서는 DB 조회
# 여기서는 예시로 user_id 기반 반환
if user_id.startswith('ent_'):
return 'enterprise'
elif user_id.startswith('pre_'):
return 'premium'
elif user_id.startswith('bas_'):
return 'basic'
else:
return 'free'
def get_client(self, user_id: str) -> MongoClient:
"""등급에 맞는 풀 크기로 클라이언트 생성"""
with self._lock:
if user_id in self._clients:
return self._clients[user_id]
# 유저 등급 확인
tier = self.get_user_tier(user_id)
pool_size = self.tier_pool_sizes.get(tier, 5)
logger.info(
f"Creating connection for user {user_id} "
f"(tier: {tier}, pool_size: {pool_size})"
)
# 등급별 풀 크기로 생성
client = MongoClient(
self.connection_string,
maxPoolSize=pool_size,
minPoolSize=1
)
self._clients[user_id] = client
self._last_access[user_id] = time.time()
return client
# 사용
manager = TieredUserConnectionManager(
connection_string="mongodb://localhost:27017/"
)
# Free tier: 3개 연결
db_free = manager.get_database('user_123')
# Premium tier: 10개 연결
db_premium = manager.get_database('pre_user_456')
# Enterprise tier: 20개 연결
db_enterprise = manager.get_database('ent_user_789')
4.3 연결 풀 모니터링
"""
연결 풀 상태 모니터링
"""
class MonitoredUserConnectionManager(UserConnectionManager):
"""
모니터링 기능 추가
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 메트릭 저장
self.metrics = {
'total_connections_created': 0,
'total_connections_closed': 0,
'connection_errors': 0,
'slow_queries': 0
}
def get_client(self, user_id: str) -> MongoClient:
"""연결 생성 추적"""
with self._lock:
if user_id not in self._clients:
self.metrics['total_connections_created'] += 1
return super().get_client(user_id)
def close_user_connection(self, user_id: str) -> bool:
"""연결 종료 추적"""
result = super().close_user_connection(user_id)
if result:
self.metrics['total_connections_closed'] += 1
return result
def get_pool_stats(self, user_id: str) -> dict:
"""
특정 유저의 연결 풀 통계
"""
try:
client = self._clients.get(user_id)
if not client:
return {'error': 'No connection found'}
# MongoDB 서버 상태 조회
server_status = client.admin.command('serverStatus')
return {
'user_id': user_id,
'connections': server_status.get('connections', {}),
'pool_size': self.user_pool_size,
'last_access': self._last_access.get(user_id, 0)
}
except Exception as e:
logger.error(f"Failed to get pool stats: {e}")
return {'error': str(e)}
def get_all_metrics(self) -> dict:
"""전체 메트릭 조회"""
stats = self.get_stats()
return {
**stats,
**self.metrics,
'avg_connections_per_user': (
stats['total_potential_connections'] / stats['total_connections']
if stats['total_connections'] > 0
else 0
)
}
5. 동시성 처리
5.1 동시 요청 처리 테스트
"""
동시성 테스트: 단일 연결 vs 독립 연결
"""
import concurrent.futures
import time
from pymongo import MongoClient
# ============= 테스트 1: 단일 연결 =============
def test_single_connection():
"""단일 MongoClient로 여러 유저 처리"""
client = MongoClient("mongodb://localhost:27017/", maxPoolSize=100)
def process_user(user_id: int):
db = client[f'user_{user_id}_db']
start = time.time()
# 시뮬레이션: 1초 걸리는 쿼리
db.posts.insert_one({'user_id': user_id, 'timestamp': start})
time.sleep(0.1) # 쿼리 시뮬레이션
return time.time() - start
# 100명 동시 처리
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
futures = [executor.submit(process_user, i) for i in range(100)]
results = [f.result() for f in concurrent.futures.as_completed(futures)]
total_time = time.time() - start_time
print(f"\n=== Single Connection ===")
print(f"Total time: {total_time:.2f}s")
print(f"Avg per user: {sum(results)/len(results):.2f}s")
print(f"Max wait: {max(results):.2f}s")
client.close()
# ============= 테스트 2: 독립 연결 =============
def test_independent_connections():
"""유저별 독립 MongoClient"""
manager = UserConnectionManager(
connection_string="mongodb://localhost:27017/",
user_pool_size=10
)
def process_user(user_id: int):
db = manager.get_database(f'user_{user_id}')
start = time.time()
# 시뮬레이션: 1초 걸리는 쿼리
db.posts.insert_one({'user_id': user_id, 'timestamp': start})
time.sleep(0.1)
return time.time() - start
# 100명 동시 처리
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
futures = [executor.submit(process_user, i) for i in range(100)]
results = [f.result() for f in concurrent.futures.as_completed(futures)]
total_time = time.time() - start_time
print(f"\n=== Independent Connections ===")
print(f"Total time: {total_time:.2f}s")
print(f"Avg per user: {sum(results)/len(results):.2f}s")
print(f"Max wait: {max(results):.2f}s")
manager.close_all()
# 실행
if __name__ == "__main__":
print("Running concurrency tests...")
test_single_connection()
test_independent_connections()
# 예상 결과:
# Single Connection - Total time: ~2-3s (병목 발생)
# Independent Connections - Total time: ~1s (병렬 처리)
5.2 부하 테스트 시나리오
"""
실전 부하 테스트
"""
import asyncio
import aiohttp
import time
async def load_test(
base_url: str,
num_users: int,
requests_per_user: int
):
"""
부하 테스트
Args:
base_url: API 서버 URL
num_users: 동시 유저 수
requests_per_user: 유저당 요청 수
"""
async def user_requests(session, user_id):
"""단일 유저의 요청들"""
headers = {'X-User-ID': f'user_{user_id}'}
timings = []
for _ in range(requests_per_user):
start = time.time()
async with session.post(
f'{base_url}/api/posts',
json={'title': f'Post from user {user_id}'},
headers=headers
) as response:
await response.json()
timings.append(time.time() - start)
return timings
# 모든 유저 동시 실행
start_time = time.time()
async with aiohttp.ClientSession() as session:
tasks = [
user_requests(session, user_id)
for user_id in range(num_users)
]
all_timings = await asyncio.gather(*tasks)
total_time = time.time() - start_time
# 통계
flat_timings = [t for timings in all_timings for t in timings]
print(f"\n=== Load Test Results ===")
print(f"Total users: {num_users}")
print(f"Requests per user: {requests_per_user}")
print(f"Total requests: {len(flat_timings)}")
print(f"Total time: {total_time:.2f}s")
print(f"Requests/sec: {len(flat_timings)/total_time:.2f}")
print(f"Avg response time: {sum(flat_timings)/len(flat_timings):.3f}s")
print(f"Min response time: {min(flat_timings):.3f}s")
print(f"Max response time: {max(flat_timings):.3f}s")
# 백분위수
sorted_timings = sorted(flat_timings)
p50 = sorted_timings[len(sorted_timings) // 2]
p95 = sorted_timings[int(len(sorted_timings) * 0.95)]
p99 = sorted_timings[int(len(sorted_timings) * 0.99)]
print(f"P50: {p50:.3f}s")
print(f"P95: {p95:.3f}s")
print(f"P99: {p99:.3f}s")
# 실행
if __name__ == "__main__":
asyncio.run(load_test(
base_url="http://localhost:5000",
num_users=100,
requests_per_user=10
))
6. 성능 최적화
6.1 연결 재사용 최대화
"""
연결 재사용 전략
"""
class OptimizedConnectionManager(UserConnectionManager):
"""
최적화된 연결 관리
"""
def __init__(self, *args, **kwargs):
# 재사용 카운터
self.reuse_count = {}
super().__init__(*args, **kwargs)
def get_client(self, user_id: str) -> MongoClient:
"""재사용 추적"""
client = super().get_client(user_id)
# 재사용 카운트
with self._lock:
if user_id not in self.reuse_count:
self.reuse_count[user_id] = 0
else:
self.reuse_count[user_id] += 1
return client
def get_reuse_stats(self) -> dict:
"""재사용 통계"""
with self._lock:
total_reuses = sum(self.reuse_count.values())
total_connections = len(self.reuse_count)
return {
'total_connections': total_connections,
'total_reuses': total_reuses,
'avg_reuses_per_connection': (
total_reuses / total_connections
if total_connections > 0
else 0
),
'reuse_rate': (
total_reuses / (total_reuses + total_connections)
if (total_reuses + total_connections) > 0
else 0
)
}
6.2 메모리 최적화
"""
메모리 사용 최적화
"""
import sys
import gc
class MemoryEfficientConnectionManager(UserConnectionManager):
"""
메모리 효율적인 연결 관리
"""
def __init__(
self,
connection_string: str,
max_connections: int = 100, # 최대 연결 수 제한
**kwargs
):
self.max_connections = max_connections
super().__init__(connection_string, **kwargs)
def get_client(self, user_id: str) -> MongoClient:
"""최대 연결 수 제한"""
with self._lock:
# 연결 수 제한 체크
if (user_id not in self._clients and
len(self._clients) >= self.max_connections):
# LRU: 가장 오래 사용 안한 연결 제거
oldest_user = min(
self._last_access.items(),
key=lambda x: x[1]
)[0]
logger.info(
f"Max connections reached. "
f"Closing LRU connection: {oldest_user}"
)
self.close_user_connection(oldest_user)
# 강제 GC
gc.collect()
return super().get_client(user_id)
def get_memory_usage(self) -> dict:
"""메모리 사용량 조회"""
import psutil
import os
process = psutil.Process(os.getpid())
mem_info = process.memory_info()
return {
'rss_mb': mem_info.rss / 1024 / 1024,
'vms_mb': mem_info.vms / 1024 / 1024,
'connections': len(self._clients),
'avg_memory_per_connection': (
mem_info.rss / len(self._clients) / 1024 / 1024
if len(self._clients) > 0
else 0
)
}
6.3 캐싱 전략
"""
쿼리 결과 캐싱
"""
from functools import lru_cache
from datetime import datetime, timedelta
class CachedDatabaseWrapper:
"""
DB 래퍼 with 캐싱
"""
def __init__(self, db, cache_ttl: int = 300):
self.db = db
self.cache_ttl = cache_ttl
self._cache = {}
self._cache_times = {}
def find_one_cached(self, collection: str, query: dict):
"""캐시된 find_one"""
cache_key = f"{collection}:{str(query)}"
# 캐시 확인
if cache_key in self._cache:
cache_time = self._cache_times[cache_key]
if datetime.now() - cache_time < timedelta(seconds=self.cache_ttl):
return self._cache[cache_key]
# 캐시 미스 - DB 조회
result = self.db[collection].find_one(query)
# 캐싱
self._cache[cache_key] = result
self._cache_times[cache_key] = datetime.now()
return result
def invalidate_cache(self, collection: str = None):
"""캐시 무효화"""
if collection:
# 특정 컬렉션만
keys_to_remove = [
k for k in self._cache.keys()
if k.startswith(f"{collection}:")
]
for key in keys_to_remove:
del self._cache[key]
del self._cache_times[key]
else:
# 전체 캐시
self._cache.clear()
self._cache_times.clear()
7. 실전 예제
7.1 블로그 플랫폼
"""
유저별 독립 블로그 DB
"""
class BlogPlatform:
"""
멀티 유저 블로그 플랫폼
각 유저마다 독립 DB 사용
"""
def __init__(self):
self.connection_manager = UserConnectionManager(
connection_string="mongodb://localhost:27017/",
user_pool_size=10,
idle_timeout=3600
)
def create_post(self, user_id: str, title: str, content: str):
"""포스트 생성"""
db = self.connection_manager.get_database(user_id)
post = {
'title': title,
'content': content,
'created_at': datetime.now(),
'updated_at': datetime.now(),
'views': 0,
'likes': 0
}
result = db.posts.insert_one(post)
return str(result.inserted_id)
def get_user_posts(self, user_id: str, page: int = 1, page_size: int = 10):
"""유저의 포스트 목록"""
db = self.connection_manager.get_database(user_id)
skip = (page - 1) * page_size
posts = list(
db.posts
.find({}, {'_id': 0})
.sort('created_at', -1)
.skip(skip)
.limit(page_size)
)
return posts
def increment_views(self, user_id: str, post_id: str):
"""조회수 증가"""
from bson import ObjectId
db = self.connection_manager.get_database(user_id)
db.posts.update_one(
{'_id': ObjectId(post_id)},
{'$inc': {'views': 1}}
)
def get_stats(self, user_id: str):
"""유저 통계"""
db = self.connection_manager.get_database(user_id)
return {
'total_posts': db.posts.count_documents({}),
'total_views': list(db.posts.aggregate([
{'$group': {'_id': None, 'total': {'$sum': '$views'}}}
]))[0]['total'] if db.posts.count_documents({}) > 0 else 0,
'total_likes': list(db.posts.aggregate([
{'$group': {'_id': None, 'total': {'$sum': '$likes'}}}
]))[0]['total'] if db.posts.count_documents({}) > 0 else 0
}
# 사용
blog = BlogPlatform()
# User A
post_id_a = blog.create_post(
'user_a',
'First Post',
'Hello World!'
)
# User B (독립 DB, User A와 간섭 없음)
post_id_b = blog.create_post(
'user_b',
'My First Post',
'Welcome to my blog!'
)
# 통계
stats_a = blog.get_stats('user_a')
print(f"User A stats: {stats_a}")
7.2 SaaS 애플리케이션
"""
SaaS: 회사별 독립 DB
"""
class SaaSPlatform:
"""
멀티 테넌트 SaaS 플랫폼
각 회사마다 독립 DB
"""
def __init__(self):
self.connection_manager = TieredUserConnectionManager(
connection_string="mongodb://localhost:27017/"
)
def onboard_company(self, company_id: str, plan: str = 'basic'):
"""회사 온보딩"""
db = self.connection_manager.get_database(company_id)
# 초기 컬렉션 생성
db.create_collection('users')
db.create_collection('projects')
db.create_collection('tasks')
# 인덱스 생성
db.users.create_index([('email', 1)], unique=True)
db.projects.create_index([('name', 1)])
db.tasks.create_index([('project_id', 1), ('status', 1)])
# 메타데이터
db._metadata.insert_one({
'company_id': company_id,
'plan': plan,
'onboarded_at': datetime.now(),
'status': 'active'
})
logger.info(f"Company {company_id} onboarded with {plan} plan")
def add_user(self, company_id: str, email: str, name: str, role: str):
"""유저 추가"""
db = self.connection_manager.get_database(company_id)
user = {
'email': email,
'name': name,
'role': role,
'created_at': datetime.now()
}
result = db.users.insert_one(user)
return str(result.inserted_id)
def create_project(self, company_id: str, name: str, description: str):
"""프로젝트 생성"""
db = self.connection_manager.get_database(company_id)
project = {
'name': name,
'description': description,
'created_at': datetime.now(),
'status': 'active'
}
result = db.projects.insert_one(project)
return str(result.inserted_id)
def get_company_analytics(self, company_id: str):
"""회사 분석 데이터"""
db = self.connection_manager.get_database(company_id)
return {
'company_id': company_id,
'total_users': db.users.count_documents({}),
'total_projects': db.projects.count_documents({}),
'total_tasks': db.tasks.count_documents({}),
'active_projects': db.projects.count_documents({'status': 'active'}),
'completed_tasks': db.tasks.count_documents({'status': 'completed'})
}
# 사용
saas = SaaSPlatform()
# Company A 온보딩
saas.onboard_company('company_a', plan='premium')
saas.add_user('company_a', 'alice@company-a.com', 'Alice', 'admin')
saas.create_project('company_a', 'Project Alpha', 'First project')
# Company B 온보딩 (독립 DB)
saas.onboard_company('company_b', plan='enterprise')
saas.add_user('company_b', 'bob@company-b.com', 'Bob', 'admin')
# 분석
analytics_a = saas.get_company_analytics('company_a')
print(f"Company A analytics: {analytics_a}")
8. 요약
8.1 핵심 포인트
| 항목 | 단일 연결 | 유저별 연결 |
|---|---|---|
| 구현 복잡도 | ⭐ 낮음 | ⭐⭐⭐ 높음 |
| 동시성 | ⭐⭐ 제한적 | ⭐⭐⭐⭐⭐ 최대 |
| 격리 수준 | ⭐⭐ 낮음 | ⭐⭐⭐⭐⭐ 완벽 |
| 연결 수 | 적음 (100개) | 많음 (유저*10) |
| 메모리 사용 | 적음 | 많음 |
| 확장성 | 제한적 | 무한 |
| QoS | 불가능 | 가능 |
8.2 의사결정 트리
유저별 독립 DB 필요?
└─ YES
│
├─ 동시 접속 < 50 → 단일 연결 + useDb()
│ └─ 구현 간단, 비용 적음
│
├─ 동시 접속 50-500 → 그룹별 연결
│ └─ 균형잡힌 접근
│
└─ 동시 접속 > 500 → 유저별 독립 연결 ⭐
└─ 최대 동시성, 완벽한 격리
8.3 최종 권장사항
소규모 (< 50 동시 접속)
# 단일 연결로 충분
client = MongoClient(uri, maxPoolSize=100)
db = client[f'user_{user_id}_db']
중규모 (50-500 동시 접속)
# 그룹별 연결
manager = GroupConnectionManager(groups=10)
db = manager.get_database(user_id)
대규모 (> 500 동시 접속)
# 유저별 독립 연결 ⭐
manager = UserConnectionManager(
connection_string=uri,
user_pool_size=10,
idle_timeout=3600
)
db = manager.get_database(user_id)
9. 체크리스트
구현 전
- 동시 접속 유저 수 예측
- 유저당 평균 요청 수 분석
- 서버 연결 수 제한 확인
- 메모리 용량 확인
- 연결 방식 선택 (단일/그룹/유저별)
구현 중
- UserConnectionManager 구현
- 연결 풀 크기 설정
- 유휴 연결 자동 정리 구현
- 모니터링 시스템 구축
- 에러 처리 구현
- 로깅 시스템 구축
운영
- 연결 수 모니터링
- 메모리 사용량 모니터링
- 응답 시간 모니터링
- 동시성 테스트
- 부하 테스트
- 장애 대응 계획
이 가이드가 도움이 되셨나요? 실전 경험과 피드백을 환영합니다!
Copyright (c) 2024 VCANUS. All rights reserved.