Skip to content

Commit 74e7030

Browse files
authored
feat: Add hybrid server type, spins up both http/gRPC servers (#196)
feat: Add hybrid server type, spins up both http/gRPC servers
1 parent 65c96e8 commit 74e7030

6 files changed

Lines changed: 213 additions & 50 deletions

File tree

go/embedded/online_features.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ func (s *OnlineFeatureService) StartHttpServerWithLogging(host string, port int,
383383
log.Println("HTTP server terminated")
384384
}()
385385

386-
return ser.Serve(host, port)
386+
return ser.Serve(host, port, server.DefaultHttpHandlers(ser))
387387
}
388388

389389
func (s *OnlineFeatureService) StopHttpServer() {

go/internal/feast/server/grpc_server.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,22 @@ package server
33
import (
44
"context"
55
"fmt"
6+
67
"github.com/feast-dev/feast/go/internal/feast"
78
"github.com/feast-dev/feast/go/internal/feast/server/logging"
89
"github.com/feast-dev/feast/go/protos/feast/serving"
910
prototypes "github.com/feast-dev/feast/go/protos/feast/types"
1011
"github.com/feast-dev/feast/go/types"
1112
"github.com/google/uuid"
13+
"google.golang.org/grpc"
14+
"google.golang.org/grpc/health"
1215
"google.golang.org/protobuf/types/known/timestamppb"
1316
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
17+
18+
grpcPrometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
19+
"github.com/prometheus/client_golang/prometheus"
20+
"google.golang.org/grpc/health/grpc_health_v1"
21+
grpcTrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/google.golang.org/grpc"
1422
)
1523

1624
const feastServerVersion = "0.0.1"
@@ -202,6 +210,21 @@ func (s *grpcServingServiceServer) GetOnlineFeaturesRange(ctx context.Context, r
202210
return resp, nil
203211
}
204212

213+
// Register services used by the grpcServingServiceServer.
214+
func (s *grpcServingServiceServer) RegisterServices() (*grpc.Server, *health.Server) {
215+
grpcPromMetrics := grpcPrometheus.NewServerMetrics()
216+
prometheus.MustRegister(grpcPromMetrics)
217+
grpcServer := grpc.NewServer(
218+
grpc.ChainUnaryInterceptor(grpcTrace.UnaryServerInterceptor(), grpcPromMetrics.UnaryServerInterceptor()),
219+
)
220+
221+
serving.RegisterServingServiceServer(grpcServer, s)
222+
healthService := health.NewServer()
223+
grpc_health_v1.RegisterHealthServer(grpcServer, healthService)
224+
225+
return grpcServer, healthService
226+
}
227+
205228
func GenerateRequestId() string {
206229
id := uuid.New()
207230
return id.String()

go/internal/feast/server/http_server.go

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,14 @@ type httpServer struct {
3030
server *http.Server
3131
}
3232

33+
// This represents mapping between a path and an http Handler.
34+
// Note a handler can be created out of any func with type signature
35+
// func(w http.ResponseWriter, r *http.Request) via HandleFunc()
36+
type Handler struct {
37+
path string
38+
handlerFunc http.Handler
39+
}
40+
3341
// Some Feast types aren't supported during JSON conversion
3442
type repeatedValue struct {
3543
stringVal []string
@@ -339,15 +347,17 @@ func recoverMiddleware(next http.Handler) http.Handler {
339347
})
340348
}
341349

342-
func (s *httpServer) Serve(host string, port int) error {
350+
func (s *httpServer) Serve(host string, port int, handlers []Handler) error {
343351
if strings.ToLower(os.Getenv("ENABLE_DATADOG_TRACING")) == "true" {
344352
tracer.Start(tracer.WithRuntimeMetrics())
345353
defer tracer.Stop()
346354
}
347355
mux := httptrace.NewServeMux()
348-
mux.Handle("/get-online-features", recoverMiddleware(http.HandlerFunc(s.getOnlineFeatures)))
349-
mux.Handle("/metrics", promhttp.Handler())
350-
mux.HandleFunc("/health", healthCheckHandler)
356+
357+
for _, handler := range handlers {
358+
mux.Handle(handler.path, handler.handlerFunc)
359+
}
360+
351361
s.server = &http.Server{Addr: fmt.Sprintf("%s:%d", host, port), Handler: mux, ReadTimeout: 5 * time.Second, WriteTimeout: 10 * time.Second, IdleTimeout: 15 * time.Second}
352362
err := s.server.ListenAndServe()
353363
// Don't return the error if it's caused by graceful shutdown using Stop()
@@ -358,6 +368,23 @@ func (s *httpServer) Serve(host string, port int) error {
358368
return err
359369
}
360370

371+
func DefaultHttpHandlers(s *httpServer) []Handler {
372+
return []Handler{
373+
{
374+
path: "/get-online-features",
375+
handlerFunc: recoverMiddleware(http.HandlerFunc(s.getOnlineFeatures)),
376+
},
377+
{
378+
path: "/metrics",
379+
handlerFunc: promhttp.Handler(),
380+
},
381+
{
382+
path: "/health",
383+
handlerFunc: http.HandlerFunc(healthCheckHandler),
384+
},
385+
}
386+
}
387+
361388
func healthCheckHandler(w http.ResponseWriter, r *http.Request) {
362389
w.WriteHeader(http.StatusOK)
363390
fmt.Fprintf(w, "Healthy")
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// These contain configs/methods that are used by the hybrid server function.
2+
package server
3+
4+
import (
5+
"context"
6+
"net/http"
7+
"time"
8+
9+
"github.com/prometheus/client_golang/prometheus/promhttp"
10+
"google.golang.org/grpc/health"
11+
healthpb "google.golang.org/grpc/health/grpc_health_v1"
12+
)
13+
14+
var defaultCheckTimeout = 2 * time.Second
15+
16+
// Register default HTTP handlers specific to the hybrid server configuration.
17+
func DefaultHybridHandlers(s *httpServer, hs *health.Server) []Handler {
18+
return []Handler{
19+
{
20+
path: "/get-online-features",
21+
handlerFunc: recoverMiddleware(http.HandlerFunc(s.getOnlineFeatures)),
22+
},
23+
{
24+
path: "/metrics",
25+
handlerFunc: promhttp.Handler(),
26+
},
27+
{
28+
path: "/health",
29+
handlerFunc: http.HandlerFunc(combinedHealthCheck(hs)),
30+
},
31+
}
32+
}
33+
34+
// This function wraps an http.Handler that is registered during hybrid server creation.
35+
// Calls the grpc.server healthcheck check endpoint
36+
func combinedHealthCheck(hs *health.Server) http.HandlerFunc {
37+
return func(w http.ResponseWriter, r *http.Request) {
38+
ctx, cancel := context.WithTimeout(r.Context(), defaultCheckTimeout)
39+
defer cancel()
40+
41+
req := &healthpb.HealthCheckRequest{
42+
Service: "", // Empty string means that it will simply check overall servingStatus
43+
}
44+
45+
resp, err := hs.Check(ctx, req)
46+
if err != nil {
47+
http.Error(w, "gRPC health check failed", http.StatusInternalServerError)
48+
return
49+
}
50+
51+
// Use to map servingStatus to httpStatus
52+
var status int
53+
switch resp.Status {
54+
case healthpb.HealthCheckResponse_SERVING:
55+
status = http.StatusOK
56+
case healthpb.HealthCheckResponse_NOT_SERVING:
57+
status = http.StatusServiceUnavailable
58+
default:
59+
status = http.StatusInternalServerError
60+
}
61+
62+
w.WriteHeader(status)
63+
w.Write([]byte(resp.Status.String()))
64+
}
65+
}

go/internal/feast/server/server_commons.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
package server
22

33
import (
4+
"os"
5+
46
"github.com/rs/zerolog"
57
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
6-
"os"
78
)
89

910
func LogWithSpanContext(span tracer.Span) zerolog.Logger {

0 commit comments

Comments
 (0)