Skip to content

PubSub: Errno 24: too many open files with multiple publishers #5523

@emmick4

Description

@emmick4

Ubuntu 16.04
Python version 3.5.2
google-cloud-pubsub==0.35.4
google-auth-oauthlib==0.2.0

I'm using multiprocessing + pubsub and after X amount of time get the stack trace shown below. Even stepping down to just 1 process causes it to fail. When I pull the publisher class out of the publish message function, the error stops being thrown, but the throughput plummets. My inclination is that publisher doesn't properly close its grpc channels and so each new publisher created stacks until failure. There doesn't seem to be a good way to pass a channel into the publisher to then manually close. Is there a way to make sure publishers are killed cleanly?

ERROR:root:AuthMetadataPluginCallback "<google.auth.transport.grpc.AuthMetadataPlugin object at 0x7f304d6eb780>" raised exception!
Traceback (most recent call last):
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/urllib3/util/ssl_.py", line 336, in ssl_wrap_socket
OSError: [Errno 24] Too many open files

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/urllib3/connectionpool.py", line 600, in urlopen
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/urllib3/connectionpool.py", line 343, in _make_request
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/urllib3/connectionpool.py", line 849, in _validate_conn
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/urllib3/connection.py", line 356, in connect
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/urllib3/util/ssl_.py", line 338, in ssl_wrap_socket
urllib3.exceptions.SSLError: [Errno 24] Too many open files

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/requests/adapters.py", line 445, in send
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/urllib3/connectionpool.py", line 638, in urlopen
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/urllib3/util/retry.py", line 398, in increment
urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='accounts.google.com', port=443): Max retries exceeded with url: /o/oauth2/token (Caused by SSLError(OSError(24, 'Too many open files'),))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/google/auth/transport/requests.py", line 120, in __call__
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/requests/sessions.py", line 512, in request
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/requests/sessions.py", line 622, in send
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/requests/adapters.py", line 511, in send
requests.exceptions.SSLError: HTTPSConnectionPool(host='accounts.google.com', port=443): Max retries exceeded with url: /o/oauth2/token (Caused by SSLError(OSError(24, 'Too many open files'),))

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/grpc/_plugin_wrapping.py", line 77, in __call__
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/google/auth/transport/grpc.py", line 77, in __call__
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/google/auth/transport/grpc.py", line 65, in _get_authorization_headers
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/google/auth/credentials.py", line 122, in before_request
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/google/oauth2/service_account.py", line 322, in refresh
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/google/oauth2/_client.py", line 145, in jwt_grant
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/google/oauth2/_client.py", line 106, in _token_endpoint_request
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/google/auth/transport/requests.py", line 124, in __call__
  File "<string>", line 3, in raise_from
google.auth.exceptions.TransportError: HTTPSConnectionPool(host='accounts.google.com', port=443): Max retries exceeded with url: /o/oauth2/token (Caused by SSLError(OSError(24, 'Too many open files'),))

Code snippet:

import json
from google.cloud import pubsub_v1
import google.auth
import time
from multiprocessing import Process, Queue
import os


os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/Users/ryan/Downloads/Multicoin Alpha-0b2b4e33e22d.json"
creds, project = google.auth.default()

def publish_messages(topic_name, messages, project="elegant-device-154517"):
    """Publishes multiple messages to a Pub/Sub topic. Messages as a list"""


    batch_settings = pubsub_v1.types.BatchSettings(
        max_bytes=1e7,  # ten megabytes (pub/sub max)
        max_latency=0.33,  # in seconds
        max_messages=1000
    )
    publisher_defined = False
    while not publisher_defined:
        try:
            publisher = pubsub_v1.PublisherClient(batch_settings, credentials=creds)
            publisher_defined = True
        except:
            time.sleep(0.5)

    topic_path = publisher.topic_path(project, topic_name)

    def publish(data):
        return publisher.publish(topic_path, data=data)

    encoded_messages = []
    for m in messages:
        data = json.dumps(m)
        # Data must be a bytestring
        data = data.encode('utf-8')
        encoded_messages.append(data)

    results = []
    for m in encoded_messages:
        results.append(publish(m))

    return results


class Example(object):
    def __init__(self, num):
        self.data = [{"test":"test1"} for n in range(1000)]
        self.messages = []
        self.successful_messages = []
        self.failed_messages = []

    def push_data(self):
        self.messages.extend(publish_messages("topic_name", self.data))

    def check_messages(self):
        for m in self.messages:
            if m.done():
                try:
                    self.successful_messages.append(int(m.result()))
                    self.messages.remove(m)
                except Exception:
                    self.failed_messages.append(m)
                    self.messages.remove(m)
        if len(self.messages) > 0:
            return False
        else:
            return True

    def push_and_check(self):
        self.push_data()
        total_messages = 0
        total_errors = 0
        complete = False
        while not complete: # while the messages have not all been verified to be sent correctly
            while not self.check_messages(): # while the messages haven't finished attempting to push
                time.sleep(.1)
            total_messages += len(self.failed_messages) + len(self.successful_messages)
            if len(self.failed_messages) == 0:
                complete = True
            else:
                total_errors += len(self.failed_messages)
                self.messages = []
                self.successful_messages = []
                self.failed_messages = []
                self.push_data()
        return total_messages

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = job(func, args)
        output.put(result)
def job(func, args):
    result = func(*args)
    return result


import_queue, done_queue = Queue(), Queue()

test_num = 100000
print("building queue")
for num in range(test_num):
    import_queue.put((Example(num).push_and_check, []))

numprocesses = 5
print("starting processes")
p_count = 0
for i in range(numprocesses):
    p_count += 1
    Process(target=worker, args=(import_queue, done_queue)).start()
    print("started process #", p_count)

count = 0
for i in range(test_num):
    done_queue.get()
    count += 1
    if count % 10 == 0:
        print(count)

Metadata

Metadata

Assignees

Labels

api: pubsubIssues related to the Pub/Sub API.priority: p2Moderately-important priority. Fix may not be included in next release.status: awaiting informationtype: bugError or flaw in code with unintended results or allowing sub-optimal usage patterns.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions