Source code for CovertMark.strategy.length_clustering

from .. import analytics, data
from .strategy import DetectionStrategy

import os
from sys import exit, argv
from datetime import date, datetime
from operator import itemgetter
from math import log1p

[docs]class LengthClusteringStrategy(DetectionStrategy): """ Detecting polling-based PTs such as meek by clustering the payload length of TLS-loaded TCP packet, useful for PTs with frequent directional pings with small and not greatly varying lengths of payloads. """ NAME = "Length Clustering Strategy" DESCRIPTION = "Detecting low-payload heartbeat messages." _DEBUG_PREFIX = "LenClustering" RUN_CONFIG_DESCRIPTION = ["MeanShift bandwidth", "Using top N clusters"] TLS_INCLUSION_THRESHOLD = 0.1 MEANSHIFT_BWS = [1, 2, 3, 5, 10] USE_TOP_CLUSTERS = [1, 2] MINIMUM_TPR = 0.40 # While this method does not require high TPR, a minimum threshold needs to # be maintained to ensure fitness. TLS_MODES = ["all", "only", "none"] # Decide whether to use all packets, only TLS packets, or only non-TLS packets. def __init__(self, pt_pcap, negative_pcap=None, debug=True): super().__init__(pt_pcap, negative_pcap, debug=debug) self._strategic_states['TPR'] = {} self._strategic_states['FPR'] = {} self._strategic_states['top_clusters'] = {} self._strategic_states['blocked'] = {} self._tls_mode = self.TLS_MODES[0] self._best_config = None # For wireshark reporting.
[docs] def set_strategic_filter(self): """ When detecting meek, it would be trivial to simply ignore all non-TLS packets. However for a generalised strategy use/disregard of TLS packets should be determined by inspecting the positive packets instead. Therefore it is only necessary to filter out TCP packets with no payload. """ self._strategic_packet_filter = {"tcp_info": {"$ne": None}, "tcp_info.payload": {"$ne": b''}}
[docs] def interpret_config(self, config_set): """ Bandwidth and number of clusters used distinguish length clustering runs. """ if config_set is not None: interpretation = "TCP payload length clustering at MeanShift bandwidth {} with top {} cluster(s). ".format(config_set[0], config_set[1]) best_clusters = [str(i) for i in self._strategic_states['top_clusters'][config_set]] interpretation += "This cluster contain the following TCP payload lengths: {}.".format(', '.join(best_clusters)) return interpretation else: return ""
[docs] def config_specific_penalisation(self, config_set): """ The smaller the cluster bandwidth, the easier it is to perform live TCP payload length-based interceptions. Therefore 2.5% of penalty for every 1 extra byte value in the cluster beyond the minimum cluster size used across the board. """ if config_set not in self._strategic_states['top_clusters'].keys(): return 0 best_cluster = self._strategic_states['top_clusters'][config_set] min_cluster_size = len(min(self._strategic_states['top_clusters'].values(), key=len)) return 0.025 * min(0, len(best_cluster) - min_cluster_size)
[docs] def test_validation_split(self, split_ratio): """ Not currently needed, as a fixed strategy is used. """ return ([], [])
[docs] def positive_run(self, **kwargs): """ Because this simple strategy is based on common global TCP payload lengths, the identified packet ratio is not very useful here and will be fairly low (33-80%). :param int bandwidth: the bandwidth used for meanshift clustering payload lengths. :param int clusters: the number of top length clusters to use in classification. """ bandwidth = 1 if 'bandwidth' not in kwargs else kwargs['bandwidth'] clusters = 1 if 'clusters' not in kwargs else kwargs['clusters'] if self._tls_mode == "only": most_frequent = analytics.traffic.ordered_tcp_payload_length_frequency(self._pt_packets, True, bandwidth) else: most_frequent = analytics.traffic.ordered_tcp_payload_length_frequency(self._pt_packets, False, bandwidth) top_clusters = most_frequent[0] for i in range(1, clusters): top_clusters = top_clusters.union(most_frequent[i]) identified = 0 for packet in self._pt_packets: if len(packet['tcp_info']['payload']) in top_clusters: identified += 1 # Pass the cluster to the negative run. self._strategic_states['top_clusters'][(bandwidth, clusters)] = top_clusters self._strategic_states['TPR'][(bandwidth, clusters)] = identified / len(self._pt_packets) self.debug_print("TCP payload lengths in the {} cluster(s): {}.".format(clusters, ', '.join([str(i) for i in list(top_clusters)]))) self.register_performance_stats((bandwidth, clusters), TPR=self._strategic_states['TPR'][(bandwidth, clusters)]) return self._strategic_states['TPR'][(bandwidth, clusters)]
[docs] def negative_run(self, **kwargs): """ Now we check the identified lengths against negative packets. Because TLS packets with TCP payload lengths as small as meek's are actually very rare, this simple strategy becomes very effective. :param int bandwidth: the bandwidth used for meanshift clustering payload lengths. :param int clusters: the number of top length clusters to use in classification. """ bandwidth = 1 if 'bandwidth' not in kwargs else kwargs['bandwidth'] clusters = 1 if 'clusters' not in kwargs else kwargs['clusters'] top_cluster = self._strategic_states['top_clusters'][(bandwidth, clusters)] falsely_identified = 0 self._strategic_states['blocked'][(bandwidth, clusters)] = set([]) for packet in self._neg_packets: if len(packet['tcp_info']['payload']) in top_cluster: falsely_identified += 1 self._strategic_states['blocked'][(bandwidth, clusters)].add(packet['dst']) # Unlike the positive case, we consider the false positive rate to be # over all packets, rather than just the ones were are interested in. self._strategic_states['FPR'][(bandwidth, clusters)] = float(falsely_identified) / self._neg_collection_total self._negative_blocked_ips = self._strategic_states['blocked'][(bandwidth, clusters)] self._false_positive_blocked_rate = float(len(self._negative_blocked_ips)) / self._negative_unique_ips self.register_performance_stats((bandwidth, clusters), FPR=self._strategic_states['FPR'][(bandwidth, clusters)], ip_block_rate=self._false_positive_blocked_rate) return self._strategic_states['FPR'][(bandwidth, clusters)]
[docs] def report_blocked_ips(self): """ Return a Wireshark-compatible filter expression to allow viewing blocked packets in Wireshark. Useful for studying false positives. :returns: a Wireshark-compatible filter expression string. """ if self._best_config is None: return "(No effective cluster found.)" if self._tls_mode == "all": wireshark_output = "tcp.payload && (" elif self._tls_mode == "only": wireshark_output = "ssl & tcp.payload && (" elif self._tls_mode == "none": wireshark_output = "!ssl & tcp.payload && (" for i, ip in enumerate(list(self._negative_blocked_ips)): wireshark_output += "ip.dst_host == \"" + ip + "\" " if i < len(self._negative_blocked_ips) - 1: wireshark_output += "|| " wireshark_output += ") && (" for i, l in enumerate(list(self._strategic_states['top_clusters'][self._best_config])): wireshark_output += "tcp.len == " + str(l) if i < len(self._strategic_states['top_clusters']) - 1: wireshark_output += " || " wireshark_output += ")" return wireshark_output
[docs] def run_strategy(self, **kwargs): """ PT clients and servers in the input PCAP should be specified via :const:`data.constants.IP_SRC` and :const:`data.constants.IP_DST` respectively, while negative clients should be specified via :const:`data.constants.IP_SRC`. :param str tls_mode: Optionally set tls_mode between "all", "only", or "none" to test all packets, TLS packets only, or non-TLS packets only. Set it as "guess" or omit this parameter for the strategy to guess. """ # Check whether we should include or disregard TLS packets. tls_mode = 'guess' if 'tls_mode' not in kwargs else kwargs['tls_mode'] if tls_mode not in self.TLS_MODES: # Specified but invalid. tls_mode = 'guess' if tls_mode == 'guess': self.debug_print("Studying PT packets to figure out about TLS packets") tls_packets = 0 for t in self._pt_packets: if 'tls_info' in t and t['tls_info'] is not None: tls_packets += 1 if float(tls_packets) / len(self._pt_packets) > 0.95: self._tls_mode = "only" elif float(tls_packets) / len(self._pt_packets) < 0.05: self._tls_mode = "none" else: self._tls_mode = "all" else: self._tls_mode = tls_mode if tls_mode == 'only': self.debug_print("Strategy TLS mode: examining TLS packets only.") self._pt_packets = [i for i in self._pt_packets if i["tls_info"] is not None] self._neg_packets = [i for i in self._neg_packets if i["tls_info"] is not None] elif tls_mode == 'none': self.debug_print("Strategy TLS mode: examining non-TLS packets only.") self._pt_packets = [i for i in self._pt_packets if i["tls_info"] is None] self._neg_packets = [i for i in self._neg_packets if i["tls_info"] is None] else: self.debug_print("Strategy TLS mode: examining all packets regardless of TLS status.") self.debug_print("- Testing the following bandwidths for MeanShift: {}".format(', '.join([str(i) for i in self.MEANSHIFT_BWS]))) for bw in self.MEANSHIFT_BWS: for c_size in self.USE_TOP_CLUSTERS: self.debug_print("- Running MeanShift on positives with bandwidth {} using top {} cluster(s)...".format(bw, c_size)) self.run_on_positive((bw, c_size), bandwidth=bw, clusters=c_size) tpr = self._strategic_states['TPR'][(bw, c_size)] self.debug_print("True positive rate on bandwidth {} for top {} cluster(s): {}".format(bw, c_size, tpr)) self.debug_print("- Checking MeanShift on negatives with bandwidth {} using top {} cluster(s)...".format(bw, c_size)) self.run_on_negative((bw, c_size), bandwidth=bw, clusters=c_size) fpr = self._strategic_states['FPR'][(bw, c_size)] self.debug_print("False positive rate on bandwidth {} for top {} cluster(s): {}".format(bw, c_size, fpr)) # Round performance to four decimal places. tps = self._strategic_states['TPR'] fps = self._strategic_states['FPR'] # Find the best true positive and false positive performance. # Descending order of TPR, then ascending by bandwidth and cluster size to maximise efficiency. best_true_positives = [i[0] for i in sorted(tps.items(), key=lambda x: (x[1], -x[0][0], -x[0][1]), reverse=True)] # False positive in ascending order, then by bandwidth and cluster size ascending. best_false_positives = [i[0] for i in sorted(fps.items(), key=lambda x: (x[1], x[0][0], x[0][1]))] # Walk down the list of lowest false positives to find the first config # satisfying the minimum true positive rate requirement. best_config = None for config in best_false_positives: if tps[config] >= self.MINIMUM_TPR: best_config = config break # If none satisfies the minimum true positive rate requirement, report # as failure. if best_config is None: self.debug_print("No bandwidth and cluster size achieved the minimum true positive rate required ({}), giving up.".format(self.MINIMUM_TPR)) return (None, None) self._best_config = best_config self._true_positive_rate = tps[best_config] self._false_positive_rate = fps[best_config] self.debug_print("Best classification performance:") self.debug_print("Bandwidth: {}, using top {} cluster(s).".format(best_config[0], best_config[1])) self.debug_print("True positive rate: {}; False positive rate: {}".format(self._true_positive_rate, self._false_positive_rate)) self._negative_blocked_ips = self._strategic_states['blocked'][best_config] self._false_positive_blocked_rate = float(len(self._negative_blocked_ips)) / self._negative_unique_ips self.debug_print("This classification configuration blocked {:0.2f}% of IPs seen.".format(self._false_positive_blocked_rate*100)) return (self._true_positive_rate, self._false_positive_rate)
if __name__ == "__main__": parent_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) pt_path = os.path.join(parent_path, 'examples', 'local', argv[1]) unobfuscated_path = os.path.join(parent_path, 'examples', 'local', argv[2]) detector = LengthClusteringStrategy(pt_path, unobfuscated_path, debug=True) detector.setup(pt_ip_filters=[(argv[3], data.constants.IP_SRC), (argv[4], data.constants.IP_DST)], negative_ip_filters=[(argv[5], data.constants.IP_SRC)], pt_collection=argv[6], negative_collection=argv[7])[8]) print(detector.report_blocked_ips()) score, best_config = detector._score_performance_stats() print("Score: {}, best config: {}.".format(score, detector.interpret_config(best_config))) print(detector.make_csv())