| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- from enum import Enum
- import ipaddress
- import json
- import struct
- import binascii
- import os
- from os import path
- from select import select
- import threading
- import subprocess
- import logging
- from eventfd import EventFD
- import posix_ipc
- HANDLER_SCRIPT = path.join(path.dirname(__file__), 'udhcpc_handler.py')
- AWAIT_INTERVAL = 0.1
- VENDOR_ID = 'docker'
- class EventType(Enum):
- BOUND = 'bound'
- RENEW = 'renew'
- DECONFIG = 'deconfig'
- LEASEFAIL = 'leasefail'
- logger = logging.getLogger('net-dhcp')
- class DHCPClientError(Exception):
- pass
- def _nspopen_wrapper(netns):
- return lambda cmd, *args, **kwargs: subprocess.Popen(['nsenter', f'-n{netns}', '--'] + cmd, *args, **kwargs)
- class DHCPClient:
- def __init__(self, iface, v6=False, once=False, hostname=None, event_listener=None):
- self.iface = iface
- self.v6 = v6
- self.once = once
- self.event_listeners = [DHCPClient._attr_listener]
- if event_listener:
- self.event_listeners.append(event_listener)
- self.netns = None
- if isinstance(iface, dict):
- self.netns = iface['netns']
- logger.debug('udhcpc using netns %s', self.netns)
- Popen = _nspopen_wrapper(self.netns) if self.netns else subprocess.Popen
- bin_path = '/usr/bin/udhcpc6' if v6 else '/sbin/udhcpc'
- cmdline = [bin_path, '-s', HANDLER_SCRIPT, '-i', iface['ifname'], '-f']
- cmdline.append('-q' if once else '-R')
- if hostname:
- cmdline.append('-x')
- if v6:
- # TODO: We encode the fqdn for DHCPv6 because udhcpc6 seems to be broken
- # flags: S bit set (see RFC4704)
- enc_hostname = hostname.encode('utf-8')
- enc_hostname = struct.pack('BB', 0b0001, len(enc_hostname)) + enc_hostname
- enc_hostname = binascii.hexlify(enc_hostname).decode('ascii')
- hostname_opt = f'0x27:{enc_hostname}'
- else:
- hostname_opt = f'hostname:{hostname}'
- cmdline.append(hostname_opt)
- if not v6:
- cmdline += ['-V', VENDOR_ID]
- self._suffix = '6' if v6 else ''
- self._event_queue = posix_ipc.MessageQueue(f'/udhcpc{self._suffix}_{iface["address"].replace(":", "_")}', \
- flags=os.O_CREAT | os.O_EXCL, max_messages=2, max_message_size=1024)
- self.proc = Popen(cmdline, env={'EVENT_QUEUE': self._event_queue.name}, stdin=subprocess.DEVNULL,
- stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
- if hostname:
- logger.debug('[udhcpc%s#%d] using hostname "%s"', self._suffix, self.proc.pid, hostname)
- self._has_lease = threading.Event()
- self.ip = None
- self.gateway = None
- self.domain = None
- self._shutdown_event = EventFD()
- self.shutdown = False
- self._event_thread = threading.Thread(target=self._read_events)
- self._event_thread.start()
- def _attr_listener(self, event_type, event):
- if event_type in (EventType.BOUND, EventType.RENEW):
- self.ip = ipaddress.ip_interface(event['ip'])
- if 'gateway' in event:
- self.gateway = ipaddress.ip_address(event['gateway'])
- else:
- self.gateway = None
- self.domain = event.get('domain')
- self._has_lease.set()
- elif event_type == EventType.DECONFIG:
- self._has_lease.clear()
- self.ip = None
- self.gateway = None
- self.domain = None
- def _read_events(self):
- while True:
- r, _w, _e = select([self._shutdown_event, self._event_queue.mqd], [], [])
- if self._shutdown_event in r:
- break
- msg, _priority = self._event_queue.receive()
- event = json.loads(msg.decode('utf-8'))
- try:
- event['type'] = EventType(event['type'])
- except ValueError:
- logger.warning('udhcpc%s#%d unknown event "%s"', self._suffix, self.proc.pid, event)
- continue
- logger.debug('[udhcp%s#%d event] %s', self._suffix, self.proc.pid, event)
- for listener in self.event_listeners:
- try:
- listener(self, event['type'], event)
- except Exception as ex:
- logger.exception(ex)
- self.shutdown = True
- del self._shutdown_event
- def await_ip(self, timeout=10):
- if not self._has_lease.wait(timeout=timeout):
- raise DHCPClientError(f'Timed out waiting for lease from udhcpc{self._suffix}')
- return self.ip
- def finish(self, timeout=5):
- if self.shutdown or self._shutdown_event.is_set():
- return
- try:
- if self.proc.returncode is not None and (not self.once or self.proc.returncode != 0):
- raise DHCPClientError(f'udhcpc{self._suffix} exited early with code {self.proc.returncode}')
- if self.once:
- self.await_ip()
- else:
- self.proc.terminate()
- if self.proc.wait(timeout=timeout) != 0:
- raise DHCPClientError(f'udhcpc{self._suffix} exited with non-zero exit code {self.proc.returncode}')
- return self.ip
- finally:
- self._shutdown_event.set()
- self._event_thread.join()
- self._event_queue.close()
- self._event_queue.unlink()
|