Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 29 additions & 6 deletions ydb/core/external_sources/s3/ut/s3_aws_credentials_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,29 +68,51 @@ void WaitBucket(std::shared_ptr<TKikimrRunner> kikimr, const TString& externalDa
UNIT_FAIL("Bucket isn't ready");
}

void CreateSecret(TString& secretName, const TString& secretValue, const bool useSchemaSecrets, NYdb::NTable::TSession& session) {
if (useSchemaSecrets) {
secretName = "/Root/" + secretName;
}

TString createSecretQuery;
if (useSchemaSecrets) {
createSecretQuery = "CREATE SECRET `" + secretName + "` WITH (value = \"" + secretValue + "\");";
} else {
createSecretQuery = TStringBuilder() << "CREATE OBJECT " << secretName << " (TYPE SECRET) WITH value = `" << secretValue << "`;";
}
auto createSecretQueryResult = session.ExecuteSchemeQuery(createSecretQuery).GetValueSync();
UNIT_ASSERT_C(createSecretQueryResult.GetStatus() == NYdb::EStatus::SUCCESS, createSecretQueryResult.GetIssues().ToString());
}

Y_UNIT_TEST_SUITE(S3AwsCredentials) {
Y_UNIT_TEST(ExecuteScriptWithEqSymbol) {
Y_UNIT_TEST_TWIN(ExecuteScriptWithEqSymbol, UseSchemaSecrets) {
const TString externalDataSourceName = "/Root/external_data_source";
auto s3ActorsFactory = NYql::NDq::CreateS3ActorsFactory();
auto kikimr = MakeKikimrRunner(true, nullptr, nullptr, std::nullopt, s3ActorsFactory);
if (UseSchemaSecrets) {
kikimr->GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableSchemaSecrets(true);
}
kikimr->GetTestClient().GrantConnect("root1@builtin");
auto tc = kikimr->GetTableClient();
auto session = tc.CreateSession().GetValueSync().GetSession();
TString id = "id";
TString key = "key";
CreateSecret(id, "minio", UseSchemaSecrets, session);
CreateSecret(key, "minio123", UseSchemaSecrets, session);
const TString query = fmt::format(R"(
CREATE OBJECT id (TYPE SECRET) WITH (value=`minio`);
CREATE OBJECT key (TYPE SECRET) WITH (value=`minio123`);
CREATE EXTERNAL DATA SOURCE `{external_source}` WITH (
SOURCE_TYPE="ObjectStorage",
LOCATION="{location}",
AUTH_METHOD="AWS",
AWS_ACCESS_KEY_ID_SECRET_NAME="id",
AWS_SECRET_ACCESS_KEY_SECRET_NAME="key",
AWS_ACCESS_KEY_ID_SECRET_NAME="{id}",
AWS_SECRET_ACCESS_KEY_SECRET_NAME="{key}",
AWS_REGION="ru-central-1"
);
GRANT ALL ON `{external_source}` TO `root1@builtin`;
)",
"external_source"_a = externalDataSourceName,
"location"_a = "localhost:" + GetExternalPort("minio", "9000") + "/datalake/"
"location"_a = "localhost:" + GetExternalPort("minio", "9000") + "/datalake/",
"id"_a = id,
"key"_a = key
);
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
Expand Down Expand Up @@ -171,6 +193,7 @@ Y_UNIT_TEST_SUITE(S3AwsCredentials) {

NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
UNIT_ASSERT_EQUAL_C(readyOp.Metadata().ExecStatus, EExecStatus::Failed, readyOp.Status().GetIssues().ToString());
const TString errorMessage = UseSchemaSecrets ? Sprintf("secret `%s` not found", id.data()) : Sprintf("secret with name '%s' not found", id.data());
UNIT_ASSERT_STRING_CONTAINS_C(readyOp.Status().GetIssues().ToString(), "secret with name 'id' not found", readyOp.Status().GetIssues().ToString());
}
{
Expand Down
76 changes: 54 additions & 22 deletions ydb/core/kqp/federated_query/kqp_federated_query_actors.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "kqp_federated_query_actors.h"

#include <ydb/core/kqp/common/simple/services.h>
#include <ydb/core/tx/scheme_board/subscriber.h>
#include <ydb/services/metadata/secret/fetcher.h>
#include <ydb/services/metadata/secret/snapshot.h>
#include <ydb/library/actors/core/log.h>
Expand Down Expand Up @@ -101,6 +102,10 @@ class TDescribeSecretsActor: public NActors::TActorBootstrapped<TDescribeSecrets
bool AskSent = false;
};

IActor* CreateDescribeSecretsActor(const TString& ownerUserId, const std::vector<TString>& secretIds, NThreading::TPromise<TEvDescribeSecretsResponse::TDescription> promise) {
return new TDescribeSecretsActor(ownerUserId, secretIds, promise);
}

} // anonymous namespace

void TDescribeSchemaSecretsService::HandleIncomingRequest(TEvResolveSecret::TPtr& ev) {
Expand Down Expand Up @@ -173,6 +178,10 @@ void TDescribeSchemaSecretsService::HandleSchemeShardResponse(NSchemeShard::TEvS
return;
}

if (const auto it = SchemeBoardSubscribers.find(secretName); it == SchemeBoardSubscribers.end()) {
SchemeBoardSubscribers[secretName] = Register(CreateSchemeBoardSubscriber(SelfId(), secretName));
}

const auto& secretValue = rec.GetPathDescription().GetSecretDescription().GetValue();
const auto& secretVersion = rec.GetPathDescription().GetSecretDescription().GetVersion();
VersionedSecrets[secretName] = TVersionedSecret{
Expand All @@ -181,6 +190,7 @@ void TDescribeSchemaSecretsService::HandleSchemeShardResponse(NSchemeShard::TEvS
.Name = secretName,
.Value = secretValue,
};

++respIt->second.FilledSecretsCnt;

FillResponseIfFinished(ev->Cookie, respIt->second);
Expand All @@ -206,19 +216,19 @@ void TDescribeSchemaSecretsService::SaveIncomingRequestInfo(const TEvResolveSecr
ResolveInFlight[LastCookie] = std::move(ctx);
}

void TDescribeSchemaSecretsService::SendSchemeCacheRequests(const TVector<TString>& secretNames, const NACLib::TUserToken& userToken) {
void TDescribeSchemaSecretsService::SendSchemeCacheRequests(const TVector<TString>& secretNames, const TIntrusiveConstPtr<NACLib::TUserToken> userToken) {
TAutoPtr<NSchemeCache::TSchemeCacheNavigate> request(new NSchemeCache::TSchemeCacheNavigate());
for (const auto& secretName : secretNames) {
NSchemeCache::TSchemeCacheNavigate::TEntry entry;
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpPath;
entry.Path = SplitPath(secretName);
if (userToken.GetUserSID()) {
if (userToken && userToken->GetUserSID()) {
entry.Access = NACLib::SelectRow;
}
request->ResultSet.emplace_back(entry);
}
if (userToken.GetUserSID()) {
request->UserToken = new NACLib::TUserToken(userToken);
if (userToken && userToken->GetUserSID()) {
request->UserToken = userToken;
}

Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request), 0, LastCookie++);
Expand Down Expand Up @@ -262,16 +272,39 @@ void TDescribeSchemaSecretsService::FillResponseIfFinished(const ui64& requestId
std::vector<TString> secretValues;
secretValues.resize(responseCtx.Secrets.size());
for (const auto& secret : responseCtx.Secrets) {
const auto& secretPath = secret.first;
auto it = VersionedSecrets.find(secret.first);
Y_ENSURE(it != VersionedSecrets.end(), "Secrets values were not retrieved for response");
if (it == VersionedSecrets.end()) {
LOG_N("FillResponseIfFinished: request cookie=" << requestId << ", secret `" << secretPath << "` was dropped during request");
FillResponse(requestId, TEvDescribeSecretsResponse::TDescription(Ydb::StatusIds::BAD_REQUEST, { NYql::TIssue("secret `" + secretPath + "` not found") }));
return;
}

Y_ENSURE(secret.second < secretValues.size());
secretValues[secret.second] = it->second.Value;
}
FillResponse(requestId, TEvDescribeSecretsResponse::TDescription(secretValues));
}

NThreading::TFuture<TEvDescribeSecretsResponse::TDescription> DescribeSecret(const TVector<TString>& secretNames, const TString& ownerUserId, TActorSystem* actorSystem) {
void TDescribeSchemaSecretsService::HandleNotifyUpdate(TSchemeBoardEvents::TEvNotifyUpdate::TPtr& ev) {
Y_UNUSED(ev);
}

void TDescribeSchemaSecretsService::HandleNotifyDelete(TSchemeBoardEvents::TEvNotifyDelete::TPtr& ev) {
const TString& secretName = CanonizePath(ev->Get()->Path);
VersionedSecrets.erase(secretName);

auto subscriberIt = SchemeBoardSubscribers.find(secretName);
Y_ENSURE(subscriberIt != SchemeBoardSubscribers.end());
Send(subscriberIt->second, new TEvents::TEvPoisonPill());
SchemeBoardSubscribers.erase(subscriberIt);
}

NThreading::TFuture<TEvDescribeSecretsResponse::TDescription> DescribeSecret(
const TVector<TString>& secretNames,
const TIntrusiveConstPtr<NACLib::TUserToken> userToken,
TActorSystem* actorSystem
) {
auto promise = NThreading::NewPromise<TEvDescribeSecretsResponse::TDescription>();
if (actorSystem->AppData<TAppData>()->FeatureFlags.GetEnableSchemaSecrets()) {
bool schemaSecrets = false;
Expand All @@ -284,58 +317,57 @@ NThreading::TFuture<TEvDescribeSecretsResponse::TDescription> DescribeSecret(con
if (schemaSecrets) {
actorSystem->Send(
MakeKqpDescribeSchemaSecretServiceId(actorSystem->NodeId),
new TDescribeSchemaSecretsService::TEvResolveSecret(ownerUserId, secretNames, promise));
new TDescribeSchemaSecretsService::TEvResolveSecret(userToken, secretNames, promise));
return promise.GetFuture();
}
}

actorSystem->Register(CreateDescribeSecretsActor(ownerUserId, secretNames, promise));
actorSystem->Register(CreateDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", secretNames, promise));
return promise.GetFuture();
}

IActor* CreateDescribeSecretsActor(const TString& ownerUserId, const std::vector<TString>& secretIds, NThreading::TPromise<TEvDescribeSecretsResponse::TDescription> promise) {
return new TDescribeSecretsActor(ownerUserId, secretIds, promise);
}

void RegisterDescribeSecretsActor(const NActors::TActorId& replyActorId, const TString& ownerUserId, const std::vector<TString>& secretIds, NActors::TActorSystem* actorSystem) {
auto promise = NThreading::NewPromise<TEvDescribeSecretsResponse::TDescription>();
actorSystem->Register(CreateDescribeSecretsActor(ownerUserId, secretIds, promise));

promise.GetFuture().Subscribe([actorSystem, replyActorId](const NThreading::TFuture<TEvDescribeSecretsResponse::TDescription>& result){
TVector<TString> secretNames{secretIds.begin(), secretIds.end()};
auto future = DescribeSecret(secretNames, ownerUserId.empty() ? nullptr : new NACLib::TUserToken(ownerUserId, {}), actorSystem);
future.Subscribe([actorSystem, replyActorId](const NThreading::TFuture<TEvDescribeSecretsResponse::TDescription>& result){
actorSystem->Send(replyActorId, new TEvDescribeSecretsResponse(result.GetValue()));
});
}

NThreading::TFuture<TEvDescribeSecretsResponse::TDescription> DescribeExternalDataSourceSecrets(const NKikimrSchemeOp::TAuth& authDescription, const TString& ownerUserId, TActorSystem* actorSystem) {
NThreading::TFuture<TEvDescribeSecretsResponse::TDescription> DescribeExternalDataSourceSecrets(
const NKikimrSchemeOp::TAuth& authDescription,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
TActorSystem* actorSystem
) {
switch (authDescription.identity_case()) {
case NKikimrSchemeOp::TAuth::kServiceAccount: {
const TString& saSecretId = authDescription.GetServiceAccount().GetSecretName();
return DescribeSecret({saSecretId}, ownerUserId, actorSystem);
return DescribeSecret({saSecretId}, userToken, actorSystem);
}

case NKikimrSchemeOp::TAuth::kNone:
return NThreading::MakeFuture(TEvDescribeSecretsResponse::TDescription({}));

case NKikimrSchemeOp::TAuth::kBasic: {
const TString& passwordSecretId = authDescription.GetBasic().GetPasswordSecretName();
return DescribeSecret({passwordSecretId}, ownerUserId, actorSystem);
return DescribeSecret({passwordSecretId}, userToken, actorSystem);
}

case NKikimrSchemeOp::TAuth::kMdbBasic: {
const TString& saSecretId = authDescription.GetMdbBasic().GetServiceAccountSecretName();
const TString& passwordSecreId = authDescription.GetMdbBasic().GetPasswordSecretName();
return DescribeSecret({saSecretId, passwordSecreId}, ownerUserId, actorSystem);
return DescribeSecret({saSecretId, passwordSecreId}, userToken, actorSystem);
}

case NKikimrSchemeOp::TAuth::kAws: {
const TString& awsAccessKeyIdSecretId = authDescription.GetAws().GetAwsAccessKeyIdSecretName();
const TString& awsAccessKeyKeySecretId = authDescription.GetAws().GetAwsSecretAccessKeySecretName();
return DescribeSecret({awsAccessKeyIdSecretId, awsAccessKeyKeySecretId}, ownerUserId, actorSystem);
return DescribeSecret({awsAccessKeyIdSecretId, awsAccessKeyKeySecretId}, userToken, actorSystem);
}

case NKikimrSchemeOp::TAuth::kToken: {
const TString& tokenSecretId = authDescription.GetToken().GetTokenSecretName();
return DescribeSecret({tokenSecretId}, ownerUserId, actorSystem);
return DescribeSecret({tokenSecretId}, userToken, actorSystem);
}

case NKikimrSchemeOp::TAuth::IDENTITY_NOT_SET:
Expand Down
31 changes: 20 additions & 11 deletions ydb/core/kqp/federated_query/kqp_federated_query_actors.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@

#include <ydb/core/kqp/common/events/script_executions.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/core/tx/schemeshard/schemeshard.h>
#include <ydb/core/tx/scheme_board/events.h>

#include <ydb/library/actors/core/actor.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/aclib/aclib.h>
#include <library/cpp/threading/future/future.h>

#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/core/tx/schemeshard/schemeshard.h>
#include <library/cpp/threading/future/future.h>

namespace NKikimr::NKqp {

Expand All @@ -24,18 +25,18 @@ class TDescribeSchemaSecretsService: public NActors::TActorBootstrapped<TDescrib
struct TEvResolveSecret : public NActors::TEventLocal<TEvResolveSecret, EvResolveSecret> {
public:
TEvResolveSecret(
const TString& ownerUserId,
const TIntrusiveConstPtr<NACLib::TUserToken> userToken,
const TVector<TString>& secretNames,
NThreading::TPromise<TEvDescribeSecretsResponse::TDescription> promise
)
: UserToken(NACLib::TUserToken{ownerUserId, TVector<NACLib::TSID>{}})
: UserToken(userToken)
, SecretNames(secretNames)
, Promise(promise)
{
}

public:
const NACLib::TUserToken UserToken;
const TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
const TVector<TString> SecretNames;
NThreading::TPromise<TEvDescribeSecretsResponse::TDescription> Promise;
};
Expand All @@ -60,15 +61,20 @@ class TDescribeSchemaSecretsService: public NActors::TActorBootstrapped<TDescrib
hFunc(TEvResolveSecret, HandleIncomingRequest);
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleSchemeCacheResponse);
hFunc(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult, HandleSchemeShardResponse);
hFunc(TSchemeBoardEvents::TEvNotifyDelete, HandleNotifyDelete);
hFunc(TSchemeBoardEvents::TEvNotifyUpdate, HandleNotifyUpdate);
cFunc(NActors::TEvents::TEvPoison::EventType, PassAway);
)

void HandleIncomingRequest(TEvResolveSecret::TPtr& ev);
void HandleSchemeCacheResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
void HandleSchemeShardResponse(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult::TPtr& ev);
void HandleNotifyDelete(TSchemeBoardEvents::TEvNotifyDelete::TPtr& ev);
void HandleNotifyUpdate(TSchemeBoardEvents::TEvNotifyUpdate::TPtr& ev);

void FillResponse(const ui64& requestId, const TEvDescribeSecretsResponse::TDescription& response);
void SaveIncomingRequestInfo(const TEvResolveSecret& req);
void SendSchemeCacheRequests(const TVector<TString>& secretNames, const NACLib::TUserToken& userToken);
void SendSchemeCacheRequests(const TVector<TString>& secretNames, const TIntrusiveConstPtr<NACLib::TUserToken> userToken);
bool LocalCacheHasActualVersion(const TVersionedSecret& secret, const ui64& cacheSecretVersion);
bool LocalCacheHasActualObject(const TVersionedSecret& secret, const ui64& cacheSecretPathId);
bool HandleSchemeCacheErrorsIfAny(const ui64& requestId, NSchemeCache::TSchemeCacheNavigate& result);
Expand All @@ -83,13 +89,16 @@ class TDescribeSchemaSecretsService: public NActors::TActorBootstrapped<TDescrib
ui64 LastCookie = 0;
THashMap<ui64, TResponseContext> ResolveInFlight;
THashMap<TString, TVersionedSecret> VersionedSecrets;
THashMap<TString, TActorId> SchemeBoardSubscribers;
};

IActor* CreateDescribeSecretsActor(const TString& ownerUserId, const std::vector<TString>& secretIds, NThreading::TPromise<TEvDescribeSecretsResponse::TDescription> promise);

void RegisterDescribeSecretsActor(const TActorId& replyActorId, const TString& ownerUserId, const std::vector<TString>& secretIds, TActorSystem* actorSystem);

NThreading::TFuture<TEvDescribeSecretsResponse::TDescription> DescribeExternalDataSourceSecrets(const NKikimrSchemeOp::TAuth& authDescription, const TString& ownerUserId, TActorSystem* actorSystem);
NThreading::TFuture<TEvDescribeSecretsResponse::TDescription> DescribeExternalDataSourceSecrets(
const NKikimrSchemeOp::TAuth& authDescription,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
TActorSystem* actorSystem
);

IActor* CreateDescribeSchemaSecretsService();

Expand Down
Loading