@@ -3,7 +3,6 @@ package transport
33import (
44 "context"
55 "fmt"
6- "log"
76 "net"
87 "sort"
98 "strconv"
@@ -160,7 +159,7 @@ func NewCluster(ctx context.Context, cfg ConnConfig, p HostSelectionPolicy, e []
160159}
161160
162161func (c * Cluster ) NewControl (ctx context.Context ) (* Conn , error ) {
163- log . Printf ("cluster: open control connection" )
162+ c . cfg . Logger . Info ("cluster: open control connection" )
164163 var errs []string
165164 for addr := range c .knownHosts {
166165 conn , err := OpenConn (ctx , addr , nil , c .cfg )
@@ -184,7 +183,7 @@ func (c *Cluster) NewControl(ctx context.Context) (*Conn, error) {
184183// refreshTopology creates new topology filled with the result of keyspaceQuery, localQuery and peerQuery.
185184// Old topology is replaced with the new one atomically to prevent dirty reads.
186185func (c * Cluster ) refreshTopology (ctx context.Context ) error {
187- log . Printf ("cluster: refresh topology" )
186+ c . cfg . Logger . Infoln ("cluster: refresh topology" )
188187 rows , err := c .getAllNodesInfo (ctx )
189188 if err != nil {
190189 return fmt .Errorf ("query info about nodes in cluster: %w" , err )
@@ -242,9 +241,9 @@ func (c *Cluster) refreshTopology(ctx context.Context) error {
242241 }
243242
244243 if ks , ok := t .keyspaces [c .cfg .Keyspace ]; ok {
245- t .policyInfo .Preprocess (t , ks )
244+ t .policyInfo .Preprocess (t , ks , c . cfg . Logger )
246245 } else {
247- t .policyInfo .Preprocess (t , keyspace {})
246+ t .policyInfo .Preprocess (t , keyspace {}, c . cfg . Logger )
248247 }
249248
250249 c .setTopology (t )
@@ -455,7 +454,7 @@ func (c *Cluster) setTopology(t *topology) {
455454// of registering handlers for them.
456455func (c * Cluster ) handleEvent (r response ) {
457456 if r .Err != nil {
458- log . Printf ("cluster: received event with error: %v" , r .Err )
457+ c . cfg . Logger . Infoln ("cluster: received event with error: %v" , r .Err )
459458 c .RequestReopenControl ()
460459 return
461460 }
@@ -467,17 +466,17 @@ func (c *Cluster) handleEvent(r response) {
467466 case * SchemaChange :
468467 // TODO: add schema change.
469468 default :
470- log . Printf ("cluster: unsupported event type: %v" , r .Response )
469+ c . cfg . Logger . Warnf ("cluster: unsupported event type: %v" , r .Response )
471470 }
472471}
473472
474473func (c * Cluster ) handleTopologyChange (v * TopologyChange ) {
475- log . Printf ("cluster: handle topology change: %+#v" , v )
474+ c . cfg . Logger . Infof ("cluster: handle topology change: %+#v" , v )
476475 c .RequestRefresh ()
477476}
478477
479478func (c * Cluster ) handleStatusChange (v * StatusChange ) {
480- log . Printf ("cluster: handle status change: %+#v" , v )
479+ c . cfg . Logger . Infof ("cluster: handle status change: %+#v" , v )
481480 m := c .Topology ().peers
482481 addr := v .Address .String ()
483482 if n , ok := m [addr ]; ok {
@@ -487,10 +486,10 @@ func (c *Cluster) handleStatusChange(v *StatusChange) {
487486 case frame .Down :
488487 n .setStatus (statusDown )
489488 default :
490- log . Printf ("cluster: status change not supported: %+#v" , v )
489+ c . cfg . Logger . Warnf ("cluster: status change not supported: %+#v" , v )
491490 }
492491 } else {
493- log . Printf ("cluster: unknown node %s received status change: %+#v in topology %v" , addr , v , m )
492+ c . cfg . Logger . Infof ("cluster: unknown node %s received status change: %+#v in topology %v, requesting topology refresh " , addr , v , m )
494493 c .RequestRefresh ()
495494 }
496495}
@@ -508,7 +507,7 @@ func (c *Cluster) loop(ctx context.Context) {
508507 case <- c .reopenControlChan :
509508 c .tryReopenControl (ctx )
510509 case <- ctx .Done ():
511- log . Printf ("cluster closing due to: %v" , ctx .Err ())
510+ c . cfg . Logger . Infof ("cluster closing due to: %v" , ctx .Err ())
512511 c .handleClose ()
513512 return
514513 case <- c .closeChan :
@@ -528,17 +527,17 @@ func (c *Cluster) tryRefresh(ctx context.Context) {
528527 if err := c .refreshTopology (ctx ); err != nil {
529528 c .RequestReopenControl ()
530529 time .AfterFunc (tryRefreshInterval , c .RequestRefresh )
531- log . Printf ("cluster: refresh topology: %v" , err )
530+ c . cfg . Logger . Infof ("cluster: refresh topology: %v" , err )
532531 }
533532}
534533
535534const tryReopenControlInterval = time .Second
536535
537536func (c * Cluster ) tryReopenControl (ctx context.Context ) {
538- log . Printf ("cluster: reopen control connection" )
537+ c . cfg . Logger . Infoln ("cluster: reopen control connection" )
539538 if control , err := c .NewControl (ctx ); err != nil {
540539 time .AfterFunc (tryReopenControlInterval , c .RequestReopenControl )
541- log . Printf ("cluster: failed to reopen control connection: %v" , err )
540+ c . cfg . Logger . Infof ("cluster: failed to reopen control connection: %v" , err )
542541 } else {
543542 c .control .Close ()
544543 c .control = control
@@ -547,7 +546,7 @@ func (c *Cluster) tryReopenControl(ctx context.Context) {
547546}
548547
549548func (c * Cluster ) handleClose () {
550- log . Printf ("cluster: handle cluster close" )
549+ c . cfg . Logger . Infoln ("cluster: handle cluster close" )
551550 c .control .Close ()
552551 m := c .Topology ().peers
553552 for _ , v := range m {
@@ -558,23 +557,23 @@ func (c *Cluster) handleClose() {
558557}
559558
560559func (c * Cluster ) RequestRefresh () {
561- log . Printf ("cluster: requested to refresh cluster topology" )
560+ c . cfg . Logger . Infoln ("cluster: requested to refresh cluster topology" )
562561 select {
563562 case c .refreshChan <- struct {}{}:
564563 default :
565564 }
566565}
567566
568567func (c * Cluster ) RequestReopenControl () {
569- log . Printf ("cluster: requested to reopen control connection" )
568+ c . cfg . Logger . Infoln ("cluster: requested to reopen control connection" )
570569 select {
571570 case c .reopenControlChan <- struct {}{}:
572571 default :
573572 }
574573}
575574
576575func (c * Cluster ) Close () {
577- log . Printf ("cluster: requested to close cluster" )
576+ c . cfg . Logger . Infoln ("cluster: requested to close cluster" )
578577 select {
579578 case c .closeChan <- struct {}{}:
580579 default :
0 commit comments