Source code for powerline.segments.common.net

import re
import os
import socket

from powerline.lib.url import urllib_read
from powerline.lib.threaded import ThreadedSegment, KwThreadedSegment
from powerline.lib.monotonic import monotonic
from powerline.lib.humanize_bytes import humanize_bytes
from powerline.lib.shell import readlines
from powerline.segments import with_docstring
from powerline.theme import requires_segment_info

[docs]@requires_segment_info class NetworkManagerSegment(ThreadedSegment): interval = 100 def set_state(self, **kwargs): super(NetworkManagerSegment, self).set_state(**kwargs) def update(self, *args, **kwargs): raw_data = [a.split(':', 1) for a in readlines(cmd = ['nmcli','--terse', '--fields=general,ap,ip4,ip6', 'device', 'show'], cwd=None)] nm_data = {} i = 0 while i < len(raw_data): cdev = raw_data[i] if len(cdev) != 2 or cdev[0] != 'GENERAL.DEVICE': continue cdev = cdev[1] cdata = {} i = i + 1 while i < len(raw_data): if len(raw_data[i]) == 1: # Ignore empty lines i = i + 1 continue if raw_data[i][0] == 'GENERAL.DEVICE': # Found new device break cdata[raw_data[i][0].lower() \ .replace('[', '').replace(']','').replace('.','_')] = raw_data[i][1] if 'IP' in raw_data[i][0] and 'ADDRESS' in raw_data[i][0]: cdata[raw_data[i][0].lower() \ .replace('[', '').replace(']','').replace('.','_') + '_short'] = \ raw_data[i][1].split('/')[0] i = i + 1 cdata['device'] = cdev cdata['type'] = cdata['general_type'] cdata['connection'] = cdata['general_connection'] \ if cdata['general_connection'] != '' else None try: cdata['quality'] = 0 cdata['essid'] = None cdata['security'] = None cdata['channel'] = None apcnt = 1 while 'ap{0}_in-use'.format(apcnt) in cdata: if cdata['ap{0}_in-use'.format(apcnt)] == '*': cdata['quality'] = min(100, int(cdata['ap{0}_signal'.format(apcnt)])*100//80) if 'ap{0}_signal'.format(apcnt) in cdata else 0 cdata['essid'] = cdata['ap{0}_ssid'.format(apcnt)] if 'ap{0}_ssid'.format(apcnt) in cdata else None cdata['security'] = cdata['ap{0}_security'.format(apcnt)] if 'ap{0}_security'.format(apcnt) in cdata else None cdata['channel'] = int(cdata['ap{0}_chan'.format(apcnt)]) if 'ap{0}_chan'.format(apcnt) in cdata else None break apcnt = apcnt + 1 except Exception: pass nm_data[cdev] = cdata return nm_data def render(self, nm_data, segment_info, name='status', device=None, format='{device} {type} {connection}', short_format='', format_down=None, auto_shrink=False, device_types=None, **kwargs): channel_name = 'net.nm_' + name if auto_shrink and not ('payloads' in segment_info and channel_name in segment_info['payloads'] and segment_info['payloads'][channel_name]): format = short_format if not nm_data: return None if device: nm_data = { a: nm_data[a] for a in nm_data if a == device } if device_types: nm_data = { a: nm_data[a] for a in nm_data if nm_data['type'] in device_types } extra_groups_up = [] extra_groups_down = [] if name == 'wifi': # Filter everything that is not of type 'wifi' extra_groups_up = ['wireless:quality', 'wireless:down'] extra_groups_down = ['wireless:down', 'wireless:quality'] nm_data = { a: nm_data[a] for a in nm_data if nm_data[a]['type'] == 'wifi' } if name == 'ethernet': extra_groups_up = ['ethernet:up'] extra_groups_down = ['ethernet:down'] nm_data = { a: nm_data[a] for a in nm_data if nm_data[a]['type'] == 'ethernet' } if format_down == None: return [{'contents': format.format(**nm_data[a]), 'draw_inner_divider': True, 'gradient_level': 100 - nm_data[a]['quality'], 'highlight_groups': extra_groups_up + ['net:' + name], 'click_values': nm_data[a], 'payload_name': channel_name} for a in nm_data if nm_data[a]['connection']] else: return [{'contents': format.format(**nm_data[a]) if nm_data[a]['connection'] else format_down.format(**nm_data[a]), 'draw_inner_divider': True, 'gradient_level': 100 - nm_data[a]['quality'], 'highlight_groups': (extra_groups_up if nm_data[a]['connection'] else extra_groups_down) + ['net:' + name], 'click_values': nm_data[a], 'payload_name': channel_name} for a in nm_data]
network_manager = with_docstring(NetworkManagerSegment(), '''Return what NetworkManager knows about the current connection. Requires ``nmcli`` :param string name: the name of the segment, defaults to ``status``. Setting this value changes the highlight groups used. ======== ================================================================================= Name Highlight Groups Used ======== ================================================================================= status ``net:status`` wifi ``wireless:quality``, `net:wifi` or ``wireless:down``, ``net:wifi`` ethernet ``ethernet:up``, ``ethernet:down`` ======== ================================================================================= :param string device: the device to use. Per default this segment will list data for all active devices. :param string format: the output format :param string short_format: optional shorter format when the powerline needs to shrink segments :param string format_down: if set to any other value than ``None``, it will be shown when no connection is present on the specified device :param bool auto_shrink: if set to true, this segment will use ``short_format`` per default, only using ``format`` when any message is present on the ``net.nm_<name>`` message channel. :param list device_types: filter for the given device types. May include ``wifi``, ``ethernet``, ``gsm``, ``lo``, etc Consult ``man nmcli`` for a comprehensive list. Highlight groups used: ``ethernet:up`` or ``net:ethernet``, ``ethernet:down`` or ``net:ethernet``, ``wireless:quality`` (gradient) or ``net:wifi``, ``wireless:down`` or ``net:wifi``, ``net:status`` Click values supplied: (any value available in format) ''')
[docs]@requires_segment_info def hostname(pl, segment_info, only_if_ssh=False, exclude_domain=False): '''Returns the current hostname. :param bool only_if_ssh: only return the hostname if currently in an SSH session :param bool exclude_domain: return the hostname without domain if there is one No special highlight groups used. ''' if only_if_ssh and not segment_info['environ'].get('SSH_CLIENT'): return None if exclude_domain: return socket.gethostname().split('.')[0] return socket.gethostname()
[docs]@requires_segment_info def wireless(pl, segment_info, device=None, format='{quality:3.0%} at {essid}', short_format='{quality:3.0%}', format_down=None, auto_shrink=False): '''Returns the current connection quality. :param string device: the device to use. Per default this segment will try to be smart. :param string format: the output format :param string short_format: optional shorter format when the powerline needs to shrink segments :param string format_down: if set to any other value than ``None``, it will be shown when no wireless connection is present. :param bool auto_shrink: if set to true, this segment will use ``short_format`` per default, only using ``format`` when any message is present on the ``net.wireless`` message channel. Highlight groups used: ``wireless:quality`` (gradient), ``wireless:down`` alternatively ``wireless:quality`` (gradient) Click values supplied: ``quality`` (int), ``essid`` (string) ''' payload_name = 'net.wireless' if not device: for interface in os.listdir('/sys/class/net'): if interface.startswith('wl'): device = interface break try: import iwlib except ImportError: pl.info("Couldn't load iwlib") return None if not format_down else [{ 'contents': format_down.format(quality=0, essid=None, frequency=0), 'highlight_groups': ['wireless:down', 'wireless:quality', 'quality_gradient'], 'gradient_level': 100, 'payload_name': payload_name }] stats = iwlib.get_iwconfig(device) stats = {a.lower(): stats[a].decode() if isinstance(stats[a], bytes) else stats[a] for a in stats} quality = 0 essid = '' if 'essid' in stats: essid = stats['essid'] if 'stats' in stats and 'quality' in stats['stats']: quality = stats['stats']['quality'] if essid == '' or quality == 0: return None if not format_down else [{ 'contents': format_down.format(quality=0, essid=None, **stats), 'highlight_groups': ['wireless:down', 'wireless:quality', 'quality_gradient'], 'gradient_level': 100, 'payload_name': payload_name }] if not auto_shrink or ('payloads' in segment_info and payload_name in segment_info['payloads'] and segment_info['payloads'][payload_name]): return [{ 'contents': format.format(quality=quality/70, **stats), 'highlight_groups': ['wireless:quality', 'quality_gradient'], 'gradient_level': 100 * (70 - quality) / 70, 'click_values': {'essid': essid, 'quality': quality * 100 / 70}, 'payload_name': payload_name }] return [{ 'contents': short_format.format(quality=quality/70, **stats), 'highlight_groups': ['wireless:quality', 'quality_gradient'], 'gradient_level': 100 * (70 - quality) / 70, 'click_values': {'essid': essid, 'quality': quality * 100 / 70}, 'payload_name': payload_name, 'truncate': lambda a,b,c: short_format.format(quality=quality/70, **stats) }]
def _external_ip(query_url='http://ipv6.icanhazip.com/'): return urllib_read(query_url).strip()
[docs]class ExternalIpSegment(ThreadedSegment): interval = 300 def set_state(self, query_url='http://ipv4.icanhazip.com/', **kwargs): self.query_url = query_url super(ExternalIpSegment, self).set_state(**kwargs) def update(self, old_ip): return _external_ip(query_url=self.query_url) def render(self, ip, **kwargs): if not ip: return None return [{'contents': ip, 'divider_highlight_group': 'background:divider', 'click_values': {'external_ip': ip}}]
external_ip = with_docstring(ExternalIpSegment(), '''Return external IP address. :param str query_url: URI to query for IP address, should return only the IP address as a text string Suggested URIs: * http://ipv4.icanhazip.com/ * http://ipv6.icanhazip.com/ * http://icanhazip.com/ (returns IPv6 address if available, else IPv4) Divider highlight group used: ``background:divider``. Click values supplied: ``external_ip`` (string) ''') try: import netifaces except ImportError: def internal_ip(pl, interface='auto', ipv=4): return None else: _interface_starts = { 'eth': 10, # Regular ethernet adapters : eth1 'enp': 10, # Regular ethernet adapters, Gentoo : enp2s0 'en': 10, # OS X : en0 'ath': 9, # Atheros WiFi adapters : ath0 'wlan': 9, # Other WiFi adapters : wlan1 'wlp': 9, # Other WiFi adapters, Gentoo : wlp5s0 'teredo': 1, # miredo interface : teredo 'lo': -10, # Loopback interface : lo 'docker': -5, # Docker bridge interface : docker0 'vmnet': -5, # VMWare bridge interface : vmnet1 'vboxnet': -5, # VirtualBox bridge interface : vboxnet0 } _interface_start_re = re.compile(r'^([a-z]+?)(\d|$)') def _interface_key(interface): match = _interface_start_re.match(interface) if match: try: base = _interface_starts[match.group(1)] * 100 except KeyError: base = 500 if match.group(2): return base - int(match.group(2)) else: return base else: return 0
[docs] def internal_ip(pl, format="{addr}", interface='auto', ipv=4): family = netifaces.AF_INET6 if ipv == 6 else netifaces.AF_INET if interface == 'auto': try: interface = next(iter(sorted(netifaces.interfaces(), key=_interface_key, reverse=True))) except StopIteration: pl.info('No network interfaces found') return None elif interface == 'default_gateway': try: interface = netifaces.gateways()['default'][family][1] except KeyError: pl.info('No default gateway found for IPv{0}', ipv) return None addrs = netifaces.ifaddresses(interface) try: addr = addrs[family][0]['addr'] netmask = addrs[family][0]['netmask'] if "netmask" in addrs[family][0] else None if ipv == 6 and netmask is not None: # netifaces reports IPv6 subnet mask with suffix (e.g. /64) cidr = netmask.split("/")[1] elif netmask is not None: # Turn subnet mask into a bitmask, then count the ones bitmask = "".join("{0:08b}".format(int(number)) for number in netmask.split(".")) cidr = bitmask.count("1") else: cidr = 0 return format.format(addr=addr, netmask=netmask, cidr=cidr) except (KeyError, IndexError): pl.info("No IPv{0} address found for interface {1}", ipv, interface) return None
internal_ip = with_docstring(internal_ip, '''Return internal IP address Requires ``netifaces`` module to work properly. :param str interface: Interface on which IP will be checked. Use ``auto`` to automatically detect interface. In this case interfaces with lower numbers will be preferred over interfaces with similar names. Order of preference based on names: #. ``eth`` and ``enp`` followed by number or the end of string. #. ``ath``, ``wlan`` and ``wlp`` followed by number or the end of string. #. ``teredo`` followed by number or the end of string. #. Any other interface that is not ``lo*``. #. ``lo`` followed by number or the end of string. Use ``default_gateway`` to detect the interface based on the machine's `default gateway <https://en.wikipedia.org/wiki/Default_gateway>`_ (i.e., the router to which it is connected). :param string format: Format string. Use ``addr`` to show the address, ``netmask`` to show the subnet mask, and ``cidr`` to show the subnet in CIDR notation :param int ipv: 4 or 6 for ipv4 and ipv6 respectively, depending on which IP address you need exactly. ''') try: import psutil def _get_bytes(interface): try: io_counters = psutil.net_io_counters(pernic=True) except AttributeError: io_counters = psutil.network_io_counters(pernic=True) if_io = io_counters.get(interface) if not if_io: return None return if_io.bytes_recv, if_io.bytes_sent def _get_interfaces(): try: io_counters = psutil.net_io_counters(pernic=True) except AttributeError: io_counters = psutil.network_io_counters(pernic=True) for interface, data in io_counters.items(): if data: yield interface, data.bytes_recv, data.bytes_sent except ImportError: def _get_bytes(interface): with open('/sys/class/net/{interface}/statistics/rx_bytes'.format(interface=interface), 'rb') as file_obj: rx = int(file_obj.read()) with open('/sys/class/net/{interface}/statistics/tx_bytes'.format(interface=interface), 'rb') as file_obj: tx = int(file_obj.read()) return (rx, tx) def _get_interfaces(): for interface in os.listdir('/sys/class/net'): x = _get_bytes(interface) if x is not None: yield interface, x[0], x[1]
[docs]class NetworkLoadSegment(KwThreadedSegment): interfaces = {} replace_num_pat = re.compile(r'[a-zA-Z]+') @staticmethod def key(interface='auto', **kwargs): return interface def compute_state(self, interface): if interface == 'auto': proc_exists = getattr(self, 'proc_exists', None) if proc_exists is None: proc_exists = self.proc_exists = os.path.exists('/proc/net/route') if proc_exists: # Look for default interface in routing table with open('/proc/net/route', 'rb') as f: for line in f.readlines(): parts = line.split() if len(parts) > 1: iface, destination = parts[:2] if not destination.replace(b'0', b''): interface = iface.decode('utf-8') break if interface == 'auto': # Choose interface with most total activity, excluding some # well known interface names interface, total = 'eth0', -1 for name, rx, tx in _get_interfaces(): base = self.replace_num_pat.match(name) if None in (base, rx, tx) or base.group() in ('lo', 'vmnet', 'sit'): continue activity = rx + tx if activity > total: total = activity interface = name try: idata = self.interfaces[interface] try: idata['prev'] = idata['last'] except KeyError: pass except KeyError: idata = {} if self.run_once: idata['prev'] = (monotonic(), _get_bytes(interface)) self.shutdown_event.wait(self.interval) self.interfaces[interface] = idata idata['last'] = (monotonic(), _get_bytes(interface)) return idata.copy() def render_one(self, idata, recv_format='DL {value:>8}', sent_format='UL {value:>8}', suffix='B/s', si_prefix=False, **kwargs): if not idata or 'prev' not in idata: return None t1, b1 = idata['prev'] t2, b2 = idata['last'] measure_interval = t2 - t1 if None in (b1, b2): return None r = [] for i, key in zip((0, 1), ('recv', 'sent')): format = locals()[key + '_format'] try: value = (b2[i] - b1[i]) / measure_interval except ZeroDivisionError: self.warn('Measure interval zero.') value = 0 max_key = key + '_max' is_gradient = max_key in kwargs hl_groups = ['network_load_' + key, 'network_load'] if is_gradient: hl_groups[:0] = (group + '_gradient' for group in hl_groups) r.append({ 'contents': format.format(value=humanize_bytes(value, suffix, si_prefix)), 'divider_highlight_group': 'network_load:divider', 'highlight_groups': hl_groups }) if is_gradient: max = kwargs[max_key] if value >= max: r[-1]['gradient_level'] = 100 else: r[-1]['gradient_level'] = value * 100.0 / max return r
network_load = with_docstring(NetworkLoadSegment(), '''Return the network load. Uses the ``psutil`` module if available for multi-platform compatibility, falls back to reading :file:`/sys/class/net/{interface}/statistics/{rx,tx}_bytes`. :param str interface: Network interface to measure (use the special value "auto" to have powerline try to auto-detect the network interface). :param str suffix: String appended to each load string. :param bool si_prefix: Use SI prefix, e.g. MB instead of MiB. :param str recv_format: Format string that determines how download speed should look like. Receives ``value`` as argument. :param str sent_format: Format string that determines how upload speed should look like. Receives ``value`` as argument. :param float recv_max: Maximum number of received bytes per second. Is only used to compute gradient level. :param float sent_max: Maximum number of sent bytes per second. Is only used to compute gradient level. Divider highlight group used: ``network_load:divider``. Highlight groups used: ``network_load_sent_gradient`` (gradient) or ``network_load_recv_gradient`` (gradient) or ``network_load_gradient`` (gradient), ``network_load_sent`` or ``network_load_recv`` or ``network_load``. ''')