from . import utils, constants, mongo
from os.path import isfile, abspath, expanduser
from base64 import b64encode, b64decode
import ipaddress
import dpkt
[docs]class PCAPParser:
def __init__(self, pcap_file):
pcap_file = abspath(expanduser(pcap_file)) # Expand user and relative paths.
if not utils.check_file_exists(pcap_file):
raise FileNotFoundError("PCAP file not found: " + pcap_file)
self._pcap_file = pcap_file
self.__db = mongo.MongoDBManager(db_server=constants.MONGODB_SERVER)
self.__filter = []
[docs] def get_ip_filter(self):
"""
Return the current ip filter configuration.
:returns: a list of acceptable IPv4/IPv6 subnets in ipaddress subnet objects.
"""
return self.__filter
[docs] def set_ip_filter(self, subjects):
"""
Configure the parser to only store a packet if its source or
destination address belongs to an address or subnet as specified.
Always process single addresses as lowest-level subnets for convenience.
Calling this method overwrites the previous filter configuration.
:param list subjects: a list of acceptable IPv4/IPv6 addresses or subnets in
string format, and their direction. Format: [(NET, POSITION)], where
NET represents the IPv4/IPv6 address or subnet to track, and POSITION
represents whether this is supposed to be :const:`constants.IP_SRC`,
:const:`constants.IP_DST`, or :const:`constants.IP_EITHER`.
Precedence: for each packet, if there is either no IP_SRC or no IP_DST
specified, then it will be seen as matched; otherwise, as long as its
`src` or `dst` matches one of the :const:`constants.IP_SRC`/:const:`constants.IP_DST`
filters, it will be seen as matched. In the case of :const:`constants.IP_EITHER`,
the filter will match either source or destination occurrences of that IP,
superceding acceptance by :const:`constants.IP_SRC`/:const:`constants.IP_DST`
filters covering the same subnets.
:returns: the number of successfully added filters (filter with
overlapping subnets represented and processed separately).
"""
self.__filter = []
self.__filter_src_rules = []
self.__filter_dst_rules = []
self.__filter_bidir_rules = []
for subject in subjects:
if not isinstance(subject, tuple) or \
(subject[1] not in [constants.IP_SRC, constants.IP_DST, constants.IP_EITHER]):
continue
subnet = utils.build_subnet(subject[0])
if subnet:
if subject[1] == constants.IP_SRC:
self.__filter_src_rules.append(subnet)
elif subject[1] == constants.IP_DST:
self.__filter_dst_rules.append(subnet)
elif subject[1] == constants.IP_EITHER:
# Either source or desitination.
self.__filter_bidir_rules.append(subnet)
self.__filter.append((subnet, subject[1]))
return len(self.__filter)
[docs] def load_packet_info(self):
"""
Load and return information of raw packets.
Non-IP/IPv6 packets are ignored.
Format::
[{
type: v4/v6, dst: dst_ip, src: src_ip, len: packet_length,
proto: protocol, time: time_stamp, ttl: TTL/hop_limit,
tcp_info (None for non-TCP packets):
{sport: src_port, dport: dst_port, flags: tcp_flags,
opts: tcp_options, seq: tcp_seq, ack: tcp_ack,
payload: b64encoded_payload},
tls_info (None for non-TLS packets):
{type: tls_type, ver: tls_version, len: tls_data_length,
records: tls_num_records, data: [b64_encoded_tls_data],
data_length = [b64_encoded_tls_data_length]}
}]
:returns: a list of packets parsed formatted as above.
"""
packet_list = []
check_filter = False
if len(self.__filter) > 0:
check_filter = True
with open(self._pcap_file, 'rb') as f:
for ts, buf in dpkt.pcap.Reader(f):
eth = dpkt.ethernet.Ethernet(buf)
packet_info = {}
# Generic IP information.
ip = eth.data
if eth.type == dpkt.ethernet.ETH_TYPE_IP:
packet_info["dst"] = utils.parse_ip(ip.dst)
packet_info["src"] = utils.parse_ip(ip.src)
packet_info["type"] = "IPv4"
packet_info["len"] = ip.len
packet_info["ttl"] = ip.ttl
elif eth.type == dpkt.ethernet.ETH_TYPE_IP6:
packet_info["dst"] = utils.parse_ip(ip.dst)
packet_info["src"] = utils.parse_ip(ip.src)
packet_info["type"] = "IPv6"
packet_info["len"] = ip.plen
packet_info["ttl"] = ip.hlim
else:
PCAPParser.log_invalid("Non ip/ip6 packet ignored: " + str(buf))
continue
# Drop this packet if filter rules exclude this packet.
if check_filter:
src_net = utils.build_subnet(packet_info["src"])
dst_net = utils.build_subnet(packet_info["dst"])
if len(self.__filter_src_rules) > 0:
src_match = any([s.overlaps(src_net) for s in self.__filter_src_rules])
else:
src_match = True # Default acceptance if unspecified.
if len(self.__filter_dst_rules) > 0:
dst_match = any([s.overlaps(dst_net) for s in self.__filter_dst_rules])
else:
dst_match = True # Default acceptance if unspecified.
if len(self.__filter_bidir_rules) > 0:
bidir_match = any([s.overlaps(src_net) or s.overlaps(dst_net) for s in self.__filter_bidir_rules])
else:
bidir_match = False # No default acceptance for bidirectional filters.
if not bidir_match: # bidirectional supersedence for the same subnets.
if not (src_match and dst_match):
continue
packet_info["proto"] = type(ip.data).__name__
packet_info["time"] = "{0:.6f}".format(ts)
# Check and record TCP information if applicable.
tcp_info = None
if packet_info["proto"] == "TCP":
tcp_info = {}
tcp_info["sport"] = ip.data.sport
tcp_info["dport"] = ip.data.dport
tcp_info["flags"] = utils.parse_tcp_flags(ip.data.flags)
tcp_info["opts"] = dpkt.tcp.parse_opts(ip.data.opts)
tcp_info["ack"] = ip.data.ack
tcp_info["seq"] = ip.data.seq
tcp_info["payload"] = b64encode(ip.data.data)
packet_info["tcp_info"] = tcp_info
# Check and record TLS information if applicable.
try:
tls = dpkt.ssl.TLS(ip.data.data)
tls_data = {}
tls_data["type"] = constants.TLS_TYPE[tls.type]
tls_data["ver"] = constants.TLS_VERSION[tls.version]
tls_data["len"] = tls.len
tls_data["records"] = len(tls.records) # Number of records.
tls_data["data"] = []
tls_data["data_length"] = []
for record in tls.records:
tls_data["data"].append(b64encode(record.data))
tls_data["data_length"].append(len(record.data))
except:
tls_data = None
packet_info["tls_info"] = tls_data
# check and record useful features of HTTP Requests, if exist.
try:
http_request = dpkt.http.Request(ip.data.data)
http_data = {}
http_data['headers'] = http_request.headers
http_data['uri'] = http_request.uri
http_data['version'] = http_request.version
except:
http_data = None
packet_info["http_info"] = http_data
packet_list.append(packet_info)
return packet_list
[docs] def load_and_insert_new(self, description=""):
"""
Load raw packet from pcap file, and insert into a new collection.
Returned collection name **must** be verified to not be False.
:param str description: description of the new collection, empty by default.
:returns: name of the new collection, False if failed.
"""
packets = self.load_packet_info()
if len(packets) == 0: # No packet loaded (likely incorrect ip filter.)
return False
new_collection = self.__db.new_collection(description=description, input_filters=self.__filter)
if not new_collection:
return False
insertion_result = self.__db.insert_packets(packets, collection_name=new_collection)
if len(insertion_result["inserted"].inserted_ids) > 0:
return new_collection
else:
return False
[docs] def load_and_insert_existing(self, collection_name):
"""
Load raw packets from pcap file, and insert into an existing collection.
Returned collection name **must** be verified to not be False.
:returns: True if insertion successful, False if failed.
"""
packets = self.load_packet_info()
if len(packets) == 0: # No packet loaded (likely incorrect ip filter.)
return False
insertion_result = self.__db.insert_packets(packets, collection_name=collection_name)
if len(insertion_result["inserted"].inserted_ids) > 0:
return True
else:
return False
[docs] def clean_up(self, collection):
"""
Drop the collection and its index to clean up space, if the stored trace
is temporary only.
:param str collection: the name of the collection to be cleaned up.
"""
self.__db.delete_collection(collection)
[docs] @staticmethod
def log_invalid(error_content):
"""
Utility function to log invalid packet information parsed.
:returns: None
"""
if constants.LOG_ERROR and isfile(constants.LOG_FILE):
with open(constants.LOG_FILE, "a") as log_file:
log_file.write(error_content)