Skip to content

Commit

Permalink
covering log lines
Browse files Browse the repository at this point in the history
Signed-off-by: Ankur_Anand <[email protected]>
  • Loading branch information
AnkurAnand11 committed Jan 31, 2024
1 parent 7c9e5ab commit 3a63e5a
Show file tree
Hide file tree
Showing 6 changed files with 11 additions and 40 deletions.
5 changes: 1 addition & 4 deletions src/byte_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,7 @@ impl ByteStream {

impl Drop for ByteStream {
fn drop(&mut self) {
info!(
"Drop invoked on ByteStream {:?}, invoking flush",
self.stream
);
info!("Drop invoked on ByteStream {:?}, invoking flush", self.stream);
if let Err(e) = self.runtime_handle.block_on(self.writer.flush()) {
error!("Error while flushing byteStream {:?}", e);
}
Expand Down
25 changes: 5 additions & 20 deletions src/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,7 @@ impl StreamManager {
tags: Option<Vec<String>>,
) -> PyResult<bool> {
let handle = self.cf.runtime_handle();
info!(
"creating stream {:?} under scope {:?} with scaling policy {:?}, retention policy {:?} and tags {:?}",
stream_name, scope_name, scaling_policy.scaling, retention_policy.retention, tags
);
info!("creating stream {:?} under scope {:?} with scaling policy {:?}, retention policy {:?} and tags {:?}", stream_name, scope_name, scaling_policy.scaling, retention_policy.retention, tags);
let stream_cfg = StreamConfiguration {
scoped_stream: ScopedStream {
scope: Scope::from(scope_name.to_string()),
Expand Down Expand Up @@ -311,10 +308,7 @@ impl StreamManager {
tags: Option<Vec<String>>,
) -> PyResult<bool> {
let handle = self.cf.runtime_handle();
info!(
"updating stream {:?} under scope {:?} with scaling policy {:?}, retention policy {:?} and tags {:?}",
stream_name, scope_name, scaling_policy.scaling, retention_policy.retention, tags
);
info!("updating stream {:?} under scope {:?} with scaling policy {:?}, retention policy {:?} and tags {:?}", stream_name, scope_name, scaling_policy.scaling, retention_policy.retention, tags);
let stream_cfg = StreamConfiguration {
scoped_stream: ScopedStream {
scope: Scope::from(scope_name.to_string()),
Expand Down Expand Up @@ -346,10 +340,7 @@ impl StreamManager {
stream_name: &str,
) -> PyResult<Option<Vec<String>>> {
let handle = self.cf.runtime_handle();
info!(
"fetch tags for stream {:?} under scope {:?}",
stream_name, scope_name,
);
info!("fetch tags for stream {:?} under scope {:?}", stream_name, scope_name);
let stream = ScopedStream {
scope: Scope::from(scope_name.to_string()),
stream: Stream::from(stream_name.to_string()),
Expand All @@ -370,10 +361,7 @@ impl StreamManager {
#[pyo3(text_signature = "($self, scope_name, stream_name)")]
pub fn seal_stream(&self, scope_name: &str, stream_name: &str) -> PyResult<bool> {
let handle = self.cf.runtime_handle();
info!(
"Sealing stream {:?} under scope {:?} ",
stream_name, scope_name
);
info!("Sealing stream {:?} under scope {:?} ", stream_name, scope_name);
let scoped_stream = ScopedStream {
scope: Scope::from(scope_name.to_string()),
stream: Stream::from(stream_name.to_string()),
Expand All @@ -395,10 +383,7 @@ impl StreamManager {
#[pyo3(text_signature = "($self, scope_name, stream_name)")]
pub fn delete_stream(&self, scope_name: &str, stream_name: &str) -> PyResult<bool> {
let handle = self.cf.runtime_handle();
info!(
"Deleting stream {:?} under scope {:?} ",
stream_name, scope_name
);
info!("Deleting stream {:?} under scope {:?} ", stream_name, scope_name);
let scoped_stream = ScopedStream {
scope: Scope::from(scope_name.to_string()),
stream: Stream::from(stream_name.to_string()),
Expand Down
10 changes: 2 additions & 8 deletions src/stream_reader_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,7 @@ impl StreamReaderGroup {
///```
///
pub fn create_reader(&self, reader_name: &str) -> PyResult<StreamReader> {
info!(
"Creating reader {:?} under reader group {:?}",
reader_name, self.reader_group.name
);
info!("Creating reader {:?} under reader group {:?}", reader_name, self.reader_group.name);
let reader = self
.runtime_handle
.block_on(self.reader_group.create_reader(reader_name.to_string()));
Expand All @@ -156,10 +153,7 @@ impl StreamReaderGroup {
///```
///
pub fn reader_offline(&self, reader_name: &str) -> PyResult<()> {
info!(
"Marking reader {:?} under reader group {:?} as offline",
reader_name, self.reader_group.name
);
info!("Marking reader {:?} under reader group {:?} as offline", reader_name, self.reader_group.name);
let res = self.runtime_handle.block_on(
self.reader_group
.reader_offline(reader_name.to_string(), None),
Expand Down
5 changes: 1 addition & 4 deletions src/stream_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,7 @@ impl StreamWriter {
let res = self.runtime_handle.block_on(x);
// fail fast on error.
if let Err(e) = res {
info!(
"RecvError observed while flushing events on stream {:?}",
self.stream
);
info!("RecvError observed while flushing events on stream {:?}", self.stream);
flush_result = Err(exceptions::PyValueError::new_err(format!(
"RecvError observed while writing an event {:?}",
e
Expand Down
5 changes: 1 addition & 4 deletions src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,7 @@ impl StreamTransaction {
#[pyo3(text_signature = "($self, event, routing_key=None)")]
#[args(event, routing_key = "None", "*")]
pub fn write_event_bytes(&mut self, event: &[u8], routing_key: Option<String>) -> PyResult<()> {
trace!(
"Writing a single event to a transaction {:?}",
self.txn.txn_id()
);
trace!("Writing a single event to a transaction {:?}", self.txn.txn_id());
// to_vec creates an owned copy of the python byte array object.
let result: Result<(), TransactionError> = self
.runtime_handle
Expand Down
1 change: 1 addition & 0 deletions tests/pravega_reader_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ async def test_writeEventAndRead(self):
self.assertEqual(b'test event', event.data(), "Invalid event data")
self.assertEqual(count, 2, "Two events are expected")
reader_group.reader_offline("reader-1")
reader_group.reader_offline("reader-1")
stream_manager.delete_reader_group("rg" + suffix, scope)

async def test_asyncEventwriteAndRead(self):
Expand Down

0 comments on commit 3a63e5a

Please sign in to comment.