Skip to content

Commit

Permalink
Refactor EnsureService because we have more deployment types now;
Browse files Browse the repository at this point in the history
  • Loading branch information
ycliuhw committed Feb 7, 2020
1 parent 6b3d67b commit 1b5e48b
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 48 deletions.
14 changes: 14 additions & 0 deletions caas/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,20 @@ type StatusCallbackFunc func(appName string, settableStatus status.Status, info
// DeploymentType defines a deployment type.
type DeploymentType string

// Validate validates if this deployment type is supported.
func (dt DeploymentType) Validate() error {
if dt == "" {
// TODO(caas): Change DeploymentType to be required.
return nil
}
if dt == DeploymentStateless ||
dt == DeploymentStateful ||
dt == DeploymentDaemon {
return nil
}
return errors.NotSupportedf("deployment type %q", dt)
}

const (
DeploymentStateless DeploymentType = "stateless"
DeploymentStateful DeploymentType = "stateful"
Expand Down
2 changes: 1 addition & 1 deletion caas/kubernetes/provider/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ func (c *controllerStack) createControllerStatefulset() error {
}
defer w.Kill()

if err = c.broker.createStatefulSet(spec); err != nil {
if _, err = c.broker.createStatefulSet(spec); err != nil {
return errors.Trace(err)
}

Expand Down
135 changes: 88 additions & 47 deletions caas/kubernetes/provider/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,10 @@ func (k *kubernetesClient) EnsureService(
}
}()

if err := params.Deployment.DeploymentType.Validate(); err != nil {
return errors.Trace(err)
}

logger.Debugf("creating/updating application %s", appName)
deploymentName := k.deploymentName(appName)

Expand Down Expand Up @@ -1051,36 +1055,23 @@ func (k *kubernetesClient) EnsureService(

// Add a deployment controller or stateful set configured to create the specified number of units/pods.
// Defensively check to see if a stateful set is already used.
var useStatefulSet bool
if params.Deployment.DeploymentType != "" {
useStatefulSet = params.Deployment.DeploymentType == caas.DeploymentStateful
} else {
useStatefulSet = len(params.Filesystems) > 0
}
statefulsets := k.client().AppsV1().StatefulSets(k.namespace)
existingStatefulSet, err := statefulsets.Get(deploymentName, v1.GetOptions{IncludeUninitialized: true})
if err != nil && !k8serrors.IsNotFound(err) {
return errors.Trace(err)
}
if !useStatefulSet {
useStatefulSet = err == nil
if useStatefulSet {
logger.Debugf("no updated filesystems but already using stateful set for %v", appName)
if params.Deployment.DeploymentType == "" {
// TODO(caas): we should really change `params.Deployment` to be required.
params.Deployment.DeploymentType = caas.DeploymentStateless
if len(params.Filesystems) > 0 {
params.Deployment.DeploymentType = caas.DeploymentStateful
}
}
var randPrefix string
if useStatefulSet {
// Include a random snippet in the pvc name so that if the same app
// is deleted and redeployed again, the pvc retains a unique name.
// Only generate it once, and record it on the stateful set.
if existingStatefulSet != nil {
randPrefix = existingStatefulSet.Annotations[labelApplicationUUID]
}
if randPrefix == "" {
randPrefix, err = k.randomPrefix()
if err != nil {
return errors.Trace(err)
}

if params.Deployment.DeploymentType != caas.DeploymentStateful {
// TODO(caas): remove this check once `params.Deployment` is changed to be required.
existingStatefulSet, err := k.getStatefulSet(deploymentName)
if err != nil && !k8serrors.IsNotFound(err) {
return errors.Trace(err)
}
if err == nil {
params.Deployment.DeploymentType = caas.DeploymentStateful
logger.Debugf("no updated filesystems but already using stateful set for %q", appName)
}
}

Expand Down Expand Up @@ -1118,20 +1109,26 @@ func (k *kubernetesClient) EnsureService(
}

numPods := int32(numUnits)
if useStatefulSet {
switch params.Deployment.DeploymentType {
case caas.DeploymentStateful:
if err := k.configureHeadlessService(appName, deploymentName, annotations.Copy()); err != nil {
return errors.Annotate(err, "creating or updating headless service")
}
cleanups = append(cleanups, func() { k.deleteService(headlessServiceName(deploymentName)) })
if err := k.configureStatefulSet(appName, deploymentName, randPrefix, annotations.Copy(), workloadSpec, params.PodSpec.Containers, &numPods, params.Filesystems); err != nil {
if err := k.configureStatefulSet(appName, deploymentName, annotations.Copy(), workloadSpec, params.PodSpec.Containers, &numPods, params.Filesystems); err != nil {
return errors.Annotate(err, "creating or updating StatefulSet")
}
cleanups = append(cleanups, func() { k.deleteDeployment(appName) })
} else {
case caas.DeploymentStateless:
if err := k.configureDeployment(appName, deploymentName, annotations.Copy(), workloadSpec, params.PodSpec.Containers, &numPods); err != nil {
return errors.Annotate(err, "creating or updating DeploymentController")
}
cleanups = append(cleanups, func() { k.deleteDeployment(appName) })
case caas.DeploymentDaemon:
// TODO
default:
// This should never happend because we have validated both in this method and in `charm.v6`.
return errors.NotSupportedf("deployment type %q not supported", params.Deployment.DeploymentType)
}
return nil
}
Expand Down Expand Up @@ -1530,7 +1527,7 @@ func getPodManagementPolicy(svc *specs.ServiceSpec) (out apps.PodManagementPolic
}

func (k *kubernetesClient) configureStatefulSet(
appName, deploymentName, randPrefix string, annotations k8sannotations.Annotation, workloadSpec *workloadSpec,
appName, deploymentName string, annotations k8sannotations.Annotation, workloadSpec *workloadSpec,
containers []specs.ContainerSpec, replicas *int32, filesystems []storage.KubernetesFilesystemParams,
) error {
logger.Debugf("creating/updating stateful set for %s", appName)
Expand All @@ -1540,6 +1537,26 @@ func (k *kubernetesClient) configureStatefulSet(
return applicationConfigMapName(deploymentName, fileSetName)
}

existingStatefulSet, err := k.getStatefulSet(deploymentName)
if err != nil && !errors.IsNotFound(err) {
return errors.Trace(err)
}
existing := err == nil

var randPrefix string
// Include a random snippet in the pvc name so that if the same app
// is deleted and redeployed again, the pvc retains a unique name.
// Only generate it once, and record it on the stateful set.
if existing {
randPrefix = existingStatefulSet.Annotations[labelApplicationUUID]
}
if randPrefix == "" {
randPrefix, err = k.randomPrefix()
if err != nil {
return errors.Trace(err)
}
}

statefulset := &apps.StatefulSet{
ObjectMeta: v1.ObjectMeta{
Name: deploymentName,
Expand Down Expand Up @@ -1574,24 +1591,24 @@ func (k *kubernetesClient) configureStatefulSet(
return errors.Annotatef(err, "configuring storage for %s", appName)
}
statefulset.Spec.Template.Spec = podSpec
return k.ensureStatefulSet(statefulset, existingPodSpec)
return k.ensureStatefulSet(statefulset, existing, existingPodSpec)
}

func (k *kubernetesClient) ensureStatefulSet(spec *apps.StatefulSet, existingPodSpec core.PodSpec) error {
api := k.client().AppsV1().StatefulSets(k.namespace)

_, err := api.Update(spec)
if err != nil {
if k8serrors.IsNotFound(err) {
_, err = api.Create(spec)
func (k *kubernetesClient) ensureStatefulSet(spec *apps.StatefulSet, existAlready bool, existingPodSpec core.PodSpec) error {
if !existAlready {
_, err := k.createStatefulSet(spec)
if errors.IsNotValid(err) {
return errors.NewNotValid(err, fmt.Sprintf("ensuring stateful set %q", spec.GetName()))
}
if err != nil && !errors.IsAlreadyExists(err) {
return errors.Trace(err)
}
return errors.Trace(err)
}

// The statefulset already exists so all we are allowed to update is replicas,
// template, update strategy. Juju may hand out info with a slightly different
// requested volume size due to trying to adapt the unit model to the k8s world.
existing, err := api.Get(spec.GetName(), v1.GetOptions{IncludeUninitialized: true})
existing, err := k.getStatefulSet(spec.GetName())
if err != nil {
return errors.Trace(err)
}
Expand All @@ -1601,14 +1618,38 @@ func (k *kubernetesClient) ensureStatefulSet(spec *apps.StatefulSet, existingPod
existing.Spec.Template.Spec.ServiceAccountName = existingPodSpec.ServiceAccountName
existing.Spec.Template.Spec.AutomountServiceAccountToken = existingPodSpec.AutomountServiceAccountToken
// NB: we can't update the Spec.ServiceName as it is immutable.
_, err = api.Update(existing)
_, err = k.updateStatefulSet(existing)
return errors.Trace(err)
}

// createStatefulSet deletes a statefulset resource.
func (k *kubernetesClient) createStatefulSet(spec *apps.StatefulSet) error {
_, err := k.client().AppsV1().StatefulSets(k.namespace).Create(spec)
return errors.Trace(err)
func (k *kubernetesClient) createStatefulSet(spec *apps.StatefulSet) (*apps.StatefulSet, error) {
out, err := k.client().AppsV1().StatefulSets(k.namespace).Create(spec)
if k8serrors.IsAlreadyExists(err) {
return nil, errors.AlreadyExistsf("stateful set %q", spec.GetName())
}
if k8serrors.IsInvalid(err) {
return nil, errors.NotValidf("stateful set %q", spec.GetName())
}
return out, errors.Trace(err)
}

func (k *kubernetesClient) updateStatefulSet(spec *apps.StatefulSet) (*apps.StatefulSet, error) {
out, err := k.client().AppsV1().StatefulSets(k.namespace).Update(spec)
if k8serrors.IsNotFound(err) {
return nil, errors.NotFoundf("stateful set %q", spec.GetName())
}
if k8serrors.IsInvalid(err) {
return nil, errors.NotValidf("stateful set %q", spec.GetName())
}
return out, errors.Trace(err)
}

func (k *kubernetesClient) getStatefulSet(name string) (*apps.StatefulSet, error) {
out, err := k.client().AppsV1().StatefulSets(k.namespace).Get(name, v1.GetOptions{IncludeUninitialized: true})
if k8serrors.IsNotFound(err) {
return nil, errors.NotFoundf("stateful set %q", name)
}
return out, errors.Trace(err)
}

// deleteStatefulSet deletes a statefulset resource.
Expand Down

0 comments on commit 1b5e48b

Please sign in to comment.