|
12 | 12 | from oauthlib.common import urlencode
|
13 | 13 | from oauthlib.oauth2 import TokenExpiredError, OAuth2Error
|
14 | 14 | from oauthlib.oauth2 import MismatchingStateError
|
| 15 | +from oauthlib.oauth2.rfc6749.errors import CustomOAuth2Error |
15 | 16 | from oauthlib.oauth2 import WebApplicationClient, MobileApplicationClient
|
16 | 17 | from oauthlib.oauth2 import LegacyApplicationClient, BackendApplicationClient
|
17 | 18 | from requests_oauthlib import OAuth2Session, TokenUpdated
|
@@ -524,6 +525,63 @@ def fake_send(r, **kwargs):
|
524 | 525 | sess.fetch_token(url)
|
525 | 526 | self.assertTrue(sess.authorized)
|
526 | 527 |
|
| 528 | + def test_fetch_token_http_error_handling(self): |
| 529 | + """Test that HTTP errors are properly raised instead of parsing error responses as tokens.""" |
| 530 | + url = "https://example.com/token" |
| 531 | + |
| 532 | + # Test 400 error response (like the original issue) |
| 533 | + error_400 = {"messages": [{"logLevel": "Error", "text": "User not Found"}], "status": "Forbidden"} |
| 534 | + |
| 535 | + def fake_400_error(r, **kwargs): |
| 536 | + resp = mock.MagicMock() |
| 537 | + resp.text = json.dumps(error_400) |
| 538 | + resp.status_code = 400 |
| 539 | + resp.request = mock.MagicMock() |
| 540 | + resp.request.url = url |
| 541 | + resp.raise_for_status.side_effect = requests.exceptions.HTTPError( |
| 542 | + "400 Client Error", response=resp |
| 543 | + ) |
| 544 | + return resp |
| 545 | + |
| 546 | + for client in self.clients: |
| 547 | + sess = OAuth2Session(client=client) |
| 548 | + sess.send = fake_400_error |
| 549 | + |
| 550 | + if isinstance(client, LegacyApplicationClient): |
| 551 | + self.assertRaises( |
| 552 | + CustomOAuth2Error, |
| 553 | + sess.fetch_token, |
| 554 | + url, |
| 555 | + username="username1", |
| 556 | + password="password1", |
| 557 | + ) |
| 558 | + else: |
| 559 | + self.assertRaises(CustomOAuth2Error, sess.fetch_token, url) |
| 560 | + |
| 561 | + def test_refresh_token_http_error_handling(self): |
| 562 | + """Test that HTTP errors are properly raised instead of parsing error responses as tokens.""" |
| 563 | + url = "https://example.com/refresh" |
| 564 | + |
| 565 | + # Test 400 error response |
| 566 | + error_400 = {"error": "invalid_grant", |
| 567 | + "error_description": "Refresh token is invalid"} |
| 568 | + |
| 569 | + def fake_400_error(r, **kwargs): |
| 570 | + resp = mock.MagicMock() |
| 571 | + resp.text = json.dumps(error_400) |
| 572 | + resp.status_code = 400 |
| 573 | + resp.request = mock.MagicMock() |
| 574 | + resp.request.url = url |
| 575 | + resp.raise_for_status.side_effect = requests.exceptions.HTTPError( |
| 576 | + "400 Client Error", response=resp |
| 577 | + ) |
| 578 | + return resp |
| 579 | + |
| 580 | + for client in self.clients: |
| 581 | + sess = OAuth2Session(client=client, token=self.token) |
| 582 | + sess.send = fake_400_error |
| 583 | + self.assertRaises(CustomOAuth2Error, sess.refresh_token, url) |
| 584 | + |
527 | 585 |
|
528 | 586 | class OAuth2SessionNetrcTest(OAuth2SessionTest):
|
529 | 587 | """Ensure that there is no magic auth handling.
|
|
0 commit comments