Skip to content

Commit

Permalink
feat postgres: mark task as expired in case of dp
Browse files Browse the repository at this point in the history
commit_hash:f2055a67679203302bc3bbb0f5e2f4d30434198c
  • Loading branch information
segoon committed Feb 10, 2025
1 parent 738190a commit 3693b5c
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 7 deletions.
3 changes: 3 additions & 0 deletions postgresql/functional_tests/basic_chaos/postgres_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ PostgresHandler::HandleRequestThrow(const server::http::HttpRequest& request, se

transaction.Commit();
return fmt::format("[{}]", fmt::join(result, ", "));
} else if (type == "sleep") {
pg_cluster_->Execute(storages::postgres::ClusterHostType::kMaster, "SELECT pg_sleep(1);");
return {};
} else {
UINVARIANT(false, fmt::format("Unknown chaos test request type '{}'", type));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,30 @@ async def test_expired(service_client, dynamic_config):
assert 'storages::postgres::ConnectionInterrupted' in text, text


async def test_timeout(service_client, dynamic_config):
assert dynamic_config.get('POSTGRES_DEADLINE_PROPAGATION_VERSION') == 1

pipeline_enabled = dynamic_config.get(
'POSTGRES_CONNECTION_PIPELINE_EXPERIMENT',
)
if not pipeline_enabled:
pytest.skip('Disabled in configuration')
return

async with service_client.capture_logs() as capture:
response = await service_client.post(
'/chaos/postgres?type=sleep',
headers={'X-YaTaxi-Client-TimeoutMs': '100'},
)
assert response.status == 498
assert response.text == 'Deadline expired'

logs = [log for log in capture.select() if log['text'].startswith('Statement')]
assert len(logs) == 1
text = logs[0]['text']
assert 'was cancelled by deadline propagation' in text, text


@pytest.mark.config(POSTGRES_DEADLINE_PROPAGATION_VERSION=0)
async def test_expired_dp_disabled(service_client, dynamic_config):
async with service_client.capture_logs() as capture:
Expand Down
16 changes: 14 additions & 2 deletions postgresql/src/storages/postgres/deadline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ void CheckDeadlineIsExpired(const dynamic_config::Snapshot& config) {

const auto inherited_deadline = server::request::GetTaskInheritedDeadline();
if (inherited_deadline.IsReached()) {
server::request::MarkTaskInheritedDeadlineExpired();
throw ConnectionInterrupted("Cancelled by deadline");
}
}

TimeoutDuration AdjustTimeout(TimeoutDuration timeout) {
TimeoutDuration AdjustTimeout(TimeoutDuration timeout, bool& adjusted) {
adjusted = false;
if (!USERVER_NAMESPACE::utils::impl::kPgDeadlinePropagationExperiment.IsEnabled()) {
return timeout;
}
Expand All @@ -30,7 +32,17 @@ TimeoutDuration AdjustTimeout(TimeoutDuration timeout) {
if (!inherited_deadline.IsReachable()) return timeout;

auto left = std::chrono::duration_cast<TimeoutDuration>(inherited_deadline.TimeLeft());
return std::min(timeout, left);
if (left.count() < 0) {
server::request::MarkTaskInheritedDeadlineExpired();
throw ConnectionInterrupted("Cancelled by deadline");
}

if (timeout > left) {
adjusted = true;
return left;
} else {
return timeout;
}
}

} // namespace storages::postgres
Expand Down
2 changes: 1 addition & 1 deletion postgresql/src/storages/postgres/deadline.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace storages::postgres {
/// @throws ConnectionInterrupted if deadline is expired.
void CheckDeadlineIsExpired(const dynamic_config::Snapshot&);

TimeoutDuration AdjustTimeout(TimeoutDuration timeout);
TimeoutDuration AdjustTimeout(TimeoutDuration timeout, bool& adjusted);

} // namespace storages::postgres

Expand Down
16 changes: 12 additions & 4 deletions postgresql/src/storages/postgres/detail/connection_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <string_view>
#include <userver/error_injection/hook.hpp>
#include <userver/logging/log.hpp>
#include <userver/server/request/task_inherited_data.hpp>
#include <userver/testsuite/testpoint.hpp>
#include <userver/tracing/span.hpp>
#include <userver/tracing/tags.hpp>
Expand Down Expand Up @@ -671,7 +672,7 @@ TimeoutDuration ConnectionImpl::CurrentExecuteTimeout() const {
void ConnectionImpl::SetConnectionStatementTimeout(TimeoutDuration timeout, engine::Deadline deadline) {
timeout = testsuite_pg_ctl_.MakeStatementTimeout(timeout);
if (IsPipelineActive()) {
timeout = AdjustTimeout(timeout);
timeout = AdjustTimeout(timeout, deadline_propagation_is_active_);
}
if (current_statement_timeout_ != timeout) {
SetParameter(
Expand All @@ -684,7 +685,7 @@ void ConnectionImpl::SetConnectionStatementTimeout(TimeoutDuration timeout, engi
void ConnectionImpl::SetStatementTimeout(TimeoutDuration timeout, engine::Deadline deadline) {
timeout = testsuite_pg_ctl_.MakeStatementTimeout(timeout);
if (IsPipelineActive()) {
timeout = AdjustTimeout(timeout);
timeout = AdjustTimeout(timeout, deadline_propagation_is_active_);
}
if (current_statement_timeout_ != timeout) {
SetParameter(
Expand All @@ -698,6 +699,8 @@ void ConnectionImpl::SetStatementTimeout(TimeoutDuration timeout, engine::Deadli
}

void ConnectionImpl::SetStatementTimeout(OptionalCommandControl cmd_ctl) {
deadline_propagation_is_active_ = false;

if (!!cmd_ctl) {
SetConnectionStatementTimeout(cmd_ctl->statement, testsuite_pg_ctl_.MakeExecuteDeadline(cmd_ctl->execute));
} else if (!!transaction_cmd_ctl_) {
Expand Down Expand Up @@ -1058,8 +1061,13 @@ ResultSet ConnectionImpl::WaitResult(
throw;
} catch (const QueryCancelled& e) {
++stats_.execute_timeout;
LOG_LIMITED_WARNING() << "Statement `" << statement << "` was cancelled: " << e << ". Statement timeout was "
<< current_statement_timeout_.count() << "ms";
bool cancelled_by_dp = deadline_propagation_is_active_;
if (cancelled_by_dp) {
server::request::MarkTaskInheritedDeadlineExpired();
}
LOG_LIMITED_WARNING() << "Statement `" << statement << "` was cancelled"
<< (cancelled_by_dp ? " by deadline propagation" : "") << ": " << e
<< ". Statement timeout was " << current_statement_timeout_.count() << "ms";
span.AddTag(tracing::kErrorFlag, true);
throw;
} catch (const FeatureNotSupported& e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ class ConnectionImpl {
bool is_discard_prepared_pending_ = false;
ConnectionSettings settings_;
std::optional<std::chrono::steady_clock::time_point> expires_at_;
bool deadline_propagation_is_active_{false};

CommandControl default_cmd_ctl_{{}, {}};
DefaultCommandControls default_cmd_ctls_;
Expand Down

0 comments on commit 3693b5c

Please sign in to comment.