Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto repare Network error #184

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 32 additions & 5 deletions happybase/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@

import logging
from numbers import Integral
from struct import Struct

from six import iteritems

from Hbase_thrift import TScan
from six import iteritems
from struct import Struct
from thriftpy.transport import TTransportException

from .util import thrift_type_to_dict, bytes_increment, OrderedDict
from .batch import Batch
from .util import OrderedDict, bytes_increment, thrift_type_to_dict

logger = logging.getLogger(__name__)

Expand All @@ -38,12 +38,26 @@ def make_ordered_row(sorted_columns, include_timestamp):
return od


def safe_call(function):
def safe(self, *args, **kwargs):
try:
return function(self, *args, **kwargs)
except (BrokenPipeError, TTransportException):
logger.debug("Network error: refresh thrift connection")
self.connection._refresh_thrift_client()
self.connection.open()
return function(self, *args, **kwargs)

return safe


class Table(object):
"""HBase table abstraction class.

This class cannot be instantiated directly; use :py:meth:`Connection.table`
instead.
"""

def __init__(self, name, connection):
self.name = name
self.connection = connection
Expand All @@ -55,6 +69,7 @@ def __repr__(self):
self.name,
)

@safe_call
def families(self):
"""Retrieve the column families for this table.

Expand All @@ -68,11 +83,13 @@ def families(self):
families[name] = thrift_type_to_dict(descriptor)
return families

@safe_call
def _column_family_names(self):
"""Retrieve the column family names for this table (internal use)"""
names = self.connection.client.getColumnDescriptors(self.name).keys()
return [name.rstrip(b':') for name in names]

@safe_call
def regions(self):
"""Retrieve the regions for this table.

Expand All @@ -86,6 +103,7 @@ def regions(self):
# Data retrieval
#

@safe_call
def row(self, row, columns=None, timestamp=None, include_timestamp=False):
"""Retrieve a single row of data.

Expand Down Expand Up @@ -131,6 +149,7 @@ def row(self, row, columns=None, timestamp=None, include_timestamp=False):

return make_row(rows[0].columns, include_timestamp)

@safe_call
def rows(self, rows, columns=None, timestamp=None,
include_timestamp=False):
"""Retrieve multiple rows of data.
Expand Down Expand Up @@ -176,6 +195,7 @@ def rows(self, rows, columns=None, timestamp=None,
return [(r.row, make_row(r.columns, include_timestamp))
for r in results]

@safe_call
def cells(self, row, column, versions=None, timestamp=None,
include_timestamp=False):
"""Retrieve multiple versions of a single cell from the table.
Expand Down Expand Up @@ -219,6 +239,7 @@ def cells(self, row, column, versions=None, timestamp=None,
for c in cells
]

@safe_call
def scan(self, row_start=None, row_stop=None, row_prefix=None,
columns=None, filter=None, timestamp=None,
include_timestamp=False, batch_size=1000, scan_batching=None,
Expand Down Expand Up @@ -439,7 +460,7 @@ def scan(self, row_start=None, row_stop=None, row_prefix=None,
#
# Data manipulation
#

@safe_call
def put(self, row, data, timestamp=None, wal=True):
"""Store data in the table.

Expand All @@ -463,6 +484,7 @@ def put(self, row, data, timestamp=None, wal=True):
with self.batch(timestamp=timestamp, wal=wal) as batch:
batch.put(row, data)

@safe_call
def delete(self, row, columns=None, timestamp=None, wal=True):
"""Delete data from the table.

Expand All @@ -483,6 +505,7 @@ def delete(self, row, columns=None, timestamp=None, wal=True):
with self.batch(timestamp=timestamp, wal=wal) as batch:
batch.delete(row, columns)

@safe_call
def batch(self, timestamp=None, batch_size=None, transaction=False,
wal=True):
"""Create a new batch operation for this table.
Expand Down Expand Up @@ -529,6 +552,7 @@ def batch(self, timestamp=None, batch_size=None, transaction=False,
# Atomic counters
#

@safe_call
def counter_get(self, row, column):
"""Retrieve the current value of a counter column.

Expand All @@ -550,6 +574,7 @@ def counter_get(self, row, column):
# is correctly initialised if didn't exist yet.
return self.counter_inc(row, column, value=0)

@safe_call
def counter_set(self, row, column, value=0):
"""Set a counter column to a specific value.

Expand All @@ -567,6 +592,7 @@ def counter_set(self, row, column, value=0):
"""
self.put(row, {column: pack_i64(value)})

@safe_call
def counter_inc(self, row, column, value=1):
"""Atomically increment (or decrements) a counter column.

Expand All @@ -586,6 +612,7 @@ def counter_inc(self, row, column, value=1):
return self.connection.client.atomicIncrement(
self.name, row, column, value)

@safe_call
def counter_dec(self, row, column, value=1):
"""Atomically decrement (or increments) a counter column.

Expand Down