[工具]Modbus-Fuzzer

作者: BI1IHA 分类: 安全工具 发布时间: 2018-03-21 16:16

Modbus Fuzzer v0.5 Alpha

It is basic Modbus Fuzzer. 

In this version for faster operation we target, diagnosis, write coil and write registers function code seprately as user input.

It is still under development, Me and Myfriend are trying to fix the problem of Generation Algorithm. 

Update log: v0.5: Added fuzzing feature for specific function code, Apr 30, 2014 v0.5 and logging feature

Update log: v0.2 Modified and added scanning function, Dec 14, 2013 v0.2


#!/usr/bin/python
'''
Created on Apr 16, 2013 v0.1
Modified and added scanning function, Dec 14, 2013 v0.2
Added fuzzing feature for specific function code, Apr 30, 2014 v0.5
@author: Ali, TJ
'''
import socket
import sys
from types import *
import struct
import time
import logging
HOST = '127.0.0.1'    # The remote host
dest_port = 502       # The same port as used by the server
TANGO_DOWN = ''
sock = None
dumbflagset = 0;
logging.basicConfig(filename='./fuzzer.log', filemode='a', level=logging.DEBUG, format='[%(asctime)s][%(levelname)s] %(message)s')
def create_connection(dest_ip, port):
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    except socket.error, msg:
        sys.stderr.write("[ERROR] %s\n" % msg[1])
        sys.exit(1)
    HOST = dest_ip
    try:
        sock.settimeout(0.5)
        sock.connect((HOST, dest_port))
    except socket.error, msg:
        logging.exception("Connection Failed!")
    else:
        logging.info("Connected to Server: %s" % dest_ip)
    return sock
def hexstr(s):
    return '-'.join('%02x' % ord(c) for c in s)
def dumb_fuzzing(dest_ip):
  sock = create_connection(dest_ip, dest_port)
  unitID = 0
  protoID = 0
  transID = 0
  lengthOfFunctionData = 1
  prevField = ""
  for functionCode in range(0,255):
    for functionData6 in range(0, 255):
      for functionData5 in range(0, 255):
        for functionData4 in range(0, 255):
          for functionData3 in range(0, 255):
            for functionData2 in range(0, 255):
              for functionData1 in range(0, 255):
                functionDataField = prevField + struct.pack(">B", functionData1)
                #print"%s" % hexstr(functionDataField)
                length = 2 + lengthOfFunctionData
                ModbusPacket = struct.pack(">H", transID) + \
                     struct.pack(">H", protoID) + \
                     struct.pack(">H", length) + \
                     struct.pack(">B", unitID) + \
                     struct.pack(">B", functionCode) + \
                     functionDataField
                logging.debug("%s" % hexstr(ModbusPacket))
                #print"%s" % hexstr(ModbusPacket)
                try:
                  sock.send(ModbusPacket)
                except socket.timeout:
                  logging.exception("Sending Timed Out!")
                except socket.error:
                  logging.exception("Sending Failed!")
                  sock.close()
                  sock = create_connection(dest_ip, dest_port)
                  logging.info("Try to Reconnect...")
                else:
                  logging.debug("Sent Packet: %s" % hexstr(ModbusPacket))
                  print "Sent: %s" % hexstr(ModbusPacket)
'''                      try:
                        data = sock.recv(1024)
                        print 'Received %s:' % repr(data)
                      except socket.timeout:
                        print ''
                      except socket.error:
                        sock.close()
                        sock = create_connection(dest_ip, dest_port)
  sock.close()
'''
def smart_fuzzing_with_user_input(dest_ip, msg):
    sock = create_connection(dest_ip, dest_port)
    strInput = msg
    dataSend = ""
    shortInput = ""
    cnt = 1
    for chInput in strInput:
    shortInput += chInput
        if cnt%2 == 0:
           intInput = int(shortInput,16)
           dataSend += struct.pack(">B", intInput)
           shortInput = ""
        cnt += 1
    try:
      sock.send(dataSend)
      print 'sent: %s' % hexstr(dataSend)
    except socket.error:
      sock.close()
      print 'trying to create connection again'
      sock = create_connection(dest_ip, dest_port)
    try:
      dataRecv = sock.recv(1024)
      print >>sys.stderr, 'received: %s' % hexstr(dataRecv)
    except socket.timeout:
      print 'recv timed out!'
    except socket.error:
      sock.close()
      sock = create_connection(dest_ip, dest_port)
    sock.close()
def simulator(dest_ip):
    value1 = 0
    value2 = 100
    transID = 0
    while True:
      strTransID = "%0.4x" % transID
      strHex1 = "%0.4x" % value1
      msg1 = strTransID + "0000000B01060000" + strHex1
      smart_fuzzing_with_user_input(dest_ip, msg1)
      transID += 1
      strTransID = "%0.4x" % transID
      strHex2 = "%0.4x" % value2
      msg2 = strTransID + "0000000B01060001" + strHex2
      smart_fuzzing_with_user_input(dest_ip, msg2)
      value1 += 1
      value2 -= 1
      transID += 1
      if (value1 > 100):
        value1 = 0
      if (value2 < 0):
        value2 = 100
      time.sleep(0.3)
def smart_fuzzing_for_func08h(dest_ip):
    sock = create_connection(dest_ip, dest_port)
    transID = 0
    protocolID = 0
    length = 6
    unitID = 0
    funcCode = 8 # Diagnostic
    subFunction = 13 # sub function code start from 0x0000
    dataField = 0
    while True:
      packet = struct.pack(">H", transID) + struct.pack(">H", protocolID) + struct.pack(">H", length) + \
               struct.pack(">B", unitID) + struct.pack(">B", funcCode) + struct.pack(">H", subFunction) + \
               struct.pack(">H", dataField)
      try:
        sock.send(packet)
      except socket.error:
        sock.close()
        sock = create_connection(dest_ip, dest_port)
      try:
        dataRecv = sock.recv(1024)
      except socket.timeout:
        sys.stdout.write('1.time out\n')
      except socket.error:
        sock.close()
        sock = create_connection(dest_ip, dest_port)
      if len(dataRecv) > 0:
         print "Sent: %s" % hexstr(packet)
         print "Recv: %s" % hexstr(dataRecv)
#      if len(dataRecv) < 1:
#        sock.close()
#        sock = create_connection(dest_ip, dest_port)
#        try:
#          sock.send(packet)
#          print "Sent2: %s" % hexstr(packet)
#        except socket.error:
#          print 'FAILED TO SEND2'
#        try:
#          dataRecv = sock.recv(1024)
#          print "Recv2 : %s" % hexstr(dataRecv)
#        except socket.timeout:
#          sys.stdout.write('2.time out\n')
#        except socket.error:
#          print 'FAILED TO RECV2'
      transID = transID + 1
#      subFunction = subFunction + 1
      dataField = dataField + 1
def smart_fuzzing_for_func0Fh(dest_ip):
    sock = create_connection(dest_ip, dest_port)
    transID = 0
    protocolID = 0
    length = 8
    unitID = 0
    funcCode = 15 # Write Multiple coils
    startAddr = 0 # start from 0x0000
#    startAddr = 122 # start from 0xFFFF
    quantityOutputs = 8
    if (quantityOutputs % 8 == 0):
        byteCount = quantityOutputs / 8
    else:
        byteCount = quantityOutputs / 8 + 1
    value = 255
    loopCounter = 0
    while True:
      packet = struct.pack(">H", transID) + struct.pack(">H", protocolID) + struct.pack(">H", length) + \
               struct.pack(">B", unitID) + struct.pack(">B", funcCode) + struct.pack(">H", startAddr) + \
               struct.pack(">H", quantityOutputs) + struct.pack(">B", byteCount) + struct.pack(">B", value) + \
               struct.pack(">B", 255)*loopCounter
      try:
        sock.send(packet)
        print "Sent: %s" % hexstr(packet)
      except socket.error:
        sock.close()
        sock = create_connection(dest_ip, dest_port)
      try:
        dataRecv = sock.recv(1024)
        print "Recv : %s" % hexstr(dataRecv)
      except socket.timeout:
        sys.stdout.write('1.time out\n')
      except socket.error:
        sock.close()
        sock = create_connection(dest_ip, dest_port)
      if len(dataRecv) < 1:
        sock.close()
        sock = create_connection(dest_ip, dest_port)
        try:
          sock.send(packet)
          print "Sent2: %s" % hexstr(packet)
        except socket.error:
          print 'FAILED TO SEND2'
        try:
          dataRecv = sock.recv(1024)
          print "Recv2 : %s" % hexstr(dataRecv)
        except socket.timeout:
          sys.stdout.write('2.time out\n')
        except socket.error:
          print 'FAILED TO RECV2'
      transID = transID + 1
      loopCounter = loopCounter + 1
def smart_fuzzing_for_func10h(dest_ip):
    sock = create_connection(dest_ip, dest_port)
    transID = 0
    protocolID = 0
    length = 9
    unitID = 0
    funcCode = 16 # Write Multiple registers
#    startAddr = 0 # start from 0x0000
    startAddr = 122 # start from 0xFFFF
    quantityReg = 1
    byteCount = 2*quantityReg
    value = 65535
    loopCounter = 0
    while True:
      packet = struct.pack(">H", transID) + struct.pack(">H", protocolID) + struct.pack(">H", length) + \
               struct.pack(">B", unitID) + struct.pack(">B", funcCode) + struct.pack(">H", startAddr) + \
               struct.pack(">H", quantityReg) + struct.pack(">B", byteCount) + struct.pack(">H", value) + \
               struct.pack(">B", 255)*loopCounter
      try:
        sock.send(packet)
        print "Sent: %s" % hexstr(packet)
      except socket.error:
        sock.close()
        sock = create_connection(dest_ip, dest_port)
      try:
        dataRecv = sock.recv(1024)
        print "Recv : %s" % hexstr(dataRecv)
      except socket.timeout:
        sys.stdout.write('1.time out\n')
      except socket.error:
        sock.close()
        sock = create_connection(dest_ip, dest_port)
      if len(dataRecv) < 1:
        sock.close()
        sock = create_connection(dest_ip, dest_port)
        try:
          sock.send(packet)
          print "Sent2: %s" % hexstr(packet)
        except socket.error:
          print 'FAILED TO SEND2'
        try:
          dataRecv = sock.recv(1024)
          print "Recv2 : %s" % hexstr(dataRecv)
        except socket.timeout:
          sys.stdout.write('2.time out\n')
        except socket.error:
          print 'FAILED TO RECV2'
      transID = transID + 1
      loopCounter = loopCounter + 1
#    sock.close()
 
def atod(a): # ascii_to_decimal
    return struct.unpack("!L",socket.inet_aton(a))[0]
def dtoa(d): # decimal_to_ascii
    return socket.inet_ntoa(struct.pack("!L", d))
def scan_device(ip_range):
    net,_,mask = ip_range.partition('/')
    mask = int(mask)
    net = atod(net)
    for dest_ip in (dtoa(net+n) for n in range(0, 1<<32-mask)):
        try:
            sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        except socket.error, msg:
            sock.close()
        try:
            sock.settimeout(0.2)
            sock.connect((dest_ip, dest_port))
        except socket.error, msg:
            print "connection error at %s" % dest_ip
            continue
        except socket.timeout:
            print 'ip %s timeout error' % dest_ip
            continue
        unitID = 0
        dataRecv = '' 
        while True:
            dataSend =  struct.pack(">H", 0) \
                            + struct.pack(">H", 0) \
                            + struct.pack(">H", 6) \
                            + struct.pack(">B", unitID) \
                            + struct.pack(">B", 3) \
                            + struct.pack(">H", 0) \
                            + struct.pack(">H", 1)
            try:
                sock.send(dataSend)
                print "Sent: %s to %s" % (repr(dataSend), dest_ip)
            except socket.error:
                print 'FAILED TO SEND'
                #sock.close()
                #continue
            try:
                dataRecv = sock.recv(1024)
                print "Recv : %s" % repr(dataRecv)
            except socket.timeout:
                sys.stdout.write('.')
            if len(dataRecv) < 1:
                sys.stdout.write('.')
                #print "."
                unitID += 1
            else:
                print '\nunit ID %d found at IP %s' % (unitID, dest_ip)
                if dumbflagset == 1 :
                    print 'now starting dumb fuzzing'
                    dumb_fuzzing(dest_ip)
                break
    sock.close()
# main starts here
if len(sys.argv) < 3:
    print "modbus fuzzer v0.5"
    print ""
    print "Usage: python modFuzzer.py [-D] [destination_IP]"
    print "                           [-I] [destination_IP] [packet]"
    print "                           [-S] [IP_range]"
    print "                           [-SD] [IP_range]"
    print "                           [-S08] [destination_IP]"
    print "                           [-S0F] [destination_IP]"
    print "                           [-S10] [destination_IP]"
    print "                           [-SIM] [destination_IP]"
    print " "
    print "Commands:"
    print "Either long or short options are allowed."
    print "  --dumb    -D   Fuzzing in dumb way"
    print "  --input   -I   Fuzzing with given modbus packet"
    print "  --scan    -S   Scan the modbus device(s) in given IP range"
    print "  --sc_dumb -SD  Scan the device(s) and doing dumb fuzzing"
    print "  --f08     -F08 Fuzzing using function code 0x08"
    print "  --f0f     -F0F Fuzzing using function code 0x0F"
    print "  --f10     -F10 Fuzzing using function code 0x10"
    print "  --sim     -SIM Working in simulator mode"
#    print " "
#    print "Option:"
#    print "  --port    -p  Port number"
    print " "
    print "Example:"
    print "python modFuzzer.py -D 192.168.0.123"
#    print "python modFuzzer.py -D 192.168.0.123 -p 8888"
    print "python modFuzzer.py -I 192.168.0.23 0000000000060103000A0001"
    print "python modFuzzer.py -S 192.168.0.0/24"
    print "python modFuzzer.py -F10 192.168.0.0"
    print ""
    exit(1)
argv1 = sys.argv[1]
argv2 = sys.argv[2]
argv3 = ''
if len(sys.argv) > 3:
    argv3 = sys.argv[3]
if (argv1=='-D') or (argv1=='--dumb'):# dumb fuzzing
    dumb_fuzzing(argv2)
    sys.exit(1)
elif (argv1=='-I') or (argv1=='--input'):# smart user input
    smart_fuzzing_with_user_input(argv2, argv3)
elif (argv1=='-S') or (argv1=='--scan') or (argv1=='-SD'):       # scan device
    if argv1 =='-SD' :
        dumbflagset = 1 
    scan_device(argv2)
elif (argv1=='-F08') or (argv1=='--f08'): # smart fuzzing for function code 0x08
    smart_fuzzing_for_func08h(argv2)
elif (argv1=='-S0F') or (argv1=='--f0f'): # smart fuzzing for function code 0x0F
    smart_fuzzing_for_func0Fh(argv2)
elif (argv1=='-S10') or (argv1=='--f10'): # smart fuzzing for function code 0x10
    smart_fuzzing_for_func10h(argv2)
elif (argv1=='--sim') or (argv1=='--sim'):
    simulator(argv2)
sys.exit(0)

Modbus-Fuzzer.rar

如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!

发表评论

邮箱地址不会被公开。 必填项已用*标注