-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Analytics streaming #551
base: main
Are you sure you want to change the base?
Analytics streaming #551
Conversation
59d027e
to
643b3ae
Compare
262f517
to
276b6a5
Compare
src/cli/analytics.rs
Outdated
let scope: Option<String> = call.get_flag(engine_state, stack, "scope")?; | ||
let with_meta = call.has_flag(engine_state, stack, "with-meta")?; | ||
let _with_meta = call.has_flag(engine_state, stack, "with-meta")?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is with_meta no longer used? If we don't want it to be used then should we remove it from the command?
src/cli/analytics.rs
Outdated
} | ||
|
||
pub async fn send_analytics_query_stream( | ||
client: HTTPClient, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we bypassing the standard interface we use for sending requests?
src/client/http_client.rs
Outdated
@@ -336,6 +338,61 @@ impl HTTPClient { | |||
}) | |||
} | |||
|
|||
pub async fn analytics_query_stream_request( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just use analytics_query_request and always return a streaming response? It should be fairly to add a helper to consume the stream at a higher level for things like index management.
src/client/http_handler.rs
Outdated
.await | ||
} | ||
|
||
pub async fn http_do_stream( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is another case where it seems like we don't need both http_do and http_do_stream and http_do just streams anyway. Either post, put, get etc... should also return a stream or we leave post_stream in and the others just consume the stream before returning.
23ece3b
to
5d9a1b6
Compare
)?; | ||
|
||
let json_stream = JsonRowStream::new(stream); | ||
let mut json_streamer = RawJsonRowStreamer::new(json_stream, "".to_string()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit suspicious, why is there no rows attrib?
rt.block_on(async { | ||
// Read prelude and signature | ||
json_streamer.read_prelude().await.unwrap(); | ||
json_streamer.read_row().await.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i suspect that these read_row calls are needed because of no specifying a rows attrib, I don't think they should be needed.
result_stream, | ||
span, | ||
Some(ctrl_c.clone()), | ||
))) | ||
} | ||
|
||
pub fn send_analytics_query( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having send and do is really confusing, should we rename these to make intent clear?
) -> Result<HttpResponse, ClientError> { | ||
let rt = Runtime::new().unwrap(); | ||
rt: Arc<Runtime>, | ||
) -> Result<(Pin<Box<ResultsStream>>, u16), ClientError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit confusing to me, why doesn't this return HttpStreamResponse?
self.status | ||
} | ||
|
||
// pub fn endpoint(&self) -> Endpoint { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What was this used by that doesn't need it anymore?
&self, | ||
uri: &str, | ||
payload: Option<Vec<u8>>, | ||
headers: HashMap<&str, &str>, | ||
deadline: Instant, | ||
ctrl_c: Arc<AtomicBool>, | ||
) -> Result<(String, u16), ClientError> { | ||
self.http_do(uri, HttpVerb::Put, payload, headers, deadline, ctrl_c) | ||
) -> Result<(Pin<Box<ResultsStream>>, u16), ClientError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be nice to type alias this Pin<Box> type
Addresses: #535