|
1 | 1 | package gocql
|
2 | 2 |
|
3 | 3 | import (
|
4 |
| - "fmt" |
5 |
| - "log" |
| 4 | + "net" |
6 | 5 | "testing"
|
7 |
| - "time" |
8 | 6 | )
|
9 | 7 |
|
10 |
| -func TestDisableHostLookup(t *testing.T) { |
| 8 | +func TestDisableHostLookupRingRefresh(t *testing.T) { |
11 | 9 | cluster := createCluster()
|
12 |
| - cluster.DisableHostLookup = true |
| 10 | + cluster.DisableInitialHostLookup = true |
13 | 11 |
|
14 | 12 | cluster.NumConns = 1
|
15 | 13 | session := createSessionFromCluster(cluster, t)
|
16 | 14 | defer session.Close()
|
17 | 15 |
|
18 |
| - if err := createTable(session, "CREATE TABLE IF NOT EXISTS gocql_test.test_table (id int primary key)"); err != nil { |
19 |
| - t.Fatal(err) |
20 |
| - } |
21 |
| - if err := session.Query("insert into gocql_test.test_table (id) values (?)", 123).Exec(); err != nil { |
22 |
| - t.Error(err) |
23 |
| - return |
24 |
| - } |
25 |
| - |
26 |
| - go simulateHeartbeatFailure(session) |
27 |
| - |
28 |
| - queryCycles := 20 |
29 |
| - for ; queryCycles > 0; queryCycles-- { |
30 |
| - time.Sleep(2 * time.Second) |
31 |
| - queryTestKeyspace(session) |
32 |
| - triggerSchemaChange(session) |
33 |
| - } |
34 |
| -} |
35 |
| - |
36 |
| -func queryTestKeyspace(session *Session) { |
37 |
| - iter := session.Query("SELECT * FROM gocql_test.test_table").Iter() |
38 |
| - |
39 |
| - var id string |
40 |
| - for iter.Scan(&id) { |
41 |
| - fmt.Printf("id: %s\n", id) |
42 |
| - } |
43 |
| - if err := iter.Close(); err != nil { |
44 |
| - log.Printf("error querying: %v", err) |
45 |
| - } |
46 |
| -} |
47 |
| - |
48 |
| -func triggerSchemaChange(session *Session) { |
49 |
| - // Create a keyspace to trigger schema agreement |
50 |
| - err := session.Query(`CREATE KEYSPACE IF NOT EXISTS test_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1}`).Exec() |
51 |
| - if err != nil { |
52 |
| - log.Printf("unable to create keyspace: %v", err) |
53 |
| - } |
| 16 | + oldHosts := make(map[string]*HostInfo) |
54 | 17 |
|
55 |
| - // Alter the keyspace to trigger schema agreement |
56 |
| - err = session.Query(`ALTER KEYSPACE test_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 2}`).Exec() |
57 |
| - if err != nil { |
58 |
| - log.Printf("unable to alter keyspace: %v", err) |
| 18 | + for key, host := range session.ring.hosts { |
| 19 | + host.broadcastAddress = net.ParseIP("10.10.10.10") |
| 20 | + oldHosts[key] = host |
59 | 21 | }
|
60 | 22 |
|
61 |
| - // Drop the keyspace after schema agreement |
62 |
| - err = session.Query(`DROP KEYSPACE IF EXISTS test_keyspace`).Exec() |
| 23 | + // if DisableInitialHostLookup == true - host.broadcastAddress must not be updated. |
| 24 | + err := session.refreshRing() |
63 | 25 | if err != nil {
|
64 |
| - log.Printf("unable to drop keyspace: %v", err) |
| 26 | + t.Fatal(err) |
65 | 27 | }
|
66 |
| -} |
67 | 28 |
|
68 |
| -func simulateHeartbeatFailure(session *Session) { |
69 |
| - time.Sleep(20 * time.Second) // simulate 20 seconds of normal operation |
70 |
| - log.Println("Simulating heartbeat failure...") |
71 |
| - cluster := session.cfg |
72 |
| - session.Close() // close the session to simulate a heartbeat failure |
| 29 | + for key, host := range session.ring.hosts { |
| 30 | + oldHost, ok := oldHosts[key] |
| 31 | + if !ok { |
| 32 | + t.Fatalf("old host not found for key: %s", key) |
| 33 | + } |
73 | 34 |
|
74 |
| - time.Sleep(10 * time.Second) // wait for 10 seconds to simulate downtime |
75 |
| - |
76 |
| - // Reconnect |
77 |
| - log.Println("Reconnecting...") |
78 |
| - newSession, err := cluster.CreateSession() |
79 |
| - if err != nil { |
80 |
| - log.Fatalf("unable to create session: %v", err) |
| 35 | + if !oldHost.broadcastAddress.Equal(host.broadcastAddress) { |
| 36 | + t.Fatalf("broadcast addresses do not match for key: %s", key) |
| 37 | + } |
81 | 38 | }
|
82 |
| - *session = *newSession |
83 |
| - log.Println("Reconnected") |
84 | 39 | }
|
0 commit comments