Сценарий выполняет автоматизированную проверку доступности списка заранее определённых хостов, таких как веб-сайты и локальные сетевые устройства. Он регулярно отправляет серии пингов к каждому из указанных хостов с заданными параметрами (количество пингов, размер пакета и таймаут ожидания ответа). На основе полученных результатов скрипт вычисляет среднее время отклика для успешных пингов и фиксирует количество неудачных попыток соединения.

Если определённый хост становится недоступным (все пинги неудачны), сценарий активирует тревожный сигнал и увеличивает счётчик потерь связи для этого хоста. В случае восстановления связи после периода недоступности, тревога снимается, и генерируется событие о восстановлении связи. Вся эта информация регистрируется и может быть доступна через веб-интерфейс контроллера.

Скриншот панели управления Скриншот панели управления

Возможности

  • Гибкая настройка: Позволяет изменять список хостов, интервал проверки (INTERVAL), количество пингов (PING_COUNT), размер пакета (PING_SIZE) и таймаут (PING_TIMEOUT).
  • Мониторинг в реальном времени: Отслеживает состояние сети и сохраняет данные в регистрах, доступных через веб-интерфейс.
  • Оповещения: Генерирует тревоги при потере связи и события при её восстановлении.
  • Простота интеграции: Не требует аналоговых или цифровых входов/выходов, работает только с сетью.

Область применения

Сценарий подходит для:

  • Мониторинга сетевой инфраструктуры: Проверка доступности серверов, роутеров или интернет-соединения в небольших офисах или домах.
  • Диагностики сети: Отслеживание стабильности подключения к ключевым узлам (например, шлюзу 192.168.0.1).
  • Автоматизации: Может использоваться как часть системы уведомлений о сбоях в IoT-проектах или умных домах.

Производительность

Замеры осуществлялись на тестовом образце

Время компиляции 30 мс
Среднее время выполнения тела цикла 344 мс
Использование стека 464 байт (5.6%)
Использование кучи 2672 байт (4.4%)

Сценарий

  from dev import count, webconf, Reg, Alarm, Event, run
from net import ping

# Список хостов для проверки доступности
hosts = [
    {'host': 'ya.ru'},
    {'host': 'mail.ru'},
    {'host': '192.168.0.1'},
]

# Определение констант с помощью функции const
INTERVAL     = const(30)      # Интервал проверки в секундах
PING_COUNT   = const(3)       # Количество пингов для каждого хоста
PING_SIZE    = const(64)      # Размер пакета пинга
PING_TIMEOUT = const(0.2)     # Таймаут ожидания ответа в секундах

# Вычисление общего количества хостов
hosts_len = len(hosts)

# Инициализация счетчиков и конфигурация веб-интерфейса
count(
    Reg=hosts_len * 2,    # Два регистра на каждый хост (Время ответа и Потери связи)
    Ai=0,                 # Количество аналоговых входов
    Ao=0,                 # Количество аналоговых выходов
    Di=0,                 # Количество цифровых входов
    Do=0,                 # Количество цифровых выходов
    Event=hosts_len,      # Количество событий
    Alarm=hosts_len       # Количество тревог
)
webconf(Reg=True, Pin=False, Display=False, Console=True)

# Инициализация объектов Alarm, Event и Reg для каждого хоста
for i, item in enumerate(hosts):
    host = item['host']
    # Инициализация тревоги для недоступности хоста
    item['alarm'] = Alarm(i).conf(name=f'Недоступен {host}')
    # Инициализация события восстановления связи
    item['event'] = Event(i).conf(name=f'Связь восстановлена {host}')
    # Инициализация регистра времени ответа
    item['rTime'] = Reg(i).conf(name=f'Время ответа {host}', type='uint', access='r')
    # Инициализация регистра счетчика потерь связи
    item['rFault'] = Reg(hosts_len + i).conf(name=f'Потери связи с {host}', type='uint', access='r')


def ping_ext(addr):
    """
    Функция для выполнения серии пингов к заданному адресу.
    :param addr: Адрес хоста для пинга.
    :return: Среднее время отклика и статус успешности.
    """
    time_total = 0
    success_count = 0
    
    for _ in range(PING_COUNT):
        t, res = ping(addr, size=PING_SIZE, timeout=PING_TIMEOUT)
        print(f'ping {addr} time {t}ms res {res}')
        
        if res == 0:
            time_total += t
            success_count += 1
    
    if success_count == 0:
        return 0, False  # Все пинги неудачные
    else:
        average_time = int(time_total / success_count)
        return average_time, True  # Среднее время отклика и успешность


def step():
    """
    Основная функция, выполняемая с заданным интервалом.
    Пингует каждый хост и обновляет соответствующие регистры и состояния тревог/событий.
    """
    for i, host in enumerate(hosts):
        addr = host['host']
        avg_time, success = ping_ext(addr)
        if success:
            # Обновление регистра времени ответа
            host['rTime'].val(avg_time)
            # Если ранее был установлен тревожный сигнал, снимаем его и добавляем событие восстановления
            if host['alarm'].val():
                host['alarm'].off()
                host['event'].add()
        else:
            # Установка тревожного сигнала и инкремент счетчика потерь связи
            host['alarm'].on()
            host['rFault'].inc()


# Запуск основной функции с указанным интервалом
run(step, INTERVAL)