我的需求是数据库中有一个表,其字段为,id,词,主题,主题,该词在该主题下出现的概率(LDA生成的内容)。我要为利用数据库中的内容为每一条微博计算其在主题上的概率,于是我需要不停的做select,进行全表扫描。
我以id和topic作为联合主键,将type建立了索引。原本是一个txt文件,读入数据库后数据量达到了3600w,所以我有200个主题,就以每五个一个主题一个分表建立了分表,这样查询更加方便,存入数据库的时候也不会因为数据量过大出现各种问题。
在之后的select过程中,遇到了种种问题。
我的微博信息存储在一个txt里面,有很多这样的txt;每个txt中的一行都是一条分好词的微博。我使用了Guava IO来进行读取。
1. 最初我的思路是,拿一条微博查询这条微博中的词在各个主题下的概率,然后写入txt便于我后续处理,get connection,建立一个statement,然后去找结果,之后将resultset读出,写入txt。但是这样速度非常慢;
2. 于是我优化了我的select语句,我使用了order by topic,我将我需要的这条微博里的词的概率一次筛选出来,然后再进行处理。我使用的依旧是statement,但是这样很快就遇到了问题;报错信息如下:
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
Last packet sent to the server was 39967 ms ago.
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
at java.lang.reflect.Constructor.newInstance(Unknown Source)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:353)
at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1074)
at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:2583)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:2871)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:1601)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:1710)
at com.mysql.jdbc.Connection.execSQL(Connection.java:2436)
at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1402)
at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1556)
at com.sl.mixmodel.LMModel.getIndexno(LMModel.java:71)
at com.sl.mixmodel.LMModel.main(LMModel.java:30)
Caused by: java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(Unknown Source)
at java.net.SocketInputStream.read(Unknown Source)
at com.mysql.jdbc.util.ReadAheadInputStream.fill(ReadAheadInputStream.java:113)
at com.mysql.jdbc.util.ReadAheadInputStream.readFromUnderlyingStreamIfNecessary(ReadAheadInputStream.java:160)
at com.mysql.jdbc.util.ReadAheadInputStream.read(ReadAheadInputStream.java:188)
at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:1955)
at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:2372)
... 8 more
Exception in thread "main" java.lang.NullPointerException
at com.sl.mixmodel.LMModel.getIndexno(LMModel.java:79)
at com.sl.mixmodel.LMModel.main(LMModel.java:30)
这个就是传说中的Connection reset问题,出现这个问题之后,数据库链接被耗尽,只有重启才能再使用mysql,但是再次运行程序,依旧是这个问题。我在百度上搜索了好几天,也找不到这个问题的答案。我看到了stackflow上的回答:
http://stackoverflow.com/questions/6865538/solving-a-communications-link-failure-with-jdbc-and-mysql
出现这个问题的原因有很多:
1. 如果你收到这个包的时间是0ms,那么你的数据库连接是有问题的,更多的是网络问题导致没有连接上数据库,你需要检查你的端口号,连接名,检查bind-address;
2. 如果你是运行了一段时间之后才出现问题,那你的问题就和我比较类似了,是因为大量的查询导致了mysql那边堵塞,最后那边卡死推出,这边再发送请求来请求数据,就会导致链接重置的问题;
3. 如果你的数据库跑了一夜,导致了该问题的发生,那么你需要通过设置mysql的connection的时间来解决链接重置的问题。
4. 当然也可能是jdk的版本问题,或者你的mysql-connection的包的问题,你需要仔细检查,防止这些小地方出毛病。
于是我又开始了再一次的优化,我使用了preparedstatement,但是由于我要预编译的句子不是固定的,也就是说不能确定那个?有多少个,所以,我简单的以为,我可以直接通过字符串拼接完成这个sql,问题依旧。我觉得问题应该就是mysql那边由于访问被卡死了,我试着来手动清空mysql那边查询到的,并且我已经写完释放的数据,但是并没有这样的方法,都是自己回收的。
我看到了这个,http://www.cnblogs.com/xhan/p/3958521.html,我希望通过分批次的读写,减少resultset读取过来的阻碍,但是依旧没用。
直到今天,我找到了,http://www.importnew.com/5660.html,我才发现我硬性的去预编译一条sql语句效率反而下降了。我修改了mysql的配置,给mysq更多的max connection,保证不会因为链接过多而阻塞退出。
这是我最终的代码:
package com.sl.mixmodel;
import java.io.File;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collection;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.sl.util.DB;
public class LMModel {
/**
* @param args
*/
public static void main(String[] args) {
LMModel lmmodel = new LMModel();
Collection<File> c = lmmodel
.getAllUserResult("E:/myeclipse_workplace_sl/personalizedsearch2/sr/after/");
Connection conn = DB.getConn();
for (File f : c) {
lmmodel.insertIndexno(conn, f,
"E:/myeclipse_workplace_sl/personalizedsearch2/sr/"
,f.getName());
}
DB.close(conn);
// Collection<File> files = FileUtils.listFiles(new File("E:/myeclipse_workplace_sl/personalizedsearch2/sr/after/"), null, false);
// for(File f : files){
// System.out.println(f.getName());
// System.out.println(FilenameUtils.getBaseName(f.getAbsolutePath()));
// }
}
public Collection<File> getAllUserResult(String source) {
Collection<File> c = FileUtils.listFiles(new File(source), null, false);
return c;
}
public void insertIndexno(Connection conn, File f, String bathpath, String filename) {
List<String> resultlist = null;
try {
resultlist = FileUtils.readLines(f);
} catch (IOException e) {
e.printStackTrace();
}
int rank = 0;
for (String resultline : resultlist) {
Splitter splitter = Splitter.on(" ").trimResults();
List<String> list = splitter.splitToList(resultline);
Joiner joiner = Joiner.on("','").skipNulls();
String j = "('" + joiner.join(list) + "')";
String path = bathpath + FilenameUtils.getBaseName(filename) + "_" + rank + FilenameUtils.getExtension(filename);
String sql = "SELECT topic,type,oprop INTO OUTFILE '" + path
+ "' FROM phi_p WHERE type IN " + j
+ " ORDER BY topic";
System.out.println(sql);
PreparedStatement pstmt = null;
try {
pstmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY);
} catch (SQLException e2) {
e2.printStackTrace();
}
ResultSet rs = null;
try {
pstmt.setFetchSize(Integer.MIN_VALUE);
rs = pstmt.executeQuery();
} catch (SQLException e2) {
e2.printStackTrace();
}
rank++;
DB.close(rs);
DB.close(pstmt);
}
}
public void getIndexno(Connection conn, File f, String resultprop) {
// read the file to list
File finalprop = new File(resultprop);
List<String> props = Lists.newArrayList();
// batch parameters
List<String> resultlist = null;
try {
resultlist = FileUtils.readLines(f);
} catch (IOException e) {
e.printStackTrace();
}
int rank = 0;
for (String resultline : resultlist) {
int linecounter = 0;
int batchsize = 2000;
Splitter splitter = Splitter.on(" ").trimResults();
List<String> list = splitter.splitToList(resultline);
int listsize = list.size();
Joiner joiner = Joiner.on("','").skipNulls();
String j = "('" + joiner.join(list) + "')";
String s = "?";
List<String> quotes = Lists.newArrayList();
for(int i = 0; i < listsize; i++){
quotes.add(s);
}
Joiner forsql = Joiner.on(",");
String quote = "(" + forsql.join(quotes) + ")";
String sql = "SELECT topic,type,oprop FROM phi_p WHERE type IN "
+ quote + " ORDER BY topic";
PreparedStatement pstmt = null;
try {
pstmt = conn.prepareStatement(sql);
int count = 1;
for(String str : list){
pstmt.setString(count, str);
count++;
}
} catch (SQLException e3) {
e3.printStackTrace();
}
ResultSet rs = null;
try {
rs = pstmt.executeQuery();
} catch (SQLException e2) {
e2.printStackTrace();
}
// Statement stmt = DB.getStatement(conn);
// ResultSet rs = DB.getResultSet(stmt, sql);
try {
while (rs.next()) {
linecounter++;
String s2 = rank + " " + rs.getString("topic") + " "
+ rs.getString("type") + " "
+ rs.getDouble("oprop");
System.out.println(s2);
props.add(s2);
if (linecounter % batchsize == 0) {
try {
FileUtils.writeLines(finalprop, props, true);
} catch (IOException e) {
e.printStackTrace();
}
props.clear();
}
}
} catch (SQLException e1) {
e1.printStackTrace();
}
if (linecounter % batchsize > 0) {
try {
FileUtils.writeLines(finalprop, props, true);
} catch (IOException e) {
e.printStackTrace();
}
}
props.clear();
try {
pstmt.execute("RESET QUERY CACHE");
} catch (SQLException e) {
e.printStackTrace();
}
rank++;
DB.close(rs);
DB.close(pstmt);
}
}
}
communication reset以及Communications link failure问题的原因有很多,要看情况而定,分析问题产生的原因,单纯的看那些博客随便搞搞不是办法。现在百度上多数的帖子依旧是设置time_out的方法。
更多的解决方案,请找到自己问题所在,看看这个:http://stackoverflow.com/questions/6865538/solving-a-communications-link-failure-with-jdbc-and-mysql