Files
logiflow/internal/handler/route.go

120 lines
3.2 KiB
Go

package handler
import (
"context"
"log/slog"
"net/http"
"time"
"github.com/anxi0uz/logiflow/internal/models"
storage "github.com/anxi0uz/logiflow/pkg"
"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/huandu/go-sqlbuilder"
openapi_types "github.com/oapi-codegen/runtime/types"
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}
func (s *Server) GetRoute(w http.ResponseWriter, r *http.Request, id openapi_types.UUID) {
ctx := r.Context()
route, err := storage.GetOne[models.Route](ctx, s.DB, "routes", func(sb *sqlbuilder.SelectBuilder) {
sb.Where(sb.EQ("order_id", id))
})
if err != nil {
slog.ErrorContext(ctx, "Error while getting route", slog.String("error", err.Error()))
s.JSON(w, r, http.StatusInternalServerError, MsgInternalError, RespError)
return
}
s.JSON(w, r, http.StatusOK, route, RespSuccess)
}
func (s *Server) RouteWebSocket(w http.ResponseWriter, r *http.Request, id openapi_types.UUID) {
ctx := r.Context()
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
slog.ErrorContext(ctx, "ws upgrade failed", slog.String("error", err.Error()))
s.JSON(w, r, http.StatusInternalServerError, MsgInternalError, RespError)
return
}
defer func() {
if err := conn.Close(); err != nil {
slog.Warn("ws conn close error", slog.String("error", err.Error()))
}
}()
orderID := id
s.Hub.Register(id, conn)
defer s.Hub.Unregister(orderID, conn)
s.Hub.mu.RLock()
_, trackerRunning := s.Hub.trackers[orderID]
s.Hub.mu.RUnlock()
if !trackerRunning {
go s.startRouteTracker(orderID)
}
route, err := storage.GetOne[models.Route](r.Context(), s.DB, "routes", func(sb *sqlbuilder.SelectBuilder) {
sb.Where(sb.EQ("order_id", orderID))
})
if err == nil {
coords, err := route.ParseCoordinates()
if err == nil && route.CurrentIndex < len(coords) {
if err := conn.WriteJSON(map[string]any{
"current_index": route.CurrentIndex,
"coordinate": coords[route.CurrentIndex],
}); err != nil {
slog.ErrorContext(ctx, "error while writing date to clients", slog.String("error", err.Error()))
}
}
}
for {
_, _, err := conn.ReadMessage()
if err != nil {
break
}
}
}
func (s *Server) startRouteTracker(orderID uuid.UUID) {
s.Hub.StartTracker(orderID, func(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-time.After(10 * time.Second):
route, err := storage.GetOne[models.Route](ctx, s.DB, "routes", func(sb *sqlbuilder.SelectBuilder) {
sb.Where(sb.EQ("order_id", orderID))
})
if err != nil {
return
}
coords, err := route.ParseCoordinates()
if err != nil || len(coords) == 0 {
return
}
if route.CurrentIndex >= len(coords) {
s.Hub.StopTracker(orderID)
return
}
s.Hub.Broadcast(orderID, map[string]any{
"current_index": route.CurrentIndex,
"coordinate": coords[route.CurrentIndex],
})
route.CurrentIndex++
if err := storage.Update(ctx, "routes", *route, s.DB, func(sb *sqlbuilder.UpdateBuilder) {
sb.Where(sb.EQ("order_id", orderID))
}); err != nil {
slog.ErrorContext(ctx, "error while updating route", slog.String("id", route.ID.String()), slog.String("error", err.Error()))
}
}
}
})
}