Skip to content

Commit

Permalink
Kill py4j callback server properly
Browse files Browse the repository at this point in the history
  • Loading branch information
Ken Takagiwa authored and giwa committed Aug 18, 2014
1 parent 19ddcdd commit b47b5fd
Showing 1 changed file with 16 additions and 6 deletions.
22 changes: 16 additions & 6 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
# limitations under the License.
#

import time
import sys
from signal import signal, SIGTERM, SIGINT

from pyspark.conf import SparkConf
from pyspark.files import SparkFiles
Expand Down Expand Up @@ -63,22 +64,31 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
"""

# launch call back server
if not gateway:
gateway = launch_gateway()
# gateway.restart_callback_server()

# Create the Python Sparkcontext
self._sc = SparkContext(master=master, appName=appName, sparkHome=sparkHome,
pyFiles=pyFiles, environment=environment, batchSize=batchSize,
serializer=serializer, conf=conf, gateway=gateway)

# Start py4j callback server
SparkContext._gateway.restart_callback_server()
self._clean_up_trigger()
self._jvm = self._sc._jvm
self._jssc = self._initialize_context(self._sc._jsc, duration._jduration)

# Initialize StremaingContext in function to allow subclass specific initialization
def _initialize_context(self, jspark_context, jduration):
return self._jvm.JavaStreamingContext(jspark_context, jduration)

def _clean_up_trigger(self):
"""Kill py4j callback server properly using signal lib"""

def clean_up_handler(*args):
SparkContext._gateway.shutdown()
sys.exit(0)

for sig in (SIGINT, SIGTERM):
signal(sig, clean_up_handler)

def start(self):
"""
Start the execution of the streams.
Expand Down

0 comments on commit b47b5fd

Please sign in to comment.