Skip to content

Commit

Permalink
Merge pull request kubernetes#1 from XiShanYongYe-Chang/dev-cz
Browse files Browse the repository at this point in the history
Add resource routing logic to fleet-apiserver
  • Loading branch information
karmada-bot authored Dec 9, 2023
2 parents 7df7bf9 + a47b96f commit ecba69a
Show file tree
Hide file tree
Showing 41 changed files with 2,017 additions and 64 deletions.
8 changes: 7 additions & 1 deletion cmd/kube-apiserver/app/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"strings"
"sync"

karmadainformers "github.com/karmada-io/karmada/pkg/generated/informers/externalversions"
"k8s.io/klog/v2"

apiextensionsinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
Expand All @@ -36,6 +37,7 @@ import (
genericfeatures "k8s.io/apiserver/pkg/features"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/apiserver/pkg/server/options"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy"
kubeexternalinformers "k8s.io/client-go/informers"
Expand All @@ -56,6 +58,7 @@ func createAggregatorConfig(
kubeAPIServerConfig genericapiserver.Config,
commandOptions controlplaneapiserver.CompletedOptions,
externalInformers kubeexternalinformers.SharedInformerFactory,
karmadaInformers karmadainformers.SharedInformerFactory,
serviceResolver aggregatorapiserver.ServiceResolver,
proxyTransport *http.Transport,
peerProxy utilpeerproxy.Interface,
Expand Down Expand Up @@ -96,7 +99,10 @@ func createAggregatorConfig(
etcdOptions.StorageConfig.Codec = aggregatorscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion, v1beta1.SchemeGroupVersion)
etcdOptions.StorageConfig.EncodeVersioner = runtime.NewMultiGroupVersioner(v1.SchemeGroupVersion, schema.GroupKind{Group: v1beta1.GroupName})
etcdOptions.SkipHealthEndpoints = true // avoid double wiring of health checks
if err := etcdOptions.ApplyTo(&genericConfig); err != nil {
if etcdOptions.StorageConfig.StorageObjectCountTracker == nil {
etcdOptions.StorageConfig.StorageObjectCountTracker = genericConfig.StorageObjectCountTracker
}
if err := etcdOptions.ApplyWithStorageFactoryTo(&options.SimpleStorageFactory{StorageConfig: etcdOptions.StorageConfig}, &genericConfig, externalInformers, karmadaInformers, genericConfig.LoopbackClientConfig); err != nil {
return nil, err
}

Expand Down
4 changes: 2 additions & 2 deletions cmd/kube-apiserver/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) {
}
c.ControlPlane = controlPlane

apiExtensions, err := apiserver.CreateAPIExtensionsConfig(*controlPlane.GenericConfig, controlPlane.ExtraConfig.VersionedInformers, pluginInitializer, opts.CompletedOptions, opts.MasterCount,
apiExtensions, err := apiserver.CreateAPIExtensionsConfig(*controlPlane.GenericConfig, controlPlane.ExtraConfig.VersionedInformers, controlPlane.ExtraConfig.KarmadaInformers, pluginInitializer, opts.CompletedOptions, opts.MasterCount,
serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(controlPlane.ExtraConfig.ProxyTransport, controlPlane.GenericConfig.EgressSelector, controlPlane.GenericConfig.LoopbackClientConfig, controlPlane.GenericConfig.TracerProvider))
if err != nil {
return nil, err
}
c.ApiExtensions = apiExtensions

aggregator, err := createAggregatorConfig(*controlPlane.GenericConfig, opts.CompletedOptions, controlPlane.ExtraConfig.VersionedInformers, serviceResolver, controlPlane.ExtraConfig.ProxyTransport, controlPlane.ExtraConfig.PeerProxy, pluginInitializer)
aggregator, err := createAggregatorConfig(*controlPlane.GenericConfig, opts.CompletedOptions, controlPlane.ExtraConfig.VersionedInformers, controlPlane.ExtraConfig.KarmadaInformers, serviceResolver, controlPlane.ExtraConfig.ProxyTransport, controlPlane.ExtraConfig.PeerProxy, pluginInitializer)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions cmd/kube-apiserver/app/options/completion.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (opts *ServerRunOptions) Complete() (CompletedOptions, error) {
if err != nil {
return CompletedOptions{}, err
}
// todo: @wm775825 change alt dns?
controlplane, err := opts.Options.Complete([]string{"kubernetes.default.svc", "kubernetes.default", "kubernetes"}, []net.IP{apiServerServiceIP})
if err != nil {
return CompletedOptions{}, err
Expand Down
3 changes: 2 additions & 1 deletion cmd/kube-apiserver/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func CreateKubeAPIServerConfig(opts options.CompletedOptions) (
) {
proxyTransport := CreateProxyTransport()

genericConfig, versionedInformers, storageFactory, err := controlplaneapiserver.BuildGenericConfig(
genericConfig, versionedInformers, karmadaInformers, storageFactory, err := controlplaneapiserver.BuildGenericConfig(
opts.CompletedOptions,
[]*runtime.Scheme{legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme},
generatedopenapi.GetOpenAPIDefinitions,
Expand Down Expand Up @@ -256,6 +256,7 @@ func CreateKubeAPIServerConfig(opts options.CompletedOptions) (
ExtendExpiration: opts.Authentication.ServiceAccounts.ExtendExpiration,

VersionedInformers: versionedInformers,
KarmadaInformers: karmadaInformers,
},
}

Expand Down
11 changes: 9 additions & 2 deletions pkg/controlplane/apiserver/apiextensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,20 @@ import (
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/options"
"k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/webhook"
"k8s.io/client-go/informers"

karmadainformers "github.com/karmada-io/karmada/pkg/generated/informers/externalversions"

controlplaneapiserver "k8s.io/kubernetes/pkg/controlplane/apiserver/options"
)

func CreateAPIExtensionsConfig(
kubeAPIServerConfig server.Config,
kubeInformers informers.SharedInformerFactory,
karmadaInformers karmadainformers.SharedInformerFactory,
pluginInitializers []admission.PluginInitializer,
commandOptions controlplaneapiserver.CompletedOptions,
masterCount int,
Expand All @@ -58,7 +62,10 @@ func CreateAPIExtensionsConfig(
// prefer the more compact serialization (v1beta1) for storage until https://issue.k8s.io/82292 is resolved for objects whose v1 serialization is too big but whose v1beta1 serialization can be stored
etcdOptions.StorageConfig.EncodeVersioner = runtime.NewMultiGroupVersioner(v1beta1.SchemeGroupVersion, schema.GroupKind{Group: v1beta1.GroupName})
etcdOptions.SkipHealthEndpoints = true // avoid double wiring of health checks
if err := etcdOptions.ApplyTo(&genericConfig); err != nil {
if etcdOptions.StorageConfig.StorageObjectCountTracker == nil {
etcdOptions.StorageConfig.StorageObjectCountTracker = genericConfig.StorageObjectCountTracker
}
if err := etcdOptions.ApplyWithStorageFactoryTo(&options.SimpleStorageFactory{StorageConfig: etcdOptions.StorageConfig}, &genericConfig, kubeInformers, karmadaInformers, genericConfig.LoopbackClientConfig); err != nil {
return nil, err
}

Expand All @@ -75,7 +82,7 @@ func CreateAPIExtensionsConfig(
SharedInformerFactory: kubeInformers,
},
ExtraConfig: apiextensionsapiserver.ExtraConfig{
CRDRESTOptionsGetter: apiextensionsoptions.NewCRDRESTOptionsGetter(etcdOptions, genericConfig.ResourceTransformers, genericConfig.StorageObjectCountTracker),
CRDRESTOptionsGetter: apiextensionsoptions.NewCRDRESTOptionsGetter(etcdOptions, genericConfig.ResourceTransformers, genericConfig.StorageObjectCountTracker, kubeInformers, karmadaInformers, genericConfig.LoopbackClientConfig),
MasterCount: masterCount,
AuthResolverWrapper: authResolverWrapper,
ServiceResolver: serviceResolver,
Expand Down
41 changes: 23 additions & 18 deletions pkg/controlplane/apiserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"fmt"
"time"

karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned"
karmadainformers "github.com/karmada-io/karmada/pkg/generated/informers/externalversions"
oteltrace "go.opentelemetry.io/otel/trace"

"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -62,6 +64,7 @@ func BuildGenericConfig(
) (
genericConfig *genericapiserver.Config,
versionedInformers clientgoinformers.SharedInformerFactory,
karmadaInformers karmadainformers.SharedInformerFactory,
storageFactory *serverstorage.DefaultStorageFactory,

lastErr error,
Expand All @@ -76,6 +79,10 @@ func BuildGenericConfig(
if lastErr = s.SecureServing.ApplyTo(&genericConfig.SecureServing, &genericConfig.LoopbackClientConfig); lastErr != nil {
return
}
// Disable compression for self-communication, since we are going to be
// on a fast local network
genericConfig.LoopbackClientConfig.DisableCompression = true

if lastErr = s.Features.ApplyTo(genericConfig); lastErr != nil {
return
}
Expand Down Expand Up @@ -115,32 +122,30 @@ func BuildGenericConfig(
s.Etcd.StorageConfig.Transport.TracerProvider = oteltrace.NewNoopTracerProvider()
}

kubeClientConfig := genericConfig.LoopbackClientConfig
clientgoExternalClient, err := clientgoclientset.NewForConfig(kubeClientConfig)
if err != nil {
lastErr = fmt.Errorf("failed to create real external clientset: %v", err)
return
}
versionedInformers = clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute)

karmadaClientConfig := genericConfig.LoopbackClientConfig
karmadaClientset, lastErr := karmadaclientset.NewForConfig(karmadaClientConfig)
if lastErr != nil {
return
}
karmadaInformers = karmadainformers.NewSharedInformerFactory(karmadaClientset, 10*time.Minute)

storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
storageFactory, lastErr = storageFactoryConfig.Complete(s.Etcd).New()
if lastErr != nil {
return
}
if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
return
}

// Use protobufs for self-communication.
// Since not every generic apiserver has to support protobufs, we
// cannot default to it in generic apiserver and need to explicitly
// set it in kube-apiserver.
genericConfig.LoopbackClientConfig.ContentConfig.ContentType = "application/vnd.kubernetes.protobuf"
// Disable compression for self-communication, since we are going to be
// on a fast local network
genericConfig.LoopbackClientConfig.DisableCompression = true

kubeClientConfig := genericConfig.LoopbackClientConfig
clientgoExternalClient, err := clientgoclientset.NewForConfig(kubeClientConfig)
if err != nil {
lastErr = fmt.Errorf("failed to create real external clientset: %v", err)
if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig, versionedInformers, karmadaInformers, genericConfig.LoopbackClientConfig); lastErr != nil {
return
}
versionedInformers = clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute)

// Authentication.ApplyTo requires already applied OpenAPIConfig and EgressSelector if present
if lastErr = s.Authentication.ApplyTo(&genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, genericConfig.OpenAPIV3Config, clientgoExternalClient, versionedInformers); lastErr != nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/controlplane/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"strconv"
"time"

karmada "github.com/karmada-io/karmada/pkg/generated/informers/externalversions"

admissionregistrationv1 "k8s.io/api/admissionregistration/v1"
admissionregistrationv1alpha1 "k8s.io/api/admissionregistration/v1alpha1"
admissionregistrationv1beta1 "k8s.io/api/admissionregistration/v1beta1"
Expand Down Expand Up @@ -228,6 +230,7 @@ type ExtraConfig struct {
ServiceAccountPublicKeys []interface{}

VersionedInformers informers.SharedInformerFactory
KarmadaInformers karmada.SharedInformerFactory

// RepairServicesInterval interval used by the repair loops for
// the Services NodePort and ClusterIP resources
Expand Down Expand Up @@ -313,7 +316,7 @@ func (c *Config) createEndpointReconciler() reconcilers.EndpointReconciler {
// Complete fills in any fields not set that are required to have valid data. It's mutating the receiver.
func (c *Config) Complete() CompletedConfig {
cfg := completedConfig{
c.GenericConfig.Complete(c.ExtraConfig.VersionedInformers),
c.GenericConfig.Complete(c.ExtraConfig.VersionedInformers, c.ExtraConfig.KarmadaInformers),
&c.ExtraConfig,
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controlplane/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func setUp(t *testing.T) (*etcd3testing.EtcdTestServer, Config, *assert.Assertio
etcdOptions := options.NewEtcdOptions(storageConfig)
// unit tests don't need watch cache and it leaks lots of goroutines with etcd testing functions during unit tests
etcdOptions.EnableWatchCache = false
err := etcdOptions.ApplyWithStorageFactoryTo(storageFactory, config.GenericConfig)
err := etcdOptions.ApplyWithStorageFactoryTo(storageFactory, config.GenericConfig, nil, nil, nil)
if err != nil {
t.Fatal(err)
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/registry/apps/deployment/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,11 @@ func (i *scaleUpdatedObjectInfo) UpdatedObject(ctx context.Context, oldObj runti
if !ok {
return nil, errors.NewBadRequest(fmt.Sprintf("expected existing object type to be Deployment, got %T", deployment))
}

if oldObj == nil {
return deployment, nil
}

// if zero-value, the existing object does not exist
if len(deployment.ResourceVersion) == 0 {
return nil, errors.NewNotFound(apps.Resource("deployments/scale"), i.name)
Expand Down
7 changes: 6 additions & 1 deletion pkg/registry/core/namespace/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,13 @@ func (r *REST) Watch(ctx context.Context, options *metainternalversion.ListOptio
return r.store.Watch(ctx, options)
}

// Delete enforces life-cycle rules for namespace termination
// Delete forwards the DELETE request to store
func (r *REST) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
return r.store.Delete(ctx, name, deleteValidation, options)
}

// delete enforces life-cycle rules for namespace termination
func (r *REST) delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
nsObj, err := r.Get(ctx, name, &metav1.GetOptions{})
if err != nil {
return nil, false, err
Expand Down
10 changes: 1 addition & 9 deletions pkg/registry/flowcontrol/rest/storage_flowcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,7 @@ func (p RESTStorageProvider) GroupName() string {

// PostStartHook returns the hook func that launches the config provider
func (p RESTStorageProvider) PostStartHook() (string, genericapiserver.PostStartHookFunc, error) {
bce := &bootstrapConfigurationEnsurer{
informersSynced: []cache.InformerSynced{
p.InformerFactory.Flowcontrol().V1beta3().PriorityLevelConfigurations().Informer().HasSynced,
p.InformerFactory.Flowcontrol().V1beta3().FlowSchemas().Informer().HasSynced,
},
fsLister: p.InformerFactory.Flowcontrol().V1beta3().FlowSchemas().Lister(),
plcLister: p.InformerFactory.Flowcontrol().V1beta3().PriorityLevelConfigurations().Lister(),
}
return PostStartHookName, bce.ensureAPFBootstrapConfiguration, nil
return PostStartHookName, func(context genericapiserver.PostStartHookContext) error { return nil }, nil
}

type bootstrapConfigurationEnsurer struct {
Expand Down
75 changes: 75 additions & 0 deletions pkg/registry/networking/ingress/storage/decorator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package storage

import (
"k8s.io/apimachinery/pkg/runtime"
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/apis/networking"
)

func fleetDecorator(obj runtime.Object) {
switch obj.(type) {
case *networking.Ingress:
ing := obj.(*networking.Ingress)
setClusterNameSuffix(ing)
case *networking.IngressList:
items := obj.(*networking.IngressList).Items
for i := range items {
setClusterNameSuffix(&items[i])
}
default:
}
}

func setClusterNameSuffix(ing *networking.Ingress) {
if ing == nil || ing.Name == "" {
return
}

_, clusterName := genericregistry.ParseNameFromResourceName(ing.Name, false)
if clusterName == genericregistry.KarmadaCluster {
return
}

setSuffixForIngressClassName(ing.Spec.IngressClassName, clusterName)
setSuffixForBackend(ing.Spec.DefaultBackend, clusterName)
setSuffixForRules(ing.Spec.Rules, clusterName)
setSuffixForTLSConfigurations(ing.Spec.TLS, clusterName)
}

func setSuffixForIngressClassName(ingClass *string, clusterName string) {
if ingClass == nil {
return
}
*ingClass += ".clusterspace." + clusterName
}

func setSuffixForRules(rules []networking.IngressRule, clusterName string) {
for i := range rules {
rule := &rules[i]
if rule.HTTP == nil {
continue
}
paths := rule.HTTP.Paths
for j := range paths {
setSuffixForBackend(&paths[j].Backend, clusterName)
}
}
}

func setSuffixForBackend(backend *networking.IngressBackend, clusterName string) {
if backend == nil {
return
}
if svc := backend.Service; svc != nil {
svc.Name += ".clusterspace." + clusterName
}
if resource := backend.Resource; resource != nil {
resource.Name += ".clusterspace." + clusterName
}
}

func setSuffixForTLSConfigurations(tlsConfigs []networking.IngressTLS, clusterName string) {
for i := range tlsConfigs {
tlsConfigs[i].SecretName += ".clusterspace." + clusterName
}
}
2 changes: 2 additions & 0 deletions pkg/registry/networking/ingress/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ func NewREST(optsGetter generic.RESTOptionsGetter) (*REST, *StatusREST, error) {
ResetFieldsStrategy: ingress.Strategy,

TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},

FleetDecorator: fleetDecorator,
}
options := &generic.StoreOptions{RESTOptions: optsGetter}
if err := store.CompleteWithOptions(options); err != nil {
Expand Down
Loading

0 comments on commit ecba69a

Please sign in to comment.