_pickle.PicklingError:无法序列化对象:TypeError:无法pickle _thread.RLock对象

2024-03-07

我想使用 Kafka 和 Spark 进行情感分析。我想要做的是从 Kafka 读取流数据,然后使用 Spark 对数据进行批处理。之后,我想使用我使用 Tensorflow 制作的函数情感预测()来分析批处理。这就是我到目前为止所做的......

import os  
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'

#    Spark
from pyspark import SparkContext  
#    Spark Streaming
from pyspark.streaming import StreamingContext  
#    Kafka
from pyspark.streaming.kafka import KafkaUtils  
#    json parsing
import json

from multiprocessing import Lock
lock = Lock()

numDimensions = 300
maxSeqLength = 70
batchSize = 24
lstmUnits = 128
numClasses = 2
iterations = 100000

import numpy as np
import pickle
from nltk.tokenize import word_tokenize
import DataPreprocessing as proc
import time

with open('dictionary.pickle', 'rb') as handle:
    wordsList = pickle.load(handle)
wordVectors = np.load('final_embeddings.npy')

import tensorflow as tf
tf.reset_default_graph()

labels = tf.placeholder(tf.float32, [batchSize, numClasses])
input_data = tf.placeholder(tf.int32, [batchSize, maxSeqLength])

data = tf.Variable(tf.zeros([batchSize, maxSeqLength, numDimensions]),dtype=tf.float32)
data = tf.nn.embedding_lookup(wordVectors,input_data)

lstmCell = tf.contrib.rnn.BasicLSTMCell(lstmUnits)
lstmCell = tf.contrib.rnn.DropoutWrapper(cell=lstmCell, output_keep_prob=0.25)
value, _ = tf.nn.dynamic_rnn(lstmCell, data, dtype=tf.float32)

weight = tf.Variable(tf.truncated_normal([lstmUnits, numClasses]))
bias = tf.Variable(tf.constant(0.1, shape=[numClasses]))
value = tf.transpose(value, [1, 0, 2])
last = tf.gather(value, int(value.get_shape()[0]) - 1)
prediction = (tf.matmul(last, weight) + bias)

correctPred = tf.equal(tf.argmax(prediction,1), tf.argmax(labels,1))
accuracy = tf.reduce_mean(tf.cast(correctPred, tf.float32))

sess = tf.InteractiveSession()
saver = tf.train.Saver()
saver.restore(sess, tf.train.latest_checkpoint('models'))

def getSentenceMatrix(sentence):
    arr = np.zeros([batchSize, maxSeqLength])
    sentenceMatrix = np.zeros([batchSize,maxSeqLength], dtype='int32')
    cleanedSentence = proc.cleanSentences(sentence)
    split = cleanedSentence.split()
    for indexCounter,word in enumerate(split):
        try:
            if word in wordsList:
                    sentenceMatrix[0,indexCounter] = wordsList[word]
            else:
                    sentenceMatrix[0,indexCounter] = 0 #Vector for unkown words
        except ValueError:
            sentenceMatrix[0,indexCounter] = 399999 #Vector for unkown words
    return sentenceMatrix

def sentimentCorrect(data):
    try:
        sentiment_results = {}
        #sentences = data['sentences']
        string = data.split(' ')
        exact = [(spell.correction(word)) for word in string]
        exact = ' '.join(exact)
        inputMatrix = getSentenceMatrix(proc.cleanSentences(proc._lookup_words(proc.stemmer.stem(exact))))
        predictedSentiment = sess.run(prediction, {input_data: inputMatrix})[0]
        # predictedSentiment[0] represents output score for positive sentiment
        # predictedSentiment[1] represents output score for negative sentiment
        print("Positive : ",predictedSentiment[0])
        print("Negative : ",predictedSentiment[1])
        if (predictedSentiment[0] > predictedSentiment[1]):
            result = "Positive"
        else:
            result = "Negative"

        sentiment_results["sentences"] = data
        sentiment_results["positiveScores"] = str(predictedSentiment[0])
        sentiment_results["negativeScores"] = str(predictedSentiment[1])
        sentiment_results["sentiment"] = result

        return sentiment_results
    except:
        print("Delay for 5 seconds")
        time.sleep(5)

def sentimentPredict(data):
        try:
            sentiment_results = {}
            #sentences = data['sentences']
            #string = sentences.split(' ')
            #exact = [get_exact_words(word) for word in string]
            #exact = ' '.join(exact)
            inputMatrix = getSentenceMatrix(proc.cleanSentences(proc._lookup_words(proc.stemmer.stem(data))))
            predictedSentiment = sess.run(prediction, {input_data: inputMatrix})[0]
            # predictedSentiment[0] represents output score for positive sentiment
            # predictedSentiment[1] represents output score for negative sentiment
            print("Positive : ",predictedSentiment[0])
            print("Negative : ",predictedSentiment[1])
            if (predictedSentiment[0] > predictedSentiment[1]):
                result = "Positive"
            else:
                result = "Negative"

            sentiment_results["sentences"] = data
            sentiment_results["positiveScores"] = str(predictedSentiment[0])
            sentiment_results["negativeScores"] = str(predictedSentiment[1])
            sentiment_results["sentiment"] = result

            return sentiment_results
        except TypeError:
            raise

sc = SparkContext(appName="PythonSparkStreamingKafka_RM_01")  
sc.setLogLevel("WARN")

ssc = StreamingContext(sc, 2)  
#kafkaStream = KafkaUtils.createStream(ssc, 'NLP:2181', 'spark-streaming', {'weblogs':1})
kafkaStream = KafkaUtils.createDirectStream(ssc, topics = ['weblogs'], kafkaParams = {"metadata.broker.list": "NLP:9092"})
# Here to parse the inbound messages isn't valid JSON
parsed = kafkaStream.map(lambda v: json.loads(v[1]))
parsed.count().map(lambda x:'Tweets in this batch: %s' % x).pprint()
#parsed.saveAsTextFiles("file:///D:/spark-kafka.txt")

id_twitter = parsed.map(lambda tweet: tweet["id"])
id_twitter.saveAsTextFiles("file:///D:/id-tweet.txt")
id_twitter.count().map(lambda x:'ID in this batch: %s' % x).pprint()

name = parsed.map(lambda tweet: tweet["name"])
name.saveAsTextFiles("file:///D:/name-tweet.txt")
parsed.count().map(lambda x:'Tweets in this batch: %s' % x).pprint()

text = parsed.map(lambda tweet: tweet["text"])
text.saveAsTextFiles("file:///D:/text-tweet.txt")

sentiment = text.mapPartitions(sentimentPredict)
sentiment.saveAsTextFiles("file:///D:/sentiment-tweet.txt")

#sentiment_result = text.map(sentimentPredict(text))
#sentiment_result = text.flatMap(sentimentPredict(text))
#print(sentiment_result)

#parsed.map(lambda x:'Tweets in this batch: %s' % x).pprint()
#parsed.encode("utf-8").pprint()
#print(parsed)
#print(soup.encode("utf-8"))
#sentiment_result.saveAsTextFiles("file:///D:/spark-kafka.txt")
ssc.start()
ssc.awaitTermination()

但是,当我在终端中使用 Spark-submit 2 运行代码时,出现此错误......

Traceback (most recent call last):
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 235, in dump
    return Pickler.dump(self, obj)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 409, in dump
    self.save(obj)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 751, in save_tuple
    save(element)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 378, in save_function
    self.save_function_tuple(obj)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 529, in save_function_tuple
    save(closure_values)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 781, in save_list
    self._batch_appends(obj)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 805, in _batch_appends
    save(x)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 378, in save_function
    self.save_function_tuple(obj)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 529, in save_function_tuple
    save(closure_values)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 781, in save_list
    self._batch_appends(obj)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 808, in _batch_appends
    save(tmp[0])
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 372, in save_function
    self.save_function_tuple(obj)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 525, in save_function_tuple
    save(f_globals)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
    save(v)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 521, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 804, in save_reduce
    save(state)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
    save(v)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 521, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 804, in save_reduce
    save(state)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
    save(v)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 496, in save
    rv = reduce(self.proto)
TypeError: can't pickle _thread.RLock objects
2018-04-09 16:21:48 ERROR JobScheduler:91 - Error generating jobs for time 1523265708000 ms
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 235, in dump
    return Pickler.dump(self, obj)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 409, in dump
    self.save(obj)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 751, in save_tuple
    save(element)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 378, in save_function
    self.save_function_tuple(obj)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 529, in save_function_tuple
    save(closure_values)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 781, in save_list
    self._batch_appends(obj)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 805, in _batch_appends
    save(x)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 378, in save_function
    self.save_function_tuple(obj)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 529, in save_function_tuple
    save(closure_values)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 781, in save_list
    self._batch_appends(obj)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 808, in _batch_appends
    save(tmp[0])
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 372, in save_function
    self.save_function_tuple(obj)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 525, in save_function_tuple
    save(f_globals)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
    save(v)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 521, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 804, in save_reduce
    save(state)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
    save(v)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 521, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 804, in save_reduce
    save(state)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
    save(v)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 496, in save
    rv = reduce(self.proto)
TypeError: can't pickle _thread.RLock objects

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\streaming\util.py", line 67, in call
    return r._jrdd
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\rdd.py", line 2470, in _jrdd
    self._jrdd_deserializer, profiler)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\rdd.py", line 2403, in _wrap_function
    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\rdd.py", line 2389, in _prepare_for_python_RDD
    pickled_command = ser.dumps(command)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\serializers.py", line 568, in dumps
    return cloudpickle.dumps(obj, 2)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 918, in dumps
    cp.dump(obj)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 249, in dump
    raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects

        at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
        at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
        at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:246)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
        at scala.Option.orElse(Option.scala:289)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
        at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
        at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
        at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Traceback (most recent call last):
  File "D:/PROJECT_MABESPOLRI/progress_spark_sentiment/spark+sentiment.py", line 171, in <module>
    ssc.awaitTermination()
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\streaming\context.py", line 206, in awaitTermination
    self._jssc.awaitTermination()
  File "C:\Users\CS\Anaconda3\lib\site-packages\py4j\java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:\Users\CS\Anaconda3\lib\site-packages\py4j\protocol.py", line 320, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o22.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 235, in dump
    return Pickler.dump(self, obj)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 409, in dump
    self.save(obj)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 751, in save_tuple
    save(element)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 378, in save_function
    self.save_function_tuple(obj)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 529, in save_function_tuple
    save(closure_values)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 781, in save_list
    self._batch_appends(obj)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 805, in _batch_appends
    save(x)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 378, in save_function
    self.save_function_tuple(obj)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 529, in save_function_tuple
    save(closure_values)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 781, in save_list
    self._batch_appends(obj)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 808, in _batch_appends
    save(tmp[0])
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 372, in save_function
    self.save_function_tuple(obj)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 525, in save_function_tuple
    save(f_globals)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
    save(v)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 521, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 804, in save_reduce
    save(state)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
    save(v)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 521, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 804, in save_reduce
    save(state)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
    save(v)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 496, in save
    rv = reduce(self.proto)
TypeError: can't pickle _thread.RLock objects

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\streaming\util.py", line 67, in call
    return r._jrdd
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\rdd.py", line 2470, in _jrdd
    self._jrdd_deserializer, profiler)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\rdd.py", line 2403, in _wrap_function
    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\rdd.py", line 2389, in _prepare_for_python_RDD
    pickled_command = ser.dumps(command)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\serializers.py", line 568, in dumps
    return cloudpickle.dumps(obj, 2)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 918, in dumps
    cp.dump(obj)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 249, in dump
    raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects

        at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
        at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
        at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:246)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
        at scala.Option.orElse(Option.scala:289)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
        at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
        at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
        at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

有人可以告诉我如何解决这个问题吗???谢谢


尝试将 Spark 代码放入仅包含必要参数的单独函数中。当您运行 Spark 操作时,它会尝试腌制当前范围内的所有内容(在您的情况下是顶层),如果遇到某些无法腌制的对象,则会抛出错误。就您而言,我怀疑该错误可能是由变量“lock”引起的。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

_pickle.PicklingError:无法序列化对象:TypeError:无法pickle _thread.RLock对象 的相关文章

  • 根据另一个数据框中找到的范围填充数据框中的列

    我试图根据该记录的索引值是否落在另一个数据框中的两列定义的范围内来填充数据框中的列 df1 看起来像 a 0 4 1 45 2 7 3 5 4 48 5 44 6 22 7 89 8 45 9 44 10 23 df2 是 START ST
  • 如何生成大型网站的图形站点地图[关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我想为我的网站生成图形站点地图 据我所知 有两个阶段 抓取网站并分析链接关系 提取树形结构 生成视觉上
  • 类型错误:translate() 只接受一个参数(给定 2 个参数)[重复]

    这个问题在这里已经有答案了 我的代码在 python 2 x 版本上运行良好 但是当我尝试在 python 3 x 版本上运行它时 出现错误 主题 需要缩写短信编码中的任何消息 Code def sms encoding data star
  • 在Python中清理属于不同语言的文本

    我有一个文本集合 其中的句子要么完全是英语 印地语或马拉地语 每个句子附加的 id 为 0 1 2 分别代表文本的语言 无论任何语言的文本都可能有 HTML 标签 标点符号等 我可以使用下面的代码清理英语句子 import HTMLPars
  • Python ElementTree 获取带有命名空间的属性

    我试图访问 XML 中的 def 所以在这个例子中我会得到Evolus Common PlainTextV2作为输出 我似乎无法弄清楚如何获取具有名称空间的属性 如果我想得到id它工作得很好 Python for content ns in
  • 如何在python mechanize中设置cookie

    向服务器发送请求后 br open http xxxx br select form nr 0 br form MESSAGE 1 2 3 4 5 br submit 我得到了响应标题 其中包含 set cookie Set Cookie
  • 多个列表和大小的所有可能排列

    在 python 中使用以下命令很容易计算简单的排列itertools permutations https docs python org 3 library itertools html itertools permutations 你
  • 如何不断地将 STDOUT 发送到我的 python TCP 服务器?

    我有简单的 python echo 服务器 它使用套接字 并向客户端回显随机数 我有另一个程序 每 2 秒将值打印到标准输出 如果它只是一个脚本 我可以像这样重定向 stdout python script py 并像这样在脚本中获取它da
  • 在 Qt 5 中嵌入 Python

    我想将 Python 解释器嵌入到 Qt 5 应用程序中 我在 Qt 5 中有一个工作应用程序 但是当我把 include
  • Python Tkinter 网格复选框

    我想知道是否有一种简单的方法可以使用 Tkinter 创建复选框网格 我正在尝试制作一个由 10 行和 10 列 即 100 个复选框 组成的网格 以便每行只能选择两个复选框 编辑 我正在使用带有spyder的python 2 7 到目前为
  • 如何删除 pip 安装的所有软件包?

    如何从当前激活的虚拟环境中卸载 pip 安装的所有软件包 我发现这个片段作为替代解决方案 与重新创建 virtualenv 相比 删除库更加优雅 pip freeze xargs pip uninstall y 如果您通过 VCS 安装了软
  • Django INSTALLED_APPS 的命名约定是如何工作的?

    该网站上的教程创建了一个名为 polls 的应用程序 它使用 django 1 9 所以在 INSTALLED APPS 中它是 polls apps PollsConfig 我正在观看一个教程 他将应用程序命名为新闻通讯 并且在 INST
  • 从 python 文件调用 Julia 函数

    我能够创建一个 docker 环境 然后按照这个线程我有一个用 Julia 编写的高性能函数 如何从 Python 中使用它 https stackoverflow com questions 64241264 i have a high
  • 给定一个字符串,如何删除所有重复的连续字母?

    如何从字符串中删除两个连续的字母 例如 a str hii thherre 应该成为 hi there 我尝试这样做 a str join sorted set a str key a str index 但是 我得到 hi ter 是的
  • python:xml.etree.ElementTree,删除“命名空间”

    我喜欢 ElementTree 解析 xml 的方式 特别是 Xpath 功能 我有一个带有嵌套标签的应用程序的 xml 输出 我想按名称访问此标签而不指定名称空间 这可能吗 例如 root findall molpro job 代替 ro
  • scikit-learn kmeans 聚类的初始质心

    如果我已经有一个可以作为初始质心的 numpy 数组 我该如何正确初始化 kmeans 算法 我正在使用 scikit learn Kmeans 类 这个帖子 具有选定初始中心的 k 均值 https stackoverflow com q
  • 跟踪白色背景中的白球(Python/OpenCV)

    我在 Python 3 中使用 OpenCV 来检测白场上的白 黑球 并给出它的精确 x y 半径 和颜色 我使用函数 cv2 Canny 和 cv2 findContours 来找到它 但问题是 cv2 Canny 并不总是检测到圆的完整
  • 如何通过 Selenium 内部的文本查找按钮(Python)?

    我有以下三个按钮 我不知道如何获取其中的文本 例如异常值 我试过browser find element by link text Outliers click 但出现 无法找到元素 错误 我该怎么做 See find element by
  • 具有重复值的 Sqlite 列

    就说专栏吧aSQLite 数据库的非常重复 始终有相同的 4 个值 其他值可能稍后出现 但不同值的数量将少于 1000 个 VALUES hello world it s a shame to store this str many tim
  • 如何从Python枚举类中获取所有值?

    我正在使用 Enum4 库创建一个枚举类 如下所示 class Color Enum RED 1 BLUE 2 我要打印 1 2 作为某处的列表 我怎样才能实现这个目标 您可以执行以下操作 e value for e in Color

随机推荐