관리-도구
편집 파일: reduction.py
# # Module to allow connection and socket objects to be transferred # between processes # # multiprocessing/reduction.py # # Copyright (c) 2006-2008, R Oudkerk # Licensed to PSF under a Contributor Agreement. # __all__ = ['reduce_socket', 'reduce_connection', 'send_handle', 'recv_handle'] import os import sys import socket import threading import struct import signal from multiprocessing import current_process from multiprocessing.util import register_after_fork, debug, sub_debug from multiprocessing.util import is_exiting, sub_warning # # # if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and hasattr(socket, 'SCM_RIGHTS'))): raise ImportError('pickling of connections not supported') # # Platform specific definitions # if sys.platform == 'win32': # Windows __all__ += ['reduce_pipe_connection'] import _winapi def send_handle(conn, handle, destination_pid): dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid) conn.send(dh) def recv_handle(conn): return conn.recv().detach() class DupHandle(object): def __init__(self, handle, access, pid=None): # duplicate handle for process with given pid if pid is None: pid = os.getpid() proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid) try: self._handle = _winapi.DuplicateHandle( _winapi.GetCurrentProcess(), handle, proc, access, False, 0) finally: _winapi.CloseHandle(proc) self._access = access self._pid = pid def detach(self): # retrieve handle from process which currently owns it if self._pid == os.getpid(): return self._handle proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, self._pid) try: return _winapi.DuplicateHandle( proc, self._handle, _winapi.GetCurrentProcess(), self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE) finally: _winapi.CloseHandle(proc) class DupSocket(object): def __init__(self, sock): new_sock = sock.dup() def send(conn, pid): share = new_sock.share(pid) conn.send_bytes(share) self._id = resource_sharer.register(send, new_sock.close) def detach(self): conn = resource_sharer.get_connection(self._id) try: share = conn.recv_bytes() return socket.fromshare(share) finally: conn.close() def reduce_socket(s): return rebuild_socket, (DupSocket(s),) def rebuild_socket(ds): return ds.detach() def reduce_connection(conn): handle = conn.fileno() with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s: ds = DupSocket(s) return rebuild_connection, (ds, conn.readable, conn.writable) def rebuild_connection(ds, readable, writable): from .connection import Connection sock = ds.detach() return Connection(sock.detach(), readable, writable) def reduce_pipe_connection(conn): access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) | (_winapi.FILE_GENERIC_WRITE if conn.writable else 0)) dh = DupHandle(conn.fileno(), access) return rebuild_pipe_connection, (dh, conn.readable, conn.writable) def rebuild_pipe_connection(dh, readable, writable): from .connection import PipeConnection handle = dh.detach() return PipeConnection(handle, readable, writable) else: # Unix # On MacOSX we should acknowledge receipt of fds -- see Issue14669 ACKNOWLEDGE = sys.platform == 'darwin' def send_handle(conn, handle, destination_pid): with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: s.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, struct.pack("@i", handle))]) if ACKNOWLEDGE and conn.recv_bytes() != b'ACK': raise RuntimeError('did not receive acknowledgement of fd') def recv_handle(conn): size = struct.calcsize("@i") with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: msg, ancdata, flags, addr = s.recvmsg(1, socket.CMSG_LEN(size)) try: if ACKNOWLEDGE: conn.send_bytes(b'ACK') cmsg_level, cmsg_type, cmsg_data = ancdata[0] if (cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS): return struct.unpack("@i", cmsg_data[:size])[0] except (ValueError, IndexError, struct.error): pass raise RuntimeError('Invalid data received') class DupFd(object): def __init__(self, fd): new_fd = os.dup(fd) def send(conn, pid): send_handle(conn, new_fd, pid) def close(): os.close(new_fd) self._id = resource_sharer.register(send, close) def detach(self): conn = resource_sharer.get_connection(self._id) try: return recv_handle(conn) finally: conn.close() def reduce_socket(s): df = DupFd(s.fileno()) return rebuild_socket, (df, s.family, s.type, s.proto) def rebuild_socket(df, family, type, proto): fd = df.detach() s = socket.fromfd(fd, family, type, proto) os.close(fd) return s def reduce_connection(conn): df = DupFd(conn.fileno()) return rebuild_connection, (df, conn.readable, conn.writable) def rebuild_connection(df, readable, writable): from .connection import Connection fd = df.detach() return Connection(fd, readable, writable) # # Server which shares registered resources with clients # class ResourceSharer(object): def __init__(self): self._key = 0 self._cache = {} self._old_locks = [] self._lock = threading.Lock() self._listener = None self._address = None self._thread = None register_after_fork(self, ResourceSharer._afterfork) def register(self, send, close): with self._lock: if self._address is None: self._start() self._key += 1 self._cache[self._key] = (send, close) return (self._address, self._key) @staticmethod def get_connection(ident): from .connection import Client address, key = ident c = Client(address, authkey=current_process().authkey) c.send((key, os.getpid())) return c def stop(self, timeout=None): from .connection import Client with self._lock: if self._address is not None: c = Client(self._address, authkey=current_process().authkey) c.send(None) c.close() self._thread.join(timeout) if self._thread.is_alive(): sub_warn('ResourceSharer thread did not stop when asked') self._listener.close() self._thread = None self._address = None self._listener = None for key, (send, close) in self._cache.items(): close() self._cache.clear() def _afterfork(self): for key, (send, close) in self._cache.items(): close() self._cache.clear() # If self._lock was locked at the time of the fork, it may be broken # -- see issue 6721. Replace it without letting it be gc'ed. self._old_locks.append(self._lock) self._lock = threading.Lock() if self._listener is not None: self._listener.close() self._listener = None self._address = None self._thread = None def _start(self): from .connection import Listener assert self._listener is None debug('starting listener and thread for sending handles') self._listener = Listener(authkey=current_process().authkey) self._address = self._listener.address t = threading.Thread(target=self._serve) t.daemon = True t.start() self._thread = t def _serve(self): if hasattr(signal, 'pthread_sigmask'): signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG)) while 1: try: conn = self._listener.accept() msg = conn.recv() if msg is None: break key, destination_pid = msg send, close = self._cache.pop(key) send(conn, destination_pid) close() conn.close() except: if not is_exiting(): import traceback sub_warning( 'thread for sharing handles raised exception :\n' + '-'*79 + '\n' + traceback.format_exc() + '-'*79 ) resource_sharer = ResourceSharer()