Source code for CovertMark.strategy.entropy_est

from .. import analytics, data
from .strategy import DetectionStrategy

import os
from sys import exit, argv
from datetime import date, datetime
from operator import itemgetter
from math import log1p, floor
import numpy as np


[docs]class EntropyEstimationStrategy(DetectionStrategy): """ Detecting high-entropy fully-encrypted based on estimation of sliding window entropy on TCP payloads in both directions. """ NAME = "Entropy Estimation Strategy" DESCRIPTION = "Detecting high-entropy PTs based on sliding window entropy estimation." _DEBUG_PREFIX = "EntropyEst" RUN_CONFIG_DESCRIPTION = ["Window Size", "Test Size", "Percentile Threshold"] MIN_TEST_SIZES = [1024, 512, 256, 128] WINDOW_SIZE = 64 # Default. WINDOW_SIZES = [32, 64, 96] THRESHOLDS = [0.1, 0.5, 1, 5] # %ile threshold for high-entropy proportions FALSE_POSITIVE_SCORE_WEIGHT = 0.5 TLS_HTTP_INCLUSION_THRESHOLD = 0.1 def __init__(self, pt_pcap, negative_pcap=None, debug=True): super().__init__(pt_pcap, negative_pcap, debug=debug) self._analyser = analytics.entropy.EntropyAnalyser() # Store intermediate results and cut-off thresholds. self._strategic_states['TPR'] = {} self._strategic_states['FPR'] = {} self._strategic_states['blocked_ips'] = {} self._strategic_states['cut_off'] = {} # Record disregards. self._disregard_tls = False self._disregard_http = False
[docs] def set_strategic_filter(self): """ The base strategy is to only observe TCP packets that do not have valid TLS records (as identified by dpkt) but do bear a non-blank payload. """ # TCP payload required here, with whether to include or disregard HTTP # and TLS packets are done by run when observing retrieved packet # patterns. self._strategic_packet_filter = {"tcp_info": {"$ne": None}, "tcp_info.payload": {"$ne": b''}}
[docs] def interpret_config(self, config_set): """ Block size, p-value threshold, and criteria distinguish entropy distribution tests. """ if config_set is not None: return "Entropy estimation test with byte block size {} and max tested payload size {}, subject to {} pct threshold. .".format(config_set[0], config_set[1], config_set[2]) else: return ""
[docs] def config_specific_penalisation(self, config_set): """ Byte block sizes and min test sizes for entropy uniformity and distribution tests will have already inversely proportionally affected the positive execution time. As the percentile threshold has no effect on the difficulty of strategy deployment, no strategy-specific penalisation is required. """ return 0
[docs] def test_validation_split(self, split_ratio): """ Not needed, as a fixed strategy is used. """ return ([], [])
[docs] def positive_run(self, **kwargs): """ Results from these tests estimate the presence of fully encrypted payloads by counting the number of sliding windows with large numbers of unique bytes. :param int window_size: the size of blocks of payload bytes tested in KS and AD. Default is set in :const:`BLOCK_SIZE`. :param int test_size: the minimum number of bytes tested in each payload for testing, with default set in :const:`TEST_SIZES`. :param int threshold: the percentile threshold for the proportion of high entropy packets considered as positives. """ window_size = self.WINDOW_SIZE if 'window_size' not in kwargs else kwargs['window_size'] test_size = min(self.MIN_TEST_SIZES) if 'test_size' not in kwargs else kwargs['test_size'] threshold = max(self.THRESHOLDS) if 'threshold' not in kwargs else kwargs['threshold'] config = (window_size, test_size, threshold) mtu_threshold = analytics.constants.MTU_FRAME_AVOIDANCE_THRESHOLD examined_packets = 0 detected = [] self._strategic_states['cut_off'][config] = 0 for t in self._pt_packets: payload = t['tcp_info']['payload'][:mtu_threshold] if len(payload) >= max(self._protocol_min_length, window_size, test_size): examined_packets += 1 detected.append(self._analyser.entropy_estimation(payload, window_size)) if examined_packets == 0: self.debug_print("Warning: no packets examined, TCP payload length threshold or input filters may be incorrect.") return 0 self._strategic_states['cut_off'][config] = floor(np.percentile(detected, threshold)) # Store result in the state space and register it. self._strategic_states['TPR'][config] = float(100 - threshold) / 100 # Fixed positive thresholding. self.register_performance_stats(config, TPR=self._strategic_states['TPR'][config]) return self._strategic_states['TPR'][config]
[docs] def negative_run(self, **kwargs): """ Test an identical configuration on negative packets. Reporting falsely blocked IPs. :param int window_size: the size of blocks of payload bytes tested in KS and AD. Default is set in :const:`BLOCK_SIZE`. :param int test_size: the minimum number of bytes tested in each payload for testing, with default set in :const:`TEST_SIZES`. :param int threshold: the percentile threshold for the proportion of high entropy packets considered as positives. """ window_size = self.WINDOW_SIZE if 'window_size' not in kwargs else kwargs['window_size'] test_size = min(self.MIN_TEST_SIZES) if 'test_size' not in kwargs else kwargs['test_size'] threshold = max(self.THRESHOLDS) if 'threshold' not in kwargs else kwargs['threshold'] config = (window_size, test_size, threshold) mtu_threshold = analytics.constants.MTU_FRAME_AVOIDANCE_THRESHOLD false_positives = 0 blocked_ips = set([]) for t in self._neg_packets: payload = t['tcp_info']['payload'][:mtu_threshold] if len(payload) >= max(self._protocol_min_length, window_size, test_size): high_entropy_proportion = self._analyser.entropy_estimation(payload, window_size) if high_entropy_proportion >= self._strategic_states['cut_off'][config]: blocked_ips.add(t['dst']) false_positives += 1 self._negative_blocked_ips = blocked_ips # Unlike the positive case, we consider the false positive rate to be # over all packets, rather than just the ones were are interested in. # Store all results in the state space. self._strategic_states['FPR'][config] = false_positives / self._neg_collection_total self._strategic_states['blocked_ips'][config] = blocked_ips self._false_positive_blocked_rate = float(len(blocked_ips)) / self._negative_unique_ips # Register the results. self.register_performance_stats(config, FPR=self._strategic_states['FPR'][config], ip_block_rate=self._false_positive_blocked_rate) return self._strategic_states['FPR'][config]
[docs] def report_blocked_ips(self): """ Return a Wireshark-compatible filter expression to allow viewing blocked packets in Wireshark. Useful for studying false positives. :returns: a Wireshark-compatible filter expression string. """ wireshark_output = "" if not self._disregard_tls: wireshark_output += "ssl && " else: wireshark_output += "!ssl && " if not self._disregard_http: wireshark_output += "http && " else: wireshark_output += "!http && " wireshark_output += "tcp_len >= " + str(self._protocol_min_length) + " && " wireshark_output += "(" for i, ip in enumerate(list(self._negative_blocked_ips)): wireshark_output += "ip.dst_host == \"" + ip + "\" " if i < len(self._negative_blocked_ips) - 1: wireshark_output += "|| " wireshark_output += ")" return wireshark_output
[docs] def run_strategy(self, **kwargs): """ PT input filters should be given as :const:`data.constants.IP_SRC` and :const:`data.constants.IP_DST`, and changed around if testing for downstream rather than upstream direction. Negative input filters specifying innocent clients should be given as an :const:`data.constants.IP_SRC`. :param int protocol_min_length: Optionally set the minimum handshake TCP payload length of packets in that direction, allowing disregard of short packets. """ protocol_min_length = 0 if 'protocol_min_length' not in kwargs else kwargs['protocol_min_length'] if not isinstance(protocol_min_length, int) or protocol_min_length < 0: self.debug_print("Assuming minimum protocol TCP payload length as 0.") self._protocol_min_length = 0 else: self._protocol_min_length = protocol_min_length # Check whether we should include or disregard TLS or HTTP packets. pt_tls_count = 0 pt_http_count = 0 for packet in self._pt_packets: if packet["tls_info"] is not None: pt_tls_count += 1 elif packet["http_info"] is not None: pt_http_count += 1 if float(pt_tls_count) / len(self._pt_packets) >= self.TLS_HTTP_INCLUSION_THRESHOLD: self.debug_print("Considering TLS packets based on PT trace observations only.") self._pt_packets = [i for i in self._pt_packets if i["tls_info"] is not None] self._neg_packets = [i for i in self._neg_packets if i["tls_info"] is not None] else: self.debug_print("Disregarding TLS packets based on PT trace observations.") self._pt_packets = [i for i in self._pt_packets if i["tls_info"] is None] self._neg_packets = [i for i in self._neg_packets if i["tls_info"] is None] self._disregard_tls = True if float(pt_http_count) / len(self._pt_packets) >= self.TLS_HTTP_INCLUSION_THRESHOLD: self.debug_print("Considering HTTP packets based on PT trace observations only.") self._pt_packets = [i for i in self._pt_packets if i["http_info"] is not None] self._neg_packets = [i for i in self._neg_packets if i["http_info"] is not None] else: self.debug_print("Disregarding HTTP packets based on PT trace observations.") self._pt_packets = [i for i in self._pt_packets if i["http_info"] is None] self._neg_packets = [i for i in self._neg_packets if i["http_info"] is None] self._disregard_http = True self.debug_print("- Running iterations of detection strategy on positive and negative test packets...") for s in self.MIN_TEST_SIZES: for b in self.WINDOW_SIZES: for c in self.THRESHOLDS: self.debug_print("- Calculating positive cut-off at {} pct, for min {} bytes, {} byte windows on positive packets...".format(c, s, b)) tp = self.run_on_positive((b, s, c), window_size=b, test_size=s, threshold=c) self.debug_print("- Testing min {} bytes, {} byte windows on negative packets...".format(s, b)) fp = self.run_on_negative((b, s, c), window_size=b, test_size=s, threshold=c) self.debug_print("min {} bytes, {} byte windows at {} pct cut-off gives false positive rate {}.".format(s, b, c, fp)) # Find the best true positive and false positive performance. tps = self._strategic_states['TPR'] fps = self._strategic_states['FPR'] best_true_positives = [i[0] for i in sorted(tps.items(), key=itemgetter(1), reverse=True)] # True positive in descending order. best_false_positives = [i[0] for i in sorted(fps.items(), key=itemgetter(1))] # False positive in ascending order. best_true_positive = best_true_positives[0] best_false_positive = best_false_positives[0] # Score the configurations based on their difference from the best one. # As it is guaranteed for the difference to be between 0 and 1, # log1p(100) - log1p(diff*100) is used to create a descending score # exponentially rewarding low difference values. configs = list(tps.keys()) true_positives_scores = [(log1p(100) - log1p(abs(tps[best_true_positive] - tps[i])*100)) for i in configs] false_positives_scores = [(log1p(100) - log1p(abs(tps[best_false_positive] - fps[i])*100)) for i in configs] average_scores = [(true_positives_scores[i] * (1-self.FALSE_POSITIVE_SCORE_WEIGHT) + false_positives_scores[i] * self.FALSE_POSITIVE_SCORE_WEIGHT) for i in range(len(true_positives_scores))] best_config = configs[average_scores.index(max(average_scores))] self._true_positive_rate = tps[best_config] self._false_positive_rate = fps[best_config] self._negative_blocked_ips = self._strategic_states["blocked_ips"] self.debug_print("Best classification performance:") self.debug_print("block size: {}, min test size: {}, positive cutoff threshold: {} pct.".format(best_config[0], best_config[1], best_config[2])) self.debug_print("True positive rate: {}; False positive rate: {}".format(self._true_positive_rate, self._false_positive_rate)) self._negative_blocked_ips = self._strategic_states['blocked_ips'][best_config] self._false_positive_blocked_rate = float(len(self._negative_blocked_ips)) / self._negative_unique_ips self.debug_print("This classification configuration blocked {:0.2f}% of IPs seen.".format(self._false_positive_blocked_rate*100)) return (self._true_positive_rate, self._false_positive_rate)
if __name__ == "__main__": parent_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) pt_path = os.path.join(parent_path, 'examples', 'local', argv[1]) unobfuscated_path = os.path.join(parent_path, 'examples', 'local', argv[2]) detector = EntropyEstimationStrategy(pt_path, unobfuscated_path, debug=True) detector.setup(pt_ip_filters=[(argv[3], data.constants.IP_SRC), (argv[4], data.constants.IP_DST)], negative_ip_filters=[(argv[5], data.constants.IP_SRC)], pt_collection=argv[6], negative_collection=argv[7]) detector.run(protocol_min_length=int(argv[8])) print(detector.report_blocked_ips()) score, best_config = detector._score_performance_stats() print("Score: {}, best config: {}.".format(score, detector.interpret_config(best_config))) print(detector.make_csv())