TungDaDev's Blog

CQRS Pattern

Cqrs.png
Published on
/7 mins read/

Bài viết này sẽ phân tích cách áp dụng kiến trúc CQRS (Command Query Responsibility Segregation), Saga Pattern và các Design Patterns để giải quyết những bài toán cốt lõi trong một hệ thống đặt vé xem phim (Movie Ticket Booking) có lượng truy cập đồng thời cao.

# tính khả thi của cqrs

Một hệ thống đặt vé xem phim luôn đối mặt với sự chênh lệch lớn giữa lưu lượng đọc (Read) và ghi (Write). Hàng ngàn người có thể cùng lúc tra cứu lịch chiếu và sơ đồ ghế, nhưng chỉ một phần trong số đó thực sự tiến hành thanh toán và giữ chỗ.

Phân rã Service và Mức độ phù hợp với CQRS:

ServiceMức độ phù hợpPhân tích bài toán
Ticketing (Đặt vé)Rất caoPhân tách rõ ràng giữa Write (giữ chỗ, xuất vé, xử lý concurrency) bằng PostgreSQLRead (xem sơ đồ ghế trống, lịch sử vé) bằng MongoDB.
Payment (Thanh toán)CaoCommand (khởi tạo thanh toán, callback) độc lập. State machine phức tạp cần lưu vết chính xác qua Event Sourcing.
Movie Catalog (Danh mục)Trung bìnhChủ yếu là Read-heavy (tìm kiếm phim, xem trailer). Write rất ít (Admin thêm phim mới). Chỉ cần Read Replicas hoặc Cache là đủ.
Notification (Thông báo)ThấpChủ yếu consume event từ Kafka/RabbitMQ để gửi Email/SMS. Không có logic Domain phức tạp để áp dụng CQRS.

Vấn đề của kiến trúc Monolithic/CRUD truyền thống

Nếu chỉ dùng một TicketService ôm đồm mọi thứ (God Service), chúng ta sẽ đối mặt với:

  • Nút thắt cổ chai Database: Cùng một Transaction vừa phải lock row để giữ ghế (Pessimistic Locking), vừa phải join nhiều bảng để trả về dữ liệu lịch chiếu.
  • Domain Intent bị che khuất: Các method như updateTicketStatus(String id, int status) không thể hiện rõ nghiệp vụ `(status = 2 là đã thanh toán hay đã hủy?).
  • Khó mở rộng (Scale): Không thể scale độc lập database cho việc đọc (hiển thị sơ đồ ghế) và việc ghi (thanh toán).

# cqrs thuần (lightweight)

Trước khi đưa một framework đồ sộ vào hệ thống, việc áp dụng 1CQRS pattern1 thuần với Spring BootJava 21 là bước đệm hoàn hảo để làm sạch Clean Architecture.

# thiết lập command/query bus

Mô hình này cô lập logic thay đổi trạng thái (Command) và logic truy vấn dữ liệu (Query).

API Gateway / Controller
   │
   ├── CommandBus ──► CommandHandler ──► Domain Model ──► PostgreSQL (Write-side)
   │                                                    └► Event Publisher (Kafka)
   │
   └── QueryBus  ──► QueryHandler  ──► MongoDB / Redis (Read-side)

# triển khai thực tế cho ticketing service

command model (thể hiện rõ business intent): Sử dụng record của Java 21 để định nghĩa các Command bất biến.

// Command: Rõ ràng, mang đậm ngôn ngữ nghiệp vụ (Ubiquitous Language)
public record ReserveSeatsCommand(
    UUID showtimeId,
    UUID userId,
    List<String> seatCodes
) {}
 
// Command Handler: Chỉ chứa logic xử lý thay đổi trạng thái
@Component
@RequiredArgsConstructor
public class ReserveSeatsCommandHandler {
    private final ShowtimeRepository writeRepository;
    private final ApplicationEventPublisher eventPublisher;
 
    @Transactional
    public ReservationResult handle(ReserveSeatsCommand cmd) {
        Showtime showtime = writeRepository.findByIdWithLock(cmd.showtimeId())
            .orElseThrow(() -> new ShowtimeNotFoundException(cmd.showtimeId()));
 
        // Domain logic: Giữ ghế và ném lỗi nếu ghế đã bị người khác đặt
        Reservation reservation = showtime.reserveSeats(cmd.userId(), cmd.seatCodes());
        writeRepository.save(showtime);
 
        // Publish Event để Read-side cập nhật lại sơ đồ ghế
        eventPublisher.publishEvent(new SeatsReservedEvent(
            cmd.showtimeId(), cmd.seatCodes()
        ));
 
        return new ReservationResult(reservation.getId(), reservation.getExpiresAt());
    }
}

query model (tối ưu hóa cho việc đọc):

public record GetAvailableSeatsQuery(UUID showtimeId) {}
 
@Component
@RequiredArgsConstructor
public class GetAvailableSeatsQueryHandler {
    private final SeatMapProjectionRepository readRepository;
 
    @Cacheable(value = "seat-maps", key = "#query.showtimeId()")
    public SeatMapDTO handle(GetAvailableSeatsQuery query) {
        // Truy vấn thẳng vào MongoDB - Dữ liệu đã được denormalize, không cần JOIN
        return readRepository.findByShowtimeId(query.showtimeId())
            .orElseThrow(() -> new SeatMapNotFoundException(query.showtimeId()));
    }
}

# kiến trúc axon framework & saga pattern

Khi quy trình nghiệp vụ trải dài qua nhiều microservices, quản lý Transaction phân tán (Distributed Transaction) trở thành bài toán hóc búa. Lúc này, việc đưa Axon Framework vào để triển khai Event SourcingSaga Pattern là một quyết định chiến lược.

# quản lý lifecycle đặt vé với saga

Quy trình đặt vé: Giữ chỗ (Ticketing) -> Thanh toán (Payment) -> Xuất vé (Ticketing) -> Gửi SMS (Notification).

Thay vì gọi API chéo nhau (Synchronous HTTP) dễ dẫn đến fail rớt dây chuyền, ta dùng Orchestration-based Saga.

@Saga
@Slf4j
public class TicketBookingSaga {
 
    @Autowired
    private transient CommandGateway commandGateway;
 
    // Bắt đầu quy trình khi ghế được giữ thành công
    @StartSaga
    @SagaEventHandler(associationProperty = "reservationId")
    public void on(SeatsReservedEvent event) {
        log.info("Ghế đã giữ, yêu cầu thanh toán cho Reservation: {}", event.reservationId());
        commandGateway.send(new CreatePaymentCommand(
            event.reservationId(), event.totalAmount()
        ));
    }
 
    // Xử lý khi thanh toán thành công
    @SagaEventHandler(associationProperty = "reservationId")
    public void on(PaymentCompletedEvent event) {
        log.info("Thanh toán thành công, tiến hành xuất vé");
        commandGateway.send(new IssueTicketCommand(event.reservationId()));
    }
 
    // Xử lý thất bại (Compensating Transaction)
    @SagaEventHandler(associationProperty = "reservationId")
    public void on(PaymentFailedEvent event) {
        log.warn("Thanh toán thất bại, nhả ghế đang giữ");
        commandGateway.send(new ReleaseSeatsCommand(event.reservationId()));
    }
 
    // Kết thúc Saga khi vé đã xuất
    @EndSaga
    @SagaEventHandler(associationProperty = "reservationId")
    public void on(TicketIssuedEvent event) {
        log.info("Quy trình đặt vé hoàn tất: {}", event.ticketId());
    }
}

# các design patterns cốt lõi

# strategy pattern: tính toán giá vé linh hoạt

Giá vé xem phim thay đổi liên tục: vé cuối tuần, vé VIP, vé sinh viên, vé ngày Lễ. Đừng dùng chuỗi if/else dài dằng dặc.

public interface PricingStrategy {
    boolean isApplicable(BookingContext context);
    BigDecimal calculate(BigDecimal basePrice, BookingContext context);
}
 
@Component
public class WeekendPricingStrategy implements PricingStrategy { /*...+20%...*/ }
 
@Component
public class StudentPricingStrategy implements PricingStrategy { /*...-15%...*/ }
 
// Trong BookingService
private final List<PricingStrategy> pricingStrategies; // Spring tự inject tất cả các bean implement interface này
 
public BigDecimal calculateFinalPrice(BigDecimal basePrice, BookingContext context) {
    return pricingStrategies.stream()
        .filter(strategy -> strategy.isApplicable(context))
        .findFirst()
        .map(strategy -> strategy.calculate(basePrice, context))
        .orElse(basePrice);
}

# transactional outbox pattern: đảm bảo tính nhất quán của event

Khi ReserveSeatsCommand lưu database thành công nhưng gửi message qua Kafka bị lỗi (Network timeout), dữ liệu sẽ bị bất đồng bộ.

Giải pháp: Lưu event vào chung database rDBMS (PostgreSQL) cùng chung 1 Transaction. Một worker (Poller) sẽ đọc bảng outbox này và đẩy lên Kafka.

@Transactional
public void reserveAndPublish(ReserveSeatsCommand cmd) {
    // 1. Cập nhật state vào nghiệp vụ
    showtimeRepository.save(showtime);
 
    // 2. Lưu event vào bảng Outbox cùng Transaction
    OutboxEvent event = OutboxEvent.builder()
        .aggregateId(showtime.getId())
        .eventType("SEAT_RESERVED")
        .payload(objectMapper.writeValueAsString(cmd))
        .build();
    outboxRepository.save(event);
}
// Poller background sẽ đọc outboxRepository và push vào Kafka cluster, đảm bảo At-Least-Once Delivery.

# read model projection (materialized view)

Sơ đồ ghế trống là dữ liệu được truy vấn nhiều nhất hệ thống. Việc join bảng Room, Seat và Reservation liên tục trên bảng write sẽ làm sập database.

Giải pháp: Bắt lắng nghe các Domain Event (SeatsReservedEvent, SeatsReleasedEvent) để cập nhật trực tiếp vào một Document trên MongoDB.

@Document(collection = "showtime_seat_maps")
public class ShowtimeSeatMapProjection {
    @Id
    private UUID showtimeId;
    private String movieName;
    private List<SeatInfo> availableSeats;
    private List<SeatInfo> reservedSeats;
    // Dữ liệu đã được "nấu" sẵn, query API chỉ cần bốc ra trả về JSON dưới 10ms.
}

Bài viết mang tính chất "ghi chú - chia sẻ và phi lợi nhuận". Nếu thấy hữu ích, hãy chia sẻ nó tới bạn bè và đồng nghiệp của bạn nhé!

Happy coding 😎 👍🏻 🚀 🔥.

← Previous postgRPC - toàn tập
Next post →list in java