Skip to content

Commit 2f86436

Browse files
Check email in breaches module (#56)
* Add unfinished email_breach module * Add email breach module * Remove debug code * Skip tests on 429. Close session on error * Make function for handling errors. Pretify get_random_string function. * Fix response messages a little Co-authored-by: manmolecular <regwebghost@yandex.ru>
1 parent 4417844 commit 2f86436

File tree

5 files changed

+186
-0
lines changed

5 files changed

+186
-0
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
import sys
2+
from pathlib import Path
3+
4+
__root_dir = Path(__file__).parents[4]
5+
sys.path.append(str(__root_dir))
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
#!/usr/bin/env python3
2+
3+
from pprint import pprint
4+
from sys import argv
5+
6+
from src.core.utils.module import run_module
7+
from .module import Runner
8+
9+
result = run_module(
10+
Runner, args=argv, arg_name="email", arg_default="johndoe@gmail.com"
11+
)
12+
pprint(result)
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
#!/usr/bin/env python3
2+
3+
from hashlib import sha1
4+
from re import findall
5+
6+
from bs4 import BeautifulSoup
7+
from requests import Session
8+
9+
from src.core.base.osint import OsintRunner, PossibleKeys
10+
from src.core.utils.response import ScriptResponse
11+
from src.core.utils.validators import validate_kwargs
12+
13+
14+
class Defaults:
15+
headers = {
16+
"Content-Type": "application/x-www-form-urlencoded",
17+
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
18+
"Chrome/84.0.4147.105 Safari/537.36",
19+
}
20+
21+
22+
class Runner(OsintRunner):
23+
"""
24+
Check email in public breaches.
25+
"""
26+
27+
required = ["email"]
28+
29+
def __init__(self, logger: str = __name__):
30+
super(Runner, self).__init__(logger)
31+
32+
@validate_kwargs(PossibleKeys.KEYS)
33+
def run(self, *args, **kwargs) -> ScriptResponse.success or ScriptResponse.error:
34+
"""
35+
Take email and look it up in breaches.
36+
Breaches info is provided by monitor.firefox.com (haveibeenpwned.com)
37+
"""
38+
email = kwargs.get("email")
39+
if not isinstance(email, str):
40+
return ScriptResponse.error(
41+
result=None,
42+
message=f"Can't make query. Incorrect input type (got {type(email)}, need {type('')}).",
43+
)
44+
email_hash = sha1(email.encode()).hexdigest()
45+
46+
with Session() as session:
47+
session.headers.update(Defaults.headers)
48+
49+
resp = session.get(r"https://monitor.firefox.com")
50+
if resp.status_code != 200:
51+
return ScriptResponse.error(
52+
message=f"Can't look up email in breaches. Server response: {resp.status_code}.",
53+
)
54+
55+
csrf_re = findall(r'(?<="_csrf" value=").*(?=">)', resp.text)
56+
if not csrf_re:
57+
return ScriptResponse.error(message=f"Can't find csrf token.")
58+
59+
csrf = csrf_re[0]
60+
resp = session.post(
61+
r"https://monitor.firefox.com/scan",
62+
data={"_csrf": csrf, "emailHash": email_hash},
63+
)
64+
if resp.status_code != 200:
65+
return ScriptResponse.error(
66+
message=f"Can't look up email in breaches. Server response: {resp.status_code}.",
67+
)
68+
69+
breaches = []
70+
soup = BeautifulSoup(resp.text, "html.parser")
71+
for breach in soup.find_all("a", class_="breach-card"):
72+
title = breach.find("span", class_="breach-title").text
73+
info = breach.find_all("span", class_="breach-value")
74+
if len(info) < 2:
75+
continue
76+
breaches.append(
77+
{"title": title, "date": info[0].text, "compromised": info[1].text}
78+
)
79+
80+
return ScriptResponse.success(
81+
result=breaches,
82+
message=f"Email {email} is found in {len(breaches)} breaches.",
83+
)
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
beautifulsoup4==4.9.1
2+
certifi==2020.6.20
3+
chardet==3.0.4
4+
colorama==0.4.3
5+
commonmark==0.9.1
6+
idna==2.10
7+
Pygments==2.6.1
8+
requests==2.24.0
9+
rich==5.2.1
10+
soupsieve==2.0.1
11+
typing-extensions==3.7.4.2
12+
urllib3==1.25.10
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
#!/usr/bin/env python3
2+
3+
from unittest import TestCase, SkipTest
4+
from random import choices
5+
from string import ascii_uppercase
6+
7+
from .module import Runner
8+
9+
10+
def get_random_string(length: int = 30) -> str:
11+
"""
12+
Generates random string.
13+
:param length: string length
14+
:return: random string
15+
"""
16+
return "".join(choices(ascii_uppercase, k=length))
17+
18+
19+
def check_response_msg(message: str = "") -> None:
20+
"""
21+
Check that response message is valid and applicable
22+
:param message: message of the response
23+
:return: None
24+
"""
25+
if "429" in message:
26+
raise SkipTest("Server respond with 429 (Too many requests)")
27+
28+
29+
class EmailBreachTest(TestCase):
30+
"""
31+
Defines basic tests for the email breach script.
32+
"""
33+
34+
def setUp(self):
35+
"""
36+
Setup something before each test function
37+
:return: None
38+
"""
39+
self.runner = Runner()
40+
41+
def test_request(self) -> None:
42+
"""
43+
Test email breach on johndoe@gmail.com.
44+
:return: None
45+
"""
46+
response = self.runner.run(email="johndoe@gmail.com")
47+
check_response_msg(response.get("message", ""))
48+
49+
self.assertIn("found in", response.get("message"))
50+
self.assertGreaterEqual(len(response.get("result")), 10)
51+
self.assertIn(
52+
{
53+
"compromised": "Passwords, Email addresses",
54+
"date": "December 4, 2013",
55+
"title": "Adobe",
56+
},
57+
response.get("result"),
58+
)
59+
60+
def test_random_string(self) -> None:
61+
"""
62+
Test email breach on random string request.
63+
"""
64+
response = self.runner.run(email=get_random_string())
65+
check_response_msg(response.get("message", ""))
66+
self.assertIn("found in", response.get("message"))
67+
68+
def test_unexpected_input(self) -> None:
69+
"""
70+
Test email breach on unexpected input type
71+
"""
72+
response = self.runner.run(email=None)
73+
check_response_msg(response.get("message", ""))
74+
self.assertIn("Can't make query", response.get("message"))

0 commit comments

Comments
 (0)