-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathserver.py
More file actions
1024 lines (806 loc) · 35.8 KB
/
server.py
File metadata and controls
1024 lines (806 loc) · 35.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# server.py
import socket
import time
import sys
import threading
import os
import json
import select
import copy
import google.generativeai as genai
from dotenv import load_dotenv
from queue import Queue
from shared import message, NETWORK_SERVER_PORT, MAX_SERVER_NUM, DELAY, TIMEOUT_TIME
from key_value import KeyValue
keyValue = KeyValue()
networkServer = None
stop_event = threading.Event()
SERVER_NUM = -1
# Consensus Variables
leader = -1
# Represent the maxmimum known ballot by this server
ballot_number = {
'seq_num': 0,
'pid': -1,
'op_num': 0
}
# Server's last accepted value (or leader's current command)
accept_val = -1
# Ballot of server's last accepted value (leader's current ballot)
accept_num = -1
#Used for operations
pending_operations = Queue()
num_leader_promises = 0
consensus_accepted = {}
leader_ack = 0
#Used for Storing responses format = {tuple(context_id, query), list(responses)}
response_dict = {}
# ------ SERVER ------
def connect_server():
"""
Dakota
Connect to server.
Receive initial message from server (expecting SERVER_INIT <num>).
Parse message to retrieve server number and assign to SERVER_NUM.
Start listening for incoming messages by calling get_server_message().
TODO: Implement a method to restore context/kv data in case of a crash.
"""
global networkServer
attempt = 0
max_retries = 5
interval = 0.5
while attempt < max_retries:
try:
#Connect to the Network Server
networkServer = socket.socket(socket.AF_INET,socket.SOCK_STREAM) # Create a TCP socket
networkServer.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
networkServer.connect(('127.0.0.1', NETWORK_SERVER_PORT)) # Connect to the server at localhost on port
print(f"Connected To Network Server on {NETWORK_SERVER_PORT}")
threading.Thread(target=get_server_message).start()
break
except (TimeoutError, ConnectionRefusedError):
attempt += 1
if attempt < max_retries:
time.sleep(interval)
else:
print(f"FAILED To Connect to Network Server on {NETWORK_SERVER_PORT}")
break
def send_server_message(message_type, dest_server, message_args=None):
"""
Dakota
Send a message to the specified server with a given message type and arguments.
Format message as <destination_server> <SERVER_NUM> <message_type> <args>.
Use networkServer to send the formatted message.
"""
# Create uniform message datastructure
message_data = {
"dest_server": dest_server,
"sending_server": SERVER_NUM,
"message_type": message_type.value,
"args": message_args or {} # Embed existing message_args here
}
# Serialize and send message
serialized_message = json.dumps(message_data)
networkServer.send(serialized_message.encode('utf-8')) # Convert JSON string to bytes
if message_type == message.LLM_RESPONSE:
return # Skip further processing for this message
# Extract the simple name from message_type (remove any prefix like "message.")
simple_message_type = str(message_type).split(".")[-1]
# Add formatted ballot to the print message if present
ballot_string = ""
if message_args and "ballot_number" in message_args:
ballot_string = ballot_to_string(message_args["ballot_number"])
# Add accept_val to the print message if present
accept_val_string = ""
if message_args and "accept_val" in message_args:
accept_val_txt = message_args["accept_val"]
accept_val_string = f"{accept_val_txt}" if accept_val_txt != -1 else "Bottom Bottom"
# Make sure sending server isn't printed for query message
if accept_val_txt != -1 and accept_val_txt.startswith("query") and "." in accept_val_txt:
accept_val_string = accept_val_string.split(".", 1)[0]
else:
accept_val_string = accept_val_string
# Format the destination for the print message
dest_message = "to ALL" if dest_server == -1 else f"to Server {dest_server}"
# Format and print the message
print(
f"Sending {simple_message_type}{f' {ballot_string}' if ballot_string else ''}{f' {accept_val_string}' if accept_val_string else ''} {dest_message}"
)
def get_server_message():
"""
Continuously receive messages from other servers or clients.
Reassemble fragmented messages using a buffer and parse them when complete.
Based on message type, call the appropriate server function.
Example: if message_type == "NEW_CONTEXT", call server_new_context().
"""
buffer = "" # Buffer to store partial messages
while not stop_event.is_set():
try:
# Receive data from the socket in chunks
data = networkServer.recv(1024).decode('utf-8')
if not data:
print("Server Disconnected")
break
buffer += data # Append received data to the buffer
while True:
try:
# Attempt to parse a complete JSON object from the buffer
message_data, end_idx = json.JSONDecoder().raw_decode(buffer)
buffer = buffer[end_idx:].strip() # Remove the processed message from the buffer
# Extract message type and details
message_type = message(message_data["message_type"])
sending_server = message_data["sending_server"]
args = message_data.get("args", {})
# Special handling for LLM_RESPONSE: no printing
if message_type == message.LLM_RESPONSE:
server_llm_response(message_data)
continue # Skip further processing for this message
# Extract ballot info if present
ballot_string = ""
if "ballot_number" in args:
ballot_string = ballot_to_string(args["ballot_number"])
# Extract accept_val if present
accept_val_string = ""
if "accept_val" in args:
accept_val_txt = args["accept_val"]
accept_val_string = f"{accept_val_txt}" if accept_val_txt != -1 else "Bottom Bottom"
# Make sure sending server isn't printed for query message
if accept_val_txt != -1 and accept_val_txt.startswith("query") and "." in accept_val_txt:
accept_val_string = accept_val_string.split(".", 1)[0]
# Extract user_message if present
user_message = args.get("user_message", "")
if user_message and user_message.startswith("query") and "." in user_message:
user_message = user_message.split(".", 1)[0]
# Format the sending server name
sending_server_name = "Network Server" if sending_server == -1 else f"Server {sending_server}"
# Format the message and print it
simple_message_type = str(message_type).split(".")[-1] # Extract simple name
print(
f"Received {simple_message_type}"
f"{f' {ballot_string}' if ballot_string else ''}"
f"{f' {accept_val_string}' if accept_val_string else ''}"
f"{f' {user_message}' if user_message else ''} from {sending_server_name}"
)
# Call the appropriate function based on message type
if message_type == message.SERVER_INIT:
server_init_message(message_data)
elif message_type == message.SERVER_KILL:
server_kill_message()
elif message_type == message.PREPARE:
server_leader_prepare_message(message_data)
elif message_type == message.PROMISE:
server_leader_promise_message(message_data)
elif message_type == message.LEADER_FORWARD:
server_leader_forward_message(message_data)
elif message_type == message.LEADER_ACK:
server_leader_ack_message(message_data)
elif message_type == message.ACCEPT:
server_consensus_accept_message(message_data)
elif message_type == message.ACCEPTED:
server_consensus_accepted_message(message_data)
elif message_type == message.DECIDE:
server_consensus_decide_message(message_data)
elif message_type == message.UPDATE_CONTEXT:
server_update_context(message_data)
except json.JSONDecodeError:
# Incomplete JSON message; wait for more data
break
except Exception as e:
print(f"Exception Thrown Getting Server Message: {e}")
continue
networkServer.close()
def server_init_message(message_data):
"""
Dakota
Used to asign the server num when connected to the network server
"""
global SERVER_NUM
server_num = message_data["args"]["server_num"]
SERVER_NUM = server_num
ballot_number["pid"] = server_num
print(f"Assigned Server Number {SERVER_NUM}")
def server_kill_message():
"""
Dakota
Used to asign the server num when connected to the network server
"""
stop_event.set()
def server_new_context(user_message):
"""
Nik
Create a new context using the keyValue object (kv).
Call kv.create_context() to initialize the context.
"""
try:
# Extract the context ID from the received message data
context_id = user_message.replace("create", "").strip()
if not context_id:
print("Error Getting Context_id")
return
if context_id in keyValue.data:
print(f"Error: Context ID '{context_id}' already exists. Please use a unique ID.")
return
# Create the new context using the keyValue object
keyValue.create_context(context_id)
# : Server: Context created successfully on this server.")
print(f"NEW_CONTEXT {context_id}")
except Exception as e:
print(f"Error occurred while processing NEW_CONTEXT: {e}")
def server_create_query(message_data):
"""
Nik
Create a query in the specified context.
Call kv.create_query() to add the query.
Generate a response by calling query_gemini().
Send response back to the calling server using send_server_message().
"""
try:
# Extract context ID and query string from the message data
args = message_data.get("args", {})
user_message, request_server = args.get("accept_val").split(".", 1)
request_server = int(request_server)
parts = user_message.split(" ", 2) # Split into 'query', '<context_id>', and '<query_string>'
if len(parts) != 3 or parts[0] != "query":
print("Error: Invalid input format. Use 'query <context_id> <query_string>'")
return
context_id, query_string = parts[1].strip(), parts[2].strip()
if not context_id or not query_string:
print("Error: Context ID and query cannot be empty.")
return
# Step 1: Add the query to the specified context
keyValue.create_query(context_id, query_string)
# print(f"DEBUG: Server: Query added to context '{context_id}': {query_string}")
# Step 2: Retrieve the context as a string
context_string = keyValue.view(context_id)
if not context_string:
print(f"Error: Context '{context_id}' not found.")
return
# Check if the context exists in KeyValue
if context_id not in keyValue.data:
print(f"Error: Context ID '{context_id}' does not exist. Please create the context first.")
return
# Print updated context with query
print(f"NEW_QUERY on {context_id} with {context_string}")
# Step 3: Generate a response by querying Gemini
prompt_answer = ""
response = query_gemini(context_string + "\n" + prompt_answer)
if request_server == SERVER_NUM:
save_response_to_dict(context_id, response)
# Step 4: Send the response back to the calling server
response_message = {
"context_id": context_id,
"query_string": query_string,
"response": response
}
if request_server == SERVER_NUM:
# print(response)
pass
else:
send_server_message(message.LLM_RESPONSE, request_server, response_message)
except Exception as e:
print(f"Error occurred while processing CREATE_QUERY: {e}")
def server_llm_response(message_data):
"""
Add the received LLM response to the llm_responses collection.
Print the response for server-side logging.
"""
args = message_data.get("args", {})
response = args.get("response")
context_id = args.get("context_id")
#Add responses to datastructure
save_response_to_dict(context_id, response)
def save_response_to_dict(context_id, response):
"""
Helper Function To Add Response to response_dict
"""
#Create List if needed
if (context_id) not in response_dict:
response_dict[(context_id)] = []
#Get The Candidate Num
candidate_num = len(response_dict[context_id])
#Add to response dict and print
response_dict[context_id].append(response)
print(f"Context '{context_id}' - Candidate {candidate_num}: {response}")
def server_choose_response(message_data):
"""
Save the selected answer to the keyValue storage.
Call kv.save_answer() to persist the answer.
Send confirmation back to calling server(s) if needed.
"""
args = message_data.get("args", {})
user_message= args.get("accept_val")
parts = user_message.split(" ", 2) # Split into 'choose', '<context_id>', and '<response_string>'
if len(parts) != 3 or parts[0] != "choose":
print("Error: Invalid input format. Use 'choose <context_id> <response>'")
return
context_id, response = parts[1].strip(), parts[2].strip()
if not context_id or not response:
print("Error: Context ID and query cannot be empty.")
return
keyValue.save_answer(context_id, response)
print(f"CHOSEN ANSWER on {context_id} with {response}")
# ------ USER ------
def get_user_input():
"""
Dakota
Take input from the user and call appropriate functions based on input type.
Example: if user requests a new context, call user_new_context().
"""
while not stop_event.is_set():
#user_input = input() # Input message from the user
if select.select([sys.stdin], [], [], 0.5)[0]: # Check for input with a timeout
user_input = sys.stdin.readline().strip()
else:
continue
if user_input.lower() == 'exit':
stop_event.set()
if networkServer != None:
networkServer.close()
break
elif user_input.startswith("create"):
user_new_context(user_input)
elif user_input.startswith("query"):
user_input_with_server = f"{user_input}.{SERVER_NUM}"
user_create_query(user_input_with_server)
elif user_input.startswith("choose"):
user_select_answer(user_input)
elif user_input.startswith("viewall"):
user_view_all_context()
elif user_input.startswith("view"):
user_view_context(user_input)
else:
print(f"UNCRECOGNIZED INPUT {user_input}")
def user_new_context(user_message):
"""
Nik
Process request to create a new context.
Call get_consensus() to get agreement from all servers.
Send message NEW_CONTEXT to all servers via send_server_message().
Call kv.create_context() to create the context locally.
"""
# Extract context ID from the user message
context_id = user_message.replace("create", "").strip()
if not context_id:
print("Error: Context ID cannot be empty.")
return
#Get consensus from all servers
get_consensus(user_message)
def user_create_query(user_message):
"""
Nik
Process request to create a query within an existing context.
Call get_consensus() for server consensus.
Send CREATE_QUERY message to all servers via send_server_message().
Call kv.create_query() locally to create query.
Obtain response from query_gemini() and add to llm_responses collection.
Print the response for user.
"""
# Extract context ID and query string from the user message
parts = user_message.split(" ", 2) # Split into 'query', '<context_id>', and '<query_string>'
if len(parts) != 3 or parts[0] != "query":
print("Error: Invalid input format. Use 'query <context_id> <query_string>'")
return
context_id, query_string = parts[1].strip(), parts[2].strip()
if not context_id or not query_string:
print("Error: Context ID and query cannot be empty.")
return
# Clear responses only for the given context_id
global response_dict
if context_id in response_dict:
response_dict[context_id].clear()
# Get consensus from all servers
get_consensus(user_message)
def user_select_answer(user_message):
"""
Allow user to select an answer from llm_responses using index or identifier.
Retrieve the selected response from llm_responses.
Call get_consensus() for server agreement.
Send SAVE_ANSWER message to all servers via send_server_message().
Call kv.save_answer() locally to save the chosen answer.
"""
#Take user_message format 'choose <context_id> <response_number>' and make sure valid and get variables
parts = user_message.split()
# Check if the message has the correct format (i.e., 'choose <context_id> <response_number>')
if len(parts) != 3:
print("Invalid choose format: must be 'choose context_id response_number")
return
try:
# Extract and convert context_id and response_number to integers
context_id = parts[1]
response_number = int(parts[2])
#Check if context_id in saved answers
if context_id not in response_dict:
print(f"Context_id {context_id} not in saved response_dict")
return
#Check if valid Response Number
if response_number < 0 or response_number >= len(response_dict[context_id]):
print(f"Invalide Response Number {response_number}")
return
except ValueError:
print("Invalid format. context_id and response_number must be integers.")
#Make have message include selected response
new_user_message = f"choose {context_id} {response_dict[context_id][response_number]}"
get_consensus(new_user_message)
def user_view_context(user_message):
"""
Retrieve and display the data for a specified context.
Use keyValue.view(context_id) to fetch context details.
Args:
user_message (str): The context ID provided by the user.
"""
# Extract the context ID from the user message
context_id = user_message.replace("view", "").strip()
context_data = keyValue.view(context_id) # Call the KeyValue store's view method
if not context_data:
print(f"Context '{context_id}' not found or context is empty.")
return
# Print the formatted context data with quotation marks and context ID
print(f"{context_id} = \"\"\"\n{context_data}\n\"\"\"")
def user_view_all_context():
"""
Retrieve and display all contexts.
Use keyValue.view_all() to list all contexts.
"""
all_contexts = keyValue.view_all() # Call the KeyValue store's view_all method
if not all_contexts:
print("No contexts available.")
return
# Format all context data for display
formatted_output = []
for context_id, context_data in all_contexts.items():
formatted_output.append(f"{context_id} = \"\"\"")
for item in context_data:
formatted_output.append(f"Query: {item['query']}\nAnswer: {item['answer']}")
formatted_output.append("\"\"\"")
print("\n".join(formatted_output))
# ------ CONSENSUS ------
def ballot_to_string(ballot_num):
"""
Converts a ballot dictionary into a string representation.
Args:
ballot_number (dict): A dictionary with keys 'seq_num', 'pid', and 'op_num'.
Returns:
str: The string representation of the ballot in the format "<seq_num, pid, op_num>".
"""
return f"<{ballot_num['seq_num']}, {ballot_num['pid']}, {ballot_num['op_num']}>"
# --- Election Phase ---
def leader_init():
# print("DEBUG: Leader init")
if leader == -1:
start_leader_election()
def start_leader_election():
# print("DEBUG: Starting Election")
global num_leader_promises, ballot_number, leader, accept_num, accept_val
num_leader_promises = 0
# Update currently known ballot with your PID to create new ballot
ballot_number["seq_num"] += 1
ballot_number["pid"] = SERVER_NUM
message_args = {
"ballot_number": ballot_number,
}
send_server_message(message.PREPARE, -1, message_args)
# Wait for all servers to respond with a timeout
start_time = time.time() # Record the start time
while num_leader_promises < MAX_SERVER_NUM - 2:
time.sleep(0.1)
if time.time() - start_time > (TIMEOUT_TIME): # Check if TIMEOUT seconds have elapsed
print("TIMEOUT: Leader promises not received.")
# Added by Nik
if ballot_number["pid"] != SERVER_NUM:
return
# Restart leader election
leader_init()
#get_consensus()
return
if stop_event.is_set():
return
# # Added by Nik to handle getting another prepare
# if(ballot_number["pid"] != SERVER_NUM):
# leader_init()
# return
if ballot_number["pid"] != SERVER_NUM:
leader = ballot_number["pid"]
return
leader = SERVER_NUM
threading.Thread(target=run_leader).start()
def server_leader_prepare_message(message_data):
"""
Handle recieving a prepare message from server who wants to be leader
"""
# Extract context ID and query string from the message data
global ballot_number
global accept_val
global accept_num
global leader
args = message_data.get("args", {})
ballot = args.get("ballot_number")
sending_server = message_data.get("sending_server")
# Handle case that non-leader failed and is trying to get context
if leader == SERVER_NUM:
message_args = {
# TODO: Is KeyValue the best way to send all of the context?
"context": keyValue.to_dict(), # Serialize the KeyValue object
"op_num": ballot_number["op_num"],
"leader": leader
}
send_server_message(message.UPDATE_CONTEXT, sending_server, message_args)
# Return Promise if message seq_num greater than local seq_num, and set local seq_num to new value
elif ballot["seq_num"] > ballot_number["seq_num"] or (ballot["seq_num"] == ballot_number["seq_num"] and ballot["pid"] == ballot_number["pid"]) or (ballot["seq_num"] == ballot_number["seq_num"] and ballot["pid"] > ballot_number["pid"]):
# If proposer's op_num is lower, send update their context will up-to-date operations
if ballot["op_num"] < ballot_number["op_num"]:
message_args = {
# TODO: Is KeyValue the best way to send all of the context?
"context": keyValue.to_dict(), # Serialize the KeyValue object
"op_num": ballot_number["op_num"],
"leader": leader
}
send_server_message(message.UPDATE_CONTEXT, sending_server, message_args)
else:
# Set maximum known ballot to recieved ballot
# Set a help flag if acceptor has a lower number of operations completed than leader
help_needed = ballot["op_num"] > ballot_number["op_num"]
ballot_number = ballot
message_args = {
"ballot_number": ballot_number,
"accept_val": accept_val,
"accept_num": accept_num,
"help": help_needed
}
# Send a promise to proposer with this server's ballot
send_server_message(message.PROMISE, sending_server, message_args)
def server_update_context(message_data):
"""
Function called when a server trying to be leader recieves
UPDATE_CONTEXT because their op_num is lagging behind.
Replaces its key-value with the sending server's and updates
op_num.
"""
global keyValue
global ballot_number
global leader
args = message_data.get("args", {})
if(args["leader"] != SERVER_NUM):
leader = args["leader"]
# Update context and op_num
ballot_number["op_num"] = args.get("op_num")
received_data = args.get("context")
if received_data:
keyValue = KeyValue.from_dict(received_data) # Rebuild KeyValue object
# Restart leader election
#TODO: maybe call leader_init instead??
# start_leader_election()
def server_leader_promise_message(message_data):
"""
Handle recieving a promise message from a server after sending a prepare.
Compare their accept_num to yours. If their's is not -1 and is bigger than yours,
set your accept_val to their accept_val.
"""
global accept_val
global accept_num
args = message_data.get("args", {})
received_accept_num = args.get("accept_num")
received_accept_val = args.get("accept_val")
# Handle case that an acceptor is behind in number of operation by sending them an update context message
if args.get("help"):
message_args = {
"context": keyValue.to_dict(), # Serialize the KeyValue object
"op_num": ballot_number["op_num"],
"leader": leader
}
send_server_message(message.UPDATE_CONTEXT, message_data.get("sending_server"), message_args)
time.sleep(.2)
if received_accept_num != -1:
if accept_num == -1:
accept_num = received_accept_num
accept_val = received_accept_val
elif accept_num != -1 and (received_accept_num["seq_num"] > accept_num["seq_num"] or (received_accept_num["seq_num"] == accept_num["seq_num"] and received_accept_num["pid"] > accept_num["pid"])):
accept_num = received_accept_num
accept_val = received_accept_val
global num_leader_promises
num_leader_promises += 1
# --- Decision Phase ---
# def insert_operation_to_queue(user_message, ballot):
# #Insert message and ballot to queue
# pending_operations.put((user_message, ballot))
def insert_operation_to_queue(user_message):
#Insert message to queue
if user_message not in list(pending_operations.queue):
pending_operations.put(user_message)
def get_consensus(user_message):
"""
Implement a method to achieve consensus among servers.
Communicate with all servers to confirm action or context creation.
Return consensus result to calling function.
"""
global leader
leader_init()
#If leader add operation to operation queue
if leader == SERVER_NUM:
insert_operation_to_queue(user_message)
#If not send message to leader to do so
else:
global leader_ack
leader_ack = 0
message_args = {
"user_message": user_message,
}
send_server_message(message.LEADER_FORWARD, leader, message_args)
#Check To Make Sure Leader Forward Has been Received
time.sleep(TIMEOUT_TIME)
if leader_ack == 0:
print(f"TIMEOUT: Leader Acknowledge Not Received from {leader} for message: {user_message}")
# Assume leader failed, set leader to none
leader = -1
# Rerun get consensus with no known leader
get_consensus(user_message)
def run_leader():
#global num_consensus_accepted
global num_consensus
global accept_val
global accept_num
global leader
while not stop_event.is_set():
if not pending_operations.empty():
ball_num = copy.deepcopy(ballot_number)
if accept_val == -1:
#Pop the next operation
user_message = pending_operations.get()
accept_val = user_message
# Use leader's ballot_number and accept_val
accept_message_args = {
"ballot_number": ball_num,
"accept_val": accept_val,
}
#Send Accept? message to all servers with message
consensus_accepted[ballot_to_string(ball_num)] = 0
# added by Nik
if ballot_number["pid"] != SERVER_NUM:
leader = ballot_number["pid"]
return
send_server_message(message.ACCEPT, -1, accept_message_args)
# Wait for all servers to respond with a timeout
start_time = time.time() # Record the start time
while consensus_accepted[ballot_to_string(ball_num)] < MAX_SERVER_NUM - 2:
time.sleep(0.1)
if time.time() - start_time > (TIMEOUT_TIME): # Check if TIMEOUT seconds have elapsed
print("TIMEOUT: Accepted messages not received, running new leader election again.")
leader = -1
# Restart leader election
#leader_init()
get_consensus(accept_val)
accept_val = -1
accept_num = -1
return
#If program gets told Kill message, exit gracefully
if stop_event.is_set():
return
del consensus_accepted[ballot_to_string(ball_num)]
#Broadcast consensus decide
decide_message_args = {
"accept_val": accept_val,
}
send_server_message(message.DECIDE, -1, decide_message_args)
#Do Operation Locally (mimic message with minimum pieces needed)
local_decide_message = {
"args": decide_message_args,
}
server_consensus_decide_message(local_decide_message)
def server_leader_forward_message(message_data):
"""
As the leader, insert recieved message into service queue
"""
sending_server = message_data.get("sending_server")
args = message_data.get("args", {})
user_message = args.get("user_message")
#ballot = args.get("ballot_number")
# Don't respond if server doesn't know that it is leader
if(SERVER_NUM == leader):
insert_operation_to_queue(user_message)
send_server_message(message.LEADER_ACK, sending_server, args)
def server_leader_ack_message(message_data):
global leader_ack
leader_ack = 1
def server_consensus_accepted_message(message_data):
args = message_data.get("args", {})
# Handle case that an acceptor is behind in number of operation by sending them an update context message
if args.get("help"):
message_args = {
# TODO: Is KeyValue the best way to send all of the context?
"context": keyValue.to_dict(), # Serialize the KeyValue object
"op_num": ballot_number["op_num"],
"leader": leader
}
send_server_message(message.UPDATE_CONTEXT, message_data.get("sending_server"), message_args)
time.sleep(.2)
#increment consensus accepted counter for the ballot
b = ballot_to_string(args.get("ballot_number"))
if b in consensus_accepted:
consensus_accepted[b] += 1
def server_consensus_accept_message(message_data):
global ballot_number
global accept_val
global accept_num
args = message_data.get("args", {})
ballot = args.get("ballot_number")
sending_server = message_data.get("sending_server")
# Return Accept if sender has higher ballot than own ballot
if ballot["seq_num"] > ballot_number["seq_num"] or (ballot["seq_num"] == ballot_number["seq_num"] and ballot["pid"] == ballot_number["pid"]) or (ballot["seq_num"] == ballot_number["seq_num"] and ballot["pid"] > ballot_number["pid"]):
# If proposer's op_num is lower, send own context to update their context with up-to-date operations
if ballot["op_num"] < ballot_number["op_num"]:
message_args = {
# TODO: Is KeyValue the best way to send all of the context?
"context": keyValue.to_dict(),
"op_num": ballot_number["op_num"]
}
send_server_message(message.UPDATE_CONTEXT, sending_server, message_args)
ballot_number["seq_num"] = ballot["seq_num"]
# Send an accepted message to proposer otherwise
else:
# Server accepts value and logs it in case leader fails
accept_val = args.get("accept_val")
accept_num = ballot
# Set a help flag if acceptor has a lower number of operations completed than leader
help_needed = ballot["op_num"] > ballot_number["op_num"]
# Set maximum known ballot number to recieved ballot
ballot_number = ballot
message_args = {
"ballot_number": ballot_number,
"accept_val": accept_val,
"help": help_needed
}
send_server_message(message.ACCEPTED, sending_server, message_args)
# Set sending server to leader for future reference
global leader
leader = sending_server
def server_consensus_decide_message(message_data):
#Increment local ballot_number
ballot_number["op_num"] += 1
# Reset accept_val and accept_num
global accept_val
global accept_num
accept_val = -1
accept_num = -1
args = message_data.get("args", {})
user_message = args.get("accept_val")
if user_message == -1:
return
elif user_message.startswith("create"):
server_new_context(user_message)
elif user_message.startswith("query"):
server_create_query(message_data)
elif user_message.startswith("choose"):
server_choose_response(message_data)
else:
print(f"UNSUPPORTED SERVER CONSENSUS DECIDE MESSAGE: {user_message}")
# ------ GEMINI ------
def setup_gemini():
# Load environment variables from .env file
load_dotenv()
# Configure the Gemini API using the loaded environment variable
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
def query_gemini(context, prompt_answer="Answer: "):
"""
Query the Gemini LLM with a given context and prompt.
Args:
context (str): The context string to send to Gemini.
prompt_answer (str): The prompt indicating where Gemini should generate a response.
Returns:
str: The generated response text from Gemini.
"""