Skip to content

Experiment: Enable decode of base64 #44

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 37 additions & 12 deletions src/jsonid/jsonid.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

import argparse
import asyncio
import base64
import binascii
import json
import logging
import os
import sys
import time
from typing import Tuple
from typing import Any, Tuple

try:
import helpers
Expand Down Expand Up @@ -36,32 +38,48 @@
logger = logging.getLogger(__name__)


def decode(content: Any, decodeb64: bool):
"""Decode the given content stream."""
data = ""
try:
data = json.loads(content)
except json.decoder.JSONDecodeError:
if not decodeb64:
return False, None
try:
logger.debug("attempting to decode as base64")
data = base64.b64decode(content)
return decode(data, False)
except (binascii.Error, ValueError):
return False, None
return True, data


@helpers.timeit
async def identify_plaintext_bytestream(path: str) -> Tuple[bool, str]:
async def identify_plaintext_bytestream(path: str, decodeb64: bool) -> Tuple[bool, str]:
"""Ensure that the file is a palintext bytestream and can be
processed as JSON.
"""
logger.debug("attempting to open: %s", path)
data = ""
with open(path, "r", encoding="utf-8") as obj:
try:
data = json.loads(obj.read())
except (json.decoder.JSONDecodeError, UnicodeDecodeError):
content = obj.read()
return decode(content, decodeb64)
except UnicodeDecodeError:
return False, None
return True, data


async def identify_json(paths: list[str], binary: bool):
async def identify_json(paths: list[str], binary: bool, decodeb64: bool):
"""Identify objects"""
print("---")
for path in paths:
valid, data = await identify_plaintext_bytestream(path)
valid, data = await identify_plaintext_bytestream(path, decodeb64)
if not valid:
logger.debug("%s: is not plaintext", path)
if binary:
logger.warning("report on binary object...")
continue
if data != "":
print("---")
logger.debug("processing: %s", path)
res = registry.matcher(data)
print(f"file: {path}")
Expand All @@ -82,20 +100,20 @@ async def create_manifest(path: str) -> list[str]:
return paths


async def process_data(path: str, binary: bool):
async def process_data(path: str, binary: bool, decodeb64: bool):
"""Process all objects at a given path"""
logger.debug("processing: %s", path)
if not os.path.exists(path):
logger.error("path: '%s' does not exist", path)
sys.exit(1)
if os.path.isfile(path):
await identify_json([path], binary)
await identify_json([path], binary, decodeb64)
sys.exit(0)
paths = await create_manifest(path)
if not paths:
logger.info("no files in directory: %s", path)
sys.exit(1)
await identify_json(paths, binary)
await identify_json(paths, binary, decodeb64)


def main() -> None:
Expand All @@ -122,6 +140,12 @@ def main() -> None:
required=False,
action="store_true",
)
parser.add_argument(
"--base64",
help="attempt to decode contents as base64 (prototype)",
required=False,
action="store_true",
)
parser.add_argument(
"--registry",
help="path to a custom registry to lead into memory replacing the default",
Expand Down Expand Up @@ -150,6 +174,7 @@ def main() -> None:
process_data(
path=args.path,
binary=args.binary,
decodeb64=args.base64,
)
)

Expand Down