-
Notifications
You must be signed in to change notification settings - Fork 25
feat: call cluster service to create identity cluster relation while creating user #745
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,7 +3,6 @@ package cluster | |
import ( | ||
"context" | ||
"net/http" | ||
"net/url" | ||
"sync" | ||
"time" | ||
|
||
|
@@ -13,7 +12,6 @@ import ( | |
"github.com/fabric8-services/fabric8-auth/rest" | ||
"github.com/fabric8-services/fabric8-cluster-client/cluster" | ||
|
||
goaclient "github.com/goadesign/goa/client" | ||
"github.com/pkg/errors" | ||
) | ||
|
||
|
@@ -100,7 +98,8 @@ func (c *cache) refreshCache(ctx context.Context) error { | |
|
||
// fetchClusters fetches a new list of clusters from Cluster Management Service | ||
func (c *cache) fetchClusters(ctx context.Context) (map[string]Cluster, error) { | ||
cln, err := c.createClientWithServiceAccountSigner(ctx) | ||
signer := NewJWTSASigner(ctx, c.config, c.options...) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of exporting that NewJWTSASigner() function let's create a new method in the Cluster Service - There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But I see that it would create cycle dependency between cache (cluster) and service (cluster/service)... We should get rid of this cache anyway btw. We should just load cluster by URL from cluster service every time and do not rely on the full list of clusters at all. But it's worth creating a separate issue and PR for that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. but how do we get a list of all cluster URL to fetch it once? Or are you saying when there is a request for any cluster explicitly from any client, then we should get it from cluster service each time? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. External clients (such as tenant service, oso-proxy, jenkins-idler, etc) are proxied to cluster service. The cache is not used. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But as I said let's just create an issue for that and do it in a separate PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Created #750 |
||
cln, err := signer.CreateSignedClient() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
@@ -148,38 +147,3 @@ func (c *cache) fetchClusters(ctx context.Context) (map[string]Cluster, error) { | |
} | ||
return clusterMap, nil | ||
} | ||
|
||
// createClientWithSASigner creates a client with a JWT signer which uses the Auth Service Account token | ||
func (c *cache) createClientWithServiceAccountSigner(ctx context.Context) (*cluster.Client, error) { | ||
cln, err := c.createClient(ctx) | ||
if err != nil { | ||
return nil, err | ||
} | ||
m, err := manager.DefaultManager(c.config) | ||
if err != nil { | ||
return nil, err | ||
} | ||
signer := m.AuthServiceAccountSigner() | ||
cln.SetJWTSigner(signer) | ||
return cln, nil | ||
} | ||
|
||
func (c *cache) createClient(ctx context.Context) (*cluster.Client, error) { | ||
u, err := url.Parse(c.config.GetClusterServiceURL()) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
httpClient := http.DefaultClient | ||
|
||
if c.options != nil { | ||
for _, opt := range c.options { | ||
opt(httpClient) | ||
} | ||
} | ||
cln := cluster.New(goaclient.HTTPClientDoer(httpClient)) | ||
|
||
cln.Host = u.Host | ||
cln.Scheme = u.Scheme | ||
return cln, nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,15 @@ | ||
package cluster | ||
|
||
import ( | ||
"context" | ||
"github.com/fabric8-services/fabric8-auth/authorization/token/manager" | ||
"github.com/fabric8-services/fabric8-auth/rest" | ||
"github.com/fabric8-services/fabric8-cluster-client/cluster" | ||
goaclient "github.com/goadesign/goa/client" | ||
"net/http" | ||
"net/url" | ||
) | ||
|
||
// Cluster represents an OpenShift cluster configuration | ||
type Cluster struct { | ||
Name string `mapstructure:"name"` | ||
|
@@ -16,3 +26,52 @@ type Cluster struct { | |
AuthClientDefaultScope string `mapstructure:"auth-client-default-scope"` | ||
CapacityExhausted bool `mapstructure:"capacity-exhausted"` // Optional in oso-clusters.conf ('false' by default) | ||
} | ||
|
||
type SASigner interface { | ||
CreateSignedClient() (*cluster.Client, error) | ||
} | ||
|
||
type JWTSASigner struct { | ||
ctx context.Context | ||
config clusterConfig | ||
options []rest.HTTPClientOption | ||
} | ||
|
||
func NewJWTSASigner(ctx context.Context, config clusterConfig, options ...rest.HTTPClientOption) SASigner { | ||
return &JWTSASigner{ctx, config, options} | ||
} | ||
|
||
// CreateSignedClient creates a client with a JWT signer which uses the Auth Service Account token | ||
func (c JWTSASigner) CreateSignedClient() (*cluster.Client, error) { | ||
cln, err := c.createClient(c.ctx) | ||
if err != nil { | ||
return nil, err | ||
} | ||
m, err := manager.DefaultManager(c.config) | ||
if err != nil { | ||
return nil, err | ||
} | ||
signer := m.AuthServiceAccountSigner() | ||
cln.SetJWTSigner(signer) | ||
return cln, nil | ||
} | ||
|
||
func (c JWTSASigner) createClient(ctx context.Context) (*cluster.Client, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As I mentioned above all these changes in cluster/cluster.go should be implementation details of the service package. |
||
u, err := url.Parse(c.config.GetClusterServiceURL()) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
httpClient := http.DefaultClient | ||
|
||
if c.options != nil { | ||
for _, opt := range c.options { | ||
opt(httpClient) | ||
} | ||
} | ||
cln := cluster.New(goaclient.HTTPClientDoer(httpClient)) | ||
|
||
cln.Host = u.Host | ||
cln.Scheme = u.Scheme | ||
return cln, nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,7 +13,13 @@ import ( | |
"github.com/fabric8-services/fabric8-auth/application/service/base" | ||
servicecontext "github.com/fabric8-services/fabric8-auth/application/service/context" | ||
"github.com/fabric8-services/fabric8-auth/cluster" | ||
"github.com/fabric8-services/fabric8-auth/goasupport" | ||
"github.com/fabric8-services/fabric8-auth/log" | ||
"github.com/fabric8-services/fabric8-auth/rest" | ||
clusterclient "github.com/fabric8-services/fabric8-cluster-client/cluster" | ||
"github.com/pkg/errors" | ||
"github.com/satori/go.uuid" | ||
"net/http" | ||
) | ||
|
||
type clusterServiceConfig interface { | ||
|
@@ -77,6 +83,62 @@ func (s *clusterService) Stop() { | |
} | ||
} | ||
|
||
func (s *clusterService) AddIdentityToClusterLink(ctx context.Context, identityID uuid.UUID, clusterURL string, options ...rest.HTTPClientOption) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add docs please explaining what that method does. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Addressed in ebaf3d2 |
||
signer := cluster.NewJWTSASigner(ctx, s.config, options...) | ||
remoteClusterService, err := signer.CreateSignedClient() | ||
if err != nil { | ||
return err | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe wrap this error? |
||
} | ||
identityToClusterData := &clusterclient.LinkIdentityToClusterData{ | ||
ClusterURL: clusterURL, | ||
IdentityID: identityID.String(), | ||
} | ||
res, err := remoteClusterService.LinkIdentityToClusterClusters(goasupport.ForwardContextRequestID(ctx), clusterclient.LinkIdentityToClusterClustersPath(), identityToClusterData) | ||
if err != nil { | ||
return err | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here, wrap the error? |
||
} | ||
defer rest.CloseResponse(res) | ||
bodyString := rest.ReadBody(res.Body) // To prevent FDs leaks | ||
if res.StatusCode != http.StatusNoContent { | ||
log.Error(ctx, map[string]interface{}{ | ||
"identity_id": identityID, | ||
"cluster_url": clusterURL, | ||
"response_status": res.Status, | ||
"response_body": bodyString, | ||
}, "unable to link identity to cluster in cluster management service") | ||
return errors.Errorf("failed to link identity to cluster in cluster management service. Response status: %s. Response body: %s", res.Status, bodyString) | ||
} | ||
return nil | ||
} | ||
|
||
func (s *clusterService) RemoveIdentityToClusterLink(ctx context.Context, identityID uuid.UUID, clusterURL string, options ...rest.HTTPClientOption) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add docs. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Addressed in ebaf3d2 |
||
signer := cluster.NewJWTSASigner(ctx, s.config, options...) | ||
remoteClusterService, err := signer.CreateSignedClient() | ||
if err != nil { | ||
return err | ||
} | ||
identityToClusterData := &clusterclient.UnLinkIdentityToClusterdata{ | ||
ClusterURL: clusterURL, | ||
IdentityID: identityID.String(), | ||
} | ||
res, err := remoteClusterService.RemoveIdentityToClusterLinkClusters(goasupport.ForwardContextRequestID(ctx), clusterclient.RemoveIdentityToClusterLinkClustersPath(), identityToClusterData) | ||
if err != nil { | ||
return err | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here |
||
} | ||
defer rest.CloseResponse(res) | ||
bodyString := rest.ReadBody(res.Body) // To prevent FDs leaks | ||
if res.StatusCode != http.StatusNoContent { | ||
log.Error(ctx, map[string]interface{}{ | ||
"identity_id": identityID, | ||
"cluster_url": clusterURL, | ||
"response_status": res.Status, | ||
"response_body": bodyString, | ||
}, "unable to remove identity cluster relationship in cluster management service") | ||
return errors.Errorf("failed to unlink identity to cluster in cluster management service. Response status: %s. Response body: %s", res.Status, bodyString) | ||
} | ||
return nil | ||
} | ||
|
||
// Start initializes the default Cluster cache if it's not initialized already | ||
// Cache initialization loads the list of clusters from the cluster management service and starts regular cache refresher | ||
func Start(ctx context.Context, factory service.ClusterCacheFactory, options ...rest.HTTPClientOption) (bool, error) { | ||
|
@@ -93,10 +155,10 @@ func Start(ctx context.Context, factory service.ClusterCacheFactory, options ... | |
} else { | ||
clusterCache = nil | ||
} | ||
return (clusterCache != nil && started == uint32(1)), err | ||
return clusterCache != nil && started == uint32(1), err | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if this code tries to start the cache only once, then maybe https://golang.org/pkg/sync/#Once.Do would be more appropriate ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code can start the cache multiple times. It should initialize the cache successfully only once. But if it failed then it should keep trying. This is why we can't use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
} | ||
} | ||
return (clusterCache != nil && started == uint32(1)), nil | ||
return clusterCache != nil && started == uint32(1), nil | ||
} | ||
|
||
// Clusters converts the given cluster map to an array slice | ||
|
@@ -109,9 +171,9 @@ func Clusters(clusters map[string]cluster.Cluster) []cluster.Cluster { | |
} | ||
|
||
func ClusterByURL(clusters map[string]cluster.Cluster, url string) *cluster.Cluster { | ||
for apiURL, cluster := range clusters { | ||
for apiURL, c := range clusters { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good work in avoiding var names with package names 👍 |
||
if strings.HasPrefix(rest.AddTrailingSlashToURL(url), apiURL) { | ||
return &cluster | ||
return &c | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe
?
@sbryzak WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alexeykazakov yes, I like your naming suggestions better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed in ebaf3d2