召回策略:热度,LBS,user tag,itemcf,频繁模式挖掘,二部图挖掘,embedding(word2vec、fasttext、bert),deep match排序策略,learning to rank 流程三大模式(pointwise、pairwise、listwise),常见的特征挖掘(user、item、context,以及相互交叉),ctr预估(lr、gbdt、fm、ffm、dnn、widedeep、dcn、deepfm)探索与发现(bandit、Q-Learning、DQN)
LSH算法的优势是,可以在线性时间内获取相似的topK向量,类似于搜索引擎和NLP算法实现,大多数情况下无法对全量数据进行计算,这样复杂度是n的平方,海量数据n的平方复杂度是可怕的。局部敏感哈希的基本思想类似于一种空间域转换思想,LSH算法基于一个假设,如果两个向量在原有的数据空间是相似的,那么分别经过哈希函数转换以后的它们也具有很高的相似度;相反,如果它们本身是不相似的,那么经过转换后它们应仍不具有相似性。
import java.text.SimpleDateFormat
import java.util.{Calendar, Date}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.{SparkConf, SparkContext, mllib}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.recommendation.ALS
import com.soundcloud.lsh.{Cosine, Lsh, NearestNeighbours}
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.mllib.recommendation.Rating
import scala.collection.mutable.ArrayBuffer
object CosineLSHJoinSpark {
def main(args: Array[String]){
val conf = new SparkConf().setAppName("CosineLSHJoinSpark")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.format("orc").load(getLastNDaysPath(15))
df.show
val (ratings,productMap) = datatransform_formkpixel(df)
ratings.take(20).foreach(println)
// Build the recommendation model using ALS
val rank = 10
val numIterations = 10
val model = ALS.train(ratings, rank, numIterations, 0.02)
val vectorArray: ArrayBuffer[IndexedRow] = new ArrayBuffer[IndexedRow]()
val fdata=model.productFeatures.collect()
for(i <- 0 until fdata.length) {
if(fdata(i)!=null && fdata(i)._1!=null && fdata(i)._2!=null) {
val t=fdata(i) //(int,Array[double]) (productID,vector)
val ir=IndexedRow(i,Vectors.dense(t._2))
vectorArray.append(ir)
}
}
val idxrows = sc.parallelize(vectorArray)
val idxmat: IndexedRowMatrix = new IndexedRowMatrix(idxrows)
/*
val rows = Seq(
IndexedRow(1, Vectors.dense(1.0, 1.0, 1.0)),
IndexedRow(2, Vectors.dense(2.0, 2.0, 2.0)),
IndexedRow(5, Vectors.dense(6.0, 3.0, 2.0))
)
val matrix = new IndexedRowMatrix(sparkSession.sparkContext.parallelize(rows))
*/
val lsh = new Lsh(
minCosineSimilarity = 0.1,
dimensions = 20,
numNeighbours = 10,
numPermutations = 2
)
val similariyMatrix = lsh.join(idxmat)
val orderTable=similariyMatrix.entries.groupBy(tup => tup.i).flatMap(tup =>{
tup._2.toList.sortWith((a,b) =>a.value>b.value)
})
val results = orderTable.map {
entry =>
"%s %s %.6f".format(productMap(entry.i), productMap(entry.j), entry.value)
}
results.take(20).foreach(println)
results.saveAsTextFile("this is save path ")
// above will print:
// item:2 item:5 cosine:0.91
// item:1 item:5 cosine:0.91
// item:1 item:2 cosine:1,00
}
def datatransform(df:DataFrame) ={
val r = df.rdd
val affData = r.flatMap(row => {
val result: ArrayBuffer[(String, String)] = new ArrayBuffer[(String, String)]()
val ifa: String = if (row(0) != null) row.getString(0) else null
val bundle: Array[String] = if (row(1) != null) row.getSeq[String](1).toArray[String] else null
if (bundle != null && bundle.length > 1) {
bundle.foreach(b => {
result.append((ifa, b.trim))
})
}
result
}).filter(x => x._1 != null && x._2 != null )
val stringData=affData.map(x =>(x._1,x._2,1))
//get distinct names and products and create maps from them
val ifaname = stringData.map(_._1).distinct.sortBy(x => x).zipWithIndex.collectAsMap
val products = stringData.map(_._2).distinct.sortBy(x => x).zipWithIndex.collectAsMap
val data_rating=stringData.map(r => Rating(ifaname(r._1).toInt,products(r._2).toInt,r._3))
val reproducts=products.map(line=>(line._2,line._1))
(data_rating,reproducts)
}
def datatransform_formkpixel(df:DataFrame) ={
val r = df.rdd
val affData = r.flatMap(row => {
val result: ArrayBuffer[(String, String)] = new ArrayBuffer[(String, String)]()
val ifa: String = if (row(8) != null) row.getString(8) else null
val content = if (row(16) != null) row.getString(16) else null
val content_ids: Array[String] = if (content != null) content.split(",") else null
if (content_ids != null && content_ids.length > 1) {
content_ids.foreach(b => {
result.append((ifa, b.trim))
})
}
result
}).filter(x => x!=null && x._1 != null && x._2 != null)
val stringData=affData.map(x =>(x._2,x._1,1)) //(content id,ifa,1)
//get distinct names and products and create maps from them
val ifaname = stringData.map(_._1).distinct.sortBy(x => x).zipWithIndex.collectAsMap
val products = stringData.map(_._2).distinct.sortBy(x => x).zipWithIndex.collectAsMap
val data_rating=stringData.map(r => Rating(ifaname(r._1).toInt,products(r._2).toInt,r._3))
val reproducts=products.map(line=>(line._2,line._1))
(data_rating,reproducts)
}
def getLastNDaysPath(days: Int ): String = {
//println(date)
val dateFormat: SimpleDateFormat = new SimpleDateFormat( "yyyyMMdd" )
val date=dateFormat.format(new Date())
val dateF: Date = dateFormat.parse(date)
val cal: Calendar = Calendar.getInstance()
cal.setTime(dateF)
val dateArr = new ArrayBuffer[String]()
//dateArr.append(date)
var i = 1
while ( i <= days ) {
cal.add( Calendar.DATE, -i )
dateArr.append(dateFormat.format( cal.getTime() ))
i = i + 1
cal.setTime(dateF)
}
val path = "this is path prefix/{"+ dateArr.mkString(",") +"}/*/*"
println(path)
path
}
}