feat: dashboard aggregates, notifications, websocket route tracking

This commit is contained in:
2026-04-17 17:02:48 +03:00
parent 0d8df9ed7d
commit 36dac1840b
11 changed files with 504 additions and 3 deletions

68
internal/handler/hub.go Normal file
View File

@@ -0,0 +1,68 @@
package handler
import (
"context"
"sync"
"github.com/google/uuid"
"github.com/gorilla/websocket"
)
type Hub struct {
mu sync.RWMutex
connections map[uuid.UUID][]*websocket.Conn
trackers map[uuid.UUID]context.CancelFunc
}
func NewHub() *Hub {
return &Hub{
connections: make(map[uuid.UUID][]*websocket.Conn),
trackers: make(map[uuid.UUID]context.CancelFunc),
}
}
func (h *Hub) Register(orderID uuid.UUID, conn *websocket.Conn) {
h.mu.Lock()
defer h.mu.Unlock()
h.connections[orderID] = append(h.connections[orderID], conn)
}
func (h *Hub) Unregister(orderID uuid.UUID, conn *websocket.Conn) {
h.mu.Lock()
defer h.mu.Unlock()
conns := h.connections[orderID]
for i, c := range conns {
if c == conn {
h.connections[orderID] = append(conns[:i], conns[i+1:]...)
break
}
}
}
func (h *Hub) Broadcast(orderID uuid.UUID, msg any) {
h.mu.RLock()
defer h.mu.RUnlock()
for _, conn := range h.connections[orderID] {
conn.WriteJSON(msg)
}
}
func (h *Hub) StartTracker(orderID uuid.UUID, fn func(ctx context.Context)) {
h.mu.Lock()
defer h.mu.Unlock()
if _, exists := h.trackers[orderID]; exists {
return
}
ctx, cancel := context.WithCancel(context.Background())
h.trackers[orderID] = cancel
go fn(ctx)
}
func (h *Hub) StopTracker(orderID uuid.UUID) {
h.mu.Lock()
defer h.mu.Unlock()
if cancel, exists := h.trackers[orderID]; exists {
cancel()
delete(h.trackers, orderID)
}
}

View File

@@ -1,6 +1,9 @@
package handler
import (
"bufio"
"fmt"
"net"
"net/http"
"strconv"
"time"
@@ -45,6 +48,18 @@ type statusRecorder struct {
status int
}
func (r *statusRecorder) Hijack() (net.Conn, *bufio.ReadWriter, error) {
h, ok := r.ResponseWriter.(http.Hijacker)
if !ok {
return nil, nil, fmt.Errorf("hijack not supportd")
}
return h.Hijack()
}
func (r *statusRecorder) Unwrap() http.ResponseWriter {
return r.ResponseWriter
}
func (r *statusRecorder) WriteHeader(status int) {
r.status = status
r.ResponseWriter.WriteHeader(status)

View File

@@ -1,14 +1,68 @@
package handler
import (
"log/slog"
"net/http"
"github.com/anxi0uz/logiflow/internal/api"
"github.com/anxi0uz/logiflow/internal/models"
storage "github.com/anxi0uz/logiflow/pkg"
"github.com/huandu/go-sqlbuilder"
openapi_types "github.com/oapi-codegen/runtime/types"
)
func (s *Server) ListNotifications(w http.ResponseWriter, r *http.Request, params api.ListNotificationsParams) {
ctx := r.Context()
claims, ok := ctx.Value("user").(*Claims)
if !ok {
slog.ErrorContext(ctx, "Error while casting claims")
s.JSON(w, r, http.StatusInternalServerError, MsgInternalError, RespError)
return
}
notifs, err := storage.GetAll[models.Notification](ctx, "notifications", s.DB, func(sb *sqlbuilder.SelectBuilder) {
sb.Where(sb.EQ("user_id", claims.ID))
if params.UnreadOnly != nil && *params.UnreadOnly {
sb.Where(sb.EQ("is_read", false))
}
})
if err != nil {
slog.ErrorContext(ctx, "Error while getting notifications", slog.String("error", err.Error()))
s.JSON(w, r, http.StatusInternalServerError, MsgInternalError, RespError)
return
}
s.JSON(w, r, http.StatusOK, notifs, RespSuccess)
}
func (s *Server) MarkNotificationRead(w http.ResponseWriter, r *http.Request, id openapi_types.UUID) {
ctx := r.Context()
claims, ok := ctx.Value("user").(*Claims)
if !ok {
slog.ErrorContext(ctx, "Error while casting claims")
s.JSON(w, r, http.StatusInternalServerError, MsgInternalError, RespError)
return
}
notification, err := storage.GetOne[models.Notification](ctx, s.DB, "notifications", func(sb *sqlbuilder.SelectBuilder) {
sb.Where(sb.EQ("id", id))
})
if err != nil {
slog.ErrorContext(ctx, "Error while getting notification", slog.String("error", err.Error()))
s.JSON(w, r, http.StatusInternalServerError, MsgInternalError, RespError)
return
}
if notification.UserID != claims.ID {
s.JSON(w, r, http.StatusForbidden, MsgForbidden, RespError)
return
}
notification.IsRead = true
if err := storage.Update(ctx, "notifications", *notification, s.DB, func(sb *sqlbuilder.UpdateBuilder) {
sb.Where(sb.EQ("id", id))
}); err != nil {
slog.ErrorContext(ctx, "Error while updating notification", slog.String("error", err.Error()))
s.JSON(w, r, http.StatusInternalServerError, MsgInternalError, RespError)
return
}
s.JSON(w, r, http.StatusOK, "Updated", RespSuccess)
}

View File

@@ -10,6 +10,7 @@ import (
"github.com/anxi0uz/logiflow/internal/services"
storage "github.com/anxi0uz/logiflow/pkg"
openapi_types "github.com/oapi-codegen/runtime/types"
"github.com/xuri/excelize/v2"
)
func (s *Server) ListOrders(w http.ResponseWriter, r *http.Request, params api.ListOrdersParams) {
@@ -135,6 +136,9 @@ func (s *Server) UpdateOrderStatus(w http.ResponseWriter, r *http.Request, id op
}
return
}
if req.Status == api.OrderStatusUpdateStatusInTransit {
go s.startRouteTracker(id)
}
s.JSON(w, r, http.StatusOK, order, RespSuccess)
}
@@ -157,7 +161,64 @@ func (s *Server) GetOrdersReport(w http.ResponseWriter, r *http.Request, params
s.JSON(w, r, http.StatusInternalServerError, MsgInternalError, RespError)
return
}
s.JSON(w, r, http.StatusOK, orders, RespSuccess)
f := excelize.NewFile()
sheet := "Orders"
f.SetSheetName("Sheet1", sheet)
headers := []string{"ID", "Status", "Origin", "Destination", "Weight", "Volume", "Price", "Created At"}
for i, h := range headers {
cell, _ := excelize.CoordinatesToCellName(i+1, 1)
f.SetCellValue(sheet, cell, h)
}
for row, o := range orders {
values := []any{
o.ID.String(),
o.Status,
o.OriginAddress,
o.DestinationAddress,
o.WeightKg,
o.VolumeM3,
o.TotalPrice,
o.CreatedAt.Format("2006-01-02 15:04:05"),
}
for col, v := range values {
cell, _ := excelize.CoordinatesToCellName(col+1, row+2)
f.SetCellValue(sheet, cell, v)
}
}
buf, err := f.WriteToBuffer()
if err != nil {
slog.ErrorContext(ctx, "excel write failed", slog.String("error", err.Error()))
s.JSON(w, r, http.StatusInternalServerError, MsgInternalError, RespError)
return
}
w.Header().Set("Content-Type", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
w.Header().Set("Content-Disposition", "attachment; filename=orders_report.xlsx")
w.WriteHeader(http.StatusOK)
w.Write(buf.Bytes())
}
func (s *Server) GetDashboard(w http.ResponseWriter, r *http.Request) {}
func (s *Server) GetDashboard(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
claims, ok := ctx.Value("user").(*Claims)
if !ok {
slog.ErrorContext(ctx, "error while casting claims")
s.JSON(w, r, http.StatusInternalServerError, MsgInternalError, RespError)
return
}
report, err := s.OrderSerice.GetDashboard(ctx, claims.Role)
if err != nil {
if errors.Is(err, services.ErrForbidden) {
s.JSON(w, r, http.StatusForbidden, MsgForbidden, RespError)
return
}
slog.ErrorContext(ctx, "dashboard failed", slog.String("error", err.Error()))
s.JSON(w, r, http.StatusInternalServerError, MsgInternalError, RespError)
return
}
s.JSON(w, r, http.StatusOK, report, RespSuccess)
}

View File

@@ -1,15 +1,23 @@
package handler
import (
"context"
"log/slog"
"net/http"
"time"
"github.com/anxi0uz/logiflow/internal/models"
storage "github.com/anxi0uz/logiflow/pkg"
"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/huandu/go-sqlbuilder"
openapi_types "github.com/oapi-codegen/runtime/types"
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}
func (s *Server) GetRoute(w http.ResponseWriter, r *http.Request, id openapi_types.UUID) {
ctx := r.Context()
route, err := storage.GetOne[models.Route](ctx, s.DB, "routes", func(sb *sqlbuilder.SelectBuilder) {
@@ -23,4 +31,81 @@ func (s *Server) GetRoute(w http.ResponseWriter, r *http.Request, id openapi_typ
s.JSON(w, r, http.StatusOK, route, RespSuccess)
}
func (s *Server) RouteWebSocket(w http.ResponseWriter, r *http.Request, id openapi_types.UUID) {}
func (s *Server) RouteWebSocket(w http.ResponseWriter, r *http.Request, id openapi_types.UUID) {
ctx := r.Context()
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
slog.ErrorContext(ctx, "ws upgrade failed", slog.String("error", err.Error()))
s.JSON(w, r, http.StatusInternalServerError, MsgInternalError, RespError)
return
}
defer conn.Close()
orderID := id
s.Hub.Register(id, conn)
defer s.Hub.Unregister(orderID, conn)
s.Hub.mu.RLock()
_, trackerRunning := s.Hub.trackers[orderID]
s.Hub.mu.RUnlock()
if !trackerRunning {
go s.startRouteTracker(orderID)
}
route, err := storage.GetOne[models.Route](r.Context(), s.DB, "routes", func(sb *sqlbuilder.SelectBuilder) {
sb.Where(sb.EQ("order_id", orderID))
})
if err == nil {
coords, err := route.ParseCoordinates()
if err == nil && route.CurrentIndex < len(coords) {
conn.WriteJSON(map[string]any{
"current_index": route.CurrentIndex,
"coordinate": coords[route.CurrentIndex],
})
}
}
for {
_, _, err := conn.ReadMessage()
if err != nil {
break
}
}
}
func (s *Server) startRouteTracker(orderID uuid.UUID) {
s.Hub.StartTracker(orderID, func(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-time.After(10 * time.Second):
route, err := storage.GetOne[models.Route](ctx, s.DB, "routes", func(sb *sqlbuilder.SelectBuilder) {
sb.Where(sb.EQ("order_id", orderID))
})
if err != nil {
return
}
coords, err := route.ParseCoordinates()
if err != nil || len(coords) == 0 {
return
}
if route.CurrentIndex >= len(coords) {
s.Hub.StopTracker(orderID)
return
}
s.Hub.Broadcast(orderID, map[string]any{
"current_index": route.CurrentIndex,
"coordinate": coords[route.CurrentIndex],
})
route.CurrentIndex++
storage.Update(ctx, "routes", *route, s.DB, func(sb *sqlbuilder.UpdateBuilder) {
sb.Where(sb.EQ("order_id", orderID))
})
}
}
})
}

View File

@@ -55,6 +55,7 @@ type Server struct {
Redis *redis.Client
JwtKey []byte
OrderSerice services.OrderServicer
Hub *Hub
}
func NewServer(db *pgxpool.Pool, redis *redis.Client, cfg *config.Config) *Server {
@@ -65,6 +66,7 @@ func NewServer(db *pgxpool.Pool, redis *redis.Client, cfg *config.Config) *Serve
Config: cfg,
JwtKey: []byte(cfg.JwtOpt.Key),
OrderSerice: services.NewOrderService(db, *cfg),
Hub: NewHub(),
}
}

View File

@@ -0,0 +1,30 @@
package models
import "github.com/google/uuid"
type DashboardRevenue struct {
Total float64 `json:"total"`
ThisMonth float64 `json:"thisMonth"`
}
type DashboardOrderStatus struct {
Total int `json:"total`
Delivered int `json:"delivered"`
InTransit int `json:"inTransit`
Pending int `json:"pending"`
Cancelled int `json:"cancelled"`
}
type DashboardDriverStat struct {
ID uuid.UUID `json:"id"`
FullName string `json:"fullName"`
Status string `json:"status"`
Rating float64 `json:"rating"`
CompletedOrders int `json:"completedOrders"`
}
type DashboardReport struct {
Revenue DashboardRevenue `json:"revenue"`
Orders DashboardOrderStatus `json:"orders"`
Drivers []DashboardDriverStat `json:"drivers"`
}

View File

@@ -31,6 +31,7 @@ type OrderServicer interface {
CancelOrder(ctx context.Context, id uuid.UUID, userID uuid.UUID, role string) error
UpdateOrderStatus(ctx context.Context, id uuid.UUID, userID uuid.UUID, role string, req api.OrderStatusUpdate) (*models.Order, error)
GetOrdersReport(ctx context.Context, role string, params api.GetOrdersReportParams) ([]models.Order, error)
GetDashboard(ctx context.Context, role string) (*models.DashboardReport, error)
}
var ErrForbidden = errors.New("forbidden")
@@ -283,9 +284,20 @@ func (s *OrderService) UpdateOrderStatus(ctx context.Context, id uuid.UUID, user
}
order.DriverID = req.DriverId
order.AssignedAt = &now
driver, _ := storage.GetOne[models.Driver](ctx, s.db, "drivers", func(sb *sqlbuilder.SelectBuilder) {
sb.Where(sb.EQ("id", order.DriverID))
})
s.createNotification(ctx, driver.UserID, "Новый заказ", "Вам назначен новый заказ")
}
if req.Status == api.OrderStatusUpdateStatusInTransit {
if order.CreatedByID == nil {
return nil, fmt.Errorf("created by id needed")
}
s.createNotification(ctx, *order.CreatedByID, "Заказ в пути", "Ваш заказ передан водителю")
}
if req.Status == api.OrderStatusUpdateStatusDelivered {
order.DeliveredAt = &now
s.createNotification(ctx, *order.CreatedByID, "Заказ доставлен", "Ваш заказ успешно доставлен")
}
if err := storage.Update(ctx, "orders", *order, s.db, func(sb *sqlbuilder.UpdateBuilder) {
sb.Where(sb.EQ("id", id))
@@ -320,3 +332,89 @@ func (s *OrderService) GetOrdersReport(ctx context.Context, role string, params
}
return orders, nil
}
func (s *OrderService) GetDashboard(ctx context.Context, role string) (*models.DashboardReport, error) {
if role != "manager" && role != "admin" {
return nil, ErrForbidden
}
var report models.DashboardReport
g, gctx := errgroup.WithContext(ctx)
// query 1: order counts + revenue
g.Go(func() error {
row := s.db.QueryRow(gctx, `
SELECT
COUNT(*) AS total,
COUNT(*) FILTER (WHERE status = 'delivered') AS delivered,
COUNT(*) FILTER (WHERE status = 'in_transit') AS in_transit,
COUNT(*) FILTER (WHERE status = 'pending') AS pending,
COUNT(*) FILTER (WHERE status = 'cancelled') AS cancelled,
COALESCE(SUM(total_price) FILTER (WHERE status = 'delivered'), 0) AS revenue_total,
COALESCE(SUM(total_price) FILTER (WHERE status = 'delivered'
AND created_at >= date_trunc('month', NOW())), 0) AS revenue_this_month
FROM orders
`)
return row.Scan(
&report.Orders.Total,
&report.Orders.Delivered,
&report.Orders.InTransit,
&report.Orders.Pending,
&report.Orders.Cancelled,
&report.Revenue.Total,
&report.Revenue.ThisMonth,
)
})
// query 2: top drivers by completed orders
g.Go(func() error {
rows, err := s.db.Query(gctx, `
SELECT
d.id,
u.full_name,
d.status,
d.rating,
COUNT(o.id) FILTER (WHERE o.status = 'delivered') AS completed_orders
FROM drivers d
JOIN users u ON u.id = d.user_id
LEFT JOIN orders o ON o.driver_id = d.id
GROUP BY d.id, u.full_name, d.status, d.rating
ORDER BY completed_orders DESC
LIMIT 10
`)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var ds models.DashboardDriverStat
if err := rows.Scan(&ds.ID, &ds.FullName, &ds.Status, &ds.Rating, &ds.CompletedOrders); err != nil {
return err
}
report.Drivers = append(report.Drivers, ds)
}
return rows.Err()
})
if err := g.Wait(); err != nil {
return nil, err
}
if report.Drivers == nil {
report.Drivers = []models.DashboardDriverStat{}
}
return &report, nil
}
func (s *OrderService) createNotification(ctx context.Context, userID uuid.UUID, title, body string) {
n := models.Notification{
ID: uuid.New(),
UserID: userID,
Title: title,
Body: &body,
IsRead: false,
CreatedAt: time.Now(),
}
if err := storage.Create(ctx, "notifications", n, s.db); err != nil {
slog.ErrorContext(ctx, "failed to create notification", slog.String("error", err.Error()))
}
}