Source code for CovertMark.data.utils

import os
import socket
import ipaddress
from json import load

from dpkt import tcp

[docs]def check_file_exists(file_path): """ Check whether the file at file_path exists. :param str file_path: full path to the file checked. :returns: a boolean value indicating whether the file exists. """ if os.path.isfile(file_path): return True else: return False
[docs]def get_full_path(file_path): """ Given the path to a file, returns the full path containing the file by expanding any user prefixes. This does not require the target file to exist. :param str file_path: full or user-prefix path to file. :returns: the path to the directory containing the specified file. None if the directory does not exist. """ directory = os.path.dirname(file_path) full_dir = os.path.expanduser(directory) if not os.path.isdir(full_dir): return None return os.path.join(full_dir, os.path.basename(file_path))
[docs]def read_mongo_credentials(): """ Reads and returns mongo credentials stored in mongo-auth.json. :returns: a dict containing `'username'` and `'password'` specified if read was successful, as well as `'auth_source'` for the authentication databse. Returns None if the JSON file does not exist or is invalid. """ file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'mongo-auth.json') if not check_file_exists(file_path): return None try: creds = load(open(file_path, 'r')) except: return None for i in ['username', 'password', 'auth_source']: if i not in creds: return None return creds
[docs]def parse_ip(ip_bytes): """ Convert an IPv4/IPv6 address in bytes to a valid IP address in string format, if it is indeed valid. :param bytes ip_bytes: bytes of IPv4/IPv6 address. :returns: IP address in string format, None if input invalid. """ try: return str(ipaddress.ip_address(ip_bytes)) except ValueError: return None
[docs]def build_subnet(subnet_str): """ Convert an IPv4/IPv6 subnet in string format (e.g. 192.168.1.0/24) into an :class:`ipaddress.IPv4Network` or :class:`ipaddress.IPv6Network` object. :param str subnet_str: subnet in string format. :returns: :class:`ipaddress.IPv4Network` or :class:`ipaddress.IPv6Network` object depends on input subnet address type, or None if input invalid. """ try: network = ipaddress.IPv4Network(subnet_str) except ipaddress.AddressValueError: try: network = ipaddress.IPv6Network(subnet_str) except ipaddress.AddressValueError: return None return network
[docs]def parse_tcp_flags(flag_bits): """ Parse flags of a TCP packet. :param bytes flag_bytes: a byte of bits containing TCP packet flag, only the first 8 bits are in use. :returns: a dict of TCP flags and their values. """ flags = {} # Based on dpkt manual by Jeff Silverman. flags["FIN"] = (flag_bits & tcp.TH_FIN) != 0 flags["SYN"] = (flag_bits & tcp.TH_SYN) != 0 flags["RST"] = (flag_bits & tcp.TH_RST) != 0 flags["PSH"] = (flag_bits & tcp.TH_PUSH) != 0 flags["ACK"] = (flag_bits & tcp.TH_ACK) != 0 flags["URG"] = (flag_bits & tcp.TH_URG) != 0 flags["ECE"] = (flag_bits & tcp.TH_ECE) != 0 flags["CWR"] = (flag_bits & tcp.TH_CWR) != 0 return flags