@@ -1026,32 +1026,36 @@ class THttpMonInitializator : public TActorBootstrapped<THttpMonInitializator> {
1026
1026
class THttpMonAuthorizedActorRequest : public TActorBootstrapped <THttpMonAuthorizedActorRequest> {
1027
1027
public:
1028
1028
NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr Event;
1029
- TActorId TargetActorId ;
1029
+ TMon::TRegisterHandlerFields Fields ;
1030
1030
TMon::TRequestAuthorizer Authorizer;
1031
- TVector<TString> AllowedSIDs ;
1031
+ NHttp::TEvHttpProxy::TEvSubscribeForCancel::TPtr CancelSubscriber ;
1032
1032
1033
- THttpMonAuthorizedActorRequest (NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, TActorId targetActorId , TMon::TRequestAuthorizer authorizer, const TVector<TString>& allowedSIDs )
1033
+ THttpMonAuthorizedActorRequest (NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, const TMon::TRegisterHandlerFields& fields , TMon::TRequestAuthorizer authorizer)
1034
1034
: Event(std::move(event))
1035
- , TargetActorId(targetActorId )
1035
+ , Fields(fields )
1036
1036
, Authorizer(std::move(authorizer))
1037
- , AllowedSIDs(allowedSIDs)
1038
1037
{}
1039
1038
1040
1039
static constexpr NKikimrServices::TActivity::EType ActorActivityType () {
1041
1040
return NKikimrServices::TActivity::HTTP_MON_AUTHORIZED_ACTOR_REQUEST;
1042
1041
}
1043
1042
1044
1043
void Bootstrap () {
1045
- if (Authorizer) {
1044
+ Send (Event->Sender , new NHttp::TEvHttpProxy::TEvSubscribeForCancel (), IEventHandle::FlagTrackDelivery);
1045
+ if (Fields.UseAuth && Authorizer) {
1046
1046
NActors::IEventHandle* handle = Authorizer (SelfId (), Event->Get ()->Request .Get ());
1047
1047
if (handle) {
1048
1048
Send (handle);
1049
1049
Become (&THttpMonAuthorizedActorRequest::StateWork);
1050
1050
return ;
1051
1051
}
1052
1052
}
1053
- Forward (Event, TargetActorId);
1054
- PassAway ();
1053
+ SendRequest ();
1054
+ Become (&THttpMonAuthorizedActorRequest::StateWork);
1055
+ }
1056
+
1057
+ void ReplyWith (NHttp::THttpOutgoingResponsePtr response) {
1058
+ Send (Event->Sender , new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse (response));
1055
1059
}
1056
1060
1057
1061
bool CredentialsProvided () {
@@ -1136,7 +1140,7 @@ class THttpMonAuthorizedActorRequest : public TActorBootstrapped<THttpMonAuthori
1136
1140
response << " Content-Length: " << body.size () << " \r\n " ;
1137
1141
response << " \r\n " ;
1138
1142
response << body;
1139
- Send (Event-> Sender , new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse ( request->CreateResponseString (response) ));
1143
+ ReplyWith ( request->CreateResponseString (response));
1140
1144
PassAway ();
1141
1145
}
1142
1146
@@ -1146,26 +1150,35 @@ class THttpMonAuthorizedActorRequest : public TActorBootstrapped<THttpMonAuthori
1146
1150
ReplyErrorAndPassAway (Ydb::StatusIds::UNAUTHORIZED, issues, true );
1147
1151
}
1148
1152
1149
- void SendRequest (const NKikimr::NGRpcService::TEvRequestAuthAndCheckResult& result) {
1153
+ void SendRequest (const NKikimr::NGRpcService::TEvRequestAuthAndCheckResult* result = nullptr ) {
1150
1154
NHttp::THttpIncomingRequestPtr request = Event->Get ()->Request ;
1151
1155
if (Authorizer) {
1152
- TString user = result. UserToken ? result. UserToken ->GetUserSID () : " anonymous" ;
1156
+ TString user = ( result && result-> UserToken ) ? result-> UserToken ->GetUserSID () : " anonymous" ;
1153
1157
ALOG_NOTICE (NActorsServices::HTTP, (request->Address ? request->Address ->ToString () : " " )
1154
1158
<< " " << user
1155
1159
<< " " << request->Method
1156
1160
<< " " << request->URL );
1157
1161
}
1158
- if (result.UserToken ) {
1159
- Event->Get ()->UserToken = result.UserToken ->GetSerializedToken ();
1162
+ if (result && result->UserToken ) {
1163
+ Event->Get ()->UserToken = result->UserToken ->GetSerializedToken ();
1164
+ }
1165
+ Send (new IEventHandle (Fields.Handler , SelfId (), Event->ReleaseBase ().Release (), IEventHandle::FlagTrackDelivery, Event->Cookie ));
1166
+ }
1167
+
1168
+ void Cancelled () {
1169
+ if (CancelSubscriber) {
1170
+ Send (CancelSubscriber->Sender , new NHttp::TEvHttpProxy::TEvRequestCancelled (), 0 , CancelSubscriber->Cookie );
1160
1171
}
1161
- Forward (Event, TargetActorId);
1162
1172
PassAway ();
1163
1173
}
1164
1174
1165
- void HandleUndelivered (TEvents::TEvUndelivered::TPtr&) {
1175
+ void HandleUndelivered (TEvents::TEvUndelivered::TPtr& ev) {
1176
+ if (ev->Get ()->SourceType == NHttp::TEvHttpProxy::EvSubscribeForCancel) {
1177
+ return Cancelled ();
1178
+ }
1166
1179
NHttp::THttpIncomingRequestPtr request = Event->Get ()->Request ;
1167
- Send (Event-> Sender , new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse (
1168
- request-> CreateResponseServiceUnavailable ( TStringBuilder () << " Auth actor is not available" ) ));
1180
+ ReplyWith (request-> CreateResponseServiceUnavailable (
1181
+ TStringBuilder () << " Actor is not available" ));
1169
1182
PassAway ();
1170
1183
}
1171
1184
@@ -1174,17 +1187,41 @@ class THttpMonAuthorizedActorRequest : public TActorBootstrapped<THttpMonAuthori
1174
1187
if (result.Status != Ydb::StatusIds::SUCCESS) {
1175
1188
return ReplyErrorAndPassAway (result);
1176
1189
}
1177
- if (IsTokenAllowed (result.UserToken .Get (), AllowedSIDs)) {
1178
- SendRequest (result);
1190
+ if (IsTokenAllowed (result.UserToken .Get (), Fields. AllowedSIDs )) {
1191
+ SendRequest (& result);
1179
1192
} else {
1180
1193
return ReplyForbiddenAndPassAway (" SID is not allowed" );
1181
1194
}
1182
1195
}
1183
1196
1197
+ void Handle (NHttp::TEvHttpProxy::TEvHttpOutgoingResponse::TPtr& ev) {
1198
+ bool endOfData = ev->Get ()->Response ->IsDone ();
1199
+ Forward (ev, Event->Sender );
1200
+ if (endOfData) {
1201
+ return PassAway ();
1202
+ }
1203
+ }
1204
+
1205
+ void Handle (NHttp::TEvHttpProxy::TEvHttpOutgoingDataChunk::TPtr& ev) {
1206
+ bool endOfData = ev->Get ()->DataChunk && ev->Get ()->DataChunk ->IsEndOfData () || ev->Get ()->Error ;
1207
+ Forward (ev, Event->Sender );
1208
+ if (endOfData) {
1209
+ PassAway ();
1210
+ }
1211
+ }
1212
+
1213
+ void Handle (NHttp::TEvHttpProxy::TEvSubscribeForCancel::TPtr& ev) {
1214
+ CancelSubscriber = std::move (ev);
1215
+ }
1216
+
1184
1217
STATEFN (StateWork) {
1185
1218
switch (ev->GetTypeRewrite ()) {
1186
1219
hFunc (TEvents::TEvUndelivered, HandleUndelivered);
1187
1220
hFunc (NKikimr::NGRpcService::TEvRequestAuthAndCheckResult, Handle);
1221
+ hFunc (NHttp::TEvHttpProxy::TEvHttpOutgoingResponse, Handle);
1222
+ hFunc (NHttp::TEvHttpProxy::TEvHttpOutgoingDataChunk, Handle);
1223
+ hFunc (NHttp::TEvHttpProxy::TEvSubscribeForCancel, Handle);
1224
+ cFunc (NHttp::TEvHttpProxy::EvRequestCancelled, Cancelled);
1188
1225
}
1189
1226
}
1190
1227
};
@@ -1276,11 +1313,7 @@ class THttpMonIndexService : public TActor<THttpMonIndexService> {
1276
1313
while (!url.empty ()) {
1277
1314
auto it = Handlers.find (TString (url));
1278
1315
if (it != Handlers.end ()) {
1279
- if (it->second .UseAuth ) {
1280
- Register (new THttpMonAuthorizedActorRequest (std::move (ev), it->second .Handler , Authorizer, it->second .AllowedSIDs ));
1281
- } else {
1282
- Forward (ev, it->second .Handler );
1283
- }
1316
+ Register (new THttpMonAuthorizedActorRequest (std::move (ev), it->second , Authorizer));
1284
1317
return ;
1285
1318
} else {
1286
1319
if (url.EndsWith (' /' )) {
0 commit comments