Opensource Mail Gateway ürünü olan proxmox mailgateway’in(PMG) kendi içerisinde bir çok filtreleme katmanı bulunuyor. Bunlara ek olarak bir çok ücretli mail gateway yazılımında dahi bulunmayan bir entegrasyonu hep birlikte yapalım. Bunun için PMG’nin Custom Check Interface özelliğinden faydalanacağiz.
Öncelikle;
virustotal üyeliği açarak api yönlendirmelerini takip edip üyeliğimize ait bir api key almamız gerekmektedir. virustotal’e sorgu atarken bu api keyden faydalanacağız.
Önemli Not: virustotal free üyelikde günde 500 sorguya kadar izin vermektedir. premium fiyatları dolar bazlı olduğundan ve astarı yüzünü geçtiğinden şöyle bir çözüm ürettim.
bir url’i virustotal’e sorguladıktan sonra sonucunu proxmox içerisinde yer alan postgres db’ye yazdım. ve aynı url tekrar geldiğinde son 7 gün içerisinde sorgulamışsam, tekrar virustotal’den sorgulamadım yerel db’imde yer alan sonuçları referans aldım.
/etc/pmg/pmg.conf dosyasında admin bölümünde custom check’i enable edelim.
section: admin
custom_check 1
Custom check interface bash scriptini default ayarlarda “/usr/local/bin/pmg-custom-check” path’ine yazmamız gerekiyor. İsterseniz bu path’i de değiştirebilirsiniz. Burada oluşturduğumuz bir bash script ile asıl işi yapacak olan python dosyasını çağıracağız.
#!/bin/bash
parametre1=$1
parametre2=$2
python_sonuc=$(python3 /home/pmg-custom-check.py $parametre1 $parametre2)
echo "v1"
echo "$python_sonuc"
exit 0
Bash script dosyamız ile /home/pmg-custom-check.py
yolundaki python dosyamızı çağırıyoruz. “echo "$python_sonuc"
satırında python dosyamızdan aldığımız yanıtı proxmox’a geri gönderiyoruz.
python dosyamız;
import sys
from urlextract import URLExtract
import re
import json
import requests
import base64
import tldextract
import psycopg2
import datetime
# Dışarıdan gelen parametreleri al ve mail dosyasını oku
api_version = sys.argv[1] if len(sys.argv) > 1 else None
current_mail = sys.argv[2] if len(sys.argv) > 2 else None
mail_icerik_file = open(current_mail, "r")
mail_icerik = mail_icerik_file.read()
## tanımlamalar
virus_total_api_key = "xxxxxxxxxxxxxxxx"
guvenilen_domainler = ["microsoft.com","w3.org"]
kontrol_edilecek_domainler = []
kontrol_edilecek_subdomainler = []
kontrol_edilecek_urller = []
base64_anahtar_kelime = "Content-Transfer-Encoding: base64"
base64_count = mail_icerik.count(base64_anahtar_kelime)
#db tanımlamaları
db_user = "virustotallocal_db_user"
db_password = "xxxxxx"
db_host = "127.0.0.1"
db_name = "virustotallocal_db"
## fonksiyonlar
def db_select(tur,data):
today = datetime.date.today()
connection = psycopg2.connect(user=db_user,
password=db_password,
host=db_host,
port="5432",
database=db_name)
cursor = connection.cursor()
sorgu = """select "malicious","suspicious" from sorgu_sonuclari
WHERE tur = %s and data = %s and gecerlilik_tarihi > %s"""
cursor.execute((sorgu),(tur,data,today))
sorgu_sonuc = cursor.fetchall()
return sorgu_sonuc
def db_insert(tur,data,malicious,suspicious):
otuzgun = datetime.date.today() + datetime.timedelta(30)
connection = psycopg2.connect(user=db_user,
password=db_password,
host=db_host,
port="5432",
database=db_name)
connection.autocommit = True
cursor = connection.cursor()
sqlquery = """INSERT INTO sorgu_sonuclari(tur, data, malicious, suspicious, gecerlilik_tarihi, created_date) VALUES (%s, %s, %s, %s, %s, %s)"""
cursor.execute(sqlquery,(tur,data,malicious,suspicious,otuzgun, datetime.datetime.now()))
connection.commit()
connection.close()
def url_id_bul(url):
virus_total_sorgu_url = "https://www.virustotal.com/api/v3/urls"
payload = { "url": url }
headers = {
"accept": "application/json",
"x-apikey": virus_total_api_key,
"content-type": "application/x-www-form-urlencoded"
}
response = requests.post(virus_total_sorgu_url, data=payload, headers=headers)
response_json = json.loads(response.text)
id = response_json['data']['id']
return id
def id_sonuc_bul(id):
virus_total_sorgu_url = "https://www.virustotal.com/api/v3/analyses/{}".format(id)
headers = {
"accept": "application/json",
"x-apikey": virus_total_api_key
}
response = requests.get(virus_total_sorgu_url, headers=headers)
sonuc = json.loads(response.text)
stats = sonuc['data']['attributes']['stats']
return stats
def mail_icerik_find_urls(mail_icerik):
## base64 maili string'e çevir
if base64_count > 0:
search_baslangici = 0
for x in range(base64_count):
try:
baslangic = mail_icerik.find(base64_anahtar_kelime, search_baslangici)
bosluk_index = mail_icerik.find("--", baslangic+len(base64_anahtar_kelime))
search_baslangici = bosluk_index
sonuc = mail_icerik[baslangic+len(base64_anahtar_kelime):bosluk_index].strip()
base64_message = sonuc
base64_bytes = base64_message.encode('ascii')
message_bytes = base64.b64decode(base64_bytes)
message = message_bytes.decode('utf-8')
mail_icerik += message
except:
pass
mail_icerik = mail_icerik.replace('\n', '').replace('=', '').replace('--', ' ').replace('</a>', ' ').replace('<', ' ').replace('>', ' ').replace(']', ' ')
urls = re.findall('http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', mail_icerik)
else:
mail_icerik = mail_icerik.replace('\n', '').replace('=', '').replace('--', ' ').replace('</a>', ' ')
urls = re.findall('http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', mail_icerik)
return urls
def url_to_domain(url): #verilen url'nin domainini ayrıştırır ve güvenilir domainler listesinde değilse kontrol edilecek domainler listesine ekler.
ayristirilmis_url = tldextract.extract(url)
domain = ayristirilmis_url.domain+'.'+ayristirilmis_url.suffix
if domain in guvenilen_domainler or domain in kontrol_edilecek_domainler:
pass
else:
kontrol_edilecek_domainler.append(domain)
def url_to_subdomain(url): #verilen url'nin subdomainini ayrıştırır ve güvenilir subdomainler listesinde değilse kontrol edilecek subdomainler listesine ekler.
ayristirilmis_url = tldextract.extract(url)
domain = ayristirilmis_url.domain+'.'+ayristirilmis_url.suffix
subdomain = ayristirilmis_url.subdomain+'.'+ayristirilmis_url.domain+'.'+ayristirilmis_url.suffix
if domain in guvenilen_domainler or subdomain in kontrol_edilecek_subdomainler:
pass
else:
kontrol_edilecek_subdomainler.append(subdomain)
def url_append_kontrol_edilecek(url): #verilen url'nin domaini güvenilir domainler listesinde var mı kontrol eder, yoksa kontrol edilecek urller listesine ekler
ayristirilmis_url = tldextract.extract(url)
domain = ayristirilmis_url.domain+'.'+ayristirilmis_url.suffix
if domain in guvenilen_domainler or url in kontrol_edilecek_urller:
pass
else:
kontrol_edilecek_urller.append(url)
def spam_score_hesapla(malicious,suspicious):
skor = float(malicious) + (float(suspicious * 0.50))
return skor
## mail içerisindeki url'leri bulma
urls = mail_icerik_find_urls(mail_icerik)
urls = list(dict.fromkeys(urls))
for url in urls:
url_append_kontrol_edilecek(url)
url_to_domain(url)
url_to_subdomain(url)
def domainler_spam_score(kontrol_edilecek_domainler):
skor = 0
for domain in kontrol_edilecek_domainler:
domain_select = db_select("domain",domain)
if len(domain_select) > 0:
domain_malicious = domain_select[0][0]
domain_suspicious = domain_select[0][1]
skor += spam_score_hesapla(domain_malicious,domain_suspicious)
else:
domain_id=url_id_bul(domain)
domain_sonuc = id_sonuc_bul(domain_id)
domain_malicious = domain_sonuc["malicious"]
domain_suspicious = domain_sonuc["suspicious"]
skor += spam_score_hesapla(domain_malicious,domain_suspicious)
db_insert("domain",domain,domain_malicious,domain_suspicious)
return skor
def sub_domainler_spam_score(kontrol_edilecek_subdomainler):
skor = 0
for subdomain in kontrol_edilecek_subdomainler:
subdomain_select = db_select("subdomain",subdomain)
if len(subdomain_select) > 0:
subdomain_malicious = subdomain_select[0][0]
subdomain_suspicious = subdomain_select[0][1]
skor += spam_score_hesapla(subdomain_malicious,subdomain_suspicious)
else:
subdomain_id=url_id_bul(subdomain)
subdomain_sonuc = id_sonuc_bul(subdomain_id)
subdomain_malicious = subdomain_sonuc["malicious"]
subdomain_suspicious = subdomain_sonuc["suspicious"]
skor += spam_score_hesapla(subdomain_malicious,subdomain_suspicious)
db_insert("subdomain",subdomain,subdomain_malicious,subdomain_suspicious)
return skor
def sub_domainler_spam_score(kontrol_edilecek_subdomainler):
skor = 0
for subdomain in kontrol_edilecek_subdomainler:
subdomain_select = db_select("subdomain",subdomain)
if len(subdomain_select) > 0:
subdomain_malicious = subdomain_select[0][0]
subdomain_suspicious = subdomain_select[0][1]
skor += spam_score_hesapla(subdomain_malicious,subdomain_suspicious)
else:
subdomain_id=url_id_bul(subdomain)
subdomain_sonuc = id_sonuc_bul(subdomain_id)
subdomain_malicious = subdomain_sonuc["malicious"]
subdomain_suspicious = subdomain_sonuc["suspicious"]
skor += spam_score_hesapla(subdomain_malicious,subdomain_suspicious)
db_insert("subdomain",subdomain,subdomain_malicious,subdomain_suspicious)
return skor
def urller_spam_score(kontrol_edilecek_subdomainler):
skor = 0
for url in kontrol_edilecek_urller:
url_select = db_select("url",url)
if len(url_select) > 0:
url_malicious = url_select[0][0]
url_suspicious = url_select[0][1]
skor += spam_score_hesapla(url_malicious,url_suspicious)
else:
url_id=url_id_bul(url)
url_sonuc = id_sonuc_bul(url_id)
url_malicious = url_sonuc["malicious"]
url_suspicious = url_sonuc["suspicious"]
skor += spam_score_hesapla(url_malicious,url_suspicious)
db_insert("url",url,url_malicious,url_suspicious)
return skor
domainler_spam_score(kontrol_edilecek_domainler)
sub_domainler_spam_score(kontrol_edilecek_subdomainler)
urller_spam_score(kontrol_edilecek_urller)
mail_spam_score = domainler_spam_score(kontrol_edilecek_domainler) + sub_domainler_spam_score(kontrol_edilecek_subdomainler) + urller_spam_score(kontrol_edilecek_urller)
if mail_spam_score > 1:
print("SCORE: 10")
else:
print("OK")