Skip to content

Route all handler responses through monitoring #21480

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 57 additions & 24 deletions ydb/core/mon/mon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1026,32 +1026,36 @@ class THttpMonInitializator : public TActorBootstrapped<THttpMonInitializator> {
class THttpMonAuthorizedActorRequest : public TActorBootstrapped<THttpMonAuthorizedActorRequest> {
public:
NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr Event;
TActorId TargetActorId;
TMon::TRegisterHandlerFields Fields;
TMon::TRequestAuthorizer Authorizer;
TVector<TString> AllowedSIDs;
NHttp::TEvHttpProxy::TEvSubscribeForCancel::TPtr CancelSubscriber;

THttpMonAuthorizedActorRequest(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, TActorId targetActorId, TMon::TRequestAuthorizer authorizer, const TVector<TString>& allowedSIDs)
THttpMonAuthorizedActorRequest(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, const TMon::TRegisterHandlerFields& fields, TMon::TRequestAuthorizer authorizer)
: Event(std::move(event))
, TargetActorId(targetActorId)
, Fields(fields)
, Authorizer(std::move(authorizer))
, AllowedSIDs(allowedSIDs)
{}

static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::HTTP_MON_AUTHORIZED_ACTOR_REQUEST;
}

void Bootstrap() {
if (Authorizer) {
Send(Event->Sender, new NHttp::TEvHttpProxy::TEvSubscribeForCancel(), IEventHandle::FlagTrackDelivery);
if (Fields.UseAuth && Authorizer) {
NActors::IEventHandle* handle = Authorizer(SelfId(), Event->Get()->Request.Get());
if (handle) {
Send(handle);
Become(&THttpMonAuthorizedActorRequest::StateWork);
return;
}
}
Forward(Event, TargetActorId);
PassAway();
SendRequest();
Become(&THttpMonAuthorizedActorRequest::StateWork);
}

void ReplyWith(NHttp::THttpOutgoingResponsePtr response) {
Send(Event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response));
}

bool CredentialsProvided() {
Expand Down Expand Up @@ -1136,7 +1140,7 @@ class THttpMonAuthorizedActorRequest : public TActorBootstrapped<THttpMonAuthori
response << "Content-Length: " << body.size() << "\r\n";
response << "\r\n";
response << body;
Send(Event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(request->CreateResponseString(response)));
ReplyWith(request->CreateResponseString(response));
PassAway();
}

Expand All @@ -1146,26 +1150,35 @@ class THttpMonAuthorizedActorRequest : public TActorBootstrapped<THttpMonAuthori
ReplyErrorAndPassAway(Ydb::StatusIds::UNAUTHORIZED, issues, true);
}

void SendRequest(const NKikimr::NGRpcService::TEvRequestAuthAndCheckResult& result) {
void SendRequest(const NKikimr::NGRpcService::TEvRequestAuthAndCheckResult* result = nullptr) {
NHttp::THttpIncomingRequestPtr request = Event->Get()->Request;
if (Authorizer) {
TString user = result.UserToken ? result.UserToken->GetUserSID() : "anonymous";
TString user = (result && result->UserToken) ? result->UserToken->GetUserSID() : "anonymous";
ALOG_NOTICE(NActorsServices::HTTP, (request->Address ? request->Address->ToString() : "")
<< " " << user
<< " " << request->Method
<< " " << request->URL);
}
if (result.UserToken) {
Event->Get()->UserToken = result.UserToken->GetSerializedToken();
if (result && result->UserToken) {
Event->Get()->UserToken = result->UserToken->GetSerializedToken();
}
Send(new IEventHandle(Fields.Handler, SelfId(), Event->ReleaseBase().Release(), IEventHandle::FlagTrackDelivery, Event->Cookie));
}

void Cancelled() {
if (CancelSubscriber) {
Send(CancelSubscriber->Sender, new NHttp::TEvHttpProxy::TEvRequestCancelled(), 0, CancelSubscriber->Cookie);
}
Forward(Event, TargetActorId);
PassAway();
}

void HandleUndelivered(TEvents::TEvUndelivered::TPtr&) {
void HandleUndelivered(TEvents::TEvUndelivered::TPtr& ev) {
if (ev->Get()->SourceType == NHttp::TEvHttpProxy::EvSubscribeForCancel) {
return Cancelled();
}
NHttp::THttpIncomingRequestPtr request = Event->Get()->Request;
Send(Event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(
request->CreateResponseServiceUnavailable(TStringBuilder() << "Auth actor is not available")));
ReplyWith(request->CreateResponseServiceUnavailable(
TStringBuilder() << "Actor is not available"));
PassAway();
}

Expand All @@ -1174,17 +1187,41 @@ class THttpMonAuthorizedActorRequest : public TActorBootstrapped<THttpMonAuthori
if (result.Status != Ydb::StatusIds::SUCCESS) {
return ReplyErrorAndPassAway(result);
}
if (IsTokenAllowed(result.UserToken.Get(), AllowedSIDs)) {
SendRequest(result);
if (IsTokenAllowed(result.UserToken.Get(), Fields.AllowedSIDs)) {
SendRequest(&result);
} else {
return ReplyForbiddenAndPassAway("SID is not allowed");
}
}

void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse::TPtr& ev) {
bool endOfData = ev->Get()->Response->IsDone();
Forward(ev, Event->Sender);
if (endOfData) {
return PassAway();
}
}

void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingDataChunk::TPtr& ev) {
bool endOfData = ev->Get()->DataChunk && ev->Get()->DataChunk->IsEndOfData() || ev->Get()->Error;
Forward(ev, Event->Sender);
if (endOfData) {
PassAway();
}
}

void Handle(NHttp::TEvHttpProxy::TEvSubscribeForCancel::TPtr& ev) {
CancelSubscriber = std::move(ev);
}

STATEFN(StateWork) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvents::TEvUndelivered, HandleUndelivered);
hFunc(NKikimr::NGRpcService::TEvRequestAuthAndCheckResult, Handle);
hFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse, Handle);
hFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingDataChunk, Handle);
hFunc(NHttp::TEvHttpProxy::TEvSubscribeForCancel, Handle);
cFunc(NHttp::TEvHttpProxy::EvRequestCancelled, Cancelled);
}
}
};
Expand Down Expand Up @@ -1276,11 +1313,7 @@ class THttpMonIndexService : public TActor<THttpMonIndexService> {
while (!url.empty()) {
auto it = Handlers.find(TString(url));
if (it != Handlers.end()) {
if (it->second.UseAuth) {
Register(new THttpMonAuthorizedActorRequest(std::move(ev), it->second.Handler, Authorizer, it->second.AllowedSIDs));
} else {
Forward(ev, it->second.Handler);
}
Register(new THttpMonAuthorizedActorRequest(std::move(ev), it->second, Authorizer));
return;
} else {
if (url.EndsWith('/')) {
Expand Down
Loading