Конечно программ для сканирования портов хоть пруд пруди, известные типа Nmap и не очень. Но все равно хочется изобрести велосипед и попробовать написать что нибудь свое, простое.
Первым делом напишем функцию самого сканирования портов. Будем использовать socket для попытки подключиться к порту.
import socket def scan_port(ip,port): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(0.5) try: connect = sock.connect((ip,port)) print('Port :',port,' its open.') sock.close() except: pass
Тут все просто, создаем сокет. Обернем в обработку исключений(try — except) попытку подключения. Если подключение удалось пишем что порт открыт. Нет , просто выходим. Sock.settimeout(0.5) задает тайм-аут подключения равный пол секунды, попытаемся ускорить процесс сканирования.
Теперь вызовем просто эту функцию в цикле с указанием IP адреса для сканирования и номера порта:
ip = '192.168.0.1'
for i in range(1000):
scan_port(ip,i)

Ура, все просто великолепно работает. Тысяча портов где то за 10 секунд, я бы сказал даже не плохой результат. Но что делать если нам надо просканировать тысяча IP и все порта ? Получиться очень долго, но мы можем добавить многопоточность. Будем каждый порт сканировать в отдельном потоке:
import threading
for i in range(1000):
potoc = threading.Thread(target=scan_port, args=(ip,i))
potoc.start()
target=scan_port говорить что функция scan_port будет вызываться в потоке и передадим ей аргументы args=(ip,i).
potoc.start() — запускает сам поток.

На сканирование тех же тысячу портов у нас ушло меньше секунды. Мне кажется это не плохой результат для такой не сложной программы. Ну, а все фичи для удобства оставляю на вашу фантазию.
Кому интересно как я засекал время вот код:
from datetime import datetime
start = datetime.now()
код
ends = datetime.now()
print('Time : {}'.format(ends-start))
Спасибо большое, статья отличная!!!
Спасибо, очень приятно.
for i in range(1000):
potoc = threading.Thread(target=scan_port, args=(ip,i))
potoc.start()
Очень подозрительно: Делает запрос к серверу на открытие порта диапазоном от 1 до 1000 порта за 1 секунду.
Сегодня тестировал этот код через ДБАГ — Обнаружил что некоторые порты он не успевает просканировать, нужно добавить функцию: join() — Чтобы ждал завершение ответа от каждого порта. Нужно добавить код.
if __name__ == ‘__main__’:
for i in range(1000):
potoc = threading.Thread(target=scan_port, args=(ip, i))
potoc.start()
potoc.join()
Спасибо «avtor_chekh (Автор)» этой статьи. Информация очень хорошо преподнесена.
К этой задачке лучше использовать multiprocessing, через потоки практически смысла нет… Потоки работают чередуя между собой.
Принцип работы ЦП → https://www.youtube.com/watch?v=gcAvhi9sOvA
Умм.. Интересно, а в дебаге он корректно отрабатывает сокет. А зачем нам ждать завершение потока если у нас нету ограничений на число потоков.Он сам отрабатывает и завершается. Я с многопоточностью работал поверхностно, многих нюансов не знаю. Я если честно так прям не тестировал, ко всем ли портам он пытается подключиться. Вообщем можно покопаться и даже разработать версию 2.0 ))) Жаль сейчас нету времени совсем экспериментировать и разбираться. Так что если заходите написать статью ;))) Связаться можно через обратную связь )