| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383 |
- import itertools
- import ipaddress
- import logging
- import atexit
- import socket
- import time
- import threading
- import subprocess
- import pyroute2
- from pyroute2.netlink.rtnl import rtypes
- import docker
- from flask import request, jsonify
- from . import interface
- from . import NetDhcpError, udhcpc, app
- LIBRARY = 'IPR'
- OPTS_KEY = 'com.docker.network.generic'
- OPT_BRIDGE = 'bridge'
- OPT_IPV6 = 'ipv6'
- logger = logging.getLogger('net-dhcp')
- interface.InitializeLibrary(LIBRARY)
- client = docker.from_env()
- @atexit.register
- def close_docker():
- client.close()
- gateway_hints = {}
- container_dhcp_clients = {}
- @atexit.register
- def cleanup_dhcp():
- for endpoint, dhcp in container_dhcp_clients.items():
- logger.warning('cleaning up orphaned container DHCP client (endpoint "%s")', endpoint)
- dhcp.stop()
- def veth_pair(e):
- return f'dh-{e[:12]}', f'{e[:12]}-dh'
- def iface_addrs(iface):
- return list(map(lambda a: ipaddress.ip_interface((a['address'], a['prefixlen'])), iface.ipaddr))
- def iface_nets(iface):
- return list(map(lambda n: n.network, iface_addrs(iface)))
- def get_bridges():
- reserved_nets = set(map(ipaddress.ip_network, map(lambda c: c['Subnet'], \
- itertools.chain.from_iterable(map(lambda i: i['Config'], filter(lambda i: i['Driver'] != 'net-dhcp', \
- map(lambda n: n.attrs['IPAM'], client.networks.list())))))))
- return dict(map(lambda i: (i['ifname'], i), filter(lambda i: i['kind'] == 'bridge' and not \
- set(iface_nets(i)).intersection(reserved_nets), map(lambda i: interface.GetInterface(LIBRARY, i['ifname']), \
- interface.GetInterfaces(LIBRARY)))))
- def net_bridge(n):
- return interface.GetInterface(LIBRARY, client.networks.get(n).attrs['Options'][OPT_BRIDGE])
- def ipv6_enabled(n):
- options = client.networks.get(n).attrs['Options']
- return OPT_IPV6 in options and options[OPT_IPV6] == 'true'
- def endpoint_container_iface(n, e):
- for cid, info in client.networks.get(n).attrs['Containers'].items():
- if info['EndpointID'] == e:
- container = client.containers.get(cid)
- netns = f'/proc/{container.attrs["State"]["Pid"]}/ns/net'
- with pyroute2.NetNS(netns) as rtnl:
- for link in rtnl.get_links():
- attrs = dict(link['attrs'])
- if attrs['IFLA_ADDRESS'] == info['MacAddress']:
- return {
- 'netns': netns,
- 'ifname': attrs['IFLA_IFNAME'],
- 'address': attrs['IFLA_ADDRESS']
- }
- break
- return None
- def await_endpoint_container_iface(n, e, timeout=5):
- start = time.time()
- iface = None
- while time.time() - start < timeout:
- try:
- iface = endpoint_container_iface(n, e)
- except docker.errors.NotFound:
- time.sleep(0.5)
- if not iface:
- raise NetDhcpError('Timed out waiting for container to become availabile')
- return iface
- def endpoint_container_hostname(n, e):
- for cid, info in client.networks.get(n).attrs['Containers'].items():
- if info['EndpointID'] == e:
- return client.containers.get(cid).attrs['Config']['Hostname']
- return None
- @app.route('/NetworkDriver.GetCapabilities', methods=['POST'])
- def net_get_capabilities():
- return jsonify({
- 'Scope': 'local',
- 'ConnectivityScope': 'global'
- })
- @app.route('/NetworkDriver.CreateNetwork', methods=['POST'])
- def create_net():
- req = request.get_json(force=True)
- for data in req['IPv4Data']:
- if data['AddressSpace'] != 'null' or data['Pool'] != '0.0.0.0/0':
- return jsonify({'Err': 'Only the null IPAM driver is supported'}), 400
- options = req['Options'][OPTS_KEY]
- if OPT_BRIDGE not in options:
- return jsonify({'Err': 'No bridge provided'}), 400
- # We have to use a custom "enable IPv6" option because Docker's null IPAM driver doesn't support IPv6 and a plugin
- # IPAM driver isn't allowed to return an empty address
- if OPT_IPV6 in options and options[OPT_IPV6] not in ('', 'true', 'false'):
- return jsonify({'Err': 'Invalid boolean value for ipv6'}), 400
- desired = options[OPT_BRIDGE]
- bridges = get_bridges()
- if desired not in bridges:
- return jsonify({'Err': f'Bridge "{desired}" not found (or the specified bridge is already used by Docker)'}), 400
- logger.info('Creating network "%s" (using bridge "%s")', req['NetworkID'], desired)
- return jsonify({})
- @app.route('/NetworkDriver.DeleteNetwork', methods=['POST'])
- def delete_net():
- return jsonify({})
- @app.route('/NetworkDriver.CreateEndpoint', methods=['POST'])
- def create_endpoint():
- req = request.get_json(force=True)
- network_id = req['NetworkID']
- endpoint_id = req['EndpointID']
- req_iface = req['Interface']
- bridge = net_bridge(network_id)
- bridge_addrs = iface_addrs(bridge)
- if_host, if_container = veth_pair(endpoint_id)
- logger.info('creating veth pair %s <=> %s', if_host, if_container)
- if_host = interface.CreateInterface(LIBRARY, if_host, 'veth', if_container)
- if_host.Up()
- try:
- start = time.time()
- while isinstance(if_container, str) and time.time() - start < 10:
- try:
- if_container = interface.GetInterface(LIBRARY, if_container)
- if_container.Up()
- except KeyError:
- time.sleep(0.5)
- if isinstance(if_container, str):
- raise NetDhcpError(f'timed out waiting for {if_container} to appear in host')
- interface.AddPort(LIBRARY, bridge.ifname, if_host.ifname)
- res_iface = {
- 'MacAddress': '',
- 'Address': '',
- 'AddressIPv6': ''
- }
- if 'MacAddress' in req_iface and req_iface['MacAddress']:
- if_container.SetAddress(req_iface['MacAddress'])
- else:
- res_iface['MacAddress'] = if_container['address']
- def try_addr(type_):
- addr = None
- k = 'AddressIPv6' if type_ == 'v6' else 'Address'
- if k in req_iface and req_iface[k]:
- # TODO: Should we allow static IP's somehow?
- # Just validate the address, Docker will add it to the interface for us
- #addr = ipaddress.ip_interface(req_iface[k])
- #for bridge_addr in bridge_addrs:
- # if addr.ip == bridge_addr.ip:
- # raise NetDhcpError(400, f'Address {addr} is already in use on bridge {bridge["ifname"]}')
- raise NetDhcpError('Only the null IPAM driver is supported')
- else:
- dhcp = udhcpc.DHCPClient(if_container, v6=type_ == 'v6', once=True)
- addr = dhcp.finish()
- if not addr:
- return
- res_iface[k] = str(addr)
- if dhcp.gateway:
- gateway_hints[endpoint_id] = dhcp.gateway
- logger.info('Adding IP%s address %s to %s', type_, addr, if_container['ifname'])
- try_addr('v4')
- if ipv6_enabled(network_id):
- try_addr('v6')
- res = jsonify({
- 'Interface': res_iface
- })
- except Exception as e:
- logger.exception(e)
- if not isinstance(if_container, str):
- interface.DelPort(LIBRARY, bridge.ifname, if_host.ifname)
- interface.RemoveInterface(LIBRARY, if_host.ifname)
- if isinstance(e, NetDhcpError):
- res = jsonify({'Err': str(e)}), e.status
- else:
- res = jsonify({'Err': str(e)}), 500
- finally:
- return res
- @app.route('/NetworkDriver.EndpointOperInfo', methods=['POST'])
- def endpoint_info():
- req = request.get_json(force=True)
- bridge = net_bridge(req['NetworkID'])
- if_host, _if_container = veth_pair(req['EndpointID'])
- if_host = interface.GetInterface(LIBRARY, if_host)
- return jsonify({
- 'bridge': bridge['ifname'],
- 'if_host': {
- 'name': if_host['ifname'],
- 'mac': if_host['address']
- }
- })
- @app.route('/NetworkDriver.DeleteEndpoint', methods=['POST'])
- def delete_endpoint():
- req = request.get_json(force=True)
- bridge = net_bridge(req['NetworkID'])
- if_host, _if_container = veth_pair(req['EndpointID'])
- if_host = interface.GetInterface(LIBRARY, if_host)
- interface.DelPort(LIBRARY, bridge.ifname, if_host.ifname)
- interface.RemoveInterface(LIBRARY, if_host.ifname)
- return jsonify({})
- @app.route('/NetworkDriver.Join', methods=['POST'])
- def join():
- req = request.get_json(force=True)
- network = req['NetworkID']
- endpoint = req['EndpointID']
- bridge = net_bridge(req['NetworkID'])
- _if_host, if_container = veth_pair(req['EndpointID'])
- res = {
- 'InterfaceName': {
- 'SrcName': if_container,
- 'DstPrefix': bridge['ifname']
- },
- 'StaticRoutes': []
- }
- if endpoint in gateway_hints:
- gateway = gateway_hints[endpoint]
- logger.info('Setting IPv4 gateway from DHCP (%s)', gateway)
- res['Gateway'] = str(gateway)
- del gateway_hints[endpoint]
- ipv6 = ipv6_enabled(network)
- for route in bridge.routes:
- if route['type'] != rtypes['RTN_UNICAST'] or \
- (route['family'] == socket.AF_INET6 and not ipv6):
- continue
- if route['dst'] in ('', '/0'):
- if route['family'] == socket.AF_INET and 'Gateway' not in res:
- logger.info('Adding IPv4 gateway %s', route['gateway'])
- res['Gateway'] = route['gateway']
- elif route['family'] == socket.AF_INET6 and 'GatewayIPv6' not in res:
- logger.info('Adding IPv6 gateway %s', route['gateway'])
- res['GatewayIPv6'] = route['gateway']
- elif route['gateway']:
- dst = f'{route["dst"]}/{route["dst_len"]}'
- logger.info('Adding route to %s via %s', dst, route['gateway'])
- res['StaticRoutes'].append({
- 'Destination': dst,
- 'RouteType': 0,
- 'NextHop': route['gateway']
- })
- container_dhcp_clients[endpoint] = ContainerDHCPManager(network, endpoint)
- return jsonify(res)
- @app.route('/NetworkDriver.Leave', methods=['POST'])
- def leave():
- req = request.get_json(force=True)
- endpoint = req['EndpointID']
- if endpoint in container_dhcp_clients:
- container_dhcp_clients[endpoint].stop()
- del container_dhcp_clients[endpoint]
- return jsonify({})
- # Trying to grab the container's attributes (to get the network namespace)
- # will deadlock (since Docker is waiting on us), so we must defer starting
- # the DHCP client
- class ContainerDHCPManager:
- def __init__(self, network, endpoint):
- self.network = network
- self.endpoint = endpoint
- self.ipv6 = ipv6_enabled(network)
- self.dhcp = None
- self.dhcp6 = None
- self._thread = threading.Thread(target=self.run)
- self._thread.start()
- def _on_event(self, dhcp, event_type, _event):
- if event_type != udhcpc.EventType.RENEW or not dhcp.gateway:
- return
- logger.info('[dhcp container] Replacing gateway with %s', dhcp.gateway)
- subprocess.check_call(['nsenter', f'-n{dhcp.netns}', '--', '/sbin/ip', 'route', 'replace', 'default', 'via',
- str(dhcp.gateway)], timeout=1, stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL,
- stderr=subprocess.DEVNULL)
- # TODO: Adding default route with NDB seems to be broken (because of the dst syntax?)
- #for route in ndb.routes:
- # if route['type'] != rtypes['RTN_UNICAST'] or \
- # route['oif'] != dhcp.iface['index'] or \
- # (route['family'] == socket.AF_INET6 and not self.ipv6) or \
- # route['dst'] not in ('', '/0'):
- # continue
- # # Needed because Route.remove() doesn't like a blank destination
- # logger.info('Removing default route via %s', route['gateway'])
- # route['dst'] = '::' if route['family'] == socket.AF_INET6 else '0.0.0.0'
- # (route
- # .remove()
- # .commit())
- #logger.info('Adding default route via %s', dhcp.gateway)
- #(ndb.routes.add({'oif': dhcp.iface['index'], 'gateway': dhcp.gateway})
- # .commit())
- def run(self):
- try:
- iface = await_endpoint_container_iface(self.network, self.endpoint)
- hostname = endpoint_container_hostname(self.network, self.endpoint)
- self.dhcp = udhcpc.DHCPClient(iface, event_listener=self._on_event, hostname=hostname)
- logger.info('Starting DHCPv4 client on %s in container namespace %s', iface['ifname'], \
- self.dhcp.netns)
- if self.ipv6:
- self.dhcp6 = udhcpc.DHCPClient(iface, v6=True, event_listener=self._on_event, hostname=hostname)
- logger.info('Starting DHCPv6 client on %s in container namespace %s', iface['ifname'], \
- self.dhcp6.netns)
- except Exception as e:
- logger.exception(e)
- if self.dhcp:
- self.dhcp.finish(timeout=1)
- def stop(self):
- if not self.dhcp:
- return
- try:
- logger.info('Shutting down DHCPv4 client on %s in container namespace %s', \
- self.dhcp.iface['ifname'], self.dhcp.netns)
- self.dhcp.finish(timeout=1)
- finally:
- try:
- if self.ipv6:
- logger.info('Shutting down DHCPv6 client on %s in container namespace %s', \
- self.dhcp6.iface['ifname'], self.dhcp.netns)
- self.dhcp6.finish(timeout=1)
- finally:
- self._thread.join()
- # we have to do this since the docker client leaks sockets...
- global client
- client.close()
- client = docker.from_env()
|