network.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469
  1. import itertools
  2. import ipaddress
  3. import logging
  4. import atexit
  5. import socket
  6. import time
  7. import threading
  8. import subprocess
  9. from os import path
  10. import pyroute2
  11. from pyroute2.netlink.rtnl import rtypes
  12. import docker
  13. from flask import request, jsonify
  14. from . import interface
  15. from . import NetDhcpError, udhcpc, app
  16. LIBRARY = 'IPR'
  17. OPTS_KEY = 'com.docker.network.generic'
  18. OPT_BRIDGE = 'bridge'
  19. OPT_IPV6 = 'ipv6'
  20. logger = logging.getLogger('net-dhcp')
  21. interface.InitializeLibrary(LIBRARY)
  22. dockerClient = None
  23. def get_docker_client():
  24. global dockerClient
  25. if dockerClient is None:
  26. dockerClient = docker.from_env()
  27. @atexit.register
  28. def close_docker_client():
  29. global dockerClient
  30. if dockerClient is not None:
  31. dockerClient.close()
  32. dockerClient = None
  33. gateway_hints = {}
  34. container_dhcp_clients = {}
  35. @atexit.register
  36. def cleanup_dhcp():
  37. for endpoint, dhcp in container_dhcp_clients.items():
  38. logger.warning('cleaning up orphaned container DHCP client (endpoint "%s")', endpoint)
  39. dhcp.stop()
  40. def veth_pair(e):
  41. return f'dh-{e[:12]}', f'{e[:12]}-dh'
  42. def iface_addrs(iface):
  43. return list(map(lambda a: ipaddress.ip_interface((a['address'], a['prefixlen'])), iface.ipaddr))
  44. def iface_nets(iface):
  45. return list(map(lambda n: n.network, iface_addrs(iface)))
  46. def get_bridges():
  47. get_docker_client()
  48. reserved_nets = set(map(ipaddress.ip_network, map(lambda c: c['Subnet'], \
  49. itertools.chain.from_iterable(map(lambda x: [] if x is None else x, \
  50. map(lambda i: i['Config'], filter(lambda i: i['Driver'] != 'net-dhcp', \
  51. map(lambda n: n.attrs['IPAM'], dockerClient.networks.list()))))))))
  52. return dict(map(lambda i: (i['ifname'], i), filter(lambda i: i['kind'] == 'bridge' and not \
  53. set(iface_nets(i)).intersection(reserved_nets), map(lambda i: interface.GetInterface(LIBRARY, i['ifname']), \
  54. interface.GetInterfaces(LIBRARY)))))
  55. def net_bridge(n):
  56. get_docker_client()
  57. return interface.GetInterface(LIBRARY, dockerClient.networks.get(n).attrs['Options'][OPT_BRIDGE])
  58. def ipv6_enabled(n):
  59. get_docker_client()
  60. options = dockerClient.networks.get(n).attrs['Options']
  61. return OPT_IPV6 in options and options[OPT_IPV6] == 'true'
  62. def endpoint_container_iface(n, e):
  63. get_docker_client()
  64. for cid, info in dockerClient.networks.get(n).attrs['Containers'].items():
  65. if info['EndpointID'] == e:
  66. container = dockerClient.containers.get(cid)
  67. netns = f'/proc/{container.attrs["State"]["Pid"]}/ns/net'
  68. with pyroute2.NetNS(netns) as rtnl:
  69. for link in rtnl.get_links():
  70. attrs = dict(link['attrs'])
  71. if attrs['IFLA_ADDRESS'] == info['MacAddress']:
  72. return {
  73. 'netns': netns,
  74. 'ifname': attrs['IFLA_IFNAME'],
  75. 'address': attrs['IFLA_ADDRESS']
  76. }
  77. break
  78. return None
  79. def await_endpoint_container_iface(n, e, timeout=5):
  80. start = time.time()
  81. iface = None
  82. while time.time() - start < timeout:
  83. try:
  84. iface = endpoint_container_iface(n, e)
  85. except docker.errors.NotFound:
  86. time.sleep(0.5)
  87. except KeyError:
  88. time.sleep(0.5)
  89. if not iface:
  90. raise NetDhcpError('Timed out waiting for container to become availabile')
  91. return iface
  92. def endpoint_container_hostname(n, e):
  93. get_docker_client()
  94. for cid, info in dockerClient.networks.get(n).attrs['Containers'].items():
  95. if info['EndpointID'] == e:
  96. return dockerClient.containers.get(cid).attrs['Config']['Hostname']
  97. return None
  98. def endpoint_container_network_gwpriority(n, e):
  99. get_docker_client()
  100. for cid, info in dockerClient.networks.get(n).attrs['Containers'].items():
  101. if info['EndpointID'] == e:
  102. networkName = dockerClient.networks.get(n).attrs['Name']
  103. if 'GwPriority' in dockerClient.containers.get(cid).attrs['NetworkSettings']['Networks'][networkName]:
  104. return dockerClient.containers.get(cid).attrs['NetworkSettings']['Networks'][networkName]['GwPriority']
  105. else:
  106. return 0
  107. return None
  108. def remove_veth(host_ifname):
  109. logger.info('Removing veth %s', host_ifname)
  110. if LIBRARY == 'NDB':
  111. if_host = interface.GetInterface(LIBRARY, if_host)
  112. bridge = net_bridge(req['NetworkID'])
  113. interface.DelPort(LIBRARY, bridge.ifname, if_host.ifname)
  114. interface.RemoveInterface(LIBRARY, if_host.ifname)
  115. logger.info('Removed veth for endpoint %s on ()', endpoint)
  116. else:
  117. logger.info('Deleting veth via shell: %s', host_ifname)
  118. try:
  119. subprocess.run(
  120. ['ip', 'link', 'del', host_ifname],
  121. stdout=subprocess.DEVNULL,
  122. stderr=subprocess.PIPE,
  123. check=True
  124. )
  125. logger.info('Deleted veth %s', host_ifname)
  126. except subprocess.CalledProcessError as e:
  127. if b'Cannot find device' in e.stderr:
  128. logger.info('Veth %s already gone', host_ifname)
  129. else:
  130. logger.warning(
  131. 'Failed to delete veth %s: %s',
  132. host_ifname,
  133. e.stderr.decode().strip()
  134. )
  135. @app.route('/NetworkDriver.GetCapabilities', methods=['POST'])
  136. def net_get_capabilities():
  137. return jsonify({
  138. 'Scope': 'local',
  139. 'ConnectivityScope': 'global'
  140. })
  141. @app.route('/NetworkDriver.CreateNetwork', methods=['POST'])
  142. def create_net():
  143. req = request.get_json(force=True)
  144. for data in req['IPv4Data']:
  145. if data['AddressSpace'] != 'null' or data['Pool'] != '0.0.0.0/0':
  146. return jsonify({'Err': 'Only the null IPAM driver is supported'}), 400
  147. options = req['Options'][OPTS_KEY]
  148. if OPT_BRIDGE not in options:
  149. return jsonify({'Err': 'No bridge provided'}), 400
  150. # We have to use a custom "enable IPv6" option because Docker's null IPAM driver doesn't support IPv6 and a plugin
  151. # IPAM driver isn't allowed to return an empty address
  152. if OPT_IPV6 in options and options[OPT_IPV6] not in ('', 'true', 'false'):
  153. return jsonify({'Err': 'Invalid boolean value for ipv6'}), 400
  154. desired = options[OPT_BRIDGE]
  155. bridges = get_bridges()
  156. if desired not in bridges:
  157. return jsonify({'Err': f'Bridge "{desired}" not found (or the specified bridge is already used by Docker)'}), 400
  158. logger.info('Creating network "%s" (using bridge "%s")', req['NetworkID'], desired)
  159. return jsonify({})
  160. @app.route('/NetworkDriver.DeleteNetwork', methods=['POST'])
  161. def delete_net():
  162. return jsonify({})
  163. @app.route('/NetworkDriver.CreateEndpoint', methods=['POST'])
  164. def create_endpoint():
  165. req = request.get_json(force=True)
  166. network_id = req['NetworkID']
  167. endpoint_id = req['EndpointID']
  168. req_iface = req['Interface']
  169. bridge = net_bridge(network_id)
  170. bridge_addrs = iface_addrs(bridge)
  171. if_host, if_container = veth_pair(endpoint_id)
  172. logger.info('creating veth pair %s <=> %s', if_host, if_container)
  173. if_host = interface.CreateInterface(LIBRARY, if_host, 'veth', if_container)
  174. if_host.Up()
  175. try:
  176. start = time.time()
  177. while isinstance(if_container, str) and time.time() - start < 10:
  178. try:
  179. if_container = interface.GetInterface(LIBRARY, if_container)
  180. if_container.Up()
  181. except KeyError:
  182. time.sleep(0.5)
  183. if isinstance(if_container, str):
  184. raise NetDhcpError(f'timed out waiting for {if_container} to appear in host')
  185. interface.AddPort(LIBRARY, bridge.ifname, if_host.ifname)
  186. res_iface = {
  187. 'MacAddress': '',
  188. 'Address': '',
  189. 'AddressIPv6': ''
  190. }
  191. if 'MacAddress' in req_iface and req_iface['MacAddress']:
  192. if_container.SetAddress(req_iface['MacAddress'])
  193. else:
  194. res_iface['MacAddress'] = if_container['address']
  195. def try_addr(type_):
  196. addr = None
  197. k = 'AddressIPv6' if type_ == 'v6' else 'Address'
  198. if k in req_iface and req_iface[k]:
  199. # TODO: Should we allow static IP's somehow?
  200. # Just validate the address, Docker will add it to the interface for us
  201. #addr = ipaddress.ip_interface(req_iface[k])
  202. #for bridge_addr in bridge_addrs:
  203. # if addr.ip == bridge_addr.ip:
  204. # raise NetDhcpError(400, f'Address {addr} is already in use on bridge {bridge["ifname"]}')
  205. raise NetDhcpError('Only the null IPAM driver is supported')
  206. else:
  207. dhcp = udhcpc.DHCPClient(if_container, v6=type_ == 'v6', once=True)
  208. addr = dhcp.finish()
  209. if not addr:
  210. return
  211. res_iface[k] = str(addr)
  212. if dhcp.gateway:
  213. gateway_hints[endpoint_id] = dhcp.gateway
  214. logger.info('Adding IP%s address %s to %s', type_, addr, if_container['ifname'])
  215. try_addr('v4')
  216. if ipv6_enabled(network_id):
  217. try_addr('v6')
  218. res = jsonify({
  219. 'Interface': res_iface
  220. })
  221. except Exception as e:
  222. logger.exception(e)
  223. if not isinstance(if_container, str):
  224. interface.DelPort(LIBRARY, bridge.ifname, if_host.ifname)
  225. interface.RemoveInterface(LIBRARY, if_host.ifname)
  226. if isinstance(e, NetDhcpError):
  227. res = jsonify({'Err': str(e)}), e.status
  228. else:
  229. res = jsonify({'Err': str(e)}), 500
  230. finally:
  231. return res
  232. @app.route('/NetworkDriver.EndpointOperInfo', methods=['POST'])
  233. def endpoint_info():
  234. req = request.get_json(force=True)
  235. bridge = net_bridge(req['NetworkID'])
  236. if_host, _if_container = veth_pair(req['EndpointID'])
  237. if_host = interface.GetInterface(LIBRARY, if_host)
  238. return jsonify({
  239. 'bridge': bridge['ifname'],
  240. 'if_host': {
  241. 'name': if_host['ifname'],
  242. 'mac': if_host['address']
  243. }
  244. })
  245. @app.route('/NetworkDriver.DeleteEndpoint', methods=['POST'])
  246. def delete_endpoint():
  247. req = request.get_json(force=True)
  248. if_host, _ = veth_pair(req['EndpointID'])
  249. remove_veth(if_host)
  250. return jsonify({})
  251. @app.route('/NetworkDriver.Join', methods=['POST'])
  252. def join():
  253. req = request.get_json(force=True)
  254. network = req['NetworkID']
  255. endpoint = req['EndpointID']
  256. bridge = net_bridge(req['NetworkID'])
  257. _if_host, if_container = veth_pair(req['EndpointID'])
  258. res = {
  259. 'InterfaceName': {
  260. 'SrcName': if_container,
  261. 'DstPrefix': 'eth'
  262. },
  263. 'StaticRoutes': []
  264. }
  265. if endpoint in gateway_hints:
  266. gateway = gateway_hints[endpoint]
  267. logger.info('Setting IPv4 gateway from DHCP (%s)', gateway)
  268. res['Gateway'] = str(gateway)
  269. del gateway_hints[endpoint]
  270. ipv6 = ipv6_enabled(network)
  271. for route in bridge.routes:
  272. if route['type'] != rtypes['RTN_UNICAST'] or \
  273. (route['family'] == socket.AF_INET6 and not ipv6):
  274. continue
  275. if route['dst'] in ('', '/0'):
  276. if route['family'] == socket.AF_INET and 'Gateway' not in res:
  277. logger.info('Adding IPv4 gateway %s', route['gateway'])
  278. res['Gateway'] = route['gateway']
  279. elif route['family'] == socket.AF_INET6 and 'GatewayIPv6' not in res:
  280. logger.info('Adding IPv6 gateway %s', route['gateway'])
  281. res['GatewayIPv6'] = route['gateway']
  282. elif route['gateway']:
  283. dst = f'{route["dst"]}/{route["dst_len"]}'
  284. logger.info('Adding route to %s via %s', dst, route['gateway'])
  285. res['StaticRoutes'].append({
  286. 'Destination': dst,
  287. 'RouteType': 0,
  288. 'NextHop': route['gateway']
  289. })
  290. container_dhcp_clients[endpoint] = ContainerDHCPManager(network, endpoint)
  291. return jsonify(res)
  292. @app.route('/NetworkDriver.Leave', methods=['POST'])
  293. def leave():
  294. req = request.get_json(force=True)
  295. endpoint = req['EndpointID']
  296. if endpoint in container_dhcp_clients:
  297. container_dhcp_clients[endpoint].stop()
  298. del container_dhcp_clients[endpoint]
  299. return jsonify({})
  300. # Trying to grab the container's attributes (to get the network namespace)
  301. # will deadlock (since Docker is waiting on us), so we must defer starting
  302. # the DHCP client
  303. class ContainerDHCPManager:
  304. def __init__(self, network, endpoint):
  305. self.network = network
  306. self.endpoint = endpoint
  307. self.ipv6 = ipv6_enabled(network)
  308. self.dhcp = None
  309. self.dhcp6 = None
  310. self.gwPriority = None
  311. self._thread = threading.Thread(target=self.run)
  312. self._thread.start()
  313. def _on_event(self, dhcp, event_type, _event):
  314. if event_type == udhcpc.EventType.DECONFIG:
  315. logger.info('[dhcp container] DECONFIG Event %s', _event)
  316. if path.lexists('{dhcp.netns}'):
  317. logger.info('[dhcp container] Flushing IP addresses')
  318. subprocess.check_call(['nsenter', f'-n{dhcp.netns}', '--', '/sbin/ip', 'address', 'flush', 'dev',
  319. str(dhcp.iface['ifname']), 'label', str(dhcp.iface['ifname'])],
  320. timeout=1, stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
  321. else:
  322. logger.info('[dhcp container] Container gone, can\'t flush IP addresses')
  323. elif event_type == udhcpc.EventType.RENEW or event_type == udhcpc.EventType.BOUND:
  324. if event_type == udhcpc.EventType.RENEW:
  325. logger.info('[dhcp container] RENEW Event %s', _event)
  326. elif event_type == udhcpc.EventType.BOUND:
  327. logger.info('[dhcp container] BOUND Event %s', _event)
  328. result = subprocess.run(['nsenter', f'-n{dhcp.netns}', '--', '/sbin/ip', 'route', 'show', 'type', 'unicast', 'proto', 'static', 'table', 'all'], timeout=1, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
  329. if result.returncode == 0:
  330. routes = result.stdout
  331. routes = routes.splitlines()
  332. if routes:
  333. logger.info('[dhcp container] Store existing static routes')
  334. for route in routes:
  335. logger.info('[dhcp container] Stored static route: %s', route)
  336. logger.info('[dhcp container] Flushing IP addresses')
  337. subprocess.check_call(['nsenter', f'-n{dhcp.netns}', '--', '/sbin/ip', 'address', 'flush', 'dev',
  338. str(dhcp.iface['ifname']), 'label', str(dhcp.iface['ifname'])],
  339. timeout=1, stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
  340. logger.info('[dhcp container] Adding IP addresses %s', dhcp.ip)
  341. subprocess.check_call(['nsenter', f'-n{dhcp.netns}', '--', '/sbin/ip', 'address', 'add', str(dhcp.ip), 'dev',
  342. str(dhcp.iface['ifname'])],
  343. timeout=1, stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
  344. if dhcp.gateway and self.gwPriority >= 0:
  345. logger.info('[dhcp container] Replacing gateway with %s', dhcp.gateway)
  346. subprocess.check_call(['nsenter', f'-n{dhcp.netns}', '--', '/sbin/ip', 'route', 'replace', 'default', 'via',
  347. str(dhcp.gateway)],
  348. timeout=1, stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
  349. if routes:
  350. logger.info('[dhcp container] Restore static routes')
  351. for route in routes:
  352. parts = route.split()
  353. command = ['nsenter', f'-n{dhcp.netns}', '--', '/sbin/ip', 'route', 'add'] + parts + ['proto', 'static']
  354. output = subprocess.run(command, timeout=1, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
  355. logger.info('[dhcp container] Restored static route: %s', route)
  356. else:
  357. logger.info('[dhcp container] Unhandled Event %s: %s', event_type,_event)
  358. def run(self):
  359. try:
  360. iface = await_endpoint_container_iface(self.network, self.endpoint)
  361. hostname = endpoint_container_hostname(self.network, self.endpoint)
  362. self.gwPriority = endpoint_container_network_gwpriority(self.network, self.endpoint)
  363. if self.gwPriority >= 0:
  364. self.dhcp = udhcpc.DHCPClient(iface, event_listener=self._on_event, hostname=hostname)
  365. else:
  366. self.dhcp = udhcpc.DHCPClient(iface, event_listener=self._on_event)
  367. logger.info('Starting DHCPv4 client on %s in container namespace %s', iface['ifname'], \
  368. self.dhcp.netns)
  369. if self.ipv6:
  370. self.dhcp6 = udhcpc.DHCPClient(iface, v6=True, event_listener=self._on_event, hostname=hostname)
  371. logger.info('Starting DHCPv6 client on %s in container namespace %s', iface['ifname'], \
  372. self.dhcp6.netns)
  373. except Exception as e:
  374. logger.exception(e)
  375. if self.dhcp:
  376. self.dhcp.finish(timeout=1)
  377. def stop(self):
  378. if not self.dhcp:
  379. return
  380. try:
  381. logger.info('Shutting down DHCPv4 client on %s in container namespace %s', \
  382. self.dhcp.iface['ifname'], self.dhcp.netns)
  383. self.dhcp.finish(timeout=1)
  384. finally:
  385. try:
  386. if self.ipv6:
  387. logger.info('Shutting down DHCPv6 client on %s in container namespace %s', \
  388. self.dhcp6.iface['ifname'], self.dhcp.netns)
  389. self.dhcp6.finish(timeout=1)
  390. finally:
  391. self._thread.join()
  392. # we have to do this since the docker client leaks sockets...
  393. close_docker_client()
  394. get_docker_client()